You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/12/13 20:37:07 UTC
[8/9] qpid-proton git commit: PROTON-1537: [ruby] Update tests to new
API, many fixes
PROTON-1537: [ruby] Update tests to new API, many fixes
Both the old and new APIs are supported fornow, old APIs causes [DEPRECATION]
warnings (disable with `ruby -W0`)
Old: Qpid::Proton::Reactor::Container, Qpid::Proton::Handler::MessagingHandler
New: Qpid::Proton::Container, Qpid::Proton::MessagingHandler
Both are implemented in terms of the new Container and ConnectionDriver.
Both styles of handler are implemented by Adapter classes that translate raw
proton events to API handler calls.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a13bc2b9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a13bc2b9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a13bc2b9
Branch: refs/heads/master
Commit: a13bc2b9fa08fd8ab068abdf8645d334cde1161c
Parents: 4678e74
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Dec 11 15:36:07 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Dec 13 14:09:26 2017 -0500
----------------------------------------------------------------------
examples/ruby/README.md | 10 +-
examples/ruby/broker.rb | 50 ++--
examples/ruby/client.rb | 22 +-
examples/ruby/direct_recv.rb | 12 +-
examples/ruby/direct_send.rb | 16 +-
examples/ruby/helloworld.rb | 24 +-
examples/ruby/server.rb | 35 +--
examples/ruby/simple_recv.rb | 12 +-
examples/ruby/simple_send.rb | 16 +-
proton-c/bindings/ruby/CMakeLists.txt | 13 +-
proton-c/bindings/ruby/lib/core/connection.rb | 1 +
.../bindings/ruby/lib/core/connection_driver.rb | 9 +-
proton-c/bindings/ruby/lib/core/container.rb | 38 +--
proton-c/bindings/ruby/lib/core/delivery.rb | 12 +-
proton-c/bindings/ruby/lib/core/event.rb | 2 +-
proton-c/bindings/ruby/lib/core/exceptions.rb | 4 +-
.../bindings/ruby/lib/core/messaging_handler.rb | 14 +-
proton-c/bindings/ruby/lib/core/transfer.rb | 6 +-
proton-c/bindings/ruby/lib/handler/adapter.rb | 61 ++---
.../ruby/lib/handler/messaging_adapter.rb | 125 ++++++++++
.../ruby/lib/handler/messaging_handler.rb | 5 +
.../ruby/lib/handler/old_messaging_adapter.rb | 151 ------------
.../lib/handler/reactor_messaging_adapter.rb | 158 +++++++++++++
proton-c/bindings/ruby/lib/qpid_proton.rb | 25 +-
proton-c/bindings/ruby/lib/reactor/container.rb | 86 +++----
proton-c/bindings/ruby/lib/types/array.rb | 7 +-
proton-c/bindings/ruby/spec/array_spec.rb | 42 +---
proton-c/bindings/ruby/spec/spec_helper.rb | 28 +--
.../ruby/tests/old_examples/old_example_test.rb | 4 +-
proton-c/bindings/ruby/tests/test_adapter.rb | 227 ------------------
.../ruby/tests/test_connection_driver.rb | 45 ++--
proton-c/bindings/ruby/tests/test_container.rb | 100 ++++----
proton-c/bindings/ruby/tests/test_delivery.rb | 68 +++---
.../ruby/tests/test_messaging_adapter.rb | 232 +++++++++++++++++++
.../bindings/ruby/tests/test_old_adapter.rb | 228 ++++++++++++++++++
proton-c/bindings/ruby/tests/test_tools.rb | 90 +++----
36 files changed, 1163 insertions(+), 815 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/examples/ruby/README.md
----------------------------------------------------------------------
diff --git a/examples/ruby/README.md b/examples/ruby/README.md
index 938ef17..66f6b31 100644
--- a/examples/ruby/README.md
+++ b/examples/ruby/README.md
@@ -54,7 +54,7 @@ For example if you start `direct_recv.rb`, you can connect to it directly with
In this set of examples we see the following event occurring, in addition to what we've seen before:
- * **on_disconnected** - Fired when the transport is closed.
+ * **on_transport_close** - Fired when the network transport is closed.
## Now About That Broker example
@@ -70,7 +70,7 @@ The Broker manages a map connecting a queue address to the instance of Exchange
The broker application demonstrates a new set of events:
- * **on_link_opening** - Fired when a remote link is opened but the local end is not yet open. From this event the broker grabs the address and subscribes the link to an exchange for that address.
- * **on_link_closing** - Fired when a remote link is closed but the local end is still open. From this event the broker grabs the address and unsubscribes the link from that exchange.
- * **on_connection_closing** - Fired when a remote connection is closed but the local end is still open.
- * **on_disconnected** - Fired when the protocol transport has closed. The broker removes all links for the disconnected connection, avoiding workign with endpoints that are now gone.
+ * **on_link_open** - Fired when a remote link is opened. From this event the broker grabs the address and subscribes the link to an exchange for that address.
+ * **on_link_close** - Fired when a remote link is closed. From this event the broker grabs the address and unsubscribes the link from that exchange.
+ * **on_connection_close** - Fired when a remote connection is closed but the local end is still open.
+ * **on_transport_close** - Fired when the protocol transport has closed. The broker removes all links for the disconnected connection, avoiding workign with endpoints that are now gone.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/examples/ruby/broker.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/broker.rb b/examples/ruby/broker.rb
index 111d41b..aad32a9 100644
--- a/examples/ruby/broker.rb
+++ b/examples/ruby/broker.rb
@@ -69,7 +69,7 @@ class MessageQueue
end
-class Broker < Qpid::Proton::Handler::MessagingHandler
+class Broker < Qpid::Proton::MessagingHandler
def initialize(url)
super()
@@ -77,8 +77,8 @@ class Broker < Qpid::Proton::Handler::MessagingHandler
@queues = {}
end
- def on_start(event)
- @listener = event.container.listen(@url)
+ def on_container_start(container)
+ @listener = container.listen(@url)
STDOUT.puts "Listening on #{@url}"; STDOUT.flush
end
@@ -89,20 +89,20 @@ class Broker < Qpid::Proton::Handler::MessagingHandler
@queues[address]
end
- def on_link_opening(event)
- if event.link.sender?
- if event.link.remote_source.dynamic?
+ def on_link_open(link)
+ if link.sender?
+ if link.remote_source.dynamic?
address = SecureRandom.uuid
- event.link.source.address = address
+ link.source.address = address
q = MessageQueue.new(true)
@queues[address] = q
- q.subscribe(event.link)
- elsif event.link.remote_source.address
- event.link.source.address = event.link.remote_source.address
- self.queue(event.link.source.address).subscribe(event.link)
+ q.subscribe(link)
+ elsif link.remote_source.address
+ link.source.address = link.remote_source.address
+ self.queue(link.source.address).subscribe(link)
end
- elsif event.link.remote_target.address
- event.link.target.address = event.link.remote_target.address
+ elsif link.remote_target.address
+ link.target.address = link.remote_target.address
end
end
@@ -114,16 +114,16 @@ class Broker < Qpid::Proton::Handler::MessagingHandler
end
end
- def on_link_closing(event)
- self.unsubscribe(event.link) if event.link.sender?
+ def on_link_close(link)
+ self.unsubscribe(link) if link.sender?
end
- def on_connection_closing(event)
- self.remove_stale_consumers(event.connection)
+ def on_connection_close(connection)
+ self.remove_stale_consumers(connection)
end
- def on_disconnected(event)
- self.remove_stale_consumers(event.connection)
+ def on_transport_close(transport)
+ self.remove_stale_consumers(transport.connection)
end
def remove_stale_consumers(connection)
@@ -134,14 +134,14 @@ class Broker < Qpid::Proton::Handler::MessagingHandler
end
end
- def on_sendable(event)
- q = self.queue(event.link.source.address)
- q.dispatch(event.link)
+ def on_sendable(sender)
+ q = self.queue(sender.source.address)
+ q.dispatch(sender)
end
- def on_message(event)
- q = self.queue(event.link.target.address)
- q.publish(event.message)
+ def on_message(delivery, message)
+ q = self.queue(delivery.link.target.address)
+ q.publish(message)
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/examples/ruby/client.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/client.rb b/examples/ruby/client.rb
index 46c1251..1439169 100644
--- a/examples/ruby/client.rb
+++ b/examples/ruby/client.rb
@@ -20,7 +20,7 @@
require 'qpid_proton'
require 'optparse'
-class Client < Qpid::Proton::Handler::MessagingHandler
+class Client < Qpid::Proton::MessagingHandler
def initialize(url, address, requests)
super()
@@ -29,8 +29,8 @@ class Client < Qpid::Proton::Handler::MessagingHandler
@requests = requests
end
- def on_start(event)
- c = event.container.connect(@url)
+ def on_container_start(container)
+ c = container.connect(@url)
@sender = c.open_sender(@address)
@receiver = c.open_receiver({:dynamic => true})
end
@@ -45,24 +45,22 @@ class Client < Qpid::Proton::Handler::MessagingHandler
end
end
- def on_link_opened(event)
- if event.receiver == @receiver
- next_request
- end
+ def on_link_open(link)
+ next_request if link.receiver?
end
- def on_message(event)
- puts "<- #{event.message.body}"
+ def on_message(delivery, message)
+ puts "<- #{message.body}"
@requests.delete_at(0)
if !@requests.empty?
next_request
else
- event.connection.close
+ delivery.connection.close
end
end
- def on_transport_error(event)
- raise "Connection error: #{event.transport.condition}"
+ def on_transport_error(transport)
+ raise "Connection error: #{transport.condition}"
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/examples/ruby/direct_recv.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/direct_recv.rb b/examples/ruby/direct_recv.rb
index 2e3295f..327d8d0 100644
--- a/examples/ruby/direct_recv.rb
+++ b/examples/ruby/direct_recv.rb
@@ -20,7 +20,7 @@
require 'qpid_proton'
require 'optparse'
-class DirectReceive < Qpid::Proton::Handler::MessagingHandler
+class DirectReceive < Qpid::Proton::MessagingHandler
def initialize(url, address, count)
super()
@@ -35,16 +35,16 @@ class DirectReceive < Qpid::Proton::Handler::MessagingHandler
def on_accept(l) l.close; end
end
- def on_start(event)
- event.container.listen(@url, ListenOnce.new)
+ def on_container_start(container)
+ container.listen(@url, ListenOnce.new)
end
- def on_message(event)
+ def on_message(delivery, message)
if @expected.zero? || (@received < @expected)
- puts "Received: #{event.message.body}"
+ puts "Received: #{message.body}"
@received = @received + 1
if @received == @expected
- event.connection.close
+ delivery.connection.close
end
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/examples/ruby/direct_send.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/direct_send.rb b/examples/ruby/direct_send.rb
index 0ccfe38..6ea5fd3 100644
--- a/examples/ruby/direct_send.rb
+++ b/examples/ruby/direct_send.rb
@@ -20,7 +20,7 @@
require 'qpid_proton'
require 'optparse'
-class DirectSend < Qpid::Proton::Handler::MessagingHandler
+class DirectSend < Qpid::Proton::MessagingHandler
def initialize(url, address, expected)
super()
@@ -36,23 +36,23 @@ class DirectSend < Qpid::Proton::Handler::MessagingHandler
def on_accept(l) l.close; end
end
- def on_start(event)
- event.container.listen(@url, ListenOnce.new)
+ def on_container_start(container)
+ container.listen(@url, ListenOnce.new)
end
- def on_sendable(event)
- while event.sender.credit > 0 && @sent < @expected
+ def on_sendable(sender)
+ while sender.credit > 0 && @sent < @expected
msg = Qpid::Proton::Message.new("sequence #{@sent}", { :id => @sent } )
- event.sender.send(msg)
+ sender.send(msg)
@sent = @sent + 1
end
end
- def on_accepted(event)
+ def on_tracker_accept(tracker)
@confirmed = @confirmed + 1
if @confirmed == @expected
puts "All #{@expected} messages confirmed!"
- event.connection.close
+ tracker.connection.close
end
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/examples/ruby/helloworld.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/helloworld.rb b/examples/ruby/helloworld.rb
index 2b2d4f5..a03060c 100644
--- a/examples/ruby/helloworld.rb
+++ b/examples/ruby/helloworld.rb
@@ -20,33 +20,31 @@
require 'qpid_proton'
require 'optparse'
-class HelloWorld < Qpid::Proton::Handler::MessagingHandler
+class HelloWorld < Qpid::Proton::MessagingHandler
def initialize(url, address)
super()
@url, @address = url, address
end
- def on_start(event)
- conn = event.container.connect(@url)
+ def on_container_start(container)
+ conn = container.connect(@url)
conn.open_sender(@address)
conn.open_receiver(@address)
end
- def on_sendable(event)
- msg = Qpid::Proton::Message.new
- msg.body = "Hello world!"
- event.sender.send(msg)
- event.sender.close
+ def on_sendable(sender)
+ sender.send(Qpid::Proton::Message.new("Hello world!"))
+ sender.close
end
- def on_message(event)
- puts event.message.body
- event.connection.close
+ def on_message(delivery, message)
+ puts message.body
+ delivery.connection.close
end
- def on_transport_error(event)
- raise "Connection error: #{event.transport.condition}"
+ def on_transport_error(transport)
+ raise "Connection error: #{transport.condition}"
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/examples/ruby/server.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/server.rb b/examples/ruby/server.rb
index 87eba99..7a4832b 100644
--- a/examples/ruby/server.rb
+++ b/examples/ruby/server.rb
@@ -20,7 +20,7 @@
require 'qpid_proton'
require 'optparse'
-class Server < Qpid::Proton::Handler::MessagingHandler
+class Server < Qpid::Proton::MessagingHandler
def initialize(url, address)
super()
@@ -29,34 +29,35 @@ class Server < Qpid::Proton::Handler::MessagingHandler
@senders = {}
end
- def on_start(event)
- c = event.container.connect(@url)
+ def on_container_start(container)
+ c = container.connect(@url)
c.open_receiver(@address)
@relay = nil
end
- def on_connection_opened(event)
- if event.connection.remote_offered_capabilities &&
- event.connection.remote_offered_capabilities.contain?("ANONYMOUS-RELAY")
- @relay = event.connection.open_sender({:target => nil})
+ def on_connection_open(connection)
+ if connection.remote_offered_capabilities &&
+ connection.remote_offered_capabilities.contain?("ANONYMOUS-RELAY")
+ @relay = connection.open_sender({:target => nil})
end
end
- def on_message(event)
- msg = event.message
- return unless msg.reply_to # Not a request message
- puts "<- #{msg.body}"
- sender = @relay || (@senders[msg.reply_to] ||= event.connection.open_sender(msg.reply_to))
+ def on_message(delivery, message)
+ return unless message.reply_to # Not a request message
+ puts "<- #{message.body}"
+ unless (sender = @relay)
+ sender = (@senders[message.reply_to] ||= delivery.connection.open_sender(message.reply_to))
+ end
reply = Qpid::Proton::Message.new
- reply.address = msg.reply_to
- reply.body = msg.body.upcase
+ reply.address = message.reply_to
+ reply.body = message.body.upcase
puts "-> #{reply.body}"
- reply.correlation_id = msg.correlation_id
+ reply.correlation_id = message.correlation_id
sender.send(reply)
end
- def on_transport_error(event)
- raise "Connection error: #{event.transport.condition}"
+ def on_transport_error(transport)
+ raise "Connection error: #{transport.condition}"
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/examples/ruby/simple_recv.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/simple_recv.rb b/examples/ruby/simple_recv.rb
index 446d7da..8c8bf50 100644
--- a/examples/ruby/simple_recv.rb
+++ b/examples/ruby/simple_recv.rb
@@ -20,7 +20,7 @@
require 'qpid_proton'
require 'optparse'
-class SimpleReceive < Qpid::Proton::Handler::MessagingHandler
+class SimpleReceive < Qpid::Proton::MessagingHandler
def initialize(url, address, count)
super()
@@ -30,17 +30,17 @@ class SimpleReceive < Qpid::Proton::Handler::MessagingHandler
@received = 0
end
- def on_start(event)
- c = event.container.connect(@url)
+ def on_container_start(container)
+ c = container.connect(@url)
c.open_receiver(@address)
end
- def on_message(event)
+ def on_message(delivery, message)
if @expected.zero? || (@received < @expected)
- puts "Received: #{event.message.body}"
+ puts "Received: #{message.body}"
@received = @received + 1
if @received == @expected
- event.connection.close
+ delivery.connection.close
end
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/examples/ruby/simple_send.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/simple_send.rb b/examples/ruby/simple_send.rb
index be919d5..1a65cd5 100644
--- a/examples/ruby/simple_send.rb
+++ b/examples/ruby/simple_send.rb
@@ -20,7 +20,7 @@
require 'qpid_proton'
require 'optparse'
-class SimpleSend < Qpid::Proton::Handler::MessagingHandler
+class SimpleSend < Qpid::Proton::MessagingHandler
def initialize(url, address, expected)
super()
@@ -31,24 +31,24 @@ class SimpleSend < Qpid::Proton::Handler::MessagingHandler
@expected = expected
end
- def on_start(event)
- c = event.container.connect(@url)
+ def on_container_start(container)
+ c = container.connect(@url)
c.open_sender(@address)
end
- def on_sendable(event)
- while event.sender.credit > 0 && @sent < @expected
+ def on_sendable(sender)
+ while sender.credit > 0 && @sent < @expected
msg = Qpid::Proton::Message.new("sequence #{@sent}", { :id => @sent } )
- event.sender.send(msg)
+ sender.send(msg)
@sent = @sent + 1
end
end
- def on_accepted(event)
+ def on_tracker_accept(tracker)
@confirmed = @confirmed + 1
if @confirmed == @expected
puts "All #{@expected} messages confirmed!"
- event.connection.close
+ tracker.connection.close
end
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/CMakeLists.txt b/proton-c/bindings/ruby/CMakeLists.txt
index 8ff1323..e41b49f 100644
--- a/proton-c/bindings/ruby/CMakeLists.txt
+++ b/proton-c/bindings/ruby/CMakeLists.txt
@@ -103,18 +103,21 @@ execute_process(COMMAND ${RUBY_EXECUTABLE} -r minitest -e ""
RESULT_VARIABLE result OUTPUT_QUIET ERROR_QUIET)
if (result EQUAL 0) # Have minitest
set(test_env ${env_py} -- "PATH=${PATH}" "RUBYLIB=${RUBYLIB}" "SASLPASSWD=${SASLPASSWD_EXE}")
- macro(add_ruby_test file)
- get_filename_component(name ${file} NAME_WE)
+
+ macro(add_ruby_test script)
+ if(${script} MATCHES "old")
+ set(opt "RUBYOPT=-W0") # Suppress deprecation warnings
+ endif()
+ get_filename_component(name ${script} NAME_WE)
string(REPLACE "_" "-" name "ruby-${name}")
add_test(
NAME ${name}
- COMMAND ${test_env} ${RUBY_EXECUTABLE} ${file} -v
+ COMMAND ${test_env} ${opt} ${RUBY_EXECUTABLE} ${script} -v
${ARGN})
+
endmacro()
add_ruby_test(example_test.rb WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/examples/ruby)
- # Old examples for backwards compatibility testing.
add_ruby_test(old_example_test.rb WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/tests/old_examples)
-
file(GLOB TESTS tests/test_*.rb)
file(GLOB SPECS spec/*_spec.rb)
foreach(t ${TESTS} ${SPECS})
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/lib/core/connection.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection.rb b/proton-c/bindings/ruby/lib/core/connection.rb
index 55873dd..dcbae1f 100644
--- a/proton-c/bindings/ruby/lib/core/connection.rb
+++ b/proton-c/bindings/ruby/lib/core/connection.rb
@@ -124,6 +124,7 @@ module Qpid::Proton
# on_connection_bound if not using a connection_driver)
@container = opts[:container]
cid = opts[:container_id] || (@container && @container.id) || SecureRandom.uuid
+ cid = cid.to_s if cid.is_a? Symbol # Allow symbols as container name
Cproton.pn_connection_set_container(@impl, cid)
Cproton.pn_connection_set_user(@impl, opts[:user]) if opts[:user]
Cproton.pn_connection_set_password(@impl, opts[:password]) if opts[:password]
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/lib/core/connection_driver.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection_driver.rb b/proton-c/bindings/ruby/lib/core/connection_driver.rb
index b796d4d..2ce132e 100644
--- a/proton-c/bindings/ruby/lib/core/connection_driver.rb
+++ b/proton-c/bindings/ruby/lib/core/connection_driver.rb
@@ -159,14 +159,13 @@ module Qpid::Proton
end
end
- # A {ConnectionDriver} that feeds events to a {Handler::MessagingHandler}
+ # A {ConnectionDriver} that feeds raw proton events to a handler.
class HandlerDriver < ConnectionDriver
- # Combine an {IO} with a {Handler::MessagingHandler} and provide
+ # Combine an {IO} with a handler and provide
# a simplified way to run the driver via {#process}
#
# @param io [IO]
- # @param handler [Handler::MessagingHandler] to receive events in
- # {#dispatch} and {#process}
+ # @param handler [Handler::MessagingHandler] to receive raw events in {#dispatch} and {#process}
def initialize(io, handler)
super(io)
@handler = handler
@@ -176,7 +175,7 @@ module Qpid::Proton
# @return [MessagingHandler] The handler dispatched to by {#process}
attr_reader :handler
- # Dispatch all events available from {#event} to {#handler}
+ # Dispatch all available raw proton events from {#event} to {#handler}
def dispatch()
each_event do |e|
case e.method # Events that affect the driver
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index fff32e6..f8c8c16 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -91,10 +91,10 @@ module Qpid::Proton
# Create a new Container
# @overload initialize(id=nil)
- # @param id [String] A unique ID for this container, use random UUID if nil.
+ # @param id [String,Symbol] A unique ID for this container, use random UUID if nil.
#
# @overload initialize(handler=nil, id=nil)
- # @param id [String] A unique ID for this container, use random UUID if nil.
+ # @param id [String,Symbol] A unique ID for this container, use random UUID if nil.
# @param handler [MessagingHandler] Optional default handler for connections
# that do not have their own handler (see {#connect} and {#listen})
#
@@ -102,12 +102,16 @@ module Qpid::Proton
# handler instance for each connection, as a shared handler may be called
# concurrently.
#
- def initialize(handler = nil, id = nil)
- # Allow ID as sole argument
- (handler, id = nil, handler.to_str) if (id.nil? && handler.respond_to?(:to_str))
- # Allow multiple handlers ofor backwards compatibility
- @handler = handler
- @id = ((id && id.to_s) || SecureRandom.uuid).freeze
+ def initialize(*args)
+ case args.size
+ when 2 then @handler, @id = args
+ when 1 then
+ @id = String.try_convert(args[0]) || (args[0].to_s if args[0].is_a? Symbol)
+ @handler = args[0] unless @id
+ else raise ArgumentError, "wrong number of arguments (given #{args.size}, expected 0..2"
+ end
+ @adapter = Handler::Adapter.adapt(@handler)
+ @id = (@id || SecureRandom.uuid).freeze
# Implementation note:
#
@@ -117,7 +121,7 @@ module Qpid::Proton
# - nil on the @work queue makes a #run thread exit
@work = Queue.new
- @work << :on_start << self # Issue on_start and start start selecting
+ @work << :start << self # Issue start and start start selecting
@wake = IO.pipe # Wakes #run thread in IO.select
@auto_stop = true # Stop when @active drops to 0
@@ -224,9 +228,8 @@ module Qpid::Proton
while task = @work.pop
case task
- when :on_start
- event = Event.new(nil, :on_start, self)
- @handler.on_start(event) if @handler.respond_to? :on_start
+ when :start
+ @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
when Container
r, w = [@wake[0]], []
@@ -264,8 +267,11 @@ module Qpid::Proton
end
ensure
@lock.synchronize do
- @running -= 1
- work_wake nil if @running > 0 # Tell the next thread to exit
+ if (@running -= 1) > 0
+ work_wake nil # Signal the next thread
+ else
+ @adapter.on_container_stop(self) if @adapter.respond_to? :on_container_stop
+ end
end
end
@@ -296,7 +302,7 @@ module Qpid::Proton
wake
end
- private
+ protected
def wake; @wake[1].write_nonblock('x') rescue nil; end
@@ -314,7 +320,7 @@ module Qpid::Proton
def connection_driver(io, opts=nil, server=false)
opts ||= {}
opts[:container] = self
- opts[:handler] ||= @handler
+ opts[:handler] ||= @adapter
ConnectionTask.new(self, io, opts, server)
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/lib/core/delivery.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/delivery.rb b/proton-c/bindings/ruby/lib/core/delivery.rb
index ba89097..e86a06e 100644
--- a/proton-c/bindings/ruby/lib/core/delivery.rb
+++ b/proton-c/bindings/ruby/lib/core/delivery.rb
@@ -75,13 +75,15 @@ module Qpid::Proton
def complete?() readable? && !aborted? && !partial?; end
# Get the message from the delivery.
- # @raise [ProtonError] if the message is not {#complete?} or there is an
- # error decoding the message.
+ # @return [Message] The message
+ # @raise [AbortedError] if the message has been aborted (check with {#aborted?}
+ # @raise [UnderflowError] if the message is incomplete (check with {#complete?}
+ # @raise [::ArgumentError] if the delivery is not the current delivery on a receiving link.
def message
return @message if @message
- raise ProtonError("message aborted by sender") if aborted?
- raise ProtonError("incoming message incomplete") if partial?
- raise ProtonError("no incoming message") unless readable?
+ raise AbortedError, "message aborted by sender" if aborted?
+ raise UnderflowError, "incoming message incomplete" if partial?
+ raise ArgumentError, "no incoming message" unless readable?
@message = Message.new
@message.decode(link.receive(pending))
link.advance
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/lib/core/event.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/event.rb b/proton-c/bindings/ruby/lib/core/event.rb
index 88fdbaf..079d1bd 100644
--- a/proton-c/bindings/ruby/lib/core/event.rb
+++ b/proton-c/bindings/ruby/lib/core/event.rb
@@ -130,7 +130,7 @@ module Qpid::Proton
case context
when Delivery then @delivery = @context
# deprecated: for backwards compat allow a Tracker to be treated as a Delivery
- when Tracker then @delivery = Delivery.new(Tracker.impl)
+ when Tracker then @delivery = Delivery.new(context.impl)
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/lib/core/exceptions.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/exceptions.rb b/proton-c/bindings/ruby/lib/core/exceptions.rb
index 34a3e60..66af5fc 100644
--- a/proton-c/bindings/ruby/lib/core/exceptions.rb
+++ b/proton-c/bindings/ruby/lib/core/exceptions.rb
@@ -117,9 +117,9 @@ module Qpid::Proton
class Release < ProtonError
end
- # Raised when a message is aborted.
+ # Raised when a message is aborted by the sender.
#
- class Aborted < ProtonError
+ class AbortedError < ProtonError
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/lib/core/messaging_handler.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/messaging_handler.rb b/proton-c/bindings/ruby/lib/core/messaging_handler.rb
index 785f730..b733d37 100644
--- a/proton-c/bindings/ruby/lib/core/messaging_handler.rb
+++ b/proton-c/bindings/ruby/lib/core/messaging_handler.rb
@@ -24,6 +24,9 @@ module Qpid::Proton
#
class MessagingHandler
+ # @private
+ def proton_adapter_class() Handler::MessagingAdapter; end
+
# Create a {MessagingHandler}
# @option opts [Integer] :prefetch (10)
# The number of messages to fetch in advance, 0 disables prefetch.
@@ -40,11 +43,8 @@ module Qpid::Proton
# @option opts [Boolean] :auto_close (true)
# If true, respond to a remote close automatically with a local close.
# If false, the application must call {Connection#close} to finish closing connections.
- # @option opts [Boolean] :peer_close_is_error (false)
- # If true, and the remote peer closes the connection without an error condition,
- # the set the local error condition {Condition}("error", "unexpected peer close")
- def initialize(opts=nil)
- @options = opts && opts.clone
+ def initialize(opts={})
+ @options = opts.clone
end
# @return [Hash] handler options, see {#initialize}
@@ -157,6 +157,10 @@ module Qpid::Proton
# The sending end settled a delivery
# @param delivery [Delivery] The delivery.
+ # @!method on_delivery_abort(delivery)
+ # A message was begun but aborted by the sender, so was not received.
+ # @param delivery [Delivery] The delivery.
+
# @!endgroup
# @!group Flow control events
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/lib/core/transfer.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/transfer.rb b/proton-c/bindings/ruby/lib/core/transfer.rb
index cdf3419..f9c67f5 100644
--- a/proton-c/bindings/ruby/lib/core/transfer.rb
+++ b/proton-c/bindings/ruby/lib/core/transfer.rb
@@ -22,13 +22,11 @@ module Qpid::Proton
# Common base class for {Tracker} and {Delivery}.
class Transfer
- private
-
+ # @!private
PROTON_METHOD_PREFIX = "pn_delivery"
+ # @!private
include Util::Wrapper
- protected
-
def self.wrap(impl)
return unless impl
self.fetch_instance(impl, :pn_delivery_attachments) ||
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/lib/handler/adapter.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/handler/adapter.rb b/proton-c/bindings/ruby/lib/handler/adapter.rb
index eb712c3..fc89b12 100644
--- a/proton-c/bindings/ruby/lib/handler/adapter.rb
+++ b/proton-c/bindings/ruby/lib/handler/adapter.rb
@@ -20,57 +20,60 @@
module Qpid::Proton
module Handler
- class MultiHandler
- def self.maybe(h)
- a = Array(h)
- a.size > 1 ? self.new(h) : h
- end
+ def self.handler_method?(method) /^on_/ =~ name; end
+
+ # Handler for an array of handlers of uniform type, with non-conflicting options
+ class ArrayHandler
+
+ def initialize(handlers)
+ raise "empty handler array" if handlers.empty?
+ adapters = handlers.map do |h|
+ h.__send__(proton_adapter_class) if h.respond_to? :proton_adapter_class
+ end.uniq
+ raise "handler array not uniform, adapters requested: #{adapters}" if adapters.size > 1
+ @proton_adapter_class = htypes[0]
- def initialize(a)
- @a = a;
@options = {}
@methods = Set.new
- @a.each do |h|
- @methods.merge(h.methods.select { |m| m.to_s.start_with?("on_") })
- @options.merge(h.options) do |k, a, b|
- raise ArgumentError, "handlers have conflicting option #{k} => #{a} != #{b}"
+ handlers.each do |h|
+ if h.respond_to?(:options)
+ @options.merge(h.options) do |k, a, b|
+ raise ArgumentError, "handler array has conflicting options for [#{k}]: #{a} != #{b}"
+ end
end
+ @methods.merge(h.methods.select { |m| handler_method? m }) # Event handler methods
end
end
- attr_reader :options
+ attr_reader :options, :proton_adapter_class
def method_missing(name, *args)
if respond_to_missing?(name)
- @a.each { |h| h.__send__(name, *args) if h.respond_to? name}
+ @adapters.each { |a| a.__send__(name, *args) if a.respond_to? name}
else
super
end
end
+
def respond_to_missing?(name, private=false); @methods.include?(name); end
def respond_to?(name, all=false) super || respond_to_missing?(name); end # For ruby < 1.9.2
end
- # Base adapter
+ # Base adapter for raw proton events
class Adapter
- def initialize(h)
- @handler = MultiHandler.maybe h
- end
+ def initialize(h) @handler = h; end
- def self.adapt(h)
- if h.respond_to? :proton_event_adapter
- a = h.proton_event_adapter
- a = a.new(h) if a.is_a? Class
- a
- else
- OldMessagingAdapter.new h
- end
- end
+ def adapter_class(h) nil; end # Adapters don't need adapting
- # Adapter is already an adapter
- def proton_event_adapter() self; end
+ # Create and return an adapter for handler, or return h if it does not need adapting.
+ def self.adapt(handler)
+ return unless handler
+ a = Array(handler)
+ h = (a.size == 1) ? a[0] : ArrayHandler.new(a)
+ a = h.respond_to?(:proton_adapter_class) ? h.proton_adapter_class.new(handler) : h
+ end
- def dispatch(method, *args)
+ def forward(method, *args)
(@handler.__send__(method, *args); true) if @handler.respond_to? method
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/lib/handler/messaging_adapter.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/handler/messaging_adapter.rb b/proton-c/bindings/ruby/lib/handler/messaging_adapter.rb
new file mode 100644
index 0000000..4f24d09
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/handler/messaging_adapter.rb
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# @private
+module Qpid::Proton
+ module Handler
+
+ # Adapt raw proton events to {MessagingHandler} events.
+ class MessagingAdapter < Adapter
+ def initialize handler
+ super
+ @opts = { :prefetch => 10, :auto_accept => true, :auto_settle => true,
+ :auto_open => true, :auto_close => true }
+ if handler.respond_to?(:options) && handler.options
+ @opts.update(handler.options)
+ handler.options.replace @opts
+ end
+ end
+
+ def delegate(method, *args)
+ forward(method, *args) or forward(:on_unhandled, method, *args)
+ end
+
+ def delegate_error(method, context)
+ unless forward(method, context) || forward(:on_error, context.condition)
+ forward(:on_unhandled, method, context)
+ # By default close the whole connection on an un-handled error
+ context.connection.close(context.condition) if @opts[:auto_close]
+ end
+ end
+
+ def on_container_start(container) delegate(:on_container_start, container); end
+ def on_container_stop(container) delegate(:on_container_stop, container); end
+
+ # Define repetative on_xxx_open/close methods for each endpoint type
+ def self.open_close(endpoint)
+ Module.new do
+ define_method(:"on_#{endpoint}_remote_open") do |event|
+ delegate(:"on_#{endpoint}_open", event.context)
+ event.context.open if @opts[:auto_open] && event.context.local_uninit?
+ end
+
+ define_method(:"on_#{endpoint}_remote_close") do |event|
+ delegate_error(:"on_#{endpoint}_error", event.context) if event.context.condition
+ delegate(:"on_#{endpoint}_close", event.context)
+ event.context.close if @opts[:auto_close] && event.context.local_active?
+ end
+ end
+ end
+ # Generate and include open_close modules for each endpoint type
+ # Using modules so we can override to extend the behavior later in the handler.
+ [:connection, :session, :link].each { |endpoint| include open_close(endpoint) }
+
+ def on_transport_error(event) delegate_error(:on_transport_error, event.context); end
+ def on_transport_closed(event) delegate(:on_transport_close, event.context); end
+
+ # Add flow control for link opening events
+ def on_link_local_open(event) add_credit(event.receiver); end
+ def on_link_remote_open(event) super; add_credit(event.receiver); end
+
+
+ def on_delivery(event)
+ if event.link.receiver? # Incoming message
+ d = event.delivery
+ if d.aborted?
+ delegate(:on_delivery_abort, d)
+ elsif d.complete?
+ if d.link.local_closed? && @opts[:auto_accept]
+ d.release # Auto release after close
+ else
+ begin
+ delegate(:on_message, d, d.message)
+ d.accept if @opts[:auto_accept] && !d.settled?
+ rescue Reject
+ d.reject unless d.settled?
+ rescue Release
+ d.release unless d.settled
+ end
+ end
+ end
+ delegate(:on_delivery_settle, d) if d.settled?
+ add_credit(event.receiver)
+ else # Outgoing message
+ t = event.tracker
+ case t.remote_state
+ when Delivery::ACCEPTED then delegate(:on_tracker_accept, t)
+ when Delivery::REJECTED then delegate(:on_tracker_reject, t)
+ when Delivery::RELEASED then delegate(:on_tracker_release, t)
+ when Delivery::MODIFIED then delegate(:on_tracker_modify, t)
+ end
+ delegate(:on_tracker_settle, t) if t.settled?
+ t.settle if @opts[:auto_settle]
+ end
+ end
+
+ def on_link_flow(event)
+ add_credit(event.receiver)
+ sender = event.sender
+ delegate(:on_sendable, sender) if sender && sender.open? && sender.credit > 0
+ end
+
+ def add_credit(r)
+ prefetch = @opts[:prefetch]
+ if r && r.open? && (r.drained == 0) && prefetch && (prefetch > r.credit)
+ r.flow(prefetch - r.credit)
+ end
+ end
+ end
+ end
+end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/lib/handler/messaging_handler.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/handler/messaging_handler.rb b/proton-c/bindings/ruby/lib/handler/messaging_handler.rb
index 0e94c17..e4e5e12 100644
--- a/proton-c/bindings/ruby/lib/handler/messaging_handler.rb
+++ b/proton-c/bindings/ruby/lib/handler/messaging_handler.rb
@@ -22,6 +22,10 @@ module Qpid::Proton
# @deprecated use {Qpid::Proton::MessagingHandler}
class MessagingHandler
+ # @private
+ def proton_adapter_class() Handler::ReactorMessagingAdapter; end
+
+
# @overload initialize(opts)
# Create a {MessagingHandler} with options +opts+
# @option opts [Integer] :prefetch (10)
@@ -46,6 +50,7 @@ module Qpid::Proton
# @overload initialize(prefetch=10, auto_accept=true, auto_settle=true, peer_close_is_error=false)
# @deprecated use +initialize(opts)+ overload
def initialize(*args)
+ Qpid.deprecated MessagingHandler, Qpid::Proton::MessagingHandler
@options = {}
if args.size == 1 && args[0].is_a?(Hash)
@options.replace(args[0])
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/lib/handler/old_messaging_adapter.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/handler/old_messaging_adapter.rb b/proton-c/bindings/ruby/lib/handler/old_messaging_adapter.rb
deleted file mode 100644
index c43dc8c..0000000
--- a/proton-c/bindings/ruby/lib/handler/old_messaging_adapter.rb
+++ /dev/null
@@ -1,151 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-# @private
-module Qpid::Proton
- module Handler
-
- # Adapter to convert raw proton events to old {Handler::MessagingHandler} events
- class OldMessagingAdapter < Adapter
- def initialize handler
- super
- @opts = (handler.options if handler.respond_to?(:options)) || {}
- @opts[:prefetch] ||= 10
- @opts[:peer_close_is_error] = false unless @opts.include? :peer_close_is_error
- [:auto_accept, :auto_settle, :auto_open, :auto_close].each do |k|
- @opts[k] = true unless @opts.include? k
- end
- end
-
- def delegate(method, event)
- event.method = method # Update the event with the new method
- event.dispatch(@handler) || dispatch(:on_unhandled, event)
- end
- def delegate_error(method, event)
- event.method = method
- unless event.dispatch(@handler) # Default behaviour if not dispatched
- dispatch(:on_error, event) || dispatch(:on_unhandled, event)
- event.connection.close event.context.condition # Close the connection by default
- end
- end
-
- # Define repetative on_xxx_open/close methods for each endpoint type
- def self.open_close(endpoint)
- on_opening = :"on_#{endpoint}_opening"
- on_opened = :"on_#{endpoint}_opened"
- on_closing = :"on_#{endpoint}_closing"
- on_closed = :"on_#{endpoint}_closed"
- on_error = :"on_#{endpoint}_error"
-
- Module.new do
- define_method(:"on_#{endpoint}_local_open") do |event|
- delegate(on_opened, event) if event.context.remote_open?
- end
-
- define_method(:"on_#{endpoint}_remote_open") do |event|
- if event.context.local_open?
- delegate(on_opened, event)
- elsif event.context.local_uninit?
- delegate(on_opening, event)
- event.context.open if @opts[:auto_open]
- end
- end
-
- define_method(:"on_#{endpoint}_local_close") do |event|
- delegate(on_closed, event) if event.context.remote_closed?
- end
-
- define_method(:"on_#{endpoint}_remote_close") do |event|
- if event.context.remote_condition
- delegate_error(on_error, event)
- elsif event.context.local_closed?
- delegate(on_closed, event)
- elsif @opts[:peer_close_is_error]
- Condition.assign(event.context.__send__(:_remote_condition), "unexpected peer close")
- delegate_error(on_error, event)
- else
- delegate(on_closing, event)
- end
- event.context.close if @opts[:auto_close]
- end
- end
- end
- # Generate and include open_close modules for each endpoint type
- [:connection, :session, :link].each { |endpoint| include open_close(endpoint) }
-
- def on_transport_error(event) delegate_error(:on_transport_error, event); end
- def on_transport_closed(event) delegate(:on_transport_closed, event); end
-
- # Add flow control for link opening events
- def on_link_local_open(event) super; add_credit(event); end
- def on_link_remote_open(event) super; add_credit(event); end
-
-
- def on_delivery(event)
- if event.link.receiver? # Incoming message
- d = event.delivery
- if d.aborted?
- delegate(:on_aborted, event)
- d.settle
- elsif d.complete?
- if d.link.local_closed? && @opts[:auto_accept]
- d.release
- else
- begin
- delegate(:on_message, event)
- d.accept if @opts[:auto_accept]
- rescue Qpid::Proton::Reject
- d.reject
- rescue Qpid::Proton::Release
- d.release(true)
- end
- end
- end
- delegate(:on_settled, event) if d.settled?
- add_credit(event)
- else # Outgoing message
- t = event.tracker
- if t.updated?
- case t.remote_state
- when Qpid::Proton::Delivery::ACCEPTED then delegate(:on_accepted, event)
- when Qpid::Proton::Delivery::REJECTED then delegate(:on_rejected, event)
- when Qpid::Proton::Delivery::RELEASED then delegate(:on_released, event)
- when Qpid::Proton::Delivery::MODIFIED then delegate(:on_modified, event)
- end
- delegate(:on_settled, event) if t.settled?
- t.settle if @opts[:auto_settle]
- end
- end
- end
-
- def on_link_flow(event)
- add_credit(event)
- l = event.link
- delegate(:on_sendable, event) if l.sender? && l.open? && l.credit > 0
- end
-
- def add_credit(event)
- r = event.receiver
- prefetch = @opts[:prefetch]
- if r && r.open? && (r.drained == 0) && prefetch && (prefetch > r.credit)
- r.flow(prefetch - r.credit)
- end
- end
- end
- end
-end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/lib/handler/reactor_messaging_adapter.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/handler/reactor_messaging_adapter.rb b/proton-c/bindings/ruby/lib/handler/reactor_messaging_adapter.rb
new file mode 100644
index 0000000..7acbbae
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/handler/reactor_messaging_adapter.rb
@@ -0,0 +1,158 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# @private
+module Qpid::Proton
+ module Handler
+
+ # Adapter to convert raw proton events for the old {Handler::MessagingHandler}
+ # used by the Reactor.
+ class ReactorMessagingAdapter < Adapter
+ def initialize handler
+ super
+ @opts = (handler.options if handler.respond_to?(:options)) || {}
+ @opts[:prefetch] ||= 10
+ @opts[:peer_close_is_error] = false unless @opts.include? :peer_close_is_error
+ [:auto_accept, :auto_settle, :auto_open, :auto_close].each do |k|
+ @opts[k] = true unless @opts.include? k
+ end
+ end
+
+ alias dispatch forward
+
+ def delegate(method, event)
+ event.method = method # Update the event with the new method
+ event.dispatch(@handler) or dispatch(:on_unhandled, event)
+ end
+
+ def delegate_error(method, event)
+ event.method = method
+ unless event.dispatch(@handler) || dispatch(:on_error, event)
+ dispatch(:on_unhandled, event)
+ event.connection.close(event.context.condition) if @opts[:auto_close]
+ end
+ end
+
+ def on_container_start(container) delegate(:on_start, Event.new(nil, nil, container)); end
+ def on_container_stop(container) delegate(:on_stop, Event.new(nil, nil, container)); end
+
+ # Define repetative on_xxx_open/close methods for each endpoint type
+ def self.open_close(endpoint)
+ on_opening = :"on_#{endpoint}_opening"
+ on_opened = :"on_#{endpoint}_opened"
+ on_closing = :"on_#{endpoint}_closing"
+ on_closed = :"on_#{endpoint}_closed"
+ on_error = :"on_#{endpoint}_error"
+
+ Module.new do
+ define_method(:"on_#{endpoint}_local_open") do |event|
+ delegate(on_opened, event) if event.context.remote_open?
+ end
+
+ define_method(:"on_#{endpoint}_remote_open") do |event|
+ if event.context.local_open?
+ delegate(on_opened, event)
+ elsif event.context.local_uninit?
+ delegate(on_opening, event)
+ event.context.open if @opts[:auto_open]
+ end
+ end
+
+ define_method(:"on_#{endpoint}_local_close") do |event|
+ delegate(on_closed, event) if event.context.remote_closed?
+ end
+
+ define_method(:"on_#{endpoint}_remote_close") do |event|
+ if event.context.remote_condition
+ delegate_error(on_error, event)
+ elsif event.context.local_closed?
+ delegate(on_closed, event)
+ elsif @opts[:peer_close_is_error]
+ Condition.assign(event.context.__send__(:_remote_condition), "unexpected peer close")
+ delegate_error(on_error, event)
+ else
+ delegate(on_closing, event)
+ end
+ event.context.close if @opts[:auto_close]
+ end
+ end
+ end
+ # Generate and include open_close modules for each endpoint type
+ [:connection, :session, :link].each { |endpoint| include open_close(endpoint) }
+
+ def on_transport_error(event) delegate_error(:on_transport_error, event); end
+ def on_transport_closed(event) delegate(:on_transport_closed, event); end
+
+ # Add flow control for link opening events
+ def on_link_local_open(event) super; add_credit(event); end
+ def on_link_remote_open(event) super; add_credit(event); end
+
+
+ def on_delivery(event)
+ if event.link.receiver? # Incoming message
+ d = event.delivery
+ if d.aborted?
+ delegate(:on_aborted, event)
+ d.settle
+ elsif d.complete?
+ if d.link.local_closed? && @opts[:auto_accept]
+ d.release
+ else
+ begin
+ delegate(:on_message, event)
+ d.accept if @opts[:auto_accept] && !d.settled?
+ rescue Qpid::Proton::Reject
+ d.reject
+ rescue Qpid::Proton::Release
+ d.release(true)
+ end
+ end
+ end
+ delegate(:on_settled, event) if d.settled?
+ add_credit(event)
+ else # Outgoing message
+ t = event.tracker
+ if t.updated?
+ case t.remote_state
+ when Qpid::Proton::Delivery::ACCEPTED then delegate(:on_accepted, event)
+ when Qpid::Proton::Delivery::REJECTED then delegate(:on_rejected, event)
+ when Qpid::Proton::Delivery::RELEASED then delegate(:on_released, event)
+ when Qpid::Proton::Delivery::MODIFIED then delegate(:on_modified, event)
+ end
+ delegate(:on_settled, event) if t.settled?
+ t.settle if @opts[:auto_settle]
+ end
+ end
+ end
+
+ def on_link_flow(event)
+ add_credit(event)
+ l = event.link
+ delegate(:on_sendable, event) if l.sender? && l.open? && l.credit > 0
+ end
+
+ def add_credit(event)
+ r = event.receiver
+ prefetch = @opts[:prefetch]
+ if r && r.open? && (r.drained == 0) && prefetch && (prefetch > r.credit)
+ r.flow(prefetch - r.credit)
+ end
+ end
+ end
+ end
+end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/lib/qpid_proton.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb
index 2d93454..922504f 100644
--- a/proton-c/bindings/ruby/lib/qpid_proton.rb
+++ b/proton-c/bindings/ruby/lib/qpid_proton.rb
@@ -85,21 +85,22 @@ require "core/transport"
require "core/url"
require "core/connection_driver"
-# Messenger API classes
-require "messenger/subscription"
-require "messenger/tracker_status"
-require "messenger/tracker"
-require "messenger/messenger"
-
-# Handler classes
+# Handlers and adapters
require "handler/adapter"
-require "handler/old_messaging_adapter"
-# Core classes that depend on Handler
+require "handler/messaging_adapter"
require "core/messaging_handler"
+
+# Main container class
require "core/container"
-require "core/connection_driver"
-# Backwards compatibility shims
+# DEPRECATED Backwards compatibility shims for Reactor API
+require "handler/reactor_messaging_adapter"
+require "handler/messaging_handler" # Keep original name for compatibility
require "reactor/container"
-require "handler/messaging_handler"
+
+# DEPRECATED Messenger API classes
+require "messenger/subscription"
+require "messenger/tracker_status"
+require "messenger/tracker"
+require "messenger/messenger"
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/lib/reactor/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/reactor/container.rb b/proton-c/bindings/ruby/lib/reactor/container.rb
index 8ce5ea6..a900d9e 100644
--- a/proton-c/bindings/ruby/lib/reactor/container.rb
+++ b/proton-c/bindings/ruby/lib/reactor/container.rb
@@ -16,58 +16,62 @@
# under the License.
-module Qpid::Proton::Reactor
+module Qpid::Proton
+ module Reactor
- # @deprecated use {Qpid::Proton::Container}
- class Container < Qpid::Proton::Container
+ # @deprecated use {Qpid::Proton::Container}
+ class Container < Qpid::Proton::Container
- private
- alias super_connect connect # Access to superclass method
+ private
+ alias super_connect connect # Access to superclass method
- public
+ public
- # @deprecated use {Qpid::Proton::Container}
- def initialize(handlers, opts=nil)
- Qpid.deprecated self.class, "Qpid::Proton::Container"
- super handlers || (opts && opts[:global_handler]), opts && opts[:container_id]
- end
+ # @deprecated use {Qpid::Proton::Container}
+ def initialize(handlers, opts=nil)
+ Qpid.deprecated self.class, "Qpid::Proton::Container"
+ h = handlers || (opts && opts[:global_handler])
+ id = opts && opts[:container_id]
+ super(h, id)
+ end
- alias container_id id
- alias global_handler handler
+ alias container_id id
+ alias global_handler handler
- def connect(opts=nil)
- url = opts && (opts[:url] || opts[:address])
- raise ::ArgumentError.new, "no :url or :address option provided" unless url
- super(url, opts)
- end
+ def connect(opts=nil)
+ url = opts && (opts[:url] || opts[:address])
+ raise ::ArgumentError.new, "no :url or :address option provided" unless url
+ super(url, opts)
+ end
- # @deprecated use {#connect} then {Connection#open_sender}
- def create_sender(context, opts=nil)
- c = context if context.is_a? Qpid::Proton::Connection
- unless c
- url = Qpid::Proton::uri context
- c = super_connect(url, opts)
- opts ||= {}
- opts[:target] ||= url.amqp_address
+ # @deprecated use {#connect} then {Connection#open_sender}
+ def create_sender(context, opts=nil)
+ c = context if context.is_a? Qpid::Proton::Connection
+ unless c
+ url = Qpid::Proton::uri context
+ c = super_connect(url, opts)
+ opts ||= {}
+ opts[:target] ||= url.amqp_address
+ end
+ c.open_sender opts
end
- c.open_sender opts
- end
- # @deprecated use {#connect} then {Connection#open_receiver}
- def create_receiver(context, opts=nil)
- c = context if context.is_a? Qpid::Proton::Connection
- unless c
- url = Qpid::Proton::uri context
- c = super_connect(url, opts)
- opts ||= {}
- opts[:source] ||= url.amqp_address
+ # @deprecated use {#connect} then {Connection#open_receiver}
+ def create_receiver(context, opts=nil)
+ c = context if context.is_a? Qpid::Proton::Connection
+ unless c
+ url = Qpid::Proton::uri context
+ c = super_connect(url, opts)
+ opts ||= {}
+ opts[:source] ||= url.amqp_address
+ end
+ c.open_receiver opts
end
- c.open_receiver opts
- end
- def listen(url, ssl_domain = nil)
- # TODO aconway 2017-11-29: ssl_domain
- super(url)
+ def listen(url, ssl_domain = nil)
+ # TODO aconway 2017-11-29: ssl_domain
+ super(url)
+ end
end
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/lib/types/array.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/types/array.rb b/proton-c/bindings/ruby/lib/types/array.rb
index d00a86b..4ea1d70 100644
--- a/proton-c/bindings/ruby/lib/types/array.rb
+++ b/proton-c/bindings/ruby/lib/types/array.rb
@@ -55,9 +55,14 @@ module Qpid::Proton
attr_reader :type
# @return [Object] Optional descriptor.
- def attr_reader() descriptor; end
+ attr_reader :descriptor
def inspect() "#{self.class.name}<#{type}>#{super}"; end
+
+ def <=>(x)
+ ret = [@type, @descriptor] <=> [x.type, x.descriptor]
+ ret == 0 ? super : ret
+ end
end
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/spec/array_spec.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/spec/array_spec.rb b/proton-c/bindings/ruby/spec/array_spec.rb
index e760877..ce73760 100644
--- a/proton-c/bindings/ruby/spec/array_spec.rb
+++ b/proton-c/bindings/ruby/spec/array_spec.rb
@@ -38,65 +38,41 @@ describe "The extended array type" do
expect(value).respond_to? :proton_described?
end
- it "raises an error when getting from a nil Data object" do
- expect {
- Array.proton_get(nil)
- }.must_raise
- end
-
it "raises an error when the current object is not a list" do
@data.string = random_string(128)
@data.rewind
expect {
- Array.proton_get(@data)
+ @data.list
}.must_raise(TypeError)
end
- it "does not have an array header when it's a simple list" do
- assert !@list.proton_described?
- end
-
it "can be put into a Data object as a list" do
- @list.proton_put(@data)
- result = Array.proton_get(@data)
+ @data.list= @list
+ result = @data.list
expect(result).must_equal(@list)
- expect(result.proton_array_header) == (nil)
- end
-
- it "has an array header when it's an AMQP array" do
- expect(@undescribed.proton_array_header).wont_be_nil
- expect(@described.proton_array_header).wont_be_nil
end
it "raises an error when the elements of an Array are dissimilar and is put into a Data object" do
- value = []
- value.proton_array_header = Qpid::Proton::Types::ArrayHeader.new(Qpid::Proton::Codec::INT)
+ value = Qpid::Proton::Types::UniformArray.new(Qpid::Proton::Codec::INT)
value << random_string(16)
-
expect {
- value.proton_put(@data)
+ @data << value
}.must_raise(TypeError)
end
it "can be put into a Data object as an undescribed array" do
- @undescribed.proton_put(@data)
- result = Array.proton_get(@data)
+ @data << @undescribed
+ result = @data.array
expect(result).is_a? Qpid::Proton::Types::UniformArray
expect(@undescribed).must_equal(result)
- expect(result.proton_array_header).wont_be_nil
- expect(result.proton_array_header).must_equal(@undescribed.proton_array_header)
- assert !result.proton_array_header.described?
end
it "can be put into a Data object as a described array" do
- @described.proton_put(@data)
- result = Array.proton_get(@data)
+ @data << @described
+ result = @data.array
expect(@described).must_equal(result)
expect(result).is_a? Qpid::Proton::Types::UniformArray
- expect(result.proton_array_header).wont_be_nil
- expect(result.proton_array_header).must_equal(@described.proton_array_header)
- expect(result.proton_array_header.described?).must_equal(true)
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/spec/spec_helper.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/spec/spec_helper.rb b/proton-c/bindings/ruby/spec/spec_helper.rb
index 2698bb9..809da52 100644
--- a/proton-c/bindings/ruby/spec/spec_helper.rb
+++ b/proton-c/bindings/ruby/spec/spec_helper.rb
@@ -83,30 +83,24 @@ end
# Generates a random array of a random type.
# Returns both the array and the type.
def random_array(length, described = false, description = nil)
- result = []
- type = rand(128) % 4
+ choice = rand(128) % 4
+ type = [Qpid::Proton::Types::INT,
+ Qpid::Proton::Types::STRING,
+ Qpid::Proton::Types::DOUBLE,
+ Qpid::Proton::Types::UUID][choice]
+ result = Qpid::Proton::Types::UniformArray.new(type)
+
low = rand(512)
high = rand(8192)
(0...length).each do |element|
case
- when type == 0 then result << rand(1024)
- when type == 1 then result << random_string(rand(128))
- when type == 2 then result << rand * (low - high).abs + low
- when type == 3 then result << SecureRandom.uuid
+ when choice == 0 then result << rand(1024)
+ when choice == 1 then result << random_string(rand(128))
+ when choice == 2 then result << rand * (low - high).abs + low
+ when choice == 3 then result << SecureRandom.uuid
end
end
-
- # create the array header
- case
- when type == 0 then type = Qpid::Proton::Codec::INT
- when type == 1 then type = Qpid::Proton::Codec::STRING
- when type == 2 then type = Qpid::Proton::Codec::DOUBLE
- when type == 3 then type = Qpid::Proton::Codec::UUID
- end
-
- result.proton_array_header = Qpid::Proton::Types::ArrayHeader.new(type, description)
-
return result
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/tests/old_examples/old_example_test.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/old_examples/old_example_test.rb b/proton-c/bindings/ruby/tests/old_examples/old_example_test.rb
index de8077c..e8b21d2 100755
--- a/proton-c/bindings/ruby/tests/old_examples/old_example_test.rb
+++ b/proton-c/bindings/ruby/tests/old_examples/old_example_test.rb
@@ -28,7 +28,7 @@ def make_url(port, path) "amqp://:#{port}/#{path}"; end
class OldExampleTest < MiniTest::Test
def run_script(*args)
- IO.popen [RbConfig.ruby, "-W0", *args];
+ IO.popen [RbConfig.ruby, *args];
end
def assert_output(want, args)
@@ -91,7 +91,7 @@ end
# Start the broker before all tests.
$port = unused_port
-$broker = IO.popen [RbConfig.ruby, "-W0", "broker.rb", "-a", ":#{$port}"]
+$broker = IO.popen [RbConfig.ruby, "broker.rb", "-a", ":#{$port}"]
$broker.readline # Wait for "Listening"
# Kill the broker after all tests
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/tests/test_adapter.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_adapter.rb b/proton-c/bindings/ruby/tests/test_adapter.rb
deleted file mode 100644
index 62ef109..0000000
--- a/proton-c/bindings/ruby/tests/test_adapter.rb
+++ /dev/null
@@ -1,227 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-require 'minitest/autorun'
-require 'qpid_proton'
-require 'test_tools'
-include Qpid::Proton
-
-# Records every call
-class AllHandler < MessagingHandler
- def initialize(*args)
- super(*args)
- @calls = []
- end
-
- attr_accessor :calls
-
- def names; @calls.map { |c| c[0] }; end
- def events; @calls.map { |c| c[1] }; end
-
- def method_missing(name, *args) (/^on_/ =~ name) ? (@calls << [name] + args) : super; end
- def respond_to_missing?(name, private=false); (/^on_/ =~ name); end
- def respond_to?(name, all=false) super || respond_to_missing?(name); end # For ruby < 1.9.2
-end
-
-# Tests with Mock handler that handles all methods, expect both old and new calls
-class TestOldHandler < Minitest::Test
- def setup
- @h = [AllHandler.new, AllHandler.new]
- @ch, @sh = *@h
- @d = DriverPair.new(*@h)
- end
-
- def clear; @d.each { |d| h = d.handler; h.calls.clear }; end
-
- def test_handler_defaults
- want = { :prefetch => 10, :auto_settle => true, :auto_accept => true, :auto_open => true, :auto_close => true, :peer_close_is_error => false }
- assert_equal want, @ch.options
- assert_equal want, @sh.options
- end
-
- def test_auto_open_close
- @d.client.connection.open; @d.client.connection.open_sender; @d.run
- assert_equal [:on_connection_opened, :on_session_opened, :on_link_opened, :on_sendable], @ch.names
- assert_equal [:on_connection_opening, :on_session_opening, :on_link_opening, :on_connection_opened, :on_session_opened, :on_link_opened], @sh.names
- clear
- @d.client.connection.close; @d.run
- assert_equal [:on_connection_closed, :on_transport_closed], @ch.names
- assert_equal [:on_connection_closing, :on_connection_closed, :on_transport_closed], @sh.names
- end
-
- def test_no_auto_open_close
- [:auto_close, :auto_open].each { |k| @ch.options[k] = @sh.options[k] = false }
- @d.client.connection.open; @d.run
- assert_equal [:on_connection_opening], @sh.names
- assert_equal [], @ch.names
- @d.server.connection.open; @d.run
- assert_equal [:on_connection_opened], @ch.names
- assert_equal [:on_connection_opening, :on_connection_opened], @sh.names
- clear
- @d.client.connection.session.open; @d.run
- assert_equal [:on_session_opening], @sh.names
- assert_equal [], @ch.names
- clear
- @d.client.connection.close;
- 3.times { @d.process }
- assert_equal [:on_connection_closing], @sh.names
- assert_equal [], @ch.names
- @d.server.connection.close; @d.run
- assert_equal [:on_connection_closed, :on_transport_closed], @ch.names
- assert_equal [:on_connection_closing, :on_connection_closed, :on_transport_closed], @sh.names
- end
-
- def test_transport_error
- @d.client.connection.open; @d.run
- clear
- @d.client.close "stop that"; @d.run
- assert_equal [:on_transport_closed], @ch.names
- assert_equal [:on_transport_error, :on_transport_closed], @sh.names
- assert_equal Condition.new("proton:io", "stop that (connection aborted)"), @d.client.transport.condition
- assert_equal Condition.new("amqp:connection:framing-error", "connection aborted"), @d.server.transport.condition
- end
-
- def test_connection_error
- @ch.options[:auto_open] = @sh.options[:auto_open] = false
- @d.client.connection.open; @d.run
- @d.server.connection.close "bad dog"; @d.run
- assert_equal [:on_connection_opened, :on_connection_error, :on_connection_closed, :on_transport_closed], @ch.names
- assert_equal "bad dog", @ch.calls[2][1].condition.description
- assert_equal [:on_connection_opening, :on_connection_closed, :on_transport_closed], @sh.names
- end
-
- def test_session_error
- @d.client.connection.open
- s = @d.client.connection.session; s.open; @d.run
- s.close "bad dog"; @d.run
- assert_equal [:on_connection_opened, :on_session_opened, :on_session_closed], @ch.names
- assert_equal [:on_connection_opening, :on_session_opening, :on_connection_opened, :on_session_opened, :on_session_error, :on_session_closed], @sh.names
- assert_equal "bad dog", @sh.calls[-3][1].condition.description
- end
-
- def test_link_error
- @d.client.connection.open
- s = @d.client.connection.open_sender; @d.run
- s.close "bad dog"; @d.run
- assert_equal [:on_connection_opened, :on_session_opened, :on_link_opened, :on_sendable, :on_link_closed], @ch.names
- assert_equal [:on_connection_opening, :on_session_opening, :on_link_opening,
- :on_connection_opened, :on_session_opened, :on_link_opened,
- :on_link_error, :on_link_closed], @sh.names
- assert_equal "bad dog", @sh.calls[-3][1].condition.description
- end
-
- def test_options_off
- off = {:prefetch => 0, :auto_settle => false, :auto_accept => false, :auto_open => false, :auto_close => false}
- @ch.options.replace(off)
- @sh.options.replace(off)
- @d.client.connection.open; @d.run
- assert_equal [[], [:on_connection_opening]], [@ch.names, @sh.names]
- @d.server.connection.open; @d.run
- assert_equal [[:on_connection_opened], [:on_connection_opening, :on_connection_opened]], [@ch.names, @sh.names]
- clear
- s = @d.client.connection.open_sender; @d.run
- assert_equal [[], [:on_session_opening, :on_link_opening]], [@ch.names, @sh.names]
- @sh.events[1].session.open
- r = @sh.events[1].link
- r.open; @d.run
- assert_equal [[:on_session_opened, :on_link_opened], [:on_session_opening, :on_link_opening, :on_session_opened, :on_link_opened]], [@ch.names, @sh.names]
- clear
- r.flow(1); @d.run
- assert_equal [[:on_sendable], []], [@ch.names, @sh.names]
- assert_equal 1, s.credit
- clear
- s.send Message.new("foo"); @d.run
- assert_equal [[], [:on_message]], [@ch.names, @sh.names]
- end
-
- def test_peer_close_is_error
- @ch.options[:peer_close_is_error] = true
- @d.client.connection.open; @d.run
- @d.server.connection.close; @d.run
- assert_equal [:on_connection_opened, :on_connection_error, :on_connection_closed, :on_transport_closed], @ch.names
- assert_equal [:on_connection_opening, :on_connection_opened, :on_connection_closed, :on_transport_closed], @sh.names
- end
-end
-
-# Test with real handlers that implement a few methods
-class TestOldUnhandled < Minitest::Test
-
- def test_message
- handler_class = Class.new(MessagingHandler) do
- def on_message(event) @message = event.message; end
- def on_accepted(event) @accepted = true; end
- attr_accessor :message, :accepted, :sender
- end
- d = DriverPair.new(handler_class.new, handler_class.new)
- d.client.connection.open;
- s = d.client.connection.open_sender; d.run
- assert_equal 10, s.credit # Default prefetch
- s.send(Message.new("foo")); d.run
- assert_equal "foo", d.server.handler.message.body
- assert d.client.handler.accepted
- end
-
- # Verify on_unhandled is called
- def test_unhandled
- handler_class = Class.new(MessagingHandler) do
- def initialize() super; @unhandled = []; end
- def on_unhandled(event) @unhandled << event.method; end
- attr_accessor :unhandled
- end
- d = DriverPair.new(handler_class.new, handler_class.new)
- d.client.connection.open; d.run
- assert_equal [:on_connection_opened], d.client.handler.unhandled
- assert_equal [:on_connection_opening, :on_connection_opened], d.server.handler.unhandled
- end
-
- # Verify on_error is called
- def test_on_error
- handler_class = Class.new(MessagingHandler) do
- def initialize() super; @error = []; @unhandled = []; end
- def on_error(event) @error << event.method; end
- def on_unhandled(event) @unhandled << event.method; end
- attr_accessor :error, :unhandled
- end
- d = DriverPair.new(handler_class.new, handler_class.new)
- d.client.connection.open
- r = d.client.connection.open_receiver; d.run
- r.close "oops"; d.run
- assert_equal [:on_connection_opened, :on_session_opened, :on_link_opened,
- :on_link_closed, :on_connection_closed, :on_transport_closed], d.client.handler.unhandled
- assert_equal [:on_connection_error], d.client.handler.error
- assert_equal [:on_connection_opening, :on_session_opening, :on_link_opening,
- :on_connection_opened, :on_session_opened, :on_link_opened, :on_sendable,
- :on_link_closed, :on_connection_closed, :on_transport_closed], d.server.handler.unhandled
- assert_equal [:on_link_error], d.server.handler.error
-
- end
-
- # Verify on_unhandled is called even for errors if there is no on_error
- def test_unhandled_error
- handler_class = Class.new(MessagingHandler) do
- def initialize() super; @unhandled = []; end
- def on_unhandled(event) @unhandled << event.method; end
- attr_accessor :unhandled
- end
- d = DriverPair.new(handler_class.new, handler_class.new)
- d.client.connection.open; d.run
- d.client.connection.close "oops"; d.run
- assert_equal [:on_connection_opened, :on_connection_closed, :on_transport_closed], d.client.handler.unhandled
- assert_equal [:on_connection_opening, :on_connection_opened, :on_connection_error, :on_connection_closed, :on_transport_closed], d.server.handler.unhandled
- end
-end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/tests/test_connection_driver.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_connection_driver.rb b/proton-c/bindings/ruby/tests/test_connection_driver.rb
index a9982b5..3386e49 100644
--- a/proton-c/bindings/ruby/tests/test_connection_driver.rb
+++ b/proton-c/bindings/ruby/tests/test_connection_driver.rb
@@ -19,32 +19,49 @@ require 'test_tools'
include Qpid::Proton
-class HandlerDriverTest < Minitest::Test
+# Test delivery of raw proton events
+
+class RawDriverTest < Minitest::Test
+
+ # Raw handler to record all on_xxx calls
+ class RecordingHandler
+ def initialize() @calls =[]; end
+ attr_reader :calls
+
+ def method_missing(name, *args) respond_to_missing?(name) ? @calls << name : super; end
+ def respond_to_missing?(name, private=false); (/^on_/ =~ name); end
+ def respond_to?(name, all=false) super || respond_to_missing?(name); end # For ruby < 1.9.2
+ end
def test_send_recv
- send_class = Class.new(MessagingHandler) do
- attr_reader :accepted
- def on_sendable(event) event.sender.send Message.new("foo"); end
- def on_accepted(event) event.connection.close; @accepted = true; end
+ send_class = Class.new do
+ attr_reader :outcome
+ def on_link_flow(event) event.sender.send Message.new("foo"); end
+ def on_delivery(event)
+ @outcome = event.delivery.state
+ event.connection.close;
+ end
end
- recv_class = Class.new(MessagingHandler) do
+ recv_class = Class.new do
attr_reader :message
- def on_link_opened(event) event.link.flow(1); event.link.open; end
- def on_message(event) @message = event.message; event.connection.close; end
+ def on_connection_remote_open(event) event.context.open; end
+ def on_session_remote_open(event) event.context.open; end
+ def on_link_remote_open(event) event.link.open; event.link.flow(1); end
+ def on_delivery(event) @message = event.message; event.delivery.accept; end
end
d = DriverPair.new(send_class.new, recv_class.new)
- d.client.connection.open(:container_id => "sender");
+ d.client.connection.open(:container_id => __method__);
d.client.connection.open_sender()
d.run
assert_equal(d.server.handler.message.body, "foo")
- assert(d.client.handler.accepted)
+ assert_equal Transfer::ACCEPTED, d.client.handler.outcome
end
def test_idle
- d = DriverPair.new(UnhandledHandler.new, UnhandledHandler.new)
+ d = DriverPair.new(RecordingHandler.new, RecordingHandler.new)
ms = 444
secs = Rational(ms, 1000) # Use rationals to keep it accurate
opts = {:idle_timeout => secs}
@@ -56,8 +73,8 @@ class HandlerDriverTest < Minitest::Test
start = Time.at(1) # Dummy timeline
tick = d.run start # Process all IO events
assert_equal(secs/4, tick - start)
- assert_equal [:on_connection_opened], d.client.handler.calls
- assert_equal [:on_connection_opening, :on_connection_opened], d.server.handler.calls
+ assert_equal [:on_connection_init, :on_connection_local_open, :on_connection_bound], d.client.handler.calls
+ assert_equal [:on_connection_init, :on_connection_bound, :on_connection_remote_open, :on_transport], d.server.handler.calls
assert_equal (ms), d.client.transport.idle_timeout
assert_equal (ms/2), d.server.transport.remote_idle_timeout # proton changes the value
assert_equal (secs/2), d.server.connection.idle_timeout
@@ -67,6 +84,6 @@ class HandlerDriverTest < Minitest::Test
d.run(start + secs - 0.001) # Should nothing, timeout not reached
assert_equal [[],[]], d.collect { |x| x.handler.calls }
d.run(start + secs*2) # After 2x timeout, connections should close
- assert_equal [[:on_transport_error, :on_transport_closed], [:on_connection_error, :on_connection_closed, :on_transport_closed]], d.collect { |x| x.handler.calls }
+ assert_equal [[:on_transport_error, :on_transport_tail_closed, :on_transport_head_closed, :on_transport_closed], [:on_connection_remote_close, :on_transport_tail_closed, :on_transport_head_closed, :on_transport_closed]], d.collect { |x| x.handler.calls }
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a13bc2b9/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index bfa40b1..044ebf4 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -20,92 +20,79 @@ require 'test_tools'
require 'minitest/unit'
require 'socket'
-Message = Qpid::Proton::Message
-SASL = Qpid::Proton::SASL
-Disposition = Qpid::Proton::Disposition
-
-# Easier debugging of thread problems
-Thread::abort_on_exception=true
-
# Container that listens on a random port
-class TestContainer < Container
+class TestContainer < Qpid::Proton::Container
def initialize(handler, lopts=nil, id=nil)
super handler, id
- @server = TCPServer.open(0)
- @listener = listen_io(@server, ListenOnceHandler.new(lopts))
+ @listener = listen_io(TCPServer.open(0), ListenOnceHandler.new(lopts))
end
- def port() @server.addr[1]; end
- def url() "amqp://:#{port}"; end
+ def port() @listener.to_io.addr[1]; end
+ def url() "amqp://:#{port}"; end#
end
class ContainerTest < Minitest::Test
+ include Qpid::Proton
def test_simple()
- sh = Class.new(MessagingHandler) do
+ send_handler = Class.new(MessagingHandler) do
attr_reader :accepted, :sent
- def on_sendable(e)
- e.link.send Message.new("foo") unless @sent
+ def on_sendable(sender)
+ sender.send Message.new("foo") unless @sent
@sent = true
end
- def on_accepted(e)
+ def on_tracker_accept(tracker)
@accepted = true
- e.connection.close
+ tracker.connection.close
end
end.new
- rh = Class.new(MessagingHandler) do
+ receive_handler = Class.new(MessagingHandler) do
attr_reader :message, :link
- def on_link_opening(e)
- @link = e.link
- e.link.open
- e.link.flow(1)
+ def on_link_open(link)
+ @link = link
+ @link.open
+ @link.flow(1)
end
- def on_message(e)
- @message = e.message;
- e.delivery.update Disposition::ACCEPTED
- e.delivery.settle
+ def on_message(delivery, message)
+ @message = message;
+ delivery.update Disposition::ACCEPTED
+ delivery.settle
end
end.new
- c = TestContainer.new(rh, {}, __method__)
- c.connect(c.url, {:handler => sh}).open_sender({:name => "testlink"})
+ c = TestContainer.new(receive_handler, {}, __method__)
+ c.connect(c.url, {:handler => send_handler}).open_sender({:name => "testlink"})
c.run
- assert sh.accepted
- assert_equal "testlink", rh.link.name
- assert_equal "foo", rh.message.body
- assert_equal "test_simple", rh.link.connection.container_id
+ assert send_handler.accepted
+ assert_equal "testlink", receive_handler.link.name
+ assert_equal "foo", receive_handler.message.body
+ assert_equal "test_simple", receive_handler.link.connection.container_id
end
class CloseOnOpenHandler < TestHandler
- def on_connection_opened(e) e.connection.close; end
- end
-
- def test_multi_handler
- handler_class = Class.new(CloseOnOpenHandler) do
- @@opened = 0
- def self.opened; @@opened; end
- def on_connection_opened(e) @@opened += 1; super; end
- end
- hs = 3.times.collect { handler_class.new }
- c = TestContainer.new(hs, {}, __method__)
- c.connect(c.url)
- c.run
- assert_equal 6, handler_class.opened # Opened at each end * 3 handlers
+ def on_connection_open(c) c.close; end
end
def test_auto_stop_one
# A listener and a connection
- c = Container.new(__method__)
+ start_stop_handler = Class.new do
+ def on_container_start(c) @start = c; end
+ def on_container_stop(c) @stop = c; end
+ attr_reader :start, :stop
+ end.new
+ c = Container.new(start_stop_handler, __method__)
threads = 3.times.collect { Thread.new { c.run } }
sleep(0.01) while c.running < 3
+ assert_equal c, start_stop_handler.start
l = c.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new}))
c.connect("amqp://:#{l.to_io.addr[1]}", { :handler => CloseOnOpenHandler.new} )
threads.each { |t| assert t.join(1) }
+ assert_equal c, start_stop_handler.stop
assert_raises(Container::StoppedError) { c.run }
end
@@ -113,8 +100,8 @@ class ContainerTest < Minitest::Test
# Connect between different containers
c1, c2 = Container.new("#{__method__}-1"), Container.new("#{__method__}-2")
threads = [ Thread.new {c1.run }, Thread.new {c2.run } ]
- l = c1.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new}))
- c2.connect("amqp://:#{l.to_io.addr[1]}", { :handler => CloseOnOpenHandler.new} )
+ l = c2.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new}))
+ c1.connect(l.url, { :handler => CloseOnOpenHandler.new} )
assert threads.each { |t| t.join(1) }
assert_raises(Container::StoppedError) { c1.run }
assert_raises(Container::StoppedError) { c2.connect("") }
@@ -155,7 +142,6 @@ class ContainerTest < Minitest::Test
conn = c.connect("amqp://:#{l.to_io.addr[1]}")
c.stop
assert c.stopped
-
threads.each { |t| assert t.join(1) }
assert_raises(Container::StoppedError) { c.run }
@@ -167,6 +153,7 @@ end
class ContainerSASLTest < Minitest::Test
+ include Qpid::Proton
# Handler for test client/server that sets up server and client SASL options
class SASLHandler < TestHandler
@@ -176,24 +163,25 @@ class ContainerSASLTest < Minitest::Test
@url, @opts = url, opts
end
- def on_start(e)
- @client = e.container.connect("#{@url}:#{e.container.port}", @opts)
+ def on_container_start(container)
+ @client = container.connect("#{@url}:#{container.port}", @opts)
end
attr_reader :auth_user
- def on_connection_opened(e)
+ def on_connection_open(connection)
super
- if e.connection == @client
- e.connection.close
+ if connection == @client
+ connection.close
else
- @auth_user = e.transport.sasl.user
+ @auth_user = connection.transport.sasl.user
end
end
end
# Generate SASL server configuration files and database, initialize proton SASL
class SASLConfig
+ include Qpid::Proton
attr_reader :conf_dir, :conf_file, :conf_name, :database
def initialize()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org