You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mc...@apache.org on 2014/11/24 20:25:42 UTC

qpid-proton git commit: Revert "PROTON-752: Provide a non-blocking means to receive messages in Ruby."

Repository: qpid-proton
Updated Branches:
  refs/heads/master 0820a3722 -> 3ac2e3bd3


Revert "PROTON-752: Provide a non-blocking means to receive messages in Ruby."

This reverts commit 0820a3722b6ab5c2a5a4dbfac3428de7d22c1c6e.

Reverting this change so we can have a review first.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/3ac2e3bd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/3ac2e3bd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/3ac2e3bd

Branch: refs/heads/master
Commit: 3ac2e3bd34b90a7026103af47b1cb8fd2f7d5271
Parents: 0820a37
Author: Darryl L. Pierce <mc...@gmail.com>
Authored: Mon Nov 24 14:25:26 2014 -0500
Committer: Darryl L. Pierce <mc...@gmail.com>
Committed: Mon Nov 24 14:25:26 2014 -0500

----------------------------------------------------------------------
 examples/messenger/ruby/passive_recv.rb         | 122 +++++++++++++++---
 .../bindings/ruby/lib/qpid_proton/messenger.rb  | 128 -------------------
 .../bindings/ruby/lib/qpid_proton/selectable.rb |  21 +--
 3 files changed, 108 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ac2e3bd/examples/messenger/ruby/passive_recv.rb
----------------------------------------------------------------------
diff --git a/examples/messenger/ruby/passive_recv.rb b/examples/messenger/ruby/passive_recv.rb
index 878c801..a3625ac 100644
--- a/examples/messenger/ruby/passive_recv.rb
+++ b/examples/messenger/ruby/passive_recv.rb
@@ -31,26 +31,110 @@ end
 
 addresses = ["~0.0.0.0"] if addresses.empty?
 
-msgr = Qpid::Proton::Messenger.receive_and_call(nil, :addresses => addresses) do |message|
-  puts "Address: #{message.address}"
-  subject = message.subject || "(no subject)"
-  puts "Subject: #{subject}"
-  puts "Body: #{message.body}"
-  puts "Properties: #{message.properties}"
-  puts "Instructions: #{message.instructions}"
-  puts "Annotations: #{message.annotations}"
-
-  if message.reply_to
-    puts "=== Sending a reply to #{message.reply_to}"
-    reply = Qpid::Proton::Message.new
-    reply.address = message.reply_to
-    reply.subject = "RE: #{message.subject}"
-    reply.content = "Thanks for the message!"
-
-    messenger.put(reply)
-    messenger.send
+messenger = Qpid::Proton::Messenger.new
+messenger.passive = true
+
+begin
+  messenger.start
+rescue ProtonError => error
+  puts "ERROR: #{error.message}"
+  puts error.backtrace.join("\n")
+  exit
+end
+
+addresses.each do |address|
+  begin
+    messenger.subscribe(address)
+  rescue Qpid::Proton::ProtonError => error
+    puts "ERROR: #{error.message}"
+    exit
+  end
+end
+
+msg = Qpid::Proton::Message.new
+
+read_array = []
+write_array = []
+selectables = {}
+
+loop do
+
+  # wait for incoming messages
+  sel = messenger.selectable
+  while !sel.nil?
+    if sel.terminal?
+      selectables.delete(sel.fileno)
+      read_array.delete(sel)
+      write_array.delete(sel)
+      sel.free
+    else
+      sel.capacity
+      sel.pending
+      if !sel.registered?
+        read_array << sel
+        write_array << sel
+        selectables[sel.fileno] = sel
+        sel.registered = true
+      end
+    end
+    sel = messenger.selectable
   end
 
+  unless selectables.empty?
+    rarray = []; read_array.each {|fd| rarray << fd.to_io }
+    warray = []; write_array.each {|fd| warray << fd.to_io }
+
+    if messenger.deadline > 0.0
+      result = IO.select(rarray, warray, nil, messenger.deadline)
+    else
+      result = IO.select(rarray, warray)
+    end
+
+    unless result.nil? && result.empty?
+      result.flatten.each do |io|
+        sel = selectables[io.fileno]
+
+        sel.writable if sel.pending > 0
+        sel.readable if sel.capacity > 0
+      end
+    end
+
+    begin
+      messenger.receive(10)
+    rescue Qpid::Proton::ProtonError => error
+      puts "ERROR: #{error.message}"
+      exit
+    end
+
+    while messenger.incoming.nonzero?
+      begin
+        messenger.get(msg)
+      rescue Qpid::Proton::Error => error
+        puts "ERROR: #{error.message}"
+        exit
+      end
+
+      puts "Address: #{msg.address}"
+      subject = msg.subject || "(no subject)"
+      puts "Subject: #{subject}"
+      puts "Body: #{msg.body}"
+      puts "Properties: #{msg.properties}"
+      puts "Instructions: #{msg.instructions}"
+      puts "Annotations: #{msg.annotations}"
+
+      if msg.reply_to
+        puts "=== Sending a reply to #{msg.reply_to}"
+        reply = Qpid::Proton::Message.new
+        reply.address = msg.reply_to
+        reply.subject = "RE: #{msg.subject}"
+        reply.content = "Thanks for the message!"
+
+        messenger.put(reply)
+        messenger.send
+      end
+    end
+  end
 end
 
