You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mc...@apache.org on 2015/06/18 22:30:30 UTC
[13/32] qpid-proton git commit: PROTON-781: Added Container to the
Ruby reactive APIs.
PROTON-781: Added Container to the Ruby reactive APIs.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b7ee18cd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b7ee18cd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b7ee18cd
Branch: refs/heads/master
Commit: b7ee18cdbcef119310927a527cda05e9ef4dca37
Parents: f9b4d3e
Author: Darryl L. Pierce <mc...@gmail.com>
Authored: Thu Feb 26 10:26:11 2015 -0500
Committer: Darryl L. Pierce <mc...@gmail.com>
Committed: Thu Jun 18 16:28:44 2015 -0400
----------------------------------------------------------------------
proton-c/bindings/ruby/lib/qpid_proton.rb | 1 +
proton-c/bindings/ruby/lib/reactor/container.rb | 272 +++++++++++++++++++
2 files changed, 273 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b7ee18cd/proton-c/bindings/ruby/lib/qpid_proton.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb
index f40c608..ba1e66e 100644
--- a/proton-c/bindings/ruby/lib/qpid_proton.rb
+++ b/proton-c/bindings/ruby/lib/qpid_proton.rb
@@ -106,6 +106,7 @@ require "reactor/urls"
require "reactor/connector"
require "reactor/backoff"
require "reactor/session_per_connection"
+require "reactor/container"
module Qpid::Proton
# @private
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b7ee18cd/proton-c/bindings/ruby/lib/reactor/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/reactor/container.rb b/proton-c/bindings/ruby/lib/reactor/container.rb
new file mode 100644
index 0000000..93b49bb
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/reactor/container.rb
@@ -0,0 +1,272 @@
+#--
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#++
+
+module Qpid::Proton::Reactor
+
+ # @private
+ class InternalTransactionHandler < Qpid::Proton::Handler::OutgoingMessageHandler
+
+ def initialize
+ super
+ end
+
+ def on_settled(event)
+ if event.delivery.respond_to? :transaction
+ event.transaction = event.delivery.transaction
+ event.delivery.transaction.handle_outcome(event)
+ end
+ end
+
+ end
+
+
+ # A representation of the AMQP concept of a container which, loosely
+ # speaking, is something that establishes links to or from another
+ # container on which messages are transferred.
+ #
+ # This is an extension to the Reactor classthat adds convenience methods
+ # for creating instances of Qpid::Proton::Connection, Qpid::Proton::Sender
+ # and Qpid::Proton::Receiver.
+ #
+ # @example
+ #
+ class Container < Reactor
+
+ include Qpid::Proton::Util::Reactor
+
+ include Qpid::Proton::Util::UUID
+
+ attr_accessor :container_id
+ attr_accessor :global_handler
+
+ def initialize(handlers, options = {})
+ super(handlers, options)
+
+ # only do the following if we're creating a new instance
+ if !options.has_key?(:impl)
+ @ssl = SSLConfig.new
+ if options[:global_handler]
+ self.global_handler = GlobalOverrides.new(options[:global_handler])
+ else
+ # very ugly, but using self.global_handler doesn't work in the constructor
+ ghandler = Reactor.instance_method(:global_handler).bind(self).call
+ ghandler = GlobalOverrides.new(ghandler)
+ Reactor.instance_method(:global_handler=).bind(self).call(ghandler)
+ end
+ @trigger = nil
+ @container_id = generate_uuid
+ end
+ end
+
+ # Initiates the establishment of an AMQP connection.
+ #
+ # @param options [Hash] A hash of named arguments.
+ #
+ def connect(options = {})
+ conn = self.connection(options[:handler])
+ conn.container = self.container_id || generate_uuid
+ connector = Connector.new(conn)
+ conn.overrides = connector
+ if !options[:url].nil?
+ connector.address = URLs.new([options[:url]])
+ elsif !options[:urls].nil?
+ connector.address = URLs.new(options[:urls])
+ elsif !options[:address].nil?
+ connector.address = URLs.new([Qpid::Proton::URL.new(options[:address])])
+ else
+ raise ArgumentError.new("either :url or :urls or :address required")
+ end
+
+ connector.heartbeat = options[:heartbeat] if !options[:heartbeat].nil?
+ if !options[:reconnect].nil?
+ connector.reconnect = options[:reconnect]
+ else
+ connector.reconnect = Backoff.new()
+ end
+
+ connector.ssl_domain = SessionPerConnection.new # TODO seems this should be configurable
+
+ conn.open
+
+ return conn
+ end
+
+ def _session(context)
+ if context.is_a?(Qpid::Proton::URL)
+ return self._session(self.connect(:url => context))
+ elsif context.is_a?(Qpid::Proton::Session)
+ return context
+ elsif context.is_a?(Qpid::Proton::Connection)
+ if context.session_policy?
+ return context.session_policy.session(context)
+ else
+ return self.create_session(context)
+ end
+ else
+ return context.session
+ end
+ end
+
+ # Initiates the establishment of a link over which messages can be sent.
+ #
+ # @param context [String, URL] The context.
+ # @param opts [Hash] Additional options.
+ # @param opts [String, Qpid::Proton::URL] The target address.
+ # @param opts [String] :source The source address.
+ # @param opts [Boolean] :dynamic
+ # @param opts [Object] :handler
+ # @param opts [Object] :tag_generator The tag generator.
+ # @param opts [Hash] :options Addtional link options
+ #
+ # @return [Sender] The sender.
+ #
+ def create_sender(context, opts = {})
+ if context.is_a?(::String)
+ context = Qpid::Proton::URL.new(context)
+ end
+
+ target = opts[:target]
+ if context.is_a?(Qpid::Proton::URL) && target.nil?
+ target = context.path
+ end
+
+ session = self._session(context)
+
+ sender = session.sender(opts[:name] ||
+ id(session.connection.container,
+ target, opts[:source]))
+ sender.source.address = opts[:source] if !opts[:source].nil?
+ sender.target.address = target if target
+ sender.handler = opts[:handler] if !opts[:handler].nil?
+ sender.tag_generator = opts[:tag_generator] if !opts[:tag_gnenerator].nil?
+ self._apply_link_options(opts[:options], sender)
+ sender.open
+ return sender
+ end
+
+ # Initiates the establishment of a link over which messages can be received.
+ #
+ # There are two accepted arguments for the context
+ # 1. If a Connection is supplied then the link is established using that
+ # object. The source, and optionally the target, address can be supplied
+ # 2. If it is a String or a URL then a new Connection is created on which
+ # the link will be attached. If a path is specified, but not the source
+ # address, then the path of the URL is used as the target address.
+ #
+ # The name will be generated for the link if one is not specified.
+ #
+ # @param context [Connection, URL, String] The connection or the address.
+ # @param opts [Hash] Additional otpions.
+ # @option opts [String, Qpid::Proton::URL] The source address.
+ # @option opts [String] :target The target address
+ # @option opts [String] :name The link name.
+ # @option opts [Boolean] :dynamic
+ # @option opts [Object] :handler
+ # @option opts [Hash] :options Additional link options.
+ #
+ # @return [Receiver
+ #
+ def create_receiver(context, opts = {})
+ if context.is_a?(::String)
+ context = Qpid::Proton::URL.new(context)
+ end
+
+ source = opts[:source]
+ if context.is_a?(Qpid::Proton::URL) && source.nil?
+ source = context.path
+ end
+
+ session = self._session(context)
+
+ receiver = session.receiver(opts[:name] ||
+ id(session.connection.container,
+ source, opts[:target]))
+ receiver.source.address = source if source
+ receiver.source.dynamic = true if opts.has_key?(:dynamic) && opts[:dynamic]
+ receiver.target.address = opts[:target] if !opts[:target].nil?
+ receiver.handler = opts[:handler] if !opts[:handler].nil?
+ self._apply_link_options(opts[:options], receiver)
+ receiver.open
+ return receiver
+ end
+
+ def declare_transaction(context, handler = nil, settle_before_discharge = false)
+ if context.respond_to? :txn_ctl && !context.__send__(:txn_ctl).nil?
+ class << context
+ attr_accessor :txn_ctl
+ end
+ context.txn_ctl = self.create_sender(context, nil, "txn-ctl",
+ InternalTransactionHandler.new())
+ end
+ return Transaction.new(context.txn_ctl, handler, settle_before_discharge)
+ end
+
+ # Initiates a server socket, accepting incoming AMQP connections on the
+ # interface and port specified.
+ #
+ # @param url []
+ # @param ssl_domain []
+ #
+ def listen(url, ssl_domain = nil)
+ url = Qpid::Proton::URL.new(url)
+ acceptor = self.acceptor(url.host, url.port)
+ ssl_config = ssl_domain
+ if ssl_config.nil? && (url.scheme == 'amqps') && @ssl
+ ssl_config = @ssl.server
+ end
+ if !ssl_config.nil?
+ acceptor.ssl_domain(ssl_config)
+ end
+ return acceptor
+ end
+
+ def do_work(timeout = nil)
+ self.timeout = timeout unless timeout.nil?
+ self.process
+ end
+
+ def id(container, remote, local)
+ if !local.nil? && !remote.nil?
+ "#{container}-#{remote}-#{local}"
+ elsif !local.nil?
+ "#{container}-#{local}"
+ elsif !remote.nil?
+ "#{container}-#{remote}"
+ else
+ "#{container}-#{generate_uuid}"
+ end
+ end
+
+ def _apply_link_options(options, link)
+ if !options.nil? && !options.empty?
+ if !options.is_a?(::List)
+ options = [Options].flatten
+ end
+
+ options.each {|option| o.apply(link) if o.test(link)}
+ end
+ end
+
+ def to_s
+ "#{self.class}<@impl=#{Cproton.pni_address_of(@impl)}>"
+ end
+
+ end
+
+end
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org