You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mc...@apache.org on 2015/06/08 20:13:53 UTC

[24/31] qpid-proton git commit: PROTON-781: Added Container to the Ruby reactive APIs.

PROTON-781: Added Container to the Ruby reactive APIs.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b8ba5acb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b8ba5acb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b8ba5acb

Branch: refs/heads/PROTON-781-ruby-reactor-apis
Commit: b8ba5acb285d8f540dbf2226bb97cb564e8c09ff
Parents: aa8222e
Author: Darryl L. Pierce <mc...@gmail.com>
Authored: Thu Feb 26 10:26:11 2015 -0500
Committer: Darryl L. Pierce <mc...@gmail.com>
Committed: Mon Jun 8 13:57:52 2015 -0400

----------------------------------------------------------------------
 proton-c/bindings/ruby/lib/qpid_proton.rb       |   1 +
 proton-c/bindings/ruby/lib/reactor/container.rb | 272 +++++++++++++++++++
 2 files changed, 273 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b8ba5acb/proton-c/bindings/ruby/lib/qpid_proton.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb
index f40c608..ba1e66e 100644
--- a/proton-c/bindings/ruby/lib/qpid_proton.rb
+++ b/proton-c/bindings/ruby/lib/qpid_proton.rb
@@ -106,6 +106,7 @@ require "reactor/urls"
 require "reactor/connector"
 require "reactor/backoff"
 require "reactor/session_per_connection"
