You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/11/30 21:42:34 UTC
[03/12] qpid-proton git commit: PROTON-1064: [ruby] Event handling
refactor
PROTON-1064: [ruby] Event handling refactor
Event/handler/dispatch rework:
- No wrapped C handlers, native ruby handlers
- Simplified event class
- MessagingHandler pure interface, can be duck-typed
- Default handler logic moved to Handler::Adapter
- on_error catches unhandled on_xxx_error events
- on_unhandled catches all other unhandled events
- dropped ruby Collector wrapper - now internal to connection_driver.
Other changes
- Added close(error) for all endpoints to set condition on close
- Associate Connection with its Container
- Transport options: sasl, idle_timeout etc.
- Removed unused classes, corrected/clarified docs
- Connection_driver fixes - delegate proton closes to IO
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b883393b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b883393b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b883393b
Branch: refs/heads/master
Commit: b883393baae53bd59710c1c9595dab36139a8417
Parents: c4e5e58
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Nov 24 11:44:35 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Nov 30 16:36:26 2017 -0500
----------------------------------------------------------------------
proton-c/bindings/ruby/lib/core/base_handler.rb | 31 --
proton-c/bindings/ruby/lib/core/condition.rb | 6 +-
proton-c/bindings/ruby/lib/core/connection.rb | 156 ++-------
.../bindings/ruby/lib/core/connection_driver.rb | 53 ++--
proton-c/bindings/ruby/lib/core/container.rb | 16 +-
proton-c/bindings/ruby/lib/core/delivery.rb | 32 +-
proton-c/bindings/ruby/lib/core/endpoint.rb | 35 +-
proton-c/bindings/ruby/lib/core/event.rb | 151 +++++++++
proton-c/bindings/ruby/lib/core/link.rb | 16 +-
proton-c/bindings/ruby/lib/core/message.rb | 11 -
.../bindings/ruby/lib/core/messaging_handler.rb | 261 +++++++--------
proton-c/bindings/ruby/lib/core/sender.rb | 11 +-
proton-c/bindings/ruby/lib/core/session.rb | 12 +-
proton-c/bindings/ruby/lib/core/transport.rb | 12 +-
proton-c/bindings/ruby/lib/event/collector.rb | 148 ---------
proton-c/bindings/ruby/lib/event/event.rb | 317 -------------------
proton-c/bindings/ruby/lib/event/event_base.rb | 91 ------
proton-c/bindings/ruby/lib/event/event_type.rb | 71 -----
proton-c/bindings/ruby/lib/handler/adapter.rb | 157 +++++++++
proton-c/bindings/ruby/lib/handler/c_adaptor.rb | 47 ---
.../ruby/lib/handler/c_flow_controller.rb | 33 --
.../ruby/lib/handler/endpoint_state_handler.rb | 131 +-------
.../ruby/lib/handler/flow_controller.rb | 40 +++
.../lib/handler/incoming_message_handler.rb | 39 +--
.../lib/handler/outgoing_message_handler.rb | 57 +---
.../ruby/lib/handler/wrapped_handler.rb | 76 -----
proton-c/bindings/ruby/lib/qpid_proton.rb | 20 +-
.../bindings/ruby/lib/util/class_wrapper.rb | 52 ---
proton-c/bindings/ruby/lib/util/handler.rb | 41 ---
proton-c/bindings/ruby/tests/test_adapter.rb | 227 +++++++++++++
.../ruby/tests/test_connection_driver.rb | 14 +-
proton-c/bindings/ruby/tests/test_container.rb | 18 +-
proton-c/bindings/ruby/tests/test_tools.rb | 42 +--
proton-c/include/proton/cproton.i | 1 +
34 files changed, 892 insertions(+), 1533 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/core/base_handler.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/base_handler.rb b/proton-c/bindings/ruby/lib/core/base_handler.rb
deleted file mode 100644
index 9a7ece4..0000000
--- a/proton-c/bindings/ruby/lib/core/base_handler.rb
+++ /dev/null
@@ -1,31 +0,0 @@
-#--
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#++
-
-module Qpid::Proton
-
- class BaseHandler
-
- # Override to process unhandled events.
- #
- def on_unhandled(method, *args)
- end
-
- end
-
-end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/core/condition.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/condition.rb b/proton-c/bindings/ruby/lib/core/condition.rb
index 9cd6eec..05231c6 100644
--- a/proton-c/bindings/ruby/lib/core/condition.rb
+++ b/proton-c/bindings/ruby/lib/core/condition.rb
@@ -58,7 +58,7 @@ module Qpid::Proton
# - String-like: return String.try_convert(obj)
# - nil: return nil
# @raise ::ArgumentError if obj is not convertible to {Condition}
- def self.convert(obj, default_name="proton")
+ def self.convert(obj, default_name="error")
case obj
when nil then nil
when Condition then obj
@@ -76,9 +76,11 @@ module Qpid::Proton
end
private
- def self.from_object(impl, cond)
+
+ def self.assign(impl, cond)
Cproton.pn_condition_clear(impl)
if cond
+ cond = self.convert(cond)
Cproton.pn_condition_set_name(impl, cond.name) if cond.name
Cproton.pn_condition_set_description(impl, cond.description) if cond.description
Codec::Data.from_object(Cproton.pn_condition_info(impl), cond.info) if cond.info
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/core/connection.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection.rb b/proton-c/bindings/ruby/lib/core/connection.rb
index d6ff029..25149ce 100644
--- a/proton-c/bindings/ruby/lib/core/connection.rb
+++ b/proton-c/bindings/ruby/lib/core/connection.rb
@@ -36,156 +36,68 @@ module Qpid::Proton
# @return [String] User name used for authentication (outgoing connection) or the authenticated user name (incoming connection)
proton_accessor :user
- # @private
- proton_writer :password
+ private
- # @private
+ proton_writer :password
attr_accessor :overrides
- # @private
attr_accessor :session_policy
-
- # @private
include Util::Wrapper
- # @private
def self.wrap(impl)
return nil if impl.nil?
-
self.fetch_instance(impl, :pn_connection_attachments) || Connection.new(impl)
end
- # Constructs a new instance of Connection.
- #
- # You do *not* need to provide the underlying C struct, as this is
- # automatically generated as needed. The argument is a convenience
- # for returning existing Connection objects.
- #
- # @param impl [pn_connection_t] The pn_connection_t struct.
- #
def initialize(impl = Cproton.pn_connection)
super()
@impl = impl
@overrides = nil
- @collector = nil
@session_policy = nil
@link_count = 0
@link_prefix = ""
self.class.store_instance(self, :pn_connection_attachments)
end
- def overrides?
- !@overrides.nil?
- end
+ public
- def session_policy?
- !@session_policy.nil?
- end
+ # @deprecated no replacement
+ def overrides?() deprecated __method__; false; end
- # This method is used when working within the context of an event.
- #
- # @return [Connection] The connection itself.
- #
- def connection
- self
- end
+ # @deprecated no replacement
+ def session_policy?() deprecated __method__; false; end
- # The Transport to which this connection is bound.
- #
- # @return [Transport] The transport, or nil if the Connection is unbound.
- #
- def transport
- Transport.wrap(Cproton.pn_connection_transport(@impl))
- end
+ # @return [Connection] self
+ def connection() self; end
- # Associates the connection with an event collector.
- #
- # By doing this, key changes in the endpoint's state are reported to
- # the connector via Event objects that can be inspected and processed.
+ # @return [Transport, nil] transport bound to this connection, or nil if unbound.
#
- # Note that, by registering a collector, the user is requesting that an
- # indefinite number of events be queued up on its behalf. This means
- # that, unless the application eventual processes these events, the
- # storage requirements for keeping them will grow without bound. So be
- # careful and do not register a collector with a connection unless the
- # application will process the events.
- #
- # @param collector [Event::Collector] The event collector.
- #
- def collect(collector)
- if collector.nil?
- Cproton.pn_connection_collect(@impl, nil)
- else
- Cproton.pn_connection_collect(@impl, collector.impl)
- end
- @collector = collector
- end
+ def transport() Transport.wrap(Cproton.pn_connection_transport(@impl)); end
- # Get the AMQP container name advertised by the remote connection.
- #
- # This will return nil until the REMOTE_ACTIVE state is reached.
- #
- # @return [String] The remote connection's AMQP container name.
- #
- # @see #container
- #
- def remote_container
- Cproton.pn_connection_remote_container(@impl)
- end
+ # @return AMQP container ID advertised by the remote peer
+ def remote_container_id() Cproton.pn_connection_remote_container(@impl); end
- # AMQP container ID string for the local end of the connection.
- def container_id
- Cproton.pn_connection_get_container(@impl)
- end
+ alias :remote_container :remote_container_id
- # Get the AMQP hostname set by the remote connection endpoint.
- #
- # This will return nil until the #REMOTE_ACTIVE state is
- # reached.
- #
- # @return [String] The remote connection's AMQP hostname.
- #
- # @see #hostname
- #
- def remote_hostname
- Cproton.pn_connection_remote_hostname(@impl)
- end
+ # @return [Container] the container managing this connection
+ attr_reader :container
- # Get the AMQP offered capabilities suppolied by the remote connection
- # endpoint.
- #
- # This object returned is valid until the connection is freed. The Data
- # object will be empty until the remote connection is opened, as
- # indicated by the #REMOTE_ACTIVE flag.
- #
- # @return [Data] The offered capabilities.
- #
+ # @return AMQP container ID for the local end of the connection
+ def container_id() Cproton.pn_connection_get_container(@impl); end
+
+ # @return [String] hostname used by the remote end of the connection
+ def remote_hostname() Cproton.pn_connection_remote_hostname(@impl); end
+
+ # @return [Array<Symbol>] offered capabilities provided by the remote peer
def remote_offered_capabilities
- # FIXME aconway 2017-11-22: doesn't match doc - returning object, not Data
Codec::Data.to_object(Cproton.pn_connection_remote_offered_capabilities(@impl))
end
- # Get the AMQP desired capabilities supplied by the remote connection
- # endpoint.
- #
- # The object returned is valid until the connection is freed. The Data
- # object will be empty until the remote connection is opened, as
- # indicated by the #REMOTE_ACTIVE flag.
- #
- # @return [Data] The desired capabilities.
- #
+ # @return [Array<Symbol>] desired capabilities provided by the remote peer
def remote_desired_capabilities
Codec::Data.to_object(Cproton.pn_connection_remote_desired_capabilities(@impl))
end
- # Get the AMQP connection properties supplie by the remote connection
- # endpoint.
- #
- # The object returned is valid until the connection is freed. The Data
- # object will be empty until the remote connection is opened, as
- # indicated by the #REMOTE_ACTIVE flag.
- #
- # @return [Data] The remote properties.
- #
+ # @return [Hash] connection-properties provided by the remote peer
def remote_properties
Codec::Data.to_object(Cproton.pn_connection_remote_properites(@impl))
end
@@ -213,7 +125,9 @@ module Qpid::Proton
# NOTE: Only connection options are set here. Transport options are set
# with {Transport#apply} from the connection_driver (or in
# on_connection_bound if not using a connection_driver)
- Cproton.pn_connection_set_container(@impl, opts[:container_id] || SecureRandom.uuid)
+ @container = opts[:container]
+ cid = opts[:container_id] || (@container && @container.id) || SecureRandom.uuid
+ Cproton.pn_connection_set_container(@impl, cid)
Cproton.pn_connection_set_user(@impl, opts[:user]) if opts[:user]
Cproton.pn_connection_set_password(@impl, opts[:password]) if opts[:password]
@link_prefix = opts[:link_prefix] || container_id
@@ -227,16 +141,10 @@ module Qpid::Proton
@link_prefix + "/" + (@link_count += 1).to_s(16)
end
- # Closes the connection.
- #
- # Once this operation has completed, the #LOCAL_CLOSED state flag will be
- # set.
- #
- def close(error = nil)
- if error
- @condition = Condition.convert error
- self._update_condition
- end
+ # Closes the local end of the connection. The remote end may or may not be closed.
+ # @param error [Condition] Optional error condition to send with the close.
+ def close(error=nil)
+ Condition.assign(_local_condition, error)
Cproton.pn_connection_close(@impl)
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/core/connection_driver.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection_driver.rb b/proton-c/bindings/ruby/lib/core/connection_driver.rb
index aeca133..1995f7d 100644
--- a/proton-c/bindings/ruby/lib/core/connection_driver.rb
+++ b/proton-c/bindings/ruby/lib/core/connection_driver.rb
@@ -65,7 +65,13 @@ module Qpid
def finished?() Cproton.pn_connection_driver_finished(@impl); end
# Get the next event to dispatch, nil if no events available
- def event() Event::Event.wrap(Cproton.pn_connection_driver_next_event(@impl)); end
+ def event()
+ e = Cproton.pn_connection_driver_next_event(@impl)
+ Event.new(e) if e
+ end
+
+ # True if {#event} will return non-nil
+ def event?() Cproton.pn_connection_driver_has_event(@impl); end
# Iterator for all available events
def each_event()
@@ -119,19 +125,23 @@ module Qpid
# transport will close itself once the protocol close is complete.
#
def close_write error=nil
- return if Cproton.pn_connection_driver_write_closed(@impl)
- set_error error if error
+ set_error error
Cproton.pn_connection_driver_write_close(@impl)
- @io.close_write
+ @io.close_write rescue nil # Allow double-close
end
+ # Is the read side of the driver closed?
+ def read_closed?() Cproton.pn_connection_driver_read_closed(@impl); end
+
+ # Is the write side of the driver closed?
+ def write_closed?() Cproton.pn_connection_driver_read_closed(@impl); end
+
# Disconnect the read side of the transport, without waiting for an AMQP
# close frame. See comments on {#close_write}
def close_read error=nil
- return if Cproton.pn_connection_driver_read_closed(@impl)
- set_error error if error
+ set_error error
Cproton.pn_connection_driver_read_close(@impl)
- @io.close_read
+ @io.close_read rescue nil # Allow double-close
end
# Disconnect both sides of the transport sending/waiting for AMQP close
@@ -143,10 +153,8 @@ module Qpid
private
- def set_error e
- if cond = Condition.convert(e, "proton:io")
- Cproton.pn_connection_driver_errorf(@impl, cond.name, "%s", cond.description)
- end
+ def set_error err
+ transport.condition ||= Condition.convert(err, "proton:io") if err
end
end
@@ -160,32 +168,37 @@ module Qpid
# {#dispatch} and {#process}
def initialize(io, handler)
super(io)
- @handler = handler || Handler::MessagingHandler.new
+ @handler = handler
+ @adapter = Handler::Adapter.try_convert(handler)
end
+ # @return [MessagingHandler] The handler dispatched to by {#process}
attr_reader :handler
# Dispatch all events available from {#event} to {#handler}
- # @param handlers [Enum<Handler::MessagingHandler>]
- def dispatch()
- each_event { |e| e.dispatch @handler }
+ def dispatch() each_event do |e|
+ e.dispatch self # See private on_transport_ methods below
+ e.dispatch @adapter
+ end
end
# Do {#read}, {#tick}, {#write} and {#dispatch} without blocking.
- #
- # @param [Handle::MessagingHanlder] handler A handler to dispatch
- # events to.
# @param [Time] now the current time
# @return [Time] Latest time to call {#process} again for scheduled events,
# or nil if there are no scheduled events
def process(now=Time.now)
read
next_tick = tick(now)
- dispatch # May generate more data to write
+ dispatch # Generate data for write
write
- dispatch # Make sure we consume all events
+ dispatch # Consume all events
return next_tick
end
+
+ private
+ def on_transport_tail_closed(event) close_read; end
+ def on_transport_head_closed(event) close_write; end
+ def on_transport_authenticated(event) connection.user = transport.user; end
end
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index 28c394f..df89d1a 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -39,14 +39,6 @@ module Qpid::Proton
transport.set_server if server
transport.apply opts
connection.apply opts
- @container = container
- end
-
- def event
- # Add a container to the event
- e = super()
- e.container = @container if e
- e
end
end
@@ -237,11 +229,7 @@ module Qpid::Proton
case task
when :on_start
- # TODO aconway 2017-11-27: proper syntesized events
- event = Class.new do
- def initialize(c) @container = c; end
- attr_reader :container
- end.new(self)
+ event = Event.new(nil, :on_start, self)
@handler.on_start(event) if @handler.respond_to? :on_start
when Container
@@ -329,7 +317,7 @@ module Qpid::Proton
def connection_driver(io, opts=nil, server=false)
opts ||= {}
- opts[:container_id] ||= @id
+ opts[:container] = self
opts[:handler] ||= @handler
ConnectionTask.new(self, io, opts, server)
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/core/delivery.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/delivery.rb b/proton-c/bindings/ruby/lib/core/delivery.rb
index bb33207..a642a7b 100644
--- a/proton-c/bindings/ruby/lib/core/delivery.rb
+++ b/proton-c/bindings/ruby/lib/core/delivery.rb
@@ -123,6 +123,15 @@ module Qpid::Proton
#
proton_caller :settled?
+ # @!attribute [r] aborted?
+ #
+ # A delivery can be aborted before it is complete by the remote sender.
+ # The receiver must ignore the message and discard any partial data.
+ #
+ # @return [Boolean] Returns if a delivery is aborted.
+ #
+ proton_caller :aborted?
+
# Update the state of the delivery
# @param state [Integer] the delivery state, defined in {DeliveryState}
def update(state) Cproton.pn_delivery_update(@impl, state); end
@@ -141,8 +150,6 @@ module Qpid::Proton
# Reject a received message that is considered invalid.
def reject() settle REJECTED; end
- # FIXME aconway 2017-11-23: why the delivered argument?
-
# Release a received message making it available to other receivers.
def release(delivered = true) settle(delivered ? MODIFIED : RELEASED); end
@@ -262,6 +269,25 @@ module Qpid::Proton
self.remote_state == Disposition::MODIFIED
end
- end
+ # @return true if the delivery has a complete incoming message ready to decode
+ def message?
+ readable? && !aborted? && !partial?
+ end
+ # Decode the message from the delivery into a new {Message}
+ # @raise [ProtonError] unless {#message?}
+ def message
+ if message?
+ m = Message.new
+ m.decode(link.receive(pending))
+ link.advance
+ m
+ else
+ status = [("not readable" if !readable?),
+ ("aborted" if aborted?),
+ ("partial" if partial?)].compact.join(", ")
+ raise ProtonError, "incoming delivery #{status}"
+ end
+ end
+ end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/core/endpoint.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/endpoint.rb b/proton-c/bindings/ruby/lib/core/endpoint.rb
index fe2eb7a..ea2b11f 100644
--- a/proton-c/bindings/ruby/lib/core/endpoint.rb
+++ b/proton-c/bindings/ruby/lib/core/endpoint.rb
@@ -33,7 +33,7 @@ module Qpid::Proton
class Endpoint
# The local connection is uninitialized.
- LOCAL_UNINIT = Cproton::PN_LOCAL_UNINIT
+ LOCAL_UNINIT = Cproton::PN_LOCAL_UNINIT
# The local connection is active.
LOCAL_ACTIVE = Cproton::PN_LOCAL_ACTIVE
# The local connection is closed.
@@ -47,32 +47,17 @@ module Qpid::Proton
REMOTE_CLOSED = Cproton::PN_REMOTE_CLOSED
# Bitmask for the local-only flags.
- LOCAL_MASK = Cproton::PN_LOCAL_UNINIT |
- Cproton::PN_LOCAL_ACTIVE |
- Cproton::PN_LOCAL_CLOSED
+ LOCAL_MASK = Cproton::PN_LOCAL_UNINIT | Cproton::PN_LOCAL_ACTIVE | Cproton::PN_LOCAL_CLOSED
# Bitmask for the remote-only flags.
- REMOTE_MASK = Cproton::PN_REMOTE_UNINIT |
- Cproton::PN_REMOTE_ACTIVE |
- Cproton::PN_REMOTE_CLOSED
+ REMOTE_MASK = Cproton::PN_REMOTE_UNINIT | Cproton::PN_REMOTE_ACTIVE | Cproton::PN_REMOTE_CLOSED
# @private
- def initialize
- @condition = nil
- end
-
+ def condition; remote_condition || local_condition; end
# @private
- def _update_condition
- Condition.from_object(self._local_condition, @condition)
- end
-
- def condition
- Condition.convert(_local_condition) || remote_condition; end
-
+ def remote_condition; Condition.convert(_remote_condition); end
# @private
- def remote_condition
- Condition.convert(_remote_condition)
- end
+ def local_condition; Condition.convert(_local_condition); end
# Return the transport associated with this endpoint.
#
@@ -96,7 +81,7 @@ module Qpid::Proton
check_state(LOCAL_UNINIT)
end
- def local_active?
+ def local_open?
check_state(LOCAL_ACTIVE)
end
@@ -108,12 +93,16 @@ module Qpid::Proton
check_state(REMOTE_UNINIT)
end
- def remote_active?
+ def remote_open?
check_state(REMOTE_ACTIVE)
end
def remote_closed?
check_state(REMOTE_CLOSED)
end
+
+ alias local_active? local_open?
+ alias remote_active? remote_open?
+
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/core/event.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/event.rb b/proton-c/bindings/ruby/lib/core/event.rb
new file mode 100644
index 0000000..136c120
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/core/event.rb
@@ -0,0 +1,151 @@
+#--
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#++
+
+module Qpid::Proton
+ # AMQP protocol event.
+ #
+ # Includes a method name to call when the event is dispatched, and the context
+ # objects relevant to the event.
+ class Event
+ private
+
+ include Qpid::Proton::Util::Wrapper
+
+ EVENT_TYPE_NAMES = [:PN_EVENT_NONE,
+ :PN_CONNECTION_INIT,
+ :PN_CONNECTION_BOUND,
+ :PN_CONNECTION_UNBOUND,
+ :PN_CONNECTION_LOCAL_OPEN,
+ :PN_CONNECTION_REMOTE_OPEN,
+ :PN_CONNECTION_LOCAL_CLOSE,
+ :PN_CONNECTION_REMOTE_CLOSE,
+ :PN_CONNECTION_FINAL,
+ :PN_SESSION_INIT,
+ :PN_SESSION_LOCAL_OPEN,
+ :PN_SESSION_REMOTE_OPEN,
+ :PN_SESSION_LOCAL_CLOSE,
+ :PN_SESSION_REMOTE_CLOSE,
+ :PN_SESSION_FINAL,
+ :PN_LINK_INIT,
+ :PN_LINK_LOCAL_OPEN,
+ :PN_LINK_REMOTE_OPEN,
+ :PN_LINK_LOCAL_CLOSE,
+ :PN_LINK_REMOTE_CLOSE,
+ :PN_LINK_LOCAL_DETACH,
+ :PN_LINK_REMOTE_DETACH,
+ :PN_LINK_FLOW,
+ :PN_LINK_FINAL,
+ :PN_DELIVERY,
+ :PN_TRANSPORT,
+ :PN_TRANSPORT_AUTHENTICATED,
+ :PN_TRANSPORT_ERROR,
+ :PN_TRANSPORT_HEAD_CLOSED,
+ :PN_TRANSPORT_TAIL_CLOSED,
+ :PN_TRANSPORT_CLOSED]
+
+ TYPE_METHODS = EVENT_TYPE_NAMES.each_with_object({}) do |n, h|
+ type = Cproton.const_get(n)
+ h[type] = "on_#{Cproton.pn_event_type_name(type)[3..-1]}".downcase.to_sym
+ end
+
+ # Use Event.new(impl) to wrap a C event, or Event.new(nil, method, context)
+ # to create a pure-ruby event.
+ def initialize(impl, method=nil, context=nil)
+ @impl, @method, @context = impl, method, context
+ @method ||= TYPE_METHODS[Cproton.pn_event_type(@impl)] if @impl
+ end
+
+ def get(clazz, method=nil)
+ (ctx = context).is_a?(clazz) ? ctx : ctx.__send__(method) rescue nil
+ end
+
+ def _context
+ x = Cproton.pn_event_context(@impl)
+ case Cproton.pn_class_id(Cproton.pn_event_class(@impl))
+ when Cproton::CID_pn_transport then Transport.wrap(Cproton.pn_cast_pn_transport(x))
+ when Cproton::CID_pn_connection then Connection.wrap(Cproton.pn_cast_pn_connection(x))
+ when Cproton::CID_pn_session then Session.wrap(Cproton.pn_cast_pn_session(x))
+ when Cproton::CID_pn_link then Link.wrap(Cproton.pn_cast_pn_link(x))
+ when Cproton::CID_pn_delivery then Delivery.wrap(Cproton.pn_cast_pn_delivery(x))
+ else raise TypeError, "bad class-id #{pn_class_id(Cproton.pn_event_class(impl))}"
+ end
+ end
+
+ public
+
+ # Call handler.{#method}(self) if handler.respond_to? {#method}
+ # @return [Boolean] true if handler responded to the method, nil if not.
+ def dispatch(handler)
+ (handler.__send__(@method, self); true) if handler.respond_to? @method
+ end
+
+ # @return [Symbol] method name that this event will call in {#dispatch}
+ attr_accessor :method
+
+ alias :type :method
+
+ # @return [Object] the event context object
+ def context; return @context ||= _context; end
+
+ # @return [Container, nil] container for this event
+ def container() @container ||= get(Container, :container); end
+
+ # @return [Transport, nil] transport for this event
+ def transport() @transport ||= get(Transport, :transport); end
+
+ # @return [Connection, nil] the connection for this event
+ def connection() @connection ||= get(Connection, :connection); end
+
+ # @return [Session, nil] session for this event
+ def session() @session ||= get(Session, :session); end
+
+ # @return [Link, nil] link for this event
+ def link() @link ||= get(Link, :link); end
+
+ # @return [Sender, nil] sender associated with this event
+ def sender() link if link && link.sender?; end
+
+ # @return [Receiver, nil] receiver associated with this event
+ def receiver() link if link && link.receiver?; end
+
+ # @return [Delivery, nil] delivery for this event
+ def delivery() @delivery ||= get(Delivery); end
+
+ # @return [Tracker, nil] delivery for this event
+ def tracker() delivery; end
+
+ # @return [Message, nil] message for this event
+ def message() @message ||= delivery.message if delivery; end
+
+ def to_s() "#{self.class}(#{method}, #{context})"; end
+ def inspect() "#{self.class}(#{method.inspect}, #{context.inspect})"; end
+
+ # @return [Condition] Error condition associated with this event or nil if none.
+ def condition
+ (context.remote_condition if context.respond_to? :remote_condition) ||
+ (context.condition if context.respond_to? :condition)
+ end
+
+ # @deprecated use {#container}
+ def reactor() deprecated __method__, :container; container; end
+
+ # @deprecated use {Qpid::Proton::Event}
+ Event = self
+ end
+end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/core/link.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/link.rb b/proton-c/bindings/ruby/lib/core/link.rb
index 97f30f6..255e8b2 100644
--- a/proton-c/bindings/ruby/lib/core/link.rb
+++ b/proton-c/bindings/ruby/lib/core/link.rb
@@ -58,16 +58,12 @@ module Qpid::Proton
# @see Endpoint::LOCAL_ACTIVE
proton_caller :open
- # @!method close
- #
- # Closes the link.
- #
- # Once this operation has completed, the state flag will be set.
- # This may be called without first calling #open, which is the equivalent to
- # calling #open and then #close.
- #
- # @see Endpoint::LOCAL_CLOSED
- proton_caller :close
+ # Close the local end of the link. The remote end may or may not be closed.
+ # @param error [Condition] Optional error condition to send with the close.
+ def close(error=nil)
+ Condition.assign(_local_condition, error)
+ Cproton.pn_link_close(@impl)
+ end
# @!method detach
#
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/core/message.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/message.rb b/proton-c/bindings/ruby/lib/core/message.rb
index ae0f9d1..747414c 100644
--- a/proton-c/bindings/ruby/lib/core/message.rb
+++ b/proton-c/bindings/ruby/lib/core/message.rb
@@ -51,17 +51,6 @@ module Qpid::Proton
post_decode
end
- # Receive and decode a message from a delivery.
- #
- # @param delivery [Delivery] the delivery
- # @return [Integer] the number of bytes decoded
- def receive(delivery)
- raise RangeError, "delivery is incomplete" if delivery.partial?
- n = decode(delivery.link.receive(delivery.pending))
- delivery.link.advance
- return n
- end
-
def post_decode # :nodoc:
# decode elements from the message
@properties = {}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/core/messaging_handler.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/messaging_handler.rb b/proton-c/bindings/ruby/lib/core/messaging_handler.rb
index 3babc05..b6b07c7 100644
--- a/proton-c/bindings/ruby/lib/core/messaging_handler.rb
+++ b/proton-c/bindings/ruby/lib/core/messaging_handler.rb
@@ -19,198 +19,157 @@
module Qpid::Proton
- # A general purpose handler that simplifies processing events.
+ # A handler for AMQP messaging events.
+ #
+ # Subclass the handler and provide the #on_xxx methods with your event-handling code.
#
class MessagingHandler
- attr_reader :handlers
-
- # Creates a new instance.
- #
- # @param [Integer] prefetch
- # @param [Boolean] auto_accept
- # @param [Boolean] auto_settle
- # @param [Boolean] peer_close_is_error
- #
- def initialize(prefetch = 10, auto_accept = true, auto_settle = true, peer_close_is_error = false)
- @handlers = Array.new
- @handlers << Handler::CFlowController.new(prefetch) unless prefetch.zero?
- @handlers << Handler::EndpointStateHandler.new(peer_close_is_error, self)
- @handlers << Handler::IncomingMessageHandler.new(auto_accept, self)
- @handlers << Handler::OutgoingMessageHandler.new(auto_settle,self)
- end
-
+ # @overload initialize(opts)
+ # Create a {MessagingHandler} with options +opts+
+ # @option opts [Integer] :prefetch (10)
+ # The number of messages to fetch in advance, 0 disables prefetch.
+ # @option opts [Boolean] :auto_accept (true)
+ # If true, incoming messages are accepted automatically after {#on_message}.
+ # If false, the application can accept, reject or release the message
+ # by calling methods on {Delivery} when the message has been processed.
+ # @option opts [Boolean] :auto_settle (true) If true, outgoing
+ # messages are settled automatically when the remote peer settles. If false,
+ # the application must call {Delivery#settle} explicitly.
+ # @option opts [Boolean] :auto_open (true)
+ # If true, incoming connections are opened automatically.
+ # If false, the application must call {Connection#open} to open incoming connections.
+ # @option opts [Boolean] :auto_close (true)
+ # If true, respond to a remote close automatically with a local close.
+ # If false, the application must call {Connection#close} to finish closing connections.
+ # @option opts [Boolean] :peer_close_is_error (false)
+ # If true, and the remote peer closes the connection without an error condition,
+ # the set the local error condition {Condition}("error", "unexpected peer close")
+ #
+ # @overload initialize(prefetch=10, auto_accept=true, auto_settle=true, peer_close_is_error=false)
+ # @deprecated use +initialize(opts)+ overload
+ def initialize(*args)
+ @options = {}
+ if args.size == 1 && args[0].is_a?(Hash)
+ @options.replace(args[0])
+ else # Fill options from deprecated fixed arguments
+ [:prefetch, :auto_accept, :auto_settle, :peer_close_is_error].each do |k|
+ opts[k] = args.shift unless args.empty?
+ end
+ end
+ # NOTE: the options are processed by {Handler::Adapater}
+ end
+
+ public
+
+ # @private
+ # @return [Hash] handler options, see {#initialize}
+ attr_reader :options
+
+
+ # @!method on_transport_error(event)
+ # Called when the transport fails or closes unexpectedly.
+ # @param event [Event] The event.
+
+ # !@method on_connection_error(event)
# Called when the peer closes the connection with an error condition.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_connection_error(event)
- Handler::EndpointStateHandler.print_error(event.connection, "connection")
- end
+ # @param event [Event] The event.
- # Called when the peer closes the session with an error condition.
- #
- # @param event [Qpid:Proton::Event::Event] The event.
- #
- def on_session_error(event)
- Handler::EndpointStateHandler.print_error(event.session, "session")
- event.connection.close
- end
+ # @!method on_session_error(event)
+ # Called when the peer closes the session with an error condition.
+ # @param event [Qpid:Proton::Event] The event.
+ # @!method on_link_error(event)
# Called when the peer closes the link with an error condition.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_link_error(event)
- Handler::EndpointStateHandler.print_error(event.link, "link")
- event.connection.close
- end
-
- # Called when the event loop starts.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_reactor_init(event)
- self.on_start(event)
- end
+ # @param event [Event] The event.
+ # @!method on_start(event)
# Called when the event loop starts.
- #
- # This method needs to be overridden.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_start(event)
- end
+ # @param event [Event] The event.
+ # @!method on_connection_closed(event)
# Called when the connection is closed.
- #
- # This method needs to be overridden.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_connection_closed(event)
- end
+ # @param event [Event] The event.
+ # @!method on_session_closed(event)
# Called when the session is closed.
- #
- # This method needs to be overridden.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_session_closed(event)
- end
+ # @param event [Event] The event.
+ # @!method on_link_closed(event)
# Called when the link is closed.
- #
- # This method needs to be overridden.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_link_closed(event)
- end
+ # @param event [Event] The event.
+ # @!method on_connection_closing(event)
# Called when the peer initiates the closing of the connection.
- #
- # This method needs to be overridden.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_connection_closing(event)
- end
+ # @param event [Event] The event.
+ # @!method on_session_closing(event)
# Called when the peer initiates the closing of the session.
- #
- # This method needs to be overridden.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_session_closing(event)
- end
+ # @param event [Event] The event.
+ # @!method on_link_closing(event)
# Called when the peer initiates the closing of the link.
- #
- # This method needs to be overridden.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_link_closing(event)
- end
+ # @param event [Event] The event.
+ # @!method on_disconnected(event)
# Called when the socket is disconnected.
- #
- # This method needs to be overridden.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_disconnected(event)
- end
+ # @param event [Event] The event.
+ # @!method on_sendable(event)
# Called when the sender link has credit and messages can therefore
# be transferred.
- #
- # This method needs to be overridden.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_sendable(event)
- end
+ # @param event [Event] The event.
+ # @!method on_accepted(event)
# Called when the remote peer accepts an outgoing message.
- #
- # This method needs to be overridden.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_accepted(event)
- end
+ # @param event [Event] The event.
+ # @!method on_rejected(event)
# Called when the remote peer rejects an outgoing message.
- #
- # This method needs to be overridden.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_rejected(event)
- end
+ # @param event [Event] The event.
+ # @!method on_released(event)
# Called when the remote peer releases an outgoing message.
- #
# Note that this may be in response to either the RELEASE or
# MODIFIED state as defined by the AMPQ specification.
- #
- # This method needs to be overridden.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_released(event)
- end
+ # @param event [Event] The event.
+ # @!method on_settled(event)
# Called when the remote peer has settled hte outgoing message.
- #
# This is the point at which it should never be retransmitted.
- #
- # This method needs to be overridden.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_settled(event)
- end
+ # @param event [Event] The event.
+ # @!method on_message(event)
# Called when a message is received.
#
- # The message itself can be obtained as a property on the event. For
- # the purpose of referring to this message in further actions, such as
- # explicitly accepting it) the delivery should be used. This is also
- # obtainable vi a property on the event.
- #
- # This method needs to be overridden.
- #
- # @param event [Qpid::Proton::Event::Event] The event.
- #
- def on_message(event)
- end
+ # The message is available from {Event#message}, to accept or reject the message
+ # use {Event#delivery}
+ # @param event [Event] The event.
+
+ # @!method on_aborted(event)
+ # Called when message delivery is aborted by the sender.
+ # The {Event#delivery} provides information about the delivery, but the message should be ignored.
+
+ # @!method on_error(event)
+ # If +on_xxx_error+ method is missing, {#on_error} is called instead.
+ # If {#on_error} is missing, the connection is closed with the error.
+ # @param event [Event] the event, {Event#method} provides the original method name.
+ # @!method on_unhandled(event)
+ # If an +on_xxx+ method is missing, {#on_unhandled} is called instead.
+ # @param event [Event] the event, {Event#method} provides the original method name.
end
+ # An array of {MessagingHandler}, events are dispatched to each in turn
+ class MessagingHandlers < MessagingHandler
+ include Enumerable
+
+ # @param handlers an array of {MessagingHandler} objects
+ def initialize handlers; @handlers = handlers; end
+
+ def each(*args, &block) @handlers.each(*args, &block); end
+
+ def on_unhandled(event) each { |h| event.dispatch h }; end
+
+ end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/core/sender.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/sender.rb b/proton-c/bindings/ruby/lib/core/sender.rb
index deeb0c5..dddde15 100644
--- a/proton-c/bindings/ruby/lib/core/sender.rb
+++ b/proton-c/bindings/ruby/lib/core/sender.rb
@@ -39,10 +39,10 @@ module Qpid::Proton
Cproton.pn_link_offered(@impl, n)
end
- # Sends the specified data to the remote endpoint.
+ # Send a message to the remote endpoint.
#
- # @param object [Object] The content to send.
- # @param tag [Object] The tag
+ # @param message [Message] The message to send.
+ # @param tag [Object] Optional unique delivery tag, one will be generated if not supplied.
#
# @return [Integer] The number of bytes sent.
#
@@ -56,7 +56,7 @@ module Qpid::Proton
# Send the specified bytes as part of the current delivery.
#
- # @param bytes [Array] The bytes to send.
+ # @param bytes [String] The bytes to send.
#
# @return [Integer] The number of bytes sent.
#
@@ -64,11 +64,12 @@ module Qpid::Proton
Cproton.pn_link_send(@impl, bytes)
end
+ # Generate a new unique delivery tag for this sender
def delivery_tag
@tag_count ||= 0
result = @tag_count.succ
@tag_count = result
- return "#{result}"
+ return result.to_s(32) # Base 32 compactness
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/core/session.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/session.rb b/proton-c/bindings/ruby/lib/core/session.rb
index 2c1faeb..6bed0ba 100644
--- a/proton-c/bindings/ruby/lib/core/session.rb
+++ b/proton-c/bindings/ruby/lib/core/session.rb
@@ -87,14 +87,10 @@ module Qpid::Proton
self.class.store_instance(self, :pn_session_attachments)
end
- # Closed the session.
- #
- # Once this operation has completed, the state flag will be set. This may be
- # called without calling #open, in which case it is the equivalence of
- # calling #open and then close immediately.
- #
- def close
- self._update_condition
+ # Close the local end of the session. The remote end may or may not be closed.
+ # @param error [Condition] Optional error condition to send with the close.
+ def close(error=nil)
+ Condition.assign(_local_condition, error)
Cproton.pn_session_close(@impl)
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/core/transport.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/transport.rb b/proton-c/bindings/ruby/lib/core/transport.rb
index 7589788..9b12be3 100644
--- a/proton-c/bindings/ruby/lib/core/transport.rb
+++ b/proton-c/bindings/ruby/lib/core/transport.rb
@@ -234,7 +234,7 @@ module Qpid::Proton
# Set the error condition for the transport.
# @param c [Condition] The condition to set
def condition=(c)
- Condition.from_object(Cproton.pn_transport_condition(@impl), Condition.convert(c))
+ Condition.assign(Cproton.pn_transport_condition(@impl), c)
end
# Binds to the given connection.
@@ -395,10 +395,14 @@ module Qpid::Proton
# @private
def apply opts
- if opts[:sasl_enabled] != false # SASL is not disabled.
- sasl.allow_insecure_mechs = opts[:sasl_allow_insecure_mechs] if opts[:sasl_allow_insecure_mechs]
- sasl.allowed_mechs = opts[:sasl_allowed_mechs] if opts[:sasl_allowed_mechs]
+ sasl if opts[:sasl_enabled] # Explicitly enabled
+ unless opts.include?(:sasl_enabled) && !opts[:sasl_enabled] # Not explicitly disabled
+ sasl.allowed_mechs = opts[:sasl_allowed_mechs] if opts.include? :sasl_allowed_mechs
+ sasl.allow_insecure_mechs = opts[:sasl_allow_insecure_mechs] if opts.include? :sasl_allow_insecure_mechs
end
+ self.channel_max= opts[:channel_max] if opts.include? :channel_max
+ self.max_frame_size= opts[:max_frame_size] if opts.include? :max_frame_size
+ self.idle_timeout= opts[:idle_timeout] if opts.include? :idle_timeout
end
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/event/collector.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/event/collector.rb b/proton-c/bindings/ruby/lib/event/collector.rb
deleted file mode 100644
index 74e0182..0000000
--- a/proton-c/bindings/ruby/lib/event/collector.rb
+++ /dev/null
@@ -1,148 +0,0 @@
-#--
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#++
-
-module Qpid::Proton::Event
-
- # A Collector is used to register interest in events produced by one
- # or more Connection objects.
- #
- # == Events
- #
- # @see Qpid::Proton::Event The list of predefined events.
- #
- # @example
- #
- # conn = Qpid::Proton::Connection.new
- # coll = Qpid::Proton::Event::Collector.new
- # conn.collect(coll)
- #
- # # transport setup not included here for brevity
- #
- # loop do
- #
- # # wait for an event and then perform the following
- #
- # event = collector.peek
- #
- # unless event.nil?
- # case event.type
- #
- # when Qpid::Proton::Event::CONNECTION_REMOTE_CLOSE
- # conn = event.context # the context here is the connection
- # # the remote connection closed, so only close our side if it's
- # # still open
- # if !(conn.state & Qpid::Proton::Endpoint::LOCAL_CLOSED)
- # conn.close
- # end
- #
- # when Qpid::proton::Event::SESSION_REMOTE_OPEN
- # session = event.session # the context here is the session
- # # the remote session is now open, so if the local session is
- # # uninitialized, then open it
- # if session.state & Qpid::Proton::Endpoint::LOCAL_UNINIT
- # session.incoming_capacity = 1000000
- # session.open
- # end
- #
- # end
- #
- # # remove the processed event and get the next event
- # # the loop will exit when we have no more events to process
- # collector.pop
- # event = collector.peek
- #
- # end
- #
- class Collector
-
- # @private
- attr_reader :impl
-
- # Creates a new Collector.
- #
- def initialize
- @impl = Cproton.pn_collector
- ObjectSpace.define_finalizer(self, self.class.finalize!(@impl))
- end
-
- # @private
- def self.finalize!(impl)
- proc {
- Cproton.pn_collector_free(impl)
- }
- end
-
- # Releases the collector.
- #
- # Once in a released state, a collector will drain any internally queued
- # events, shrink its memory footprint to a minimu, and discard any newly
- # created events.
- #
- def release
- Cproton.pn_collector_release(@impl)
- end
-
- # Place a new event on the collector.
- #
- # This operation will create a new event of the given type and context
- # and return a new Event instance. In some cases an event of a given
- # type can be elided. When this happens, this operation will return
- # nil.
- #
- # @param context [Object] The event context.
- # @param event_type [EventType] The event type.
- #
- # @return [Event] the event if it was queued
- # @return [nil] if it was elided
- #
- def put(context, event_type)
- Cproton.pn_collector_put(@impl, Cproton.pn_class(context.impl), context.impl, event_type.number)
- end
-
- # Access the head event.
- #
- # This operation will continue to return the same event until it is
- # cleared by using #pop. The pointer return by this operation will be
- # valid until ::pn_collector_pop is invoked or #free is called, whichever
- # happens sooner.
- #
- # @return [Event] the head event
- # @return [nil] if there are no events
- #
- # @see #pop
- # @see #put
- #
- def peek
- Event.wrap(Cproton.pn_collector_peek(@impl))
- end
-
- # Clear the head event.
- #
- # @return [Boolean] true if an event was removed
- #
- # @see #release
- # @see #peek
- #
- def pop
- Cproton.pn_collector_pop(@impl)
- end
-
- end
-
-end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/event/event.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/event/event.rb b/proton-c/bindings/ruby/lib/event/event.rb
deleted file mode 100644
index 92f7eb7..0000000
--- a/proton-c/bindings/ruby/lib/event/event.rb
+++ /dev/null
@@ -1,317 +0,0 @@
-#--
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#++
-
-module Qpid::Proton
-
- module Event
-
- # @private
- def self.event_type(const_name, method_name = nil) # :nodoc:
- unless Cproton.const_defined?(const_name)
- raise RuntimeError.new("no such constant: #{const_name}")
- end
-
- const_value = Cproton.const_get(const_name)
- method_name = "on_#{const_name.to_s[3..-1]}".downcase if method_name.nil?
-
- EventType.new(const_value, method_name)
- end
-
- # Defined as a programming convenience. No even of this type will ever
- # be generated.
- NONE = event_type(:PN_EVENT_NONE)
-
- # A reactor has been started.
- REACTOR_INIT = event_type(:PN_REACTOR_INIT)
- # A reactor has no more events to process.
- REACTOR_QUIESCED = event_type(:PN_REACTOR_QUIESCED)
- # A reactor has been stopred.
- REACTOR_FINAL = event_type(:PN_REACTOR_FINAL)
-
- # A timer event has occurred.
- TIMER_TASK = event_type(:PN_TIMER_TASK)
-
- # A connection has been created. This is the first even that will ever
- # be issued for a connection.
- CONNECTION_INIT = event_type(:PN_CONNECTION_INIT)
- # A conneciton has been bound toa transport.
- CONNECTION_BOUND = event_type(:PN_CONNECTION_BOUND)
- # A connection has been unbound from its transport.
- CONNECTION_UNBOUND = event_type(:PN_CONNECTION_UNBOUND)
- # A local connection endpoint has been opened.
- CONNECTION_LOCAL_OPEN = event_type(:PN_CONNECTION_LOCAL_OPEN)
- # A local connection endpoint has been closed.
- CONNECTION_LOCAL_CLOSE = event_type(:PN_CONNECTION_LOCAL_CLOSE)
- # A remote endpoint has opened its connection.
- CONNECTION_REMOTE_OPEN = event_type(:PN_CONNECTION_REMOTE_OPEN)
- # A remote endpoint has closed its connection.
- CONNECTION_REMOTE_CLOSE = event_type(:PN_CONNECTION_REMOTE_CLOSE)
- # A connection has been freed and any outstanding processing has been
- # completed. This is the final event htat will ever be issued for a
- # connection
- CONNECTION_FINAL = event_type(:PN_CONNECTION_FINAL)
-
- # A session has been created. This is the first event that will ever be
- # issues for a session.
- SESSION_INIT = event_type(:PN_SESSION_INIT)
- # A local session endpoint has been opened.
- SESSION_LOCAL_OPEN = event_type(:PN_SESSION_LOCAL_OPEN)
- # A local session endpoint has been closed.
- SESSION_LOCAL_CLOSE = event_type(:PN_SESSION_LOCAL_CLOSE)
- # A remote endpoint has opened its session.
- SESSION_REMOTE_OPEN = event_type(:PN_SESSION_REMOTE_OPEN)
- # A remote endpoint has closed its session.
- SESSION_REMOTE_CLOSE = event_type(:PN_SESSION_REMOTE_CLOSE)
- # A session has been freed and any outstanding processing has been
- # completed. This is the final event that will ever be issued for a
- # session
- SESSION_FINAL = event_type(:PN_SESSION_FINAL)
-
- # A link has been created. This is the first event that will ever be
- # issued for a link.
- LINK_INIT = event_type(:PN_LINK_INIT)
- # A local link endpoint has been opened.
- LINK_LOCAL_OPEN = event_type(:PN_LINK_LOCAL_OPEN)
- # A local link endpoint has been closed.
- LINK_LOCAL_CLOSE = event_type(:PN_LINK_LOCAL_CLOSE)
- # A local link endpoint has been detached.
- LINK_LOCAL_DETACH = event_type(:PN_LINK_LOCAL_DETACH)
- # A remote endpoint has opened its link.
- LINK_REMOTE_OPEN = event_type(:PN_LINK_REMOTE_OPEN)
- # A remote endpoint has closed its link.
- LINK_REMOTE_CLOSE = event_type(:PN_LINK_REMOTE_CLOSE)
- # A remote endpoint has detached its link.
- LINK_REMOTE_DETACH = event_type(:PN_LINK_REMOTE_DETACH)
- # The flow control state for a link has changed.
- LINK_FLOW = event_type(:PN_LINK_FLOW)
- # A link has been freed and any outstanding processing has been completed.
- # This is the final event htat will ever be issued for a link.
- LINK_FINAL = event_type(:PN_LINK_FINAL)
-
- # A delivery has been created or updated.
- DELIVERY = event_type(:PN_DELIVERY)
-
- # A transport has new data to read and/or write.
- TRANSPORT = event_type(:PN_TRANSPORT)
- # Indicates that a transport error has occurred.
- # @see Transport#condition To access the details of the error.
- TRANSPORT_ERROR = event_type(:PN_TRANSPORT_ERROR)
- # Indicates that the head of a transport has been closed. This means the
- # transport will never produce more bytes for output to the network.
- TRANSPORT_HEAD_CLOSED = event_type(:PN_TRANSPORT_HEAD_CLOSED)
- # Indicates that the trail of a transport has been closed. This means the
- # transport will never be able to process more bytes from the network.
- TRANSPORT_TAIL_CLOSED = event_type(:PN_TRANSPORT_TAIL_CLOSED)
- # Indicates that both the head and tail of a transport are closed.
- TRANSPORT_CLOSED = event_type(:PN_TRANSPORT_CLOSED)
-
- SELECTABLE_INIT = event_type(:PN_SELECTABLE_INIT)
- SELECTABLE_UPDATED = event_type(:PN_SELECTABLE_UPDATED)
- SELECTABLE_READABLE = event_type(:PN_SELECTABLE_READABLE)
- SELECTABLE_WRITABLE = event_type(:PN_SELECTABLE_WRITABLE)
- SELECTABLE_EXPIRED = event_type(:PN_SELECTABLE_EXPIRED)
- SELECTABLE_ERROR = event_type(:PN_SELECTABLE_ERROR)
- SELECTABLE_FINAL = event_type(:PN_SELECTABLE_FINAL)
-
- # An Event provides notification of a state change within the protocol
- # engine.
- #
- # Every event has a type that identifies what sort of state change has
- # occurred, along with a pointer to the object whose state has changed,
- # and also any associated objects.
- #
- # For more details on working with Event, please refer to Collector.
- #
- # @see Qpid::Proton::Event The list of predefined events.
- #
- class Event < EventBase
-
- # @private
- include Qpid::Proton::Util::ClassWrapper
- # @private
- include Qpid::Proton::Util::Wrapper
-
- # Creates a Ruby object for the given pn_event_t.
- #
- # @private
- def self.wrap(impl, number = nil)
- return nil if impl.nil?
-
- result = self.fetch_instance(impl, :pn_event_attachments)
- return result unless result.nil?
- number = Cproton.pn_event_type(impl) if number.nil?
- event = Event.new(impl, number)
- return event.context if event.context.is_a? EventBase
- return event
- end
-
- # @private
- def initialize(impl, number)
- @impl = impl
- class_name = Cproton.pn_class_name(Cproton.pn_event_class(impl))
- context = class_wrapper(class_name, Cproton.pn_event_context(impl))
- event_type = EventType.by_type(Cproton.pn_event_type(impl))
- super(class_name, context, event_type)
- @type = EventType.by_type(number)
- self.class.store_instance(self, :pn_event_attachments)
- end
-
- # Notifies the handler(s) of this event.
- #
- # If a handler responds to the event's method then that method is invoked
- # and passed the event. Otherwise, if the handler defines the
- # +on_unhandled+ method, then that will be invoked instead.
- #
- # If the handler defines a +handlers+ method then that will be invoked and
- # passed the event afterward.
- #
- # @example
- #
- # class FallbackEventHandler
- #
- # # since it now defines a handlers method, any event will iterate
- # # through them and invoke the +dispatch+ method on each
- # attr_accessor handlers
- #
- # def initialize
- # @handlers = []
- # end
- #
- # # invoked for any event not otherwise handled
- # def on_unhandled(event)
- # puts "Unable to invoke #{event.type.method} on #{event.context}."
- # end
- #
- # end
- #
- # @param handler [Object] An object which implements either the event's
- # handler method or else responds to :handlers with an array of other
- # handlers.
- #
- def dispatch(handler, type = nil)
- type = @type if type.nil?
- if handler.is_a?(Qpid::Proton::Handler::WrappedHandler)
- Cproton.pn_handler_dispatch(handler.impl, @impl, type.number)
- else
- result = Qpid::Proton::Event.dispatch(handler, type.method, self)
- if (result != "DELEGATED") && handler.respond_to?(:handlers) && handler.handlers
- handler.handlers.each do |hndlr|
- self.dispatch(hndlr)
- end
- end
- end
- end
-
- # @deprecated use {#container}
- def reactor
- deprecated __method__, :container
- end
-
- # @return container associated with this event
- attr_reader :container
-
- # Returns the transport for this event.
- #
- # @return [Transport, nil] The transport.
- #
- def transport
- Qpid::Proton::Transport.wrap(Cproton.pn_event_transport(@impl))
- end
-
- # Returns the Connection for this event.
- #
- # @return [Connection, nil] The connection.
- #
- def connection
- Qpid::Proton::Connection.wrap(Cproton.pn_event_connection(@impl))
- end
-
- # Returns the Session for this event.
- #
- # @return [Session, nil] The session
- #
- def session
- Qpid::Proton::Session.wrap(Cproton.pn_event_session(@impl))
- end
-
- # Returns the Link for this event.
- #
- # @return [Link, nil] The link.
- #
- def link
- Qpid::Proton::Link.wrap(Cproton.pn_event_link(@impl))
- end
-
- # Returns the Sender, or nil if there is no Link, associated with this
- # event if that link is a sender.
- #
- # @return [Sender, nil] The sender.
- #
- def sender
- return self.link if !self.link.nil? && self.link.sender?
- end
-
- # Returns the Receiver, or nil if there is no Link, associated with this
- # event if that link is a receiver.
- #
- # @return [Receiver, nil] The receiver.
- #
- def receiver
- return self.link if !self.link.nil? && self.link.receiver?
- end
-
- # Returns the Delivery associated with this event.
- #
- # @return [Delivery, nil] The delivery.
- #
- def delivery
- Qpid::Proton::Delivery.wrap(Cproton.pn_event_delivery(@impl))
- end
-
- # Sets the message.
- #
- # @param message [Qpid::Proton::Message] The message
- #
- def message=(message)
- @message = message
- end
-
- # Returns the message.
- #
- # @return [Qpid::Proton::Message] The message.
- #
- def message
- @message
- end
-
- # @private
- def to_s
- "#{self.type}(#{self.context})"
- end
-
- # @private
- def container=(c); @container = c; end
-
- # @return The remote error {Condition} or nil if there is none.
- def condition
- context.remote_condition if context.respond_to? :remote_condition
- end
- end
- end
-end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/event/event_base.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/event/event_base.rb b/proton-c/bindings/ruby/lib/event/event_base.rb
deleted file mode 100644
index 6ae6959..0000000
--- a/proton-c/bindings/ruby/lib/event/event_base.rb
+++ /dev/null
@@ -1,91 +0,0 @@
-#--
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#++
-
-module Qpid::Proton::Event
-
- # @private
- def self.dispatch(handler, method, *args)
- args = args.last unless args.nil?
- if handler.respond_to? method.to_sym
- return handler.__send__(method, args)
- elsif handler.respond_to? :on_unhandled
- return handler.__send__(:on_unhandled, method, args)
- end
- end
-
- # EventBase is the foundation for creating application-specific events.
- #
- # @example
- #
- # # SCENARIO: A continuation of the example in EventType.
- # #
- # # An Event class is defined to handle receiving encrypted
- # # data from a remote endpoint.
- #
- # class EncryptedDataEvent < EventBase
- # def initialize(message)
- # super(EncryptedDataEvent, message,
- # Qpid::Proton::Event::ENCRYPTED_RECV)
- # end
- # end
- #
- # # at another point, when encrypted data is received
- # msg = Qpid::Proton::Message.new
- # msg.decode(link.receive(link.pending))
- # if encrypted?(msg)
- # collector.put(EncryptedDataEvent.new(msg)
- # end
- #
- # @see EventType The EventType class for how ENCRYPTED_RECV was defined.
- #
- class EventBase
-
- # Returns the name for the class associated with this event.
- attr_reader :class_name
-
- # Returns the associated context object for the event.
- attr_reader :context
-
- # Returns the type of the event.
- attr_reader :type
-
- # Creates a new event with the specific class_name and context of the
- # specified type.
- #
- # @param class_name [String] The name of the class.
- # @param context [Object] The event context.
- # @param type [EventType] The event type.
- #
- def initialize(class_name, context, type)
- @class_name = class_name
- @context = context
- @type = type
- end
-
- # Invokes the type-specific method on the provided handler.
- #
- # @param handler [Object] The handler to be notified of this event.
- #
- def dispatch(handler)
- Qpid::Proton.dispatch(handler, @type.method, self)
- end
-
- end
-
-end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/event/event_type.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/event/event_type.rb b/proton-c/bindings/ruby/lib/event/event_type.rb
deleted file mode 100644
index aa5944d..0000000
--- a/proton-c/bindings/ruby/lib/event/event_type.rb
+++ /dev/null
@@ -1,71 +0,0 @@
-#--
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#++
-
-module Qpid::Proton::Event
-
- # Manages the association between an Event and the method which should
- # process on the context object associated with an occurance of the event.
- #
- # Each type is identified by a unique #type value.
- #
- # @example
- #
- # # SCENARIO: A part of an application handles extracting and decrypting
- # # data received from a remote endpoint.
- # #
- # # An EventType is created to notify handlers that such a
- # # situation has occurred.
- #
- # ENCRYPTED_RECV = 10000 # the unique constant value for the event
- #
- # # create a new event type which, when it occurs, invokes a method
- # # named :on_encrypted_data when a handler is notified of its occurrance
- # Qpid::Proton::Event::ENCRYPTED_RECV =
- # Qpid::Proton::Event::EventType.new(ENCRYPTED_RECV, :on_encrypted_data)
- #
- # @see EventBase EventBase for the rest of this example.
- # @see Qpid::Proton::Event::Event The Event class for more details on events.
- #
- class EventType
-
- # The method to invoke on any potential handler.
- attr_reader :method
- attr_reader :number
-
- def initialize(number, method)
- @number = number
- @name = Cproton.pn_event_type_name(@number)
- @method = method
- @@types ||= {}
- @@types[number] = self
- end
-
- # @private
- def to_s
- @name
- end
-
- # @private
- def self.by_type(type) # :nodoc:
- @@types[type]
- end
-
- end
-
-end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/handler/adapter.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/handler/adapter.rb b/proton-c/bindings/ruby/lib/handler/adapter.rb
new file mode 100644
index 0000000..2efa2b7
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/handler/adapter.rb
@@ -0,0 +1,157 @@
+#--
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#++
+
+# @private
+module Qpid::Proton::Handler
+
+ # @private
+ # Adapter to convert raw proton events to {#MessagingHandler} events
+ class Adapter
+
+ def self.try_convert(h) h.is_a?(Adapter) ? h : Adapter.new(h); end
+
+ def initialize handler
+ @handler = handler || MessagingHandler.new # Pick up default MH behavior
+ @opts = handler.respond_to?(:options) ? handler.options : {}
+ @opts[:prefetch] ||= 10
+ @opts[:peer_close_is_error] = false unless @opts.include? :peer_close_is_error
+ [:auto_accept, :auto_settle, :auto_open, :auto_close].each do |k|
+ @opts[k] = true unless @opts.include? k
+ end
+ end
+
+ def dispatch(method, event)
+ (@handler.__send__(method, event); true) if @handler.respond_to? method
+ end
+
+ def delegate(method, event)
+ event.method = method # Update the event with the new method
+ event.dispatch(@handler) || dispatch(:on_unhandled, event)
+ end
+ def delegate_error(method, event)
+ event.method = method
+ unless event.dispatch(@handler) # Default behaviour if not dispatched
+ dispatch(:on_error, event) || dispatch(:on_unhandled, event)
+ event.connection.close event.context.condition # Close the connection by default
+ end
+ end
+
+ # Define repetative on_xxx_open/close methods for each endpoint type
+ def self.open_close(endpoint)
+ on_opening = :"on_#{endpoint}_opening"
+ on_opened = :"on_#{endpoint}_opened"
+ on_closing = :"on_#{endpoint}_closing"
+ on_closed = :"on_#{endpoint}_closed"
+ on_error = :"on_#{endpoint}_error"
+
+ Module.new do
+ define_method(:"on_#{endpoint}_local_open") do |event|
+ delegate(on_opened, event) if event.context.remote_open?
+ end
+
+ define_method(:"on_#{endpoint}_remote_open") do |event|
+ if event.context.local_open?
+ delegate(on_opened, event)
+ elsif event.context.local_uninit?
+ delegate(on_opening, event)
+ event.context.open if @opts[:auto_open]
+ end
+ end
+
+ define_method(:"on_#{endpoint}_local_close") do |event|
+ delegate(on_closed, event) if event.context.remote_closed?
+ end
+
+ define_method(:"on_#{endpoint}_remote_close") do |event|
+ if event.context.remote_condition
+ delegate_error(on_error, event)
+ elsif event.context.local_closed?
+ delegate(on_closed, event)
+ elsif @opts[:peer_close_is_error]
+ Condition.assign(event.context.__send__(:_remote_condition), "unexpected peer close")
+ delegate_error(on_error, event)
+ else
+ delegate(on_closing, event)
+ end
+ event.context.close if @opts[:auto_close]
+ end
+ end
+ end
+ # Generate and include open_close modules for each endpoint type
+ [:connection, :session, :link].each { |endpoint| include open_close(endpoint) }
+
+ def on_transport_error(event) delegate_error(:on_transport_error, event); end
+ def on_transport_closed(event) delegate(:on_transport_closed, event); end
+
+ # Add flow control for link opening events
+ def on_link_local_open(event) super; add_credit(event); end
+ def on_link_remote_open(event) super; add_credit(event); end
+
+
+ def on_delivery(event)
+ d = event.delivery
+ if d.link.receiver? # Incoming message
+ if d.aborted?
+ delegate(:on_aborted, event)
+ d.settle
+ elsif d.message?
+ if d.link.local_closed? && @opts[:auto_accept]
+ d.release
+ else
+ begin
+ delegate(:on_message, event)
+ d.accept if @opts[:auto_accept]
+ rescue Qpid::Proton::Reject
+ d.reject
+ rescue Qpid::Proton::Release
+ d.release(true)
+ end
+ end
+ elsif d.updated? && d.settled?
+ delegate(:on_settled, event)
+ end
+ add_credit(event)
+ else # Outgoing message
+ if d.updated?
+ case d.remote_state
+ when Qpid::Proton::Delivery::ACCEPTED then delegate(:on_accepted, event)
+ when Qpid::Proton::Delivery::REJECTED then delegate(:on_rejected, event)
+ when Qpid::Proton::Delivery::RELEASED, Qpid::Proton::Delivery::MODIFIED then delegate(:on_released, event)
+ end
+ delegate(:on_settled, event) if d.settled?
+ d.settle if @opts[:auto_settle]
+ end
+ end
+ end
+
+ def on_link_flow(event)
+ add_credit(event)
+ l = event.link
+ delegate(:on_sendable, event) if l.sender? && l.open? && l.credit > 0
+ end
+
+ def add_credit(event)
+ r = event.receiver
+ prefetch = @opts[:prefetch]
+ if r && r.open? && (r.drained == 0) && prefetch && (prefetch > r.credit)
+ r.flow(prefetch - r.credit)
+ end
+ end
+ end
+end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/handler/c_adaptor.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/handler/c_adaptor.rb b/proton-c/bindings/ruby/lib/handler/c_adaptor.rb
deleted file mode 100644
index ef4852e..0000000
--- a/proton-c/bindings/ruby/lib/handler/c_adaptor.rb
+++ /dev/null
@@ -1,47 +0,0 @@
-#--
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#++
-
-module Qpid::Proton::Handler
-
- # @private
- class CAdaptor
-
- def initialize(handler, on_error = nil)
- @handler = handler
- @on_error = on_error
- end
-
- def dispatch(cevent, ctype)
- event = Qpid::Proton::Event::Event.wrap(cevent, ctype)
- # TODO add a variable to enable this programmatically
- # print "EVENT: #{event} going to #{@handler}\n"
- event.dispatch(@handler)
- end
-
- def exception(error)
- if @on_error.nil?
- raise error
- else
- @on_error.call(error)
- end
- end
-
- end
-
-end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/handler/c_flow_controller.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/handler/c_flow_controller.rb b/proton-c/bindings/ruby/lib/handler/c_flow_controller.rb
deleted file mode 100644
index 377cc2f..0000000
--- a/proton-c/bindings/ruby/lib/handler/c_flow_controller.rb
+++ /dev/null
@@ -1,33 +0,0 @@
-#--
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#++
-
-module Qpid::Proton::Handler
-
- # @private
- class CFlowController < Qpid::Proton::Handler::WrappedHandler
-
- include Qpid::Proton::Util::Wrapper
-
- def initialize(window = 1024)
- super(Cproton.pn_flowcontroller(window))
- end
-
- end
-
-end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b883393b/proton-c/bindings/ruby/lib/handler/endpoint_state_handler.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/handler/endpoint_state_handler.rb b/proton-c/bindings/ruby/lib/handler/endpoint_state_handler.rb
index 7f0c85b..98b8d3a 100644
--- a/proton-c/bindings/ruby/lib/handler/endpoint_state_handler.rb
+++ b/proton-c/bindings/ruby/lib/handler/endpoint_state_handler.rb
@@ -17,36 +17,25 @@
# under the License.
#++
+# @private
module Qpid::Proton::Handler
- # A utility that exposes endpoint events; i.e., the open/close of a link,
- # session or connection, in a more intuitive manner.
- #
- # A XXX_opened method will be called when both local and remote peers have
- # opened the link, session or connection. This can be used to confirm a
- # locally initiated action for example.
+ # Mixin to convert raw proton endpoint events to {#MessagingHandler} events
#
# A XXX_opening method will be called when the remote peer has requested
# an open that was not initiated locally. By default this will simply open
# locally, which then trigtgers the XXX_opened called.
#
+ # A XXX_opened method will be called when both local and remote peers have
+ # opened the link, session or connection. This can be used to confirm a
+ # locally initiated action for example.
+ #
# The same applies to close.
#
- class EndpointStateHandler
-
- def initialize(peer_close_is_error = false, delegate = nil)
- @delegate = delegate
- @peer_close_is_error = peer_close_is_error
- end
-
- def self.print_error(endpoint, endpoint_type)
- if !endpoint.remote_condition.nil?
- elsif self.local_endpoint?(endpoint) && endpoint.remote_closed?
- logging.error("#{endpoint_type} closed by peer")
- end
- end
+ module EndpointStateHandler
def on_link_remote_close(event)
+ super
if !event.link.remote_condition.nil?
self.on_link_error(event)
elsif event.link.local_closed?
@@ -58,6 +47,7 @@ module Qpid::Proton::Handler
end
def on_session_remote_close(event)
+ super
if !event.session.remote_condition.nil?
self.on_session_error(event)
elsif event.session.local_closed?
@@ -69,6 +59,7 @@ module Qpid::Proton::Handler
end
def on_connection_remote_close(event)
+ super
if !event.connection.remote_condition.nil?
self.on_connection_error(event)
elsif event.connection.local_closed?
@@ -80,10 +71,12 @@ module Qpid::Proton::Handler
end
def on_connection_local_open(event)
+ super
self.on_connection_opened(event) if event.connection.remote_active?
end
def on_connection_remote_open(event)
+ super
if event.connection.local_active?
self.on_connection_opened(event)
elsif event.connection.local_uninit?
@@ -93,10 +86,12 @@ module Qpid::Proton::Handler
end
def on_session_local_open(event)
+ super
self.on_session_opened(event) if event.session.remote_active?
end
def on_session_remote_open(event)
+ super
if !(event.session.state & Qpid::Proton::Endpoint::LOCAL_ACTIVE).zero?
self.on_session_opened(event)
elsif event.session.local_uninit?
@@ -106,10 +101,12 @@ module Qpid::Proton::Handler
end
def on_link_local_open(event)
+ super
self.on_link_opened(event) if event.link.remote_active?
end
def on_link_remote_open(event)
+ super
if event.link.local_active?
self.on_link_opened(event)
elsif event.link.local_uninit?
@@ -117,101 +114,5 @@ module Qpid::Proton::Handler
event.link.open
end
end
-
- def on_connection_opened(event)
- Qpid::Proton::Event.dispatch(@delegate, :on_connection_opened, event) if !@delegate.nil?
- end
-
- def on_session_opened(event)
- Qpid::Proton::Event.dispatch(@delegate, :on_session_opened, event) if !@delegate.nil?
- end
-
- def on_link_opened(event)
- Qpid::Proton::Event.dispatch(@delegate, :on_link_opened, event) if !@delegate.nil?
- end
-
- def on_connection_opening(event)
- Qpid::Proton::Event.dispatch(@delegate, :on_connection_opening, event) if !@delegate.nil?
- end
-
- def on_session_opening(event)
- Qpid::Proton::Event.dispatch(@delegate, :on_session_opening, event) if !@delegate.nil?
- end
-
- def on_link_opening(event)
- Qpid::Proton::Event.dispatch(@delegate, :on_link_opening, event) if !@delegate.nil?
- end
-
- def on_connection_error(event)
- if !@delegate.nil?
- Qpid::Proton::Event.dispatch(@delegate, :on_connection_error, event)
- else
- self.log_error(event.connection, "connection")
- end
- end
-
- def on_session_error(event)
- if !@delegate.nil?
- Qpid::Proton::Event.dispatch(@delegate, :on_session_error, event)
- else
- self.log_error(event.session, "session")
- event.connection.close
- end
- end
-
- def on_link_error(event)
- if !@delegate.nil?
- Qpid::Proton::Event.dispatch(@delegate, :on_link_error, event)
- else
- self.log_error(event.link, "link")
- event.conneciton.close
- end
- end
-
- def on_connection_closed(event)
- Qpid::Proton::Event.dispatch(@delegate, :on_connection_closed, event) if !@delegate.nil?
- end
-
- def on_session_closed(event)
- Qpid::Proton::Event.dispatch(@delegate, :on_session_closed, event) if !@delegate.nil?
- end
-
- def on_link_closed(event)
- Qpid::Proton::Event.dispatch(@delegate, :on_link_closed, event) if !@delegate.nil?
- end
-
- def on_connection_closing(event)
- if !@delegate.nil?
- Qpid::Proton::Event.dispatch(@delegate, :on_connection_closing, event)
- elsif @peer_close_is_error
- self.on_connection_error(event)
- end
- end
-
- def on_session_closing(event)
- if !@delegate.nil?
- Qpid::Proton::Event.dispatch(@delegate, :on_session_closing, event)
- elsif @peer_close_is_error
- self.on_session_error(event)
- end
- end
-
- def on_link_closing(event)
- if !@delegate.nil?
- Qpid::Proton::Event.dispatch(@delegate, :on_link_closing, event)
- elsif @peer_close_is_error
- self.on_link_error(event)
- end
- end
-
- def on_transport_tail_closed(event)
- self.on_transport_closed(event)
- end
-
- def on_transport_closed(event)
- Qpid::Proton::Event.dispatch(@delegate, :on_disconnected, event) if !@delegate.nil?
- end
-
end
-
end
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org