You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/11/30 21:42:33 UTC
[02/12] qpid-proton git commit: PROTON-1064: [ruby] Event handling
refactor
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/handler/flow_controller.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/handler/flow_controller.rb b/proton-c/bindings/ruby/lib/handler/flow_controller.rb
new file mode 100644
index 0000000..38d925f
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/handler/flow_controller.rb
@@ -0,0 +1,40 @@
+#--
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#++
+
+# @private
+module Qpid::Proton::Handler
+
+ # Mixin to establish automatic flow control for a prefetch window
+ # Uses {#@prefetch}
+ #
+ module FlowController
+
+ def on_link_local_open(event) topup(event); super; end
+ def on_link_remote_open(event) topup(event); super; end
+ def on_delivery(event) topup(event); super; end
+ def on_link_flow(event) topup(event); super; end
+
+ def add_credit(event)
+ r = event.receiver
+ if r && r.open? && (r.drained == 0) && @handler.prefetch && (@handler.prefetch > r.credit)
+ r.flow(@handler.prefetch - r.credit)
+ end
+ end
+ end
+end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/handler/incoming_message_handler.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/handler/incoming_message_handler.rb b/proton-c/bindings/ruby/lib/handler/incoming_message_handler.rb
index db0f6db..9f34d0d 100644
--- a/proton-c/bindings/ruby/lib/handler/incoming_message_handler.rb
+++ b/proton-c/bindings/ruby/lib/handler/incoming_message_handler.rb
@@ -17,19 +17,18 @@
# under the License.
#++
+# @private
module Qpid::Proton::Handler
+ private
# A utility for simpler and more intuitive handling of delivery events
# related to incoming messages.
#
- class IncomingMessageHandler
-
- def initialize(auto_accept = true, delegate = nil)
- @delegate = delegate
- @auto_accept = auto_accept
- end
-
+ # uses @auto_accept
+ #
+ module IncomingMessageHandler
def on_delivery(event)
+ super
delivery = event.delivery
return unless delivery.link.receiver?
if delivery.readable? && !delivery.partial?
@@ -37,38 +36,20 @@ module Qpid::Proton::Handler
m.receive(delivery)
event.message = m
if event.link.local_closed?
- if @auto_accept
- delivery.update(Qpid::Proton::Disposition::RELEASED)
- delivery.settle
- end
+ delivery.settle Qpid::Proton::Delivery::RELEASED if @auto_accept
else
begin
self.on_message(event)
- if @auto_accept
- delivery.update(Qpid::Proton::Disposition::ACCEPTED)
- delivery.settle
- end
+ delivery.settle Qpid::Proton::Delivery::ACCEPTED if @auto_accept
rescue Qpid::Proton::Reject
- delivery.update(Qpid::Proton::Disposition::REJECTED)
- delivery.settle
+ delivery.settle REJECTED
rescue Qpid::Proton::Release
- delivery.update(Qpid::Proton::Disposition::MODIFIED)
- delivery.settle
+ delivery.settle MODIFIED
end
end
elsif delivery.updated? && delivery.settled?
self.on_settled(event)
end
end
-
- def on_message(event)
- Qpid::Proton::Event.dispatch(@delegate, :on_message, event) if !@delegate.nil?
- end
-
- def on_settled(event)
- Qpid::Proton::Event.dispatch(@delegate, :on_settled, event) if !@delegate.nil?
- end
-
end
-
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/handler/outgoing_message_handler.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/handler/outgoing_message_handler.rb b/proton-c/bindings/ruby/lib/handler/outgoing_message_handler.rb
index ee875b6..cedcead 100644
--- a/proton-c/bindings/ruby/lib/handler/outgoing_message_handler.rb
+++ b/proton-c/bindings/ruby/lib/handler/outgoing_message_handler.rb
@@ -22,20 +22,18 @@ module Qpid::Proton::Handler
# A utility for simpler and more intuitive handling of delivery events
# related to outgoing messages.
#
- class OutgoingMessageHandler
-
- def initialize(auto_settle = true, delegate = nil)
- @auto_settle = auto_settle
- @delegate = delegate
- end
+ # Uses {#@auto_settle}
+ module OutgoingMessageHandler
def on_link_flow(event)
+ super
self.on_sendable(event) if event.link.sender? && event.link.credit > 0 &&
(event.link.state & Qpid::Proton::Endpoint::LOCAL_ACTIVE) &&
(event.link.state & Qpid::Proton::Endpoint::REMOTE_ACTIVE)
end
def on_delivery(event)
+ super
delivery = event.delivery
if delivery.link.sender? && delivery.updated?
if delivery.remote_accepted?
@@ -49,52 +47,5 @@ module Qpid::Proton::Handler
delivery.settle if @auto_settle
end
end
-
- # Called when the sender link has credit and messages and be transferred.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_sendable(event)
- Qpid::Proton::Event.dispatch(@delegate, :on_sendable, event) if !@delegate.nil?
- end
-
- # Called when the remote peer accepts a sent message.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_accepted(event)
- Qpid::Proton::Event.dispatch(@delegate, :on_accepted, event) if !@delegate.nil?
- end
-
- # Called when the remote peer rejects a sent message.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_rejected(event)
- Qpid::Proton::Event.dispatch(@delegate, :on_rejected, event) if !@delegate.nil?
- end
-
- # Called when the remote peer releases an outgoing message.
- #
- # Note that this may be in resposnse to either the REELAASE or MODIFIED
- # state as defined by the AMQP specification.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_released(event)
- Qpid::Proton::Event.dispatch(@delegate, :on_released, event) if !@delegate.nil?
- end
-
- # Called when the remote peer has settled the outgoing message.
- #
- # This is the point at which it should never be retransmitted.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_settled(event)
- Qpid::Proton::Event.dispatch(@delegate, :on_settled, event) if !@delegate.nil?
- end
-
end
-
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/handler/wrapped_handler.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/handler/wrapped_handler.rb b/proton-c/bindings/ruby/lib/handler/wrapped_handler.rb
deleted file mode 100644
index 6d55dee..0000000
--- a/proton-c/bindings/ruby/lib/handler/wrapped_handler.rb
+++ /dev/null
@@ -1,76 +0,0 @@
-#--
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#++
-
-module Qpid::Proton::Handler
-
- class WrappedHandler
-
- # @private
- include Qpid::Proton::Util::Wrapper
-
- def self.wrap(impl, on_error = nil)
- return nil if impl.nil?
-
- result = self.fetch_instance(impl) || WrappedHandler.new(impl)
- result.on_error = on_error
- return result
- end
-
- include Qpid::Proton::Util::Handler
-
- def initialize(impl_or_constructor)
- if impl_or_constructor.is_a?(Method)
- @impl = impl_or_constructor.call
- else
- @impl = impl_or_constructor
- Cproton.pn_incref(@impl)
- end
- @on_error = nil
- self.class.store_instance(self)
- end
-
- def add(handler)
- return if handler.nil?
-
- impl = chandler(handler, self.method(:_on_error))
- Cproton.pn_handler_add(@impl, impl)
- Cproton.pn_decref(impl)
- end
-
- def clear
- Cproton.pn_handler_clear(@impl)
- end
-
- def on_error=(on_error)
- @on_error = on_error
- end
-
- private
-
- def _on_error(info)
- if self.has?['on_error']
- self['on_error'].call(info)
- else
- raise info
- end
- end
-
- end
-
-end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/qpid_proton.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb
index b47b863..d3040a0 100644
--- a/proton-c/bindings/ruby/lib/qpid_proton.rb
+++ b/proton-c/bindings/ruby/lib/qpid_proton.rb
@@ -21,10 +21,10 @@ require "cproton"
require "date"
require "weakref"
-if RUBY_VERSION < "1.9"
-require "kconv"
-else
+begin
require "securerandom"
+rescue LoadError
+ require "kconv" # Ruby < 1.9
end
DEPRECATION = "[DEPRECATION]"
@@ -41,9 +41,7 @@ require "util/version"
require "util/error_handler"
require "util/swig_helper"
require "util/wrapper"
-require "util/class_wrapper"
require "util/timeout"
-require "util/handler"
# Types
require "types/strings"
@@ -55,14 +53,9 @@ require "types/described"
require "codec/mapping"
require "codec/data"
-# Event API classes
-require "event/event_type"
-require "event/event_base"
-require "event/event"
-require "event/collector"
-
# Main Proton classes
require "core/condition"
+require "core/event"
require "core/uri"
require "core/message"
require "core/endpoint"
@@ -89,12 +82,11 @@ require "messenger/tracker"
require "messenger/messenger"
# Handler classes
-require "handler/c_adaptor"
-require "handler/wrapped_handler"
require "handler/endpoint_state_handler"
require "handler/incoming_message_handler"
require "handler/outgoing_message_handler"
-require "handler/c_flow_controller"
+require "handler/flow_controller"
+require "handler/adapter"
# Core classes that depend on Handler
require "core/messaging_handler"
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/util/class_wrapper.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/class_wrapper.rb b/proton-c/bindings/ruby/lib/util/class_wrapper.rb
deleted file mode 100644
index 134f655..0000000
--- a/proton-c/bindings/ruby/lib/util/class_wrapper.rb
+++ /dev/null
@@ -1,52 +0,0 @@
-#--
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#++
-
-module Qpid::Proton::Util
-
- # This mixin provides a method for mapping from an underlying Proton
- # C library class to a Ruby class.
- #
- # @private
- #
- module ClassWrapper
-
- WRAPPERS =
- {
- "pn_void" => proc {|x| Cproton.pn_void2rb(x)},
- "pn_rbref" => proc {|x| Cproton.pn_void2rb(x)},
- "pn_connection" => proc {|x| Qpid::Proton::Connection.wrap(Cproton.pn_cast_pn_connection(x))},
- "pn_session" => proc {|x| Qpid::Proton::Session.wrap(Cproton.pn_cast_pn_session(x))},
- "pn_link" => proc {|x| Qpid::Proton::Link.wrap(Cproton.pn_cast_pn_link(x))},
- "pn_delivery" => proc {|x| Qpid::Proton::Delivery.wrap(Cproton.pn_cast_pn_delivery(x))},
- "pn_transport" => proc {|x| Qpid::Proton::Transport.wrap(Cproton.pn_cast_pn_transport(x))},
- "pn_selectable" => proc {|x| Qpid::Proton::Selectable.wrap(Cproton.pn_cast_pn_selectable(x))},
- }
-
- def class_wrapper(clazz, c_impl, &block)
- proc_func = WRAPPERS[clazz]
- if !proc_func.nil?
- proc_func.yield(c_impl)
- elsif block_given?
- yield(c_impl)
- end
- end
-
- end
-
-end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/util/handler.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/handler.rb b/proton-c/bindings/ruby/lib/util/handler.rb
deleted file mode 100644
index e7d07b1..0000000
--- a/proton-c/bindings/ruby/lib/util/handler.rb
+++ /dev/null
@@ -1,41 +0,0 @@
-#--
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#++
-
-module Qpid::Proton::Util
-
- # @private
- module Handler
-
- def chandler(handler, on_error)
- return nil if handler.nil?
-
- if handler.instance_of?(Qpid::Proton::Handler::WrappedHandler)
- impl = handler.impl
- Cproton.pn_incref(impl)
- return impl
- else
- cadaptor = Qpid::Proton::Handler::CAdaptor.new(handler, on_error)
- rbhandler = Cproton.pn_rbhandler(cadaptor)
- return rbhandler
- end
- end
-
- end
-
-end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/tests/test_adapter.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_adapter.rb b/proton-c/bindings/ruby/tests/test_adapter.rb
new file mode 100644
index 0000000..77aee76
--- /dev/null
+++ b/proton-c/bindings/ruby/tests/test_adapter.rb
@@ -0,0 +1,227 @@
+#--
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#++
+
+require 'minitest/autorun'
+require 'qpid_proton'
+require 'test_tools'
+include Qpid::Proton
+
+# Tests with Mock handler that handles all methods.
+class TestAllHandler < Minitest::Test
+
+ class AllHandler < MessagingHandler
+ def initialize(*args)
+ super(*args)
+ @calls = []
+ end
+
+ attr_accessor :calls
+
+ def names; @calls.map { |c| c[0] }; end
+ def events; @calls.map { |c| c[1] }; end
+
+ def method_missing(name, *args) (/^on_/ =~ name) ? (@calls << [name] + args) : super; end
+ def respond_to_missing?(name, private=false); (/^on_/ =~ name); end
+ def respond_to?(name, all=false) super || respond_to_missing?(name); end # For ruby < 1.9.2
+ end
+
+ def setup
+ @h = [AllHandler.new, AllHandler.new]
+ @ch, @sh = *@h
+ @d = DriverPair.new(*@h)
+ end
+
+ def clear; @d.each { |d| h = d.handler; h.calls.clear }; end
+
+ def test_handler_defaults
+ want = { :prefetch => 10, :auto_settle => true, :auto_accept => true, :auto_open => true, :auto_close => true, :peer_close_is_error => false }
+ assert_equal want, @ch.options
+ assert_equal want, @sh.options
+ end
+
+ def test_auto_open_close
+ @d.client.connection.open; @d.client.connection.open_sender; @d.run
+ assert_equal [:on_connection_opened, :on_session_opened, :on_link_opened, :on_sendable], @ch.names
+ assert_equal [:on_connection_opening, :on_session_opening, :on_link_opening, :on_connection_opened, :on_session_opened, :on_link_opened], @sh.names
+ clear
+ @d.client.connection.close; @d.run
+ assert_equal [:on_connection_closed, :on_transport_closed], @ch.names
+ assert_equal [:on_connection_closing, :on_connection_closed, :on_transport_closed], @sh.names
+ end
+
+ def test_no_auto_open_close
+ [:auto_close, :auto_open].each { |k| @ch.options[k] = @sh.options[k] = false }
+ @d.client.connection.open; @d.run
+ assert_equal [:on_connection_opening], @sh.names
+ assert_equal [], @ch.names
+ @d.server.connection.open; @d.run
+ assert_equal [:on_connection_opened], @ch.names
+ assert_equal [:on_connection_opening, :on_connection_opened], @sh.names
+ clear
+ @d.client.connection.session.open; @d.run
+ assert_equal [:on_session_opening], @sh.names
+ assert_equal [], @ch.names
+ clear
+ @d.client.connection.close; @d.run
+ assert_equal [:on_connection_closing], @sh.names
+ assert_equal [], @ch.names
+ @d.server.connection.close; @d.run
+ assert_equal [:on_connection_closed, :on_transport_closed], @ch.names
+ assert_equal [:on_connection_closing, :on_connection_closed, :on_transport_closed], @sh.names
+ end
+
+ def test_transport_error
+ @d.client.connection.open; @d.run
+ clear
+ @d.client.close "stop that"; @d.run
+ assert_equal [:on_transport_closed], @ch.names
+ assert_equal [:on_transport_error, :on_transport_closed], @sh.names
+ assert_equal Condition.new("proton:io", "stop that (connection aborted)"), @d.client.transport.condition
+ assert_equal Condition.new("amqp:connection:framing-error", "connection aborted"), @d.server.transport.condition
+ end
+
+ def test_connection_error
+ @ch.options[:auto_open] = @sh.options[:auto_open] = false
+ @d.client.connection.open; @d.run
+ @d.server.connection.close "bad dog"; @d.run
+ assert_equal [:on_connection_opened, :on_connection_error, :on_connection_closed, :on_transport_closed], @ch.names
+ assert_equal "bad dog", @ch.calls[2][1].condition.description
+ assert_equal [:on_connection_opening, :on_connection_closed, :on_transport_closed], @sh.names
+ end
+
+ def test_session_error
+ @d.client.connection.open
+ s = @d.client.connection.session; s.open; @d.run
+ s.close "bad dog"; @d.run
+ assert_equal [:on_connection_opened, :on_session_opened, :on_session_closed], @ch.names
+ assert_equal [:on_connection_opening, :on_session_opening, :on_connection_opened, :on_session_opened, :on_session_error, :on_session_closed], @sh.names
+ assert_equal "bad dog", @sh.calls[-3][1].condition.description
+ end
+
+ def test_link_error
+ @d.client.connection.open
+ s = @d.client.connection.open_sender; @d.run
+ s.close "bad dog"; @d.run
+ assert_equal [:on_connection_opened, :on_session_opened, :on_link_opened, :on_sendable, :on_link_closed], @ch.names
+ assert_equal [:on_connection_opening, :on_session_opening, :on_link_opening,
+ :on_connection_opened, :on_session_opened, :on_link_opened,
+ :on_link_error, :on_link_closed], @sh.names
+ assert_equal "bad dog", @sh.calls[-3][1].condition.description
+ end
+
+ def test_options_off
+ off = {:prefetch => 0, :auto_settle => false, :auto_accept => false, :auto_open => false, :auto_close => false}
+ @ch.options.replace(off)
+ @sh.options.replace(off)
+ @d.client.connection.open; @d.run
+ assert_equal [[], [:on_connection_opening]], [@ch.names, @sh.names]
+ @d.server.connection.open; @d.run
+ assert_equal [[:on_connection_opened], [:on_connection_opening, :on_connection_opened]], [@ch.names, @sh.names]
+ clear
+ s = @d.client.connection.open_sender; @d.run
+ assert_equal [[], [:on_session_opening, :on_link_opening]], [@ch.names, @sh.names]
+ @sh.events[1].session.open
+ r = @sh.events[1].link
+ r.open; @d.run
+ assert_equal [[:on_session_opened, :on_link_opened], [:on_session_opening, :on_link_opening, :on_session_opened, :on_link_opened]], [@ch.names, @sh.names]
+ clear
+ r.flow(1); @d.run
+ assert_equal [[:on_sendable], []], [@ch.names, @sh.names]
+ assert_equal 1, s.credit
+ clear
+ s.send Message.new("foo"); @d.run
+ assert_equal [[], [:on_message]], [@ch.names, @sh.names]
+ end
+
+ def test_peer_close_is_error
+ @ch.options[:peer_close_is_error] = true
+ @d.client.connection.open; @d.run
+ @d.server.connection.close; @d.run
+ assert_equal [:on_connection_opened, :on_connection_error, :on_connection_closed, :on_transport_closed], @ch.names
+ assert_equal [:on_connection_opening, :on_connection_opened, :on_connection_closed, :on_transport_closed], @sh.names
+ end
+end
+
+# Test with real handlers that implement a few methods
+class TestUnhandled < Minitest::Test
+
+ def test_message
+ handler_class = Class.new(MessagingHandler) do
+ def on_message(event) @message = event.message; end
+ def on_accepted(event) @accepted = true; end
+ attr_accessor :message, :accepted, :sender
+ end
+ d = DriverPair.new(handler_class.new, handler_class.new)
+ d.client.connection.open;
+ s = d.client.connection.open_sender; d.run
+ assert_equal 10, s.credit # Default prefetch
+ s.send(Message.new("foo")); d.run
+ assert_equal "foo", d.server.handler.message.body
+ assert d.client.handler.accepted
+ end
+
+ # Verify on_unhandled is called
+ def test_unhandled
+ handler_class = Class.new(MessagingHandler) do
+ def initialize() super; @unhandled = []; end
+ def on_unhandled(event) @unhandled << event.method; end
+ attr_accessor :unhandled
+ end
+ d = DriverPair.new(handler_class.new, handler_class.new)
+ d.client.connection.open; d.run
+ assert_equal [:on_connection_opened], d.client.handler.unhandled
+ assert_equal [:on_connection_opening, :on_connection_opened], d.server.handler.unhandled
+ end
+
+ # Verify on_error is called
+ def test_on_error
+ handler_class = Class.new(MessagingHandler) do
+ def initialize() super; @error = []; @unhandled = []; end
+ def on_error(event) @error << event.method; end
+ def on_unhandled(event) @unhandled << event.method; end
+ attr_accessor :error, :unhandled
+ end
+ d = DriverPair.new(handler_class.new, handler_class.new)
+ d.client.connection.open
+ r = d.client.connection.open_receiver; d.run
+ r.close "oops"; d.run
+ assert_equal [:on_connection_opened, :on_session_opened, :on_link_opened,
+ :on_link_closed, :on_connection_closed, :on_transport_closed], d.client.handler.unhandled
+ assert_equal [:on_connection_error], d.client.handler.error
+ assert_equal [:on_connection_opening, :on_session_opening, :on_link_opening,
+ :on_connection_opened, :on_session_opened, :on_link_opened, :on_sendable,
+ :on_link_closed, :on_connection_closed, :on_transport_closed], d.server.handler.unhandled
+ assert_equal [:on_link_error], d.server.handler.error
+
+ end
+
+ # Verify on_unhandled is called even for errors if there is no on_error
+ def test_unhandled_error
+ handler_class = Class.new(MessagingHandler) do
+ def initialize() super; @unhandled = []; end
+ def on_unhandled(event) @unhandled << event.method; end
+ attr_accessor :unhandled
+ end
+ d = DriverPair.new(handler_class.new, handler_class.new)
+ d.client.connection.open; d.run
+ d.client.connection.close "oops"; d.run
+ assert_equal [:on_connection_opened, :on_connection_closed, :on_transport_closed], d.client.handler.unhandled
+ assert_equal [:on_connection_opening, :on_connection_opened, :on_connection_error, :on_connection_closed, :on_transport_closed], d.server.handler.unhandled
+ end
+end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/tests/test_connection_driver.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_connection_driver.rb b/proton-c/bindings/ruby/tests/test_connection_driver.rb
index 8ce9fe8..f12076f 100644
--- a/proton-c/bindings/ruby/tests/test_connection_driver.rb
+++ b/proton-c/bindings/ruby/tests/test_connection_driver.rb
@@ -40,7 +40,7 @@ class HandlerDriverTest < Minitest::Test
end
sender = HandlerDriver.new(@sockets[0], send_class.new)
- sender.connection.open();
+ sender.connection.open(:container_id => "sender");
sender.connection.open_sender()
receiver = HandlerDriver.new(@sockets[1], recv_class.new)
drivers = [sender, receiver]
@@ -58,14 +58,16 @@ class HandlerDriverTest < Minitest::Test
end
def test_idle
- idle_class = Class.new(MessagingHandler) do
- def on_connection_bound(event) event.transport.idle_timeout = 10; end
- end
- drivers = [HandlerDriver.new(@sockets[0], idle_class.new), HandlerDriver.new(@sockets[1], nil)]
- drivers[0].connection.open()
+ drivers = [HandlerDriver.new(@sockets[0], nil), HandlerDriver.new(@sockets[1], nil)]
+ opts = {:idle_timeout=>10}
+ drivers[0].transport.apply(opts)
+ assert_equal 10, drivers[0].transport.idle_timeout
+ drivers[0].connection.open(opts)
+ drivers[1].transport.set_server
now = Time.now
drivers.each { |d| d.process(now) } until drivers[0].connection.open?
assert_equal(10, drivers[0].transport.idle_timeout)
+ assert_equal(5, drivers[1].transport.remote_idle_timeout) # proton changes the value
assert_in_delta(10, (drivers[0].tick(now) - now)*1000, 1)
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index f89ffbe..173c421 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -160,27 +160,15 @@ class ContainerSASLTest < Minitest::Test
# Handler for test client/server that sets up server and client SASL options
class SASLHandler < TestHandler
- def initialize(url="amqp://", opts=nil, mechanisms=nil, insecure=nil, realm=nil)
+ def initialize(url="amqp://", opts=nil)
super()
- @url, @opts, @mechanisms, @insecure, @realm = url, opts, mechanisms, insecure, realm
+ @url, @opts = url, opts
end
def on_start(e)
- super
@client = e.container.connect("#{@url}:#{e.container.port}", @opts)
end
- def on_connection_bound(e)
- if e.connection != @client # Incoming server connection
- sasl = e.transport.sasl
- sasl.allow_insecure_mechs = @insecure unless @insecure.nil?
- sasl.allowed_mechs = @mechanisms unless @mechanisms.nil?
- # TODO aconway 2017-08-16: need `sasl.realm(@realm)` here for non-default realms.
- # That reqiures pn_sasl_set_realm() at the C layer - the realm should
- # be passed to cyrus_sasl_init_server()
- end
- end
-
attr_reader :auth_user
def on_connection_opened(e)
@@ -263,7 +251,7 @@ mech_list: EXTERNAL DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS
# Don't set allow_insecure_mechs, but try to use PLAIN
s = SASLHandler.new("amqp://user:password@", {:sasl_allowed_mechs => "PLAIN", :sasl_allow_insecure_mechs => true})
e = assert_raises(TestError) { TestContainer.new(s, {:sasl_allowed_mechs => "PLAIN"}).run }
- assert_match(/PN_TRANSPORT_ERROR.*unauthorized-access/, e.to_s)
+ assert_match(/amqp:unauthorized-access.*Authentication failed/, e.to_s)
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/tests/test_tools.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_tools.rb b/proton-c/bindings/ruby/tests/test_tools.rb
index e64d36b..120c488 100644
--- a/proton-c/bindings/ruby/tests/test_tools.rb
+++ b/proton-c/bindings/ruby/tests/test_tools.rb
@@ -67,28 +67,11 @@ class TestHandler < MessagingHandler
raise TestError.new("TestHandler has errors:\n #{text}")
end
- # TODO aconway 2017-08-15: implement in MessagingHandler
- def on_error(event, endpoint)
- @errors.push "#{event.type}: #{endpoint.condition.inspect}"
+ def on_error(event)
+ @errors.push "#{event.type}: #{event.condition.inspect}"
raise_errors if @raise_errors
end
- def on_transport_error(event)
- on_error(event, event.transport)
- end
-
- def on_connection_error(event)
- on_error(event, event.connection)
- end
-
- def on_session_error(event)
- on_error(event, event.session)
- end
-
- def on_link_error(event)
- on_error(event, event.link)
- end
-
def endpoint_opened(queue, endpoint)
queue.push(endpoint)
end
@@ -115,3 +98,24 @@ class ListenOnceHandler < ListenHandler
def on_error(l, e) raise TestError, e.inspect; end
def on_accept(l) l.close; super; end
end
+
+# A client/server pair of ConnectionDrivers linked by a socket pair
+class DriverPair < Array
+
+ def initialize(client_handler, server_handler)
+ handlers = [client_handler, server_handler]
+ self[0..-1] = Socket.pair(:LOCAL, :STREAM, 0).map { |s| HandlerDriver.new(s, handlers.shift) }
+ server.transport.set_server
+ end
+
+ alias :client :first
+ alias :server :last
+
+ # Run till there is nothing to do
+ def run
+ begin
+ each { |d| d.process }
+ end while (IO.select(self, [], [], 0) rescue nil)
+ end
+
+end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/include/proton/cproton.i
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/cproton.i b/proton-c/include/proton/cproton.i
index 5f375de..464e74b 100644
--- a/proton-c/include/proton/cproton.i
+++ b/proton-c/include/proton/cproton.i
@@ -1068,3 +1068,4 @@ typedef unsigned long int uintptr_t;
%include "proton/url.h"
%include "proton/reactor.h"
%include "proton/handlers.h"
+%include "proton/cid.h"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org