+require "reactor/container"
 
 module Qpid::Proton
   # @private

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b8ba5acb/proton-c/bindings/ruby/lib/reactor/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/reactor/container.rb b/proton-c/bindings/ruby/lib/reactor/container.rb
new file mode 100644
index 0000000..93b49bb
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/reactor/container.rb
@@ -0,0 +1,272 @@
+#--
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#++
+
+module Qpid::Proton::Reactor
+
+  # @private
+  class InternalTransactionHandler < Qpid::Proton::Handler::OutgoingMessageHandler
+
+    def initialize
+      super
+    end
+
+    def on_settled(event)
+      if event.delivery.respond_to? :transaction
+        event.transaction = event.delivery.transaction
+        event.delivery.transaction.handle_outcome(event)
+      end
+    end
+
+  end
+
+
+  # A representation of the AMQP concept of a container which, loosely
+  # speaking, is something that establishes links to or from another
+  # container on which messages are transferred.
+  #
+  # This is an extension to the Reactor classthat adds convenience methods
+  # for creating instances of Qpid::Proton::Connection, Qpid::Proton::Sender
+  # and Qpid::Proton::Receiver.
+  #
+  # @example
+  #
+  class Container < Reactor
+
+    include Qpid::Proton::Util::Reactor
+
+    include Qpid::Proton::Util::UUID
+
+    attr_accessor :container_id
+    attr_accessor :global_handler
+
+    def initialize(handlers, options = {})
+      super(handlers, options)
+
+      # only do the following if we're creating a new instance
+      if !options.has_key?(:impl)
+        @ssl = SSLConfig.new
+        if options[:global_handler]
+          self.global_handler = GlobalOverrides.new(options[:global_handler])
+        else
+          # very ugly, but using self.global_handler doesn't work in the constructor
+          ghandler = Reactor.instance_method(:global_handler).bind(self).call
+          ghandler = GlobalOverrides.new(ghandler)
+          Reactor.instance_method(:global_handler=).bind(self).call(ghandler)
+        end
+        @trigger = nil
+        @container_id = generate_uuid
+      end
+    end
+
+    # Initiates the establishment of an AMQP connection.
+    #
+    # @param options [Hash] A hash of named arguments.
+    #
+    def connect(options = {})
+      conn = self.connection(options[:handler])
+      conn.container = self.container_id || generate_uuid
+      connector = Connector.new(conn)
+      conn.overrides = connector
+      if !options[:url].nil?
+        connector.address = URLs.new([options[:url]])
+      elsif !options[:urls].nil?
+        connector.address = URLs.new(options[:urls])
+      elsif !options[:address].nil?
+        connector.address = URLs.new([Qpid::Proton::URL.new(options[:address])])
+      else
+        raise ArgumentError.new("either :url or :urls or :address required")
+      end
+
+      connector.heartbeat = options[:heartbeat] if !options[:heartbeat].nil?
+      if !options[:reconnect].nil?
+        connector.reconnect = options[:reconnect]
+      else
+        connector.reconnect = Backoff.new()
+      end
+
+      connector.ssl_domain = SessionPerConnection.new # TODO seems this should be configurable
+
+      conn.open
+
+      return conn
+    end
+
+    def _session(context)
+      if context.is_a?(Qpid::Proton::URL)
+        return self._session(self.connect(:url => context))
+      elsif context.is_a?(Qpid::Proton::Session)
+        return context
+      elsif context.is_a?(Qpid::Proton::Connection)
+        if context.session_policy?
+          return context.session_policy.session(context)
+        else
+          return self.create_session(context)
+        end
+      else
+        return context.session
+      end
+    end
+
+    # Initiates the establishment of a link over which messages can be sent.
+    #
+    # @param context [String, URL] The context.
+    # @param opts [Hash] Additional options.
+    # @param opts [String, Qpid::Proton::URL] The target address.
+    # @param opts [String] :source The source address.
+    # @param opts [Boolean] :dynamic
+    # @param opts [Object] :handler
+    # @param opts [Object] :tag_generator The tag generator.
+    # @param opts [Hash] :options Addtional link options
+    #
+    # @return [Sender] The sender.
+    #
+    def create_sender(context, opts = {})
+      if context.is_a?(::String)
+        context = Qpid::Proton::URL.new(context)
+      end
+
+      target = opts[:target]
+      if context.is_a?(Qpid::Proton::URL) && target.nil?
+        target = context.path
+      end
+
+      session = self._session(context)
+
+      sender = session.sender(opts[:name] ||
+                              id(session.connection.container,
+                                target, opts[:source]))
+        sender.source.address = opts[:source] if !opts[:source].nil?
+        sender.target.address = target if target
+        sender.handler = opts[:handler] if !opts[:handler].nil?
+        sender.tag_generator = opts[:tag_generator] if !opts[:tag_gnenerator].nil?
+        self._apply_link_options(opts[:options], sender)
+        sender.open
+        return sender
+    end
+
+    # Initiates the establishment of a link over which messages can be received.
+    #
+    # There are two accepted arguments for the context
+    #  1. If a Connection is supplied then the link is established using that
+    # object. The source, and optionally the target, address can be supplied
+    #  2. If it is a String or a URL then a new Connection is created on which
+    # the link will be attached. If a path is specified, but not the source
+    # address, then the path of the URL is used as the target address.
+    #
+    # The name will be generated for the link if one is not specified.
+    #
+    # @param context [Connection, URL, String] The connection or the address.
+    # @param opts [Hash] Additional otpions.
+    # @option opts [String, Qpid::Proton::URL] The source address.
+    # @option opts [String] :target The target address
+    # @option opts [String] :name The link name.
+    # @option opts [Boolean] :dynamic
+    # @option opts [Object] :handler
+    # @option opts [Hash] :options Additional link options.
+    #
+    # @return [Receiver
+    #
+    def create_receiver(context, opts = {})
+      if context.is_a?(::String)
+        context = Qpid::Proton::URL.new(context)
+      end
+
+      source = opts[:source]
+      if context.is_a?(Qpid::Proton::URL) && source.nil?
+        source = context.path
+      end
+
+      session = self._session(context)
+
+      receiver = session.receiver(opts[:name] ||
+                                  id(session.connection.container,
+                                      source, opts[:target]))
+      receiver.source.address = source if source
+      receiver.source.dynamic = true if opts.has_key?(:dynamic) && opts[:dynamic]
+      receiver.target.address = opts[:target] if !opts[:target].nil?
+      receiver.handler = opts[:handler] if !opts[:handler].nil?
+      self._apply_link_options(opts[:options], receiver)
+      receiver.open
+      return receiver
+    end
+
+    def declare_transaction(context, handler = nil, settle_before_discharge = false)
+      if context.respond_to? :txn_ctl && !context.__send__(:txn_ctl).nil?
+        class << context
+          attr_accessor :txn_ctl
+        end
+        context.txn_ctl = self.create_sender(context, nil, "txn-ctl",
+        InternalTransactionHandler.new())
+      end
+      return Transaction.new(context.txn_ctl, handler, settle_before_discharge)
+    end
+
+    # Initiates a server socket, accepting incoming AMQP connections on the
+    # interface and port specified.
+    #
+    # @param url []
+    # @param ssl_domain []
+    #
+    def listen(url, ssl_domain = nil)
+      url = Qpid::Proton::URL.new(url)
+      acceptor = self.acceptor(url.host, url.port)
+      ssl_config = ssl_domain
+      if ssl_config.nil? && (url.scheme == 'amqps') && @ssl
+        ssl_config = @ssl.server
+      end
+      if !ssl_config.nil?
+        acceptor.ssl_domain(ssl_config)
+      end
+      return acceptor
+    end
+
+    def do_work(timeout = nil)
+      self.timeout = timeout unless timeout.nil?
+      self.process
+    end
+
+    def id(container, remote, local)
+      if !local.nil? && !remote.nil?
+        "#{container}-#{remote}-#{local}"
+      elsif !local.nil?
+        "#{container}-#{local}"
+      elsif !remote.nil?
+        "#{container}-#{remote}"
+      else
+        "#{container}-#{generate_uuid}"
+      end
+    end
+
+    def _apply_link_options(options, link)
+      if !options.nil? && !options.empty?
+        if !options.is_a?(::List)
+          options = [Options].flatten
+        end
+
+        options.each {|option| o.apply(link) if o.test(link)}
+      end
+    end
+
+    def to_s
+      "#{self.class}<@impl=#{Cproton.pni_address_of(@impl)}>"
+    end
+
+  end
+
+end


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org