You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/03/23 20:30:18 UTC

qpid-proton git commit: PROTON-1778: [ruby] threaded broker example for thread safe work_queue

Repository: qpid-proton
Updated Branches:
  refs/heads/master c0d83996d -> 8694821e5


PROTON-1778: [ruby] threaded broker example for thread safe work_queue


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/8694821e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/8694821e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/8694821e

Branch: refs/heads/master
Commit: 8694821e5f71a534957c00f4857478d79906636d
Parents: c0d8399
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Mar 23 15:31:05 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 16:30:13 2018 -0400

----------------------------------------------------------------------
 examples/ruby/README.md                       |  36 ++--
 examples/ruby/broker.rb                       | 190 +++++++++++----------
 proton-c/bindings/ruby/README.rdoc            |  21 +++
 proton-c/bindings/ruby/lib/core/connection.rb |   3 +-
 proton-c/bindings/ruby/lib/core/container.rb  |   6 +-
 proton-c/bindings/ruby/lib/core/transfer.rb   |   3 +
 proton-c/bindings/ruby/lib/core/work_queue.rb |  25 ++-
 7 files changed, 175 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8694821e/examples/ruby/README.md
----------------------------------------------------------------------
diff --git a/examples/ruby/README.md b/examples/ruby/README.md
index 66f6b31..61e01ca 100644
--- a/examples/ruby/README.md
+++ b/examples/ruby/README.md
@@ -58,19 +58,35 @@ In this set of examples we see the following event occurring, in addition to wha
 
 ## Now About That Broker example
 
-The **broker.rb** example application is a nice demonstration of doing something more interesting in Ruby with Proton.
+The **broker.rb** example application is a nice demonstration of doing something more interesting in Ruby with Proton, and shows how to use multiple threads.
 
-The way the broker works is to listen to incoming connections, examine the components of the address for that connection, attach that connection to an exchange managing that address and then it sends any messages destined for that address to them.
+The broker listens for incoming connections and sender/receiver links. It uses the source and target address of senders and receivers to identify a queue. Messages from receivers go on the queue, and are sent via senders.
 
 The components of the broker example include:
- * **Broker** - A class that extends the MessagingHandler class. It accepts incoming connections, manages subscribing them to exchanges, and transfers messages between them.
- * **MessageQueue** - Distributes messages to subscriptions.
-
-The Broker manages a map connecting a queue address to the instance of Exchange that holds references to the endpoints of interest.
+ * **Broker** is a Listener::Handler that accepts connections, and manages the set of named queues.
+ * **BrokerHandler** extends MessagingHandler to accept incoming connections, senders and receivers and transfers messages between them and the Broker's queues.
+ * **MessageQueue** - A queue of messages that keeps track of waiting senders.
 
 The broker application demonstrates a new set of events:
 
- * **on_link_open** - Fired when a remote link is opened. From this event the broker grabs the address and subscribes the link to an exchange for that address.
- * **on_link_close** - Fired when a remote link is closed. From this event the broker grabs the address and unsubscribes the link from that exchange.
- * **on_connection_close** - Fired when a remote connection is closed but the local end is still open.
- * **on_transport_close** - Fired when the protocol transport has closed. The broker removes all links for the disconnected connection, avoiding workign with endpoints that are now gone.
+ * **on_sender_open** - Fired when a sender link is opened, the broker gets the address and starts sending messages from the corresponding queue.
+ * **on_sender_close** - Fired when a sender link is closed, remove the sender from the queue so no more messages are sent.
+ * **on_connection_close** - Fired when the remote connection is closes, close all senders.
+ * **on_transport_close** - Fired when the transport (socket) has closed, close all senders.
+
+It also demonstrates aspects of multi-threaded proton:
+
+ * **Thread safe MessageQueue** Uses a Mutex to make actions atomic when called concurrently.
+
+ * **Using WorkQueue** Proton objects like Sender are not thread safe.  They are
+   normally only used in MessagingHandler#on_ callbacks.  To request work from a
+   different thread you can add a code block to a WorkQueue, as shown in
+   MessageQueue#push.
+
+ * **Listener::Handler** The broker creates a new BrokerHandler instance for
+   each accepted connection. The container ensures that calls on each handler instance
+   are serialized even if there are multiple threads in the container.
+
+ * **Calling Container#run in multiple threads** The Container uses threads that call
+   #run as a thread pool to dispatch calls to MessagingHandler instances. Even
+   if there are multiple threads, calls to handler instance are serialized.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8694821e/examples/ruby/broker.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/broker.rb b/examples/ruby/broker.rb
index 9a023ba..8dbfe81 100644
--- a/examples/ruby/broker.rb
+++ b/examples/ruby/broker.rb
@@ -21,139 +21,142 @@ require 'qpid_proton'
 require 'optparse'
 require 'pathname'
 
