You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mc...@apache.org on 2014/11/24 20:25:42 UTC
qpid-proton git commit: Revert "PROTON-752: Provide a non-blocking
means to receive messages in Ruby."
Repository: qpid-proton
Updated Branches:
refs/heads/master 0820a3722 -> 3ac2e3bd3
Revert "PROTON-752: Provide a non-blocking means to receive messages in Ruby."
This reverts commit 0820a3722b6ab5c2a5a4dbfac3428de7d22c1c6e.
Reverting this change so we can have a review first.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/3ac2e3bd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/3ac2e3bd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/3ac2e3bd
Branch: refs/heads/master
Commit: 3ac2e3bd34b90a7026103af47b1cb8fd2f7d5271
Parents: 0820a37
Author: Darryl L. Pierce <mc...@gmail.com>
Authored: Mon Nov 24 14:25:26 2014 -0500
Committer: Darryl L. Pierce <mc...@gmail.com>
Committed: Mon Nov 24 14:25:26 2014 -0500
----------------------------------------------------------------------
examples/messenger/ruby/passive_recv.rb | 122 +++++++++++++++---
.../bindings/ruby/lib/qpid_proton/messenger.rb | 128 -------------------
.../bindings/ruby/lib/qpid_proton/selectable.rb | 21 +--
3 files changed, 108 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ac2e3bd/examples/messenger/ruby/passive_recv.rb
----------------------------------------------------------------------
diff --git a/examples/messenger/ruby/passive_recv.rb b/examples/messenger/ruby/passive_recv.rb
index 878c801..a3625ac 100644
--- a/examples/messenger/ruby/passive_recv.rb
+++ b/examples/messenger/ruby/passive_recv.rb
@@ -31,26 +31,110 @@ end
addresses = ["~0.0.0.0"] if addresses.empty?
-msgr = Qpid::Proton::Messenger.receive_and_call(nil, :addresses => addresses) do |message|
- puts "Address: #{message.address}"
- subject = message.subject || "(no subject)"
- puts "Subject: #{subject}"
- puts "Body: #{message.body}"
- puts "Properties: #{message.properties}"
- puts "Instructions: #{message.instructions}"
- puts "Annotations: #{message.annotations}"
-
- if message.reply_to
- puts "=== Sending a reply to #{message.reply_to}"
- reply = Qpid::Proton::Message.new
- reply.address = message.reply_to
- reply.subject = "RE: #{message.subject}"
- reply.content = "Thanks for the message!"
-
- messenger.put(reply)
- messenger.send
+messenger = Qpid::Proton::Messenger.new
+messenger.passive = true
+
+begin
+ messenger.start
+rescue ProtonError => error
+ puts "ERROR: #{error.message}"
+ puts error.backtrace.join("\n")
+ exit
+end
+
+addresses.each do |address|
+ begin
+ messenger.subscribe(address)
+ rescue Qpid::Proton::ProtonError => error
+ puts "ERROR: #{error.message}"
+ exit
+ end
+end
+
+msg = Qpid::Proton::Message.new
+
+read_array = []
+write_array = []
+selectables = {}
+
+loop do
+
+ # wait for incoming messages
+ sel = messenger.selectable
+ while !sel.nil?
+ if sel.terminal?
+ selectables.delete(sel.fileno)
+ read_array.delete(sel)
+ write_array.delete(sel)
+ sel.free
+ else
+ sel.capacity
+ sel.pending
+ if !sel.registered?
+ read_array << sel
+ write_array << sel
+ selectables[sel.fileno] = sel
+ sel.registered = true
+ end
+ end
+ sel = messenger.selectable
end
+ unless selectables.empty?
+ rarray = []; read_array.each {|fd| rarray << fd.to_io }
+ warray = []; write_array.each {|fd| warray << fd.to_io }
+
+ if messenger.deadline > 0.0
+ result = IO.select(rarray, warray, nil, messenger.deadline)
+ else
+ result = IO.select(rarray, warray)
+ end
+
+ unless result.nil? && result.empty?
+ result.flatten.each do |io|
+ sel = selectables[io.fileno]
+
+ sel.writable if sel.pending > 0
+ sel.readable if sel.capacity > 0
+ end
+ end
+
+ begin
+ messenger.receive(10)
+ rescue Qpid::Proton::ProtonError => error
+ puts "ERROR: #{error.message}"
+ exit
+ end
+
+ while messenger.incoming.nonzero?
+ begin
+ messenger.get(msg)
+ rescue Qpid::Proton::Error => error
+ puts "ERROR: #{error.message}"
+ exit
+ end
+
+ puts "Address: #{msg.address}"
+ subject = msg.subject || "(no subject)"
+ puts "Subject: #{subject}"
+ puts "Body: #{msg.body}"
+ puts "Properties: #{msg.properties}"
+ puts "Instructions: #{msg.instructions}"
+ puts "Annotations: #{msg.annotations}"
+
+ if msg.reply_to
+ puts "=== Sending a reply to #{msg.reply_to}"
+ reply = Qpid::Proton::Message.new
+ reply.address = msg.reply_to
+ reply.subject = "RE: #{msg.subject}"
+ reply.content = "Thanks for the message!"
+
+ messenger.put(reply)
+ messenger.send
+ end
+ end
+ end
end
-Thread.list[1].join
+messenger.stop
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ac2e3bd/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb b/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb
index a8f7330..5a16c50 100644
--- a/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb
+++ b/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb
@@ -75,7 +75,6 @@ module Qpid # :nodoc:
#
def initialize(name = nil)
@impl = Cproton.pn_messenger(name)
- @interrupted = false
@selectables = {}
ObjectSpace.define_finalizer(self, self.class.finalize!(@impl))
end
@@ -408,7 +407,6 @@ module Qpid # :nodoc:
# originated the interrupt.
#
def interrupt
- @interrupted = true
Cproton.pn_messenger_interrupt(@impl)
end
@@ -697,132 +695,6 @@ module Qpid # :nodoc:
!window.nil? && [Float, Fixnum].include?(window.class)
end
- public
-
-
- #--
- # The following are class methods.
- #++
-
- # Receives messages from the provided instance of Messenger, and then
- # calls the supplied block for each Message received. If no instance
- # is provided then one is created using the provided list of options.
- #
- # This starts a new thread which will loop, waiting for and processing
- # incoming messages.
- #
- # ==== Arguments
- #
- # * messenger - The instance of Messenger.
- #
- # ==== Options
- #
- # * :addresses - An array of addresses to which to subscribe. Addresses
- # are required if no Messenger was supplied.
- #
- # ==== Examples
- #
- # # create a Messenger
- # messenger = Qpid::Proton::Messenger.new
- # # begin receiving messages
- # Qpid::Proton::Messenger.receive_and_call(messenger) do |message|
- # puts "Received: #{message.body}"
- # end
- #
- def self.receive_and_call(messenger, options = {}, &block)
- # if the messenger wasn't created then create one
- if messenger.nil?
- # if no addresses were supplied then raise an exception
- raise ArgumentError.new("no addresses") if options[:addresses].nil?
- # if no block was supplied then raise an exception
- raise ArgumentError.new("missing block") if block.nil?
-
- messenger = Qpid::Proton::Messenger.new
- Array(options[:addresses]).each do |address|
- messenger.subscribe address
- end
- end
-
- # set the messenger to passive mode
- messenger.passive = true
- messenger.start
-
- Thread.new(messenger, block) do |messenger, &block|
- read_array = []
- write_array = []
- selectables = {}
-
- aborted = false
-
- while !aborted do
- # refresh the list of fds to be processed
- sel = messenger.selectable
- while !sel.nil?
- if sel.terminal?
- selectables.delete(sel.fileno)
- read_array.delete(sel)
- write_array.delete(sel)
- sel.free
- else
- sel.capacity
- sel.pending
- if !sel.registered?
- read_array << sel
- write_array << sel
- selectables[sel.fileno] = sel
- sel.registered = true
- end
- end
- sel = messenger.selectable
- end
-
- unless selectables.empty?
- rarray = []; read_array.each {|fd| rarray << fd.to_io}
- warray = []; write_array.each {|fd| warray << fd.to_io}
-
- if messenger.deadline > 0.0
- result = IO.select(rarray, warray, nil, messenger.deadline)
- else
- result = IO.select(rarray, warray)
- end
-
- unless result.nil? && result.empty?
- result.flatten.each do |io|
- sel = selectables[io.fileno]
-
- sel.writable if sel.pending > 0
- sel.readable if sel.capacity > 0
- end
- end
-
- messenger.receive(10)
-
- # if this was interrupted then exit
- messenger.instance_eval do
- aborted = @interrupted
- @interrupted = false
- end
-
- if !aborted
- # process each message received
- while messenger.incoming.nonzero?
- message = Qpid::Proton::Message.new
- messenger.get(message)
- yield message
- end
- end
-
- end
-
- end
-
- end
-
- # return the messenger
- messenger
-
- end
-
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ac2e3bd/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb b/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb
index 8b1214a..33554cd 100644
--- a/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb
+++ b/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb
@@ -37,14 +37,6 @@ module Qpid # :nodoc:
@impl = impl
@io = nil
@freed = false
-
- ObjectSpace.define_finalizer(self, self.class.finalize!(@impl))
- end
-
- def self.finalize!(impl) # :nodoc:
- proc {
- impl.free
- }
end
# Returns the underlying file descriptor.
@@ -56,11 +48,7 @@ module Qpid # :nodoc:
end
def to_io
- if @io.nil?
- fileno = self.fileno
- @io = IO.new(fileno)
- end
- @io
+ @io ||= IO.new(fileno)
end
# The number of bytes the selectable is capable of consuming.
@@ -109,14 +97,15 @@ module Qpid # :nodoc:
end
def to_s
- return super if @freed
- "#{super} fileno=#{self.fileno} registered=#{self.registered?} terminal=#{self.terminal?}"
+ "fileno=#{self.fileno} registered=#{self.registered?} terminal=#{self.terminal?}"
end
def free
+ return if @freed
@freed = true
@messenger.unregister_selectable(fileno)
- @messenger = nil
+ @io.close unless @io.nil?
+ Cproton.pn_selectable_free(@impl)
@impl = nil
end
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org