-Thread.list[1].join
+messenger.stop
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ac2e3bd/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb b/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb
index a8f7330..5a16c50 100644
--- a/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb
+++ b/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb
@@ -75,7 +75,6 @@ module Qpid # :nodoc:
       #
       def initialize(name = nil)
         @impl = Cproton.pn_messenger(name)
-        @interrupted = false
         @selectables = {}
         ObjectSpace.define_finalizer(self, self.class.finalize!(@impl))
       end
@@ -408,7 +407,6 @@ module Qpid # :nodoc:
       # originated the interrupt.
       #
       def interrupt
-        @interrupted = true
         Cproton.pn_messenger_interrupt(@impl)
       end
 
@@ -697,132 +695,6 @@ module Qpid # :nodoc:
         !window.nil? && [Float, Fixnum].include?(window.class)
       end
 
-      public
-
-
-      #--
-      # The following are class methods.
-      #++
-
-      # Receives messages from the provided instance of Messenger, and then
-      # calls the supplied block for each Message received. If no instance
-      # is provided then one is created using the provided list of options.
-      #
-      # This starts a new thread which will loop, waiting for and processing
-      # incoming messages.
-      #
-      # ==== Arguments
-      #
-      # * messenger - The instance of Messenger.
-      #
-      # ==== Options
-      #
-      # * :addresses - An array of addresses to which to subscribe. Addresses
-      #   are required if no Messenger was supplied.
-      #
-      # ==== Examples
-      #
-      #   # create a Messenger
-      #   messenger = Qpid::Proton::Messenger.new
-      #   # begin receiving messages
-      #   Qpid::Proton::Messenger.receive_and_call(messenger) do |message|
-      #      puts "Received: #{message.body}"
-      #   end
-      #
-      def self.receive_and_call(messenger, options = {}, &block)
-        # if the messenger wasn't created then create one
-        if messenger.nil?
-          # if no addresses were supplied then raise an exception
-          raise ArgumentError.new("no addresses") if options[:addresses].nil?
-          # if no block was supplied then raise an exception
-          raise ArgumentError.new("missing block") if block.nil?
-
-          messenger = Qpid::Proton::Messenger.new
-          Array(options[:addresses]).each do |address|
-            messenger.subscribe address
-          end
-        end
-
-        # set the messenger to passive mode
-        messenger.passive = true
-        messenger.start
-
-        Thread.new(messenger, block) do |messenger, &block|
-          read_array = []
-          write_array = []
-          selectables = {}
-
-          aborted = false
-
-          while !aborted do
-            # refresh the list of fds to be processed
-            sel = messenger.selectable
-            while !sel.nil?
-              if sel.terminal?
-                selectables.delete(sel.fileno)
-                read_array.delete(sel)
-                write_array.delete(sel)
-                sel.free
-              else
-                sel.capacity
-                sel.pending
-                if !sel.registered?
-                  read_array << sel
-                  write_array << sel
-                  selectables[sel.fileno] = sel
-                  sel.registered = true
-                end
-              end
-              sel = messenger.selectable
-            end
-
-            unless selectables.empty?
-              rarray = []; read_array.each {|fd| rarray << fd.to_io}
-              warray = []; write_array.each {|fd| warray << fd.to_io}
-
-              if messenger.deadline > 0.0
-                result = IO.select(rarray, warray, nil, messenger.deadline)
-              else
-                result = IO.select(rarray, warray)
-              end
-
-              unless result.nil? && result.empty?
-                result.flatten.each do |io|
-                  sel = selectables[io.fileno]
-
-                  sel.writable if sel.pending > 0
-                  sel.readable if sel.capacity > 0
-                end
-              end
-
-              messenger.receive(10)
-
-              # if this was interrupted then exit
-              messenger.instance_eval do
-                aborted = @interrupted
-                @interrupted = false
-              end
-
-              if !aborted
-                # process each message received
-                while messenger.incoming.nonzero?
-                  message = Qpid::Proton::Message.new
-                  messenger.get(message)
-                  yield message
-                end
-              end
-
-            end
-
-          end
-
-        end
-
-        # return the messenger
-        messenger
-
-      end
-
     end
 
   end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ac2e3bd/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb b/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb
index 8b1214a..33554cd 100644
--- a/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb
+++ b/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb
@@ -37,14 +37,6 @@ module Qpid # :nodoc:
         @impl = impl
         @io = nil
         @freed = false
-
-        ObjectSpace.define_finalizer(self, self.class.finalize!(@impl))
-      end
-
-      def self.finalize!(impl) # :nodoc:
-        proc {
-          impl.free
-        }
       end
 
       # Returns the underlying file descriptor.
@@ -56,11 +48,7 @@ module Qpid # :nodoc:
       end
 
       def to_io
-        if @io.nil?
-          fileno = self.fileno
-          @io = IO.new(fileno)
-        end
-        @io
+        @io ||= IO.new(fileno)
       end
 
       # The number of bytes the selectable is capable of consuming.
@@ -109,14 +97,15 @@ module Qpid # :nodoc:
       end
 
       def to_s
-        return super if @freed
-        "#{super} fileno=#{self.fileno} registered=#{self.registered?} terminal=#{self.terminal?}"
+        "fileno=#{self.fileno} registered=#{self.registered?} terminal=#{self.terminal?}"
       end
 
       def free
+        return if @freed
         @freed = true
         @messenger.unregister_selectable(fileno)
-        @messenger = nil
+        @io.close unless @io.nil?
+        Cproton.pn_selectable_free(@impl)
         @impl = nil
       end
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org