+# Thread safe message queue that notifies waiting senders when messages arrive.
 class MessageQueue
 
-  def initialize(dynamic = false)
-    @dynamic = dynamic
-    @queue = Queue.new
-    @consumers = []
-  end
-
-  def subscribe(consumer)
-    @consumers << (consumer)
-  end
-
-  def unsubscribe(consumer)
-    if @consumers.include?(consumer)
-      @consumers.delete(consumer)
+  def initialize
+    @lock = Mutex.new           # Make ations on the queue atomic
+    @messages = []              # Messages on the queue
+    @waiting = []               # Senders that are waiting for messages
+  end
+
+  # Push a message onto the queue and notify any waiting senders
+  def push(message)
+    @lock.synchronize do
+      @messages << message
+      unless @waiting.empty?    # Notify waiting senders
+        # NOTE: the call to self.send_to is added to the sender's work_queue,
+        # and will be executed in the sender's thread
+        @waiting.each { |s| s.work_queue.add { self.send_to(s); } }
+        @waiting.clear
+      end
     end
-    @consumers.empty? && (@dynamic || @queue.empty?)
-  end
-
-  def publish(message)
-    @queue << message
-    self.dispatch
   end
 
-  def dispatch(consumer = nil)
-    if consumer
-      c = [consumer]
-    else
-      c = @consumers
-    end
-
-    while self.deliver_to(c) do
+  # Pop a message off the queue.
+  # If no messages available, record sender as waiting and return nil.
+  def pop(sender)
+    @lock.synchronize do
+      if @messages.empty?
+        @waiting << sender
+        nil
+      else
+        @messages.shift
+      end
     end
   end
 
-  def deliver_to(consumers)
-    result = false
-    consumers.each do |consumer|
-      if consumer.credit > 0 && !@queue.empty?
-        consumer.send(@queue.pop(true))
-        result = true
-      end
+  # NOTE: Called in sender's thread.
+  # Pull messages from the queue as long as sender has credit.
+  # If queue runs out of messages, record sender as waiting.
+  def send_to(sender)
+    while sender.credit > 0 && (message = pop(sender))
+      sender.send(message)
     end
-    return result
   end
 
+  def forget(sender)
+    @lock.synchronize { @waiting.delete(sender) }
+  end
 end
 
-class Broker < Qpid::Proton::MessagingHandler
-
-  def initialize(url)
-    super()
-    @url = url
-    @queues = {}
-    begin          # Optional SSL setup, ignore if we don't find cert files etc.
-      @ssl_domain = Qpid::Proton::SSLDomain.new(Qpid::Proton::SSLDomain::MODE_SERVER)
-      cert_passsword = "tserverpw"
-      if Gem.win_platform?       # Use P12 certs for windows schannel
-        @ssl_domain.credentials("ssl_certs/tserver-certificate.p12", "", cert_passsword)
-      else
-        @ssl_domain.credentials("ssl_certs/tserver-certificate.pem", "ssl_certs/tserver-private-key.pem", cert_passsword)
-      end
-      @ssl_domain.allow_unsecured_client # SSL is optional, this is not secure.
-    rescue
-      @ssl_domain = nil # Don't worry if we can't set up SSL.
-    end
-  end
 
-  def on_container_start(container)
-    # Options for incoming connections, provide SSL configuration if we have it.
-    opts = {:ssl_domain => @ssl_domain} if @ssl_domain
-    @listener = container.listen(@url, Qpid::Proton::Listener::Handler.new(opts))
-    STDOUT.puts "Listening on #{@url.inspect}"; STDOUT.flush
-  end
+# Handler for broker connections. In a multi-threaded application you should
+# normally create a separate handler instance for each connection.
+class BrokerHandler < Qpid::Proton::MessagingHandler
 
-  def queue(address)
-    unless @queues.has_key?(address)
-      @queues[address] = MessageQueue.new
-    end
-    @queues[address]
+  def initialize(broker)
+    @broker = broker
   end
 
   def on_sender_open(sender)
     if sender.remote_source.dynamic?
-      address = SecureRandom.uuid
-      sender.source.address = address
-      q = MessageQueue.new(true)
-      @queues[address] = q
-      q.subscribe(sender)
+      sender.source.address = SecureRandom.uuid
     elsif sender.remote_source.address
       sender.source.address = sender.remote_source.address
-      self.queue(sender.source.address).subscribe(sender)
+    else
+      sender.connection.close("no source address")
+      return
     end
+    q = @broker.queue(sender.source.address)
+    q.send_to(sender)
   end
 
   def on_receiver_open(receiver)
     if receiver.remote_target.address
       receiver.target.address = receiver.remote_target.address
-    end
-  end
-
-  def unsubscribe(link)
-    if @queues.has_key?(link.source.address)
-      if @queues[link.source.address].unsubscribe(link)
-        @queues.delete(link.source.address)
-      end
+    else
+      receiver.connection.close("no target address")
     end
   end
 
   def on_sender_close(sender)
-    self.unsubscribe(sender)
+    q = @broker.queue(sender.source.address)
+    q.forget(sender) if q
   end
 
   def on_connection_close(connection)
-    self.remove_stale_consumers(connection)
+    connection.each_sender { |s| on_sender_close(s) }
   end
 
   def on_transport_close(transport)
-    self.remove_stale_consumers(transport.connection)
-  end
-
-  def remove_stale_consumers(connection)
-    connection.each_sender { |s| unsubscribe(s) }
+    transport.connection.each_sender { |s| on_sender_close(s) }
   end
 
   def on_sendable(sender)
-    q = self.queue(sender.source.address)
-    q.dispatch(sender)
+    @broker.queue(sender.source.address).send_to(sender)
   end
 
   def on_message(delivery, message)
-    q = self.queue(delivery.link.target.address)
-    q.publish(message)
+    @broker.queue(delivery.receiver.target.address).push(message)
+  end
+end
+
+# Broker manages the queues and accepts incoming connections.
+class Broker < Qpid::Proton::Listener::Handler
+
+  def initialize
+    @queues = {}
+    @connection_options = {}
+    ssl_setup
+  end
+
+  def ssl_setup
+    # Optional SSL setup
+    ssl = Qpid::Proton::SSLDomain.new(Qpid::Proton::SSLDomain::MODE_SERVER)
+    cert_passsword = "tserverpw"
+    if Gem.win_platform?       # Use P12 certs for windows schannel
+      ssl.credentials("ssl_certs/tserver-certificate.p12", "", cert_passsword)
+    else
+      ssl.credentials("ssl_certs/tserver-certificate.pem", "ssl_certs/tserver-private-key.pem", cert_passsword)
+    end
+    ssl.allow_unsecured_client # SSL is optional, this is not secure.
+    @connection_options[:ssl_domain] = ssl if ssl
+  rescue
+    # Don't worry if we can't set up SSL.
+  end
+
+  def on_open(l)
+    STDOUT.puts "Listening on #{l}\n"; STDOUT.flush
+  end
+
+  # Create a new BrokerHandler instance for each connection we accept
+  def on_accept(l)
+    { :handler => BrokerHandler.new(self) }.update(@connection_options)
+  end
+
+  def queue(address)
+    @queues[address] ||= MessageQueue.new
   end
 
 end
@@ -164,4 +167,9 @@ Start an example broker listening on URL"
   return 1
 end
 url, = ARGV
-Qpid::Proton::Container.new(Broker.new(url)).run
+container = Qpid::Proton::Container.new
+container.listen(url, Broker.new)
+
+# Run the container in multiple threads.
+threads = 4.times.map { Thread.new {  container.run }}
+threads.each { |t| t.join }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8694821e/proton-c/bindings/ruby/README.rdoc
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/README.rdoc b/proton-c/bindings/ruby/README.rdoc
index 5eec00f..ab623bb 100644
--- a/proton-c/bindings/ruby/README.rdoc
+++ b/proton-c/bindings/ruby/README.rdoc
@@ -118,4 +118,25 @@ connection and link can be reestablished) but it is possible for multiple copies
 of the same message are delivered, so the receiver must be aware of that. This
 is known as _at_least_once_ reliability.
 
+== Multi-threaded applications
 
+{Qpid::Proton::Container#run} can be called by multiple threads concurrently,
+giving the container a thread-pool to execute handler methods in parallel.
+
+Instances of {Qpid::Proton::Connection} and objects associated with it
+({Qpid::Proton::Session}, {Qpid::Proton::Sender}, {Qpid::Proton::Receiver},
+{Qpid::Proton::Delivery}, {Qpid::Proton::Tracker}) are not thread-safe and must
+be used correctly when multiple threads call {Qpid::Proton::Container#run}
+
+Calls to {Qpid::Proton::MessagingHandler} and {Qpid::Proton::Listener::Handler}
+methods by the {Qpid::Proton::Container} are automatically serialized for each
+connection instance.
+
+Other threads may have code similarly serialized by adding it to the
+{Qpid::Proton::Connection#work_queue} for the connection.  Each object related
+to a {Qpid::Proton::Connection} also provides a +work_queue+ method.
+
+You also need to use the {Qpid::Proton::WorkQueue} to communicate between a
+{Qpid::Proton::MessagingHandler} method call for one connection instance, and a
+different {Qpid::Proton::Connection} instance in the same container, as separate
+connections can be processed in parallel.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8694821e/proton-c/bindings/ruby/lib/core/connection.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection.rb b/proton-c/bindings/ruby/lib/core/connection.rb
index c0d161e..dc2590f 100644
--- a/proton-c/bindings/ruby/lib/core/connection.rb
+++ b/proton-c/bindings/ruby/lib/core/connection.rb
@@ -288,8 +288,7 @@ module Qpid::Proton
       @link_prefix + "/" +  (@link_count += 1).to_s(32)
     end
 
-    # @return [WorkQueue] work queue for code that should be run in the thread
-    # context for this connection
+    # @return [WorkQueue] work queue to execute code serialized correctly for this connection
     attr_reader :work_queue
 
     protected

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8694821e/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index 78f8013..85dbe69 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -59,6 +59,7 @@ module Qpid::Proton
       when 1 then
         @id = String.try_convert(args[0]) || (args[0].to_s if args[0].is_a? Symbol)
         @handler = args[0] unless @id
+      when 0 then
       else raise ArgumentError, "wrong number of arguments (given #{args.size}, expected 0..2"
       end
       # Use an empty messaging adapter to give default behaviour if there's no global handler.
@@ -169,10 +170,11 @@ module Qpid::Proton
 
     # Run the container: wait for IO activity, dispatch events to handlers.
     #
-    # *Multiple threads* : More than one thread can call {#run} concurrently,
+    # *Multi-threaading* : More than one thread can call {#run} concurrently,
     # the container will use all {#run} threads as a thread pool. Calls to
     # {MessagingHandler} or {Listener::Handler} methods are serialized for each
-    # connection or listener, even if the container has multiple threads.
+    # connection or listener. See {WorkQueue} for coordinating with other
+    # threads.
     #
     # *Exceptions*: If any handler method raises an exception it will stop the
     # container, and the exception will be raised by all calls to {#run}. For

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8694821e/proton-c/bindings/ruby/lib/core/transfer.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/transfer.rb b/proton-c/bindings/ruby/lib/core/transfer.rb
index 4fc21a9..53adf69 100644
--- a/proton-c/bindings/ruby/lib/core/transfer.rb
+++ b/proton-c/bindings/ruby/lib/core/transfer.rb
@@ -88,6 +88,9 @@ module Qpid::Proton
     # @return [Transport] The parent connection's transport.
     def transport() self.connection.transport; end
 
+    # @return [WorkQueue] The parent connection's work-queue.
+    def work_queue() self.connection.work_queue; end
+
     # @deprecated internal use only
     proton_caller :writable?
     # @deprecated internal use only

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8694821e/proton-c/bindings/ruby/lib/core/work_queue.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/work_queue.rb b/proton-c/bindings/ruby/lib/core/work_queue.rb
index d08d883..d205a13 100644
--- a/proton-c/bindings/ruby/lib/core/work_queue.rb
+++ b/proton-c/bindings/ruby/lib/core/work_queue.rb
@@ -17,10 +17,24 @@
 
 module Qpid::Proton
 
-  # A queue of work items to be executed, possibly in a different thread.
+  # A thread-safe queue of work for multi-threaded programs.
+  #
+  # Instances of {Connection} and objects associated with it ({Session}, {Sender},
+  # {Receiver}, {Delivery}, {Tracker}) are not thread-safe and must be
+  # used correctly when multiple threads call {Container#run}
+  #
+  # Calls to {MessagingHandler} methods by the {Container} are automatically
+  # serialized for each connection instance. Other threads may have code
+  # similarly serialized by adding it to the {Connection#work_queue} for the
+  # connection.  Each object related to a {Connection} also provides a
+  # +work_queue+ method.
+  #
   class WorkQueue
 
-    # Add code to be executed by the WorkQueue immediately.
+    # Add code to be executed in series with other {Container} operations on the
+    # work queue's owner. The code will be executed as soon as possible.
+    #
+    # @note Thread Safe: may be called in any thread.
     # @param non_block [Boolean] if true raise {ThreadError} if the operation would block.
     # @yield [ ] the block will be invoked with no parameters in the {WorkQueue} context,
     #  which may be a different thread.
@@ -32,9 +46,12 @@ module Qpid::Proton
       @container.send :wake
     end
 
-    # Schedule work to be executed by the WorkQueue after a delay.
-    # Note that tasks scheduled after the WorkQueue closes will be silently dropped
+    # Schedule code to be executed after +delay+ seconds in series with other
+    # {Container} operations on the work queue's owner.
+    #
+    # Work scheduled for after the {WorkQueue} has closed will be silently dropped.
     #
+    # @note (see #add)
     # @param delay delay in seconds until the block is added to the queue.
     # @param (see #add)
     # @yield (see #add)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org