You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/11/21 17:51:36 UTC

qpid-proton git commit: PROTON-1636: Fix Container shut-down races

Repository: qpid-proton
Updated Branches:
  refs/heads/master b7d5c9eb5 -> 235f0a8aa


PROTON-1636: Fix Container shut-down races

- Fixed races in Container shut-down via #stop or #auto_stop=true
- Refactor ConnectionDriver; decouple from MessagingHandler
- Fixes to example issues that showed up during the work


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/235f0a8a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/235f0a8a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/235f0a8a

Branch: refs/heads/master
Commit: 235f0a8aa6238431cd90d5d03d9cbfcb94074b20
Parents: b7d5c9e
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Nov 21 12:37:28 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Nov 21 12:50:59 2017 -0500

----------------------------------------------------------------------
 examples/ruby/README.md                         |  19 --
 examples/ruby/broker.rb                         |   4 +-
 examples/ruby/direct_recv.rb                    |  12 +-
 examples/ruby/direct_send.rb                    |  12 +-
 examples/ruby/example_test.rb                   |  43 +--
 examples/ruby/helloworld_direct.rb              |  60 ----
 examples/ruby/simple_recv.rb                    |   4 +-
 examples/ruby/simple_send.rb                    |   3 +-
 .../bindings/ruby/lib/core/connection_driver.rb | 150 +++++-----
 proton-c/bindings/ruby/lib/core/container.rb    | 285 ++++++++++++-------
 proton-c/bindings/ruby/lib/core/listener.rb     | 108 +++----
 proton-c/bindings/ruby/lib/util/condition.rb    |   3 -
 .../ruby/tests/test_connection_driver.rb        |  19 +-
 proton-c/bindings/ruby/tests/test_container.rb  |  28 +-
 proton-c/bindings/ruby/tests/test_tools.rb      |   4 +-
 15 files changed, 365 insertions(+), 389 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/examples/ruby/README.md
----------------------------------------------------------------------
diff --git a/examples/ruby/README.md b/examples/ruby/README.md
index 8526e29..938ef17 100644
--- a/examples/ruby/README.md
+++ b/examples/ruby/README.md
@@ -32,25 +32,6 @@ The following events occur while **helloworld.rb** runs:
  * **on_sendable** - Fired when a message can be sent.
  * **on_message** - Fired when a message is received.
 
-### Hello World Without A Broker required
-
-The next example we'll look at will send the classic "Hello world" message to itself directly,
-without going through a broker.
-
-To launch the example:
-
-```
- $ ruby helloworld_direct.rb //:9999
- Hello world!
-```
-
-Not very different from the example that uses the broker, which is what we'd expect from the outside. But let's take a look inside of the example and see how it's different at that level
-
-The direct version takes on the responsibility for listening to incoming connections as well as making an outgoing connection. So we see the following additional events occurring:
-
- * **on_accepted** - Fired when a message is received.
- * **on_connection_closed** - Fired when an endpoint closes its connection.
-
 ## More Complex Examples
 
 Now that we've covered the basics with the archetypical hello world app, let's look at some more interesting examples.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/examples/ruby/broker.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/broker.rb b/examples/ruby/broker.rb
index bd461ed..111d41b 100644
--- a/examples/ruby/broker.rb
+++ b/examples/ruby/broker.rb
@@ -78,8 +78,8 @@ class Broker < Qpid::Proton::Handler::MessagingHandler
   end
 
   def on_start(event)
-    @acceptor = event.container.listen(@url)
-    print "Listening on #{@url}\n"
+    @listener = event.container.listen(@url)
+    STDOUT.puts "Listening on #{@url}"; STDOUT.flush
   end
 
   def queue(address)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/examples/ruby/direct_recv.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/direct_recv.rb b/examples/ruby/direct_recv.rb
index b2b0ba9..2e3295f 100644
--- a/examples/ruby/direct_recv.rb
+++ b/examples/ruby/direct_recv.rb
@@ -30,8 +30,13 @@ class DirectReceive < Qpid::Proton::Handler::MessagingHandler
     @received = 0
   end
 
+  class ListenOnce < Qpid::Proton::Listener::Handler
+    def on_open(l) STDOUT.puts "Listening\n"; STDOUT.flush; end
+    def on_accept(l) l.close; end
+  end
+
   def on_start(event)
-    event.container.listen(@url)
+    event.container.listen(@url, ListenOnce.new)
   end
 
   def on_message(event)
@@ -39,7 +44,7 @@ class DirectReceive < Qpid::Proton::Handler::MessagingHandler
       puts "Received: #{event.message.body}"
       @received = @received + 1
       if @received == @expected
-        event.container.stop
+        event.connection.close
       end
     end
   end
@@ -51,5 +56,6 @@ Listen on URL and receive COUNT messages from ADDRESS"
   return 1
 end
 url, address, count = ARGV
-Qpid::Proton::Container.new(DirectReceive.new(url, address, count || 10)).run
+count = Integer(count || 10)
+Qpid::Proton::Container.new(DirectReceive.new(url, address, count)).run
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/examples/ruby/direct_send.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/direct_send.rb b/examples/ruby/direct_send.rb
index e54e1ab..0ccfe38 100644
--- a/examples/ruby/direct_send.rb
+++ b/examples/ruby/direct_send.rb
@@ -31,8 +31,13 @@ class DirectSend < Qpid::Proton::Handler::MessagingHandler
     @expected = expected
   end
 
+  class ListenOnce < Qpid::Proton::Listener::Handler
+    def on_open(l) STDOUT.puts "Listening\n"; STDOUT.flush; end
+    def on_accept(l) l.close; end
+  end
+
   def on_start(event)
-*co    event.container.listen(@url)
+    event.container.listen(@url, ListenOnce.new)
   end
 
   def on_sendable(event)
@@ -47,7 +52,7 @@ class DirectSend < Qpid::Proton::Handler::MessagingHandler
     @confirmed = @confirmed + 1
     if @confirmed == @expected
       puts "All #{@expected} messages confirmed!"
-      event.container.stop
+      event.connection.close
     end
   end
 end
@@ -58,4 +63,5 @@ Listen on URL and send COUNT messages to ADDRESS"
   return 1
 end
 url, address, count = ARGV
-Qpid::Proton::Container.new(DirectSend.new(url, address, count || 10)).run
+count = Integer(count || 10)
+Qpid::Proton::Container.new(DirectSend.new(url, address, count)).run

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/examples/ruby/example_test.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/example_test.rb b/examples/ruby/example_test.rb
index bcb6aa6..19f1638 100755
--- a/examples/ruby/example_test.rb
+++ b/examples/ruby/example_test.rb
@@ -22,17 +22,6 @@ require 'minitest/autorun'
 require 'qpid_proton'
 require 'socket'
 
-# Wait for the broker to be listening
-def wait_for(url, timeout = 5)
-  deadline = Time.now + 5
-  begin
-    TCPSocket.open("", URI(url).port).close
-  rescue Errno::ECONNREFUSED
-    retry if Time.now < deadline
-    raise
-  end
-end
-
 # URL with an unused port
 def test_url()
   "amqp://:#{TCPServer.open(0) { |s| s.addr[1] }}"
@@ -46,8 +35,8 @@ class ExampleTest < MiniTest::Test
   end
 
   def assert_output(want, *args)
-    out = run_script(*args)
-    assert_equal(want, out.read.strip)
+    p = run_script(*args)
+    assert_equal(want, p.read.strip)
   end
 
   def test_helloworld
@@ -77,43 +66,29 @@ EOS
     assert_output(want.strip, "simple_recv.rb", $url, __method__)
   end
 
-  def test_helloworld_direct
-    url = test_url
-    assert_output("Hello world!", "helloworld_direct.rb", url, __method__)
-  end
-
   def test_direct_recv
     url = test_url
-    p = run_script("direct_recv.rb", url, __method__)
-    wait_for url
-    assert_output("All 10 messages confirmed!", "simple_send.rb", url, __method__)
-    want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
-    assert_equal(want.strip, p.read.strip)
+      p = run_script("direct_recv.rb", url, __method__)
+      p.readline                # Wait till ready
+      assert_output("All 10 messages confirmed!", "simple_send.rb", url, __method__)
+      want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
+      assert_equal(want.strip, p.read.strip)
   end
 
   def test_direct_send
     url = test_url
     p = run_script("direct_send.rb", url, __method__)
-    wait_for url
+    p.readline                # Wait till ready
     want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
     assert_output(want.strip, "simple_recv.rb", url, __method__)
     assert_equal("All 10 messages confirmed!", p.read.strip)
   end
-
-  def test_direct_send
-    url = test_url
-    p = run_script("direct_recv.rb", url, __method__)
-    wait_for url
-    assert_output("All 10 messages confirmed!", "simple_send.rb", url, __method__)
-    want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
-    assert_equal(want.strip, p.read.strip)
-  end
 end
 
 # Start the broker before all tests.
 $url = test_url
 $broker = IO.popen([RbConfig.ruby, 'broker.rb', $url])
-wait_for $url
+$broker.readline
 
 # Kill the broker after all tests
 MiniTest.after_run do

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/examples/ruby/helloworld_direct.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/helloworld_direct.rb b/examples/ruby/helloworld_direct.rb
deleted file mode 100644
index dab368b..0000000
--- a/examples/ruby/helloworld_direct.rb
+++ /dev/null
@@ -1,60 +0,0 @@
-#--
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#++
-
-require 'qpid_proton'
-require 'optparse'
-
-class HelloWorldDirect < Qpid::Proton::Handler::MessagingHandler
-
-  include Qpid::Proton::Util::Wrapper
-
-  def initialize(url, address)
-    super()
-    @url, @address = url, address
-  end
-
-  def on_start(event)
-    event.container.listen(@url)
-    c = event.container.connect(@url) # Connect to self!
-    c.open_sender(@address)
-  end
-
-  def on_sendable(event)
-    msg = Qpid::Proton::Message.new
-    msg.body = "Hello world!"
-    event.sender.send(msg)
-    event.sender.close
-  end
-
-  def on_message(event)
-    puts "#{event.message.body}"
-  end
-
-  def on_accepted(event)
-    event.container.stop
-  end
-end
-
-if ARGV.size != 2
-  STDERR.puts "Usage: #{__FILE__} URL ADDRESS
-Listen on and connect to URL (connect to self), send a message to ADDRESS and receive it back"
-  return 1
-end
-url, address = ARGV
-Qpid::Proton::Container.new(HelloWorldDirect.new(url, address)).run

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/examples/ruby/simple_recv.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/simple_recv.rb b/examples/ruby/simple_recv.rb
index d1a9607..446d7da 100644
--- a/examples/ruby/simple_recv.rb
+++ b/examples/ruby/simple_recv.rb
@@ -52,6 +52,6 @@ Connect to URL and receive COUNT messages from ADDRESS"
   return 1
 end
 url, address, count = ARGV
-
-Qpid::Proton::Container.new(SimpleReceive.new(url, address, count || 10)).run
+count = Integer(count || 10)
+Qpid::Proton::Container.new(SimpleReceive.new(url, address, count)).run
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/examples/ruby/simple_send.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/simple_send.rb b/examples/ruby/simple_send.rb
index 25fc56c..be919d5 100644
--- a/examples/ruby/simple_send.rb
+++ b/examples/ruby/simple_send.rb
@@ -59,4 +59,5 @@ Connect to URL and send COUNT messages to ADDRESS"
   return 1
 end
 url, address, count = ARGV
-Qpid::Proton::Container.new(SimpleSend.new(url, address, count || 10)).run
+count = Integer(count || 10)
+Qpid::Proton::Container.new(SimpleSend.new(url, address, count)).run

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/proton-c/bindings/ruby/lib/core/connection_driver.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection_driver.rb b/proton-c/bindings/ruby/lib/core/connection_driver.rb
index 6c85659..537e308 100644
--- a/proton-c/bindings/ruby/lib/core/connection_driver.rb
+++ b/proton-c/bindings/ruby/lib/core/connection_driver.rb
@@ -20,37 +20,27 @@ require 'socket'
 module Qpid
   module Proton
 
-    # Associate an AMQP {Connection} with an {IO} and a {MessagingHandler}
+    # Associate an AMQP {Connection} and {Transport} with an {IO}
     #
-    # - Read AMQP binary data from the {IO} (#read, #process)
-    # - Call on_* methods on the {MessagingHandler} for AMQP events (#dispatch, #process)
-    # - Write AMQP binary data to the {IO} (#write, #process)
+    # - {#read} reads AMQP binary data from the {IO} and generates events
+    # - {#tick} generates timing-related events
+    # - {#event} gets events to be dispatched to {Handler::MessagingHandler}s
+    # - {#write} writes AMQP binary data to the {IO}
     #
     # Thread safety: The {ConnectionDriver} is not thread safe but separate
     # {ConnectionDriver} instances can be processed concurrently. The
     # {Container} handles multiple connections concurrently in multiple threads.
     #
     class ConnectionDriver
-
-      # Create a {Connection} and associate it with +io+ and +handler+
-      #
-      # @param io [#read_nonblock, #write_nonblock] An {IO} or {IO}-like object that responds
-      #   to #read_nonblock and #write_nonblock.
-      # @param opts [Hash] See {Connection#open} - transport options are set here,
-      # remaining options
-      # @pram server [Bool] If true create a server (incoming) connection
-      def initialize(io, opts = {}, server=false)
-        @impl = Cproton.pni_connection_driver or raise RuntimeError, "cannot create connection driver"
+      # Create a {Connection} and {Transport} associated with +io+
+      # @param io [IO] An {IO} or {IO}-like object that responds
+      #   to {IO#read_nonblock} and {IO#write_nonblock}
+      def initialize(io)
+        @impl = Cproton.pni_connection_driver or raise NoMemoryError
         @io = io
-        @handler = opts[:handler] || Handler::MessagingHandler.new # Default handler if missing
-        @rbuf = ""              # String to re-use as read buffer
-        connection.apply opts
-        transport.set_server if server
-        transport.apply opts
+        @rbuf = ""              # String for re-usable read buffer
       end
 
-      attr_reader :handler
-
       # @return [Connection]
       def connection()
         @connection ||= Connection.wrap(Cproton.pni_connection_driver_connection(@impl))
@@ -74,18 +64,11 @@ module Qpid
       # transport are closed and there are no events to dispatch.
       def finished?() Cproton.pn_connection_driver_finished(@impl); end
 
-      # Dispatch available events, call the relevant on_* methods on the {#handler}.
-      def dispatch(extra_handlers = nil)
-        extra_handlers ||= []
-        while event = Event::Event.wrap(Cproton.pn_connection_driver_next_event(@impl))
-          pre_dispatch(event)
-          event.dispatch(@handler)
-          extra_handlers.each { |h| event.dispatch h }
-        end
-      end
+      # Get the next event to dispatch, nil if no events available
+      def event() Event::Event.wrap(Cproton.pn_connection_driver_next_event(@impl)); end
 
-      # Read from IO without blocking.
-      # IO errors are not raised, they are passed to {#handler}.on_transport_error by {#dispatch}
+      # Non-blocking read from {#io}, generate events for {#event}
+      # IO errors are returned as transport errors by {#event}, not raised
       def read
         size = Cproton.pni_connection_driver_read_size(@impl)
         return if size <= 0
@@ -95,12 +78,12 @@ module Qpid
         # Try again later.
       rescue EOFError         # EOF is not an error
         close_read
-      rescue IOError, SystemCallError => e     #  is passed to the transport
+      rescue IOError, SystemCallError => e
         close e
       end
 
-      # Write to IO without blocking.
-      # IO errors are not raised, they are passed to {#handler}.on_transport_error by {#dispatch}
+      # Non-blocking write to {#io}
+      # IO errors are returned as transport errors by {#event}, not raised
       def write
         n = @io.write_nonblock(Cproton.pn_connection_driver_write_buffer(@impl))
         Cproton.pn_connection_driver_write_done(@impl, n) if n > 0
@@ -110,13 +93,13 @@ module Qpid
         close e
       end
 
-      # Generate timed events and IO, for example idle-timeout and heart-beat events.
-      # May generate events for {#dispatch} and change the readable/writeable state.
+      # Handle time-related work, for example idle-timeout events.
+      # May generate events for {#event} and change {#can_read?}, {#can_write?}
       #
       # @param [Time] now the current time, defaults to {Time#now}.
       #
       # @return [Time] time of the next scheduled event, or nil if there are no
-      # scheduled events. If non-nil, tick() must be called again no later than
+      # scheduled events. If non-nil you must call {#tick} again no later than
       # this time.
       def tick(now=Time.now)
         transport = Cproton.pni_connection_driver_transport(@impl)
@@ -124,54 +107,79 @@ module Qpid
         return ms.zero? ? nil : Time.at(ms.to_r / 1000);
       end
 
-      # Do read, tick, write and dispatch without blocking.
-      # @param [Bool] io_readable true if the IO might be readable
-      # @param [Bool] io_writable true if the IO might be writeable
-      # @param [Time] now the current time
-      # @return [Time] Latest time to call {#process} again for scheduled events,
-      # or nil if there are no scheduled events
-      def process(io_readable=true, io_writable=true, now=Time.now)
-        read if io_readable
-        next_tick = tick(now)
-        if io_writable
-          dispatch
-          write
-        end
-        dispatch
-        return next_tick
+      # Disconnect the write side of the transport, *without* sending an AMQP
+      # close frame. To close politely, you should use {Connection#close}, the
+      # transport will close itself once the protocol close is complete.
+      #
+      def close_write error=nil
+        return if Cproton.pn_connection_driver_write_closed(@impl)
+        set_error error if error
+        Cproton.pn_connection_driver_write_close(@impl)
+        @io.close_write
       end
 
-      # Close the read side of the transport
-      def close_read
+      # Disconnect the read side of the transport, without waiting for an AMQP
+      # close frame. See comments on {#close_write}
+      def close_read error=nil
         return if Cproton.pn_connection_driver_read_closed(@impl)
+        set_error error if error
         Cproton.pn_connection_driver_read_close(@impl)
         @io.close_read
       end
 
-      # Close the write side of the transport
-      def close_write
-        return if Cproton.pn_connection_driver_write_closed(@impl)
-        Cproton.pn_connection_driver_write_close(@impl)
-        @io.close_write
+      # Disconnect both sides of the transport sending/waiting for AMQP close
+      # frames. See comments on {#close_write}
+      def close error=nil
+        close_write error
+        close_read
       end
 
-      # Close both sides of the IO with optional error
-      # @param error [Condition] If non-nil pass to {#handler}.on_transport_error on next {#dispatch}
-      # Note `error` can be any value accepted by [Condition##make]
-      def close(error=nil)
-        if error
-          cond = Condition.make(error, "proton:io")
+      private
+
+      def set_error e
+        if cond = Condition.make(e, "proton:io")
           Cproton.pn_connection_driver_errorf(@impl, cond.name, "%s", cond.description)
         end
-        close_read
-        close_write
       end
+    end
 
-      protected
+    # A {ConnectionDriver} that feeds events to a {Handler::MessagingHandler}
+    class HandlerDriver < ConnectionDriver
+      # Combine an {IO} with a {Handler::MessagingHandler} and provide
+      # a simplified way to run the driver via {#process}
+      #
+      # @param io [IO]
+      # @param handler [Handler::MessagingHandler] to receive events in
+      #   {#dispatch} and {#process}
+      def initialize(io, handler)
+        super(io)
+        @handler = handler || Handler::MessagingHandler.new
+      end
+
+      attr_reader :handler
 
-      # Override in subclass to add event context
-      def pre_dispatch(event) event; end
+      # Dispatch all events available from {#event} to {#handler}
+      # @param handlers [Enum<Handler::MessagingHandler>]
+      def dispatch()
+        while e = event
+          e.dispatch @handler
+        end
+      end
 
+      # Do {#read}, {#tick}, {#write} and {#dispatch} without blocking.
+      #
+      # @param [Handle::MessagingHanlder] handler A handler to dispatch
+      #   events to.
+      # @param [Time] now the current time
+      # @return [Time] Latest time to call {#process} again for scheduled events,
+      #   or nil if there are no scheduled events
+      def process(now=Time.now)
+        read
+        next_tick = tick(now)
+        write
+        dispatch
+        return next_tick
+      end
     end
   end
 end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index 6e14d11..6fbdbb5 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -22,62 +22,138 @@ require 'set'
 require_relative 'listener'
 
 module Qpid::Proton
-  # An AMQP container manages a set of {Connection}s which contain {#Sender} and
-  # {#Receiver} links to transfer messages.
+  # An AMQP container manages a set of {Listener}s and {Connection}s which
+  # contain {#Sender} and {#Receiver} links to transfer messages.  Usually, each
+  # AMQP client or server process has a single container for all of its
+  # connections and links.
   #
-  # TODO aconway 2017-10-26: documentthreading/dispatch role
-  #
-  # Usually, each AMQP client or server process has a single container for
-  # all of its connections and links.
+  # One or more threads can call {#run}, events generated by all the listeners and
+  # connections will be dispatched in the {#run} threads.
   class Container
     private
 
     def amqp_uri(s) Qpid::Proton::amqp_uri s; end
 
-    class ConnectionDriver < Qpid::Proton::ConnectionDriver
+    # Container driver applies options and adds container context to events
+    class ConnectionTask < Qpid::Proton::HandlerDriver
       def initialize container, io, opts, server=false
-        super io, opts, server
+        super io, opts[:handler]
+        transport.set_server if server
+        transport.apply opts
+        connection.apply opts
         @container = container
       end
 
-      def final() end
-      def pre_dispatch(event) event.container = @container; end
+      def event
+        # Add a container to the event
+        e = super()
+        e.container = @container if e
+        e
+      end
+    end
+
+    class ListenTask < Listener
+
+      def initialize(io, handler, container)
+        super
+        env = ENV['PN_TRACE_EVT']
+        if env && ["true", "1", "yes", "on"].include?(env.downcase)
+          @log_prefix = "[0x#{object_id.to_s(16)}](PN_LISTENER_"
+        end
+      end
+
+      def process
+        unless @closing
+          unless @open_dispatched
+            dispatch(:on_open);
+            @open_dispatched = true
+          end
+          begin
+            return @io.accept, dispatch(:on_accept)
+          rescue IO::WaitReadable, Errno::EINTR
+          rescue IOError, SystemCallError => e
+            close e
+          end
+        end
+        if @closing
+          @io.close rescue nil
+          @closing = false
+          @closed = true
+          dispatch(:on_error, @condition) if @condition
+          dispatch(:on_close)
+        end
+      end
+
+      def can_read?() !@closed; end
+      def can_write?() false; end
+      def finished?() @closed; end
+
+      def dispatch(method, *args)
+        STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix
+        @handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method)
+      end
     end
 
     public
 
     # Create a new Container
+    # @overload initialize(id=nil)
+    #   @param id [String] A unique ID for this container, use random UUID if nil.
     #
-    # @param handler [MessagingHandler] Optional default handler for connections
+    # @overload initialize(handler=nil, id=nil)
+    #  @param id [String] A unique ID for this container, use random UUID if nil.
+    #  @param handler [MessagingHandler] Optional default handler for connections
     #   that do not have their own handler (see {#connect} and {#listen})
     #
-    #   @note For multi-threaded code, it is recommended to use a separate
-    #   handler instance for every connection, as a shared global handler can be
-    #   called concurrently for every connection that uses it.
-    # @param id [String] A unique ID for this container. Defaults to a random UUID.
+    #   *Note*: For multi-threaded code, it is recommended to use a separate
+    #   handler instance for each connection, as a shared handler may be called
+    #   concurrently.
     #
     def initialize(handler = nil, id = nil)
       # Allow ID as sole argument
       (handler, id = nil, handler.to_str) if (id.nil? && handler.respond_to?(:to_str))
-      raise TypeError, "Expected MessagingHandler, got #{handler.class}" if handler && !handler.is_a?(Qpid::Proton::Handler::MessagingHandler)
+      raise TypeError, "Expected MessagingHandler, got #{handler.class}" if
+        handler && !handler.is_a?(Qpid::Proton::Handler::MessagingHandler)
 
-      # TODO aconway 2017-11-08: allow handlers, opts for backwards compat?
+      # TODO aconway 2017-11-08: allow multiple handlers, opts for backwards compat?
       @handler = handler
-      @id = (id || SecureRandom.uuid).freeze
+      @id = ((id && id.to_s) || SecureRandom.uuid).freeze
+
+      # Implementation note:
+      #
+      # - #run threads take work from @work
+      # - Each driver and the Container itself is processed by at most one #run thread at a time
+      # - The Container thread does IO.select
+      # - nil on the @work queue makes a #run thread exit
+
       @work = Queue.new
-      @work.push self           # Let the first #run thread select
-      @wake = IO.pipe
-      @dummy = ""               # Dummy buffer for draining wake pipe
+      @work << self             # Let the first #run thread start selecting
+      @wake = IO.pipe           # Wakes #run thread in IO.select
+      @auto_stop = true         # Exit #run when @active drops to 0
+
+      # Following instance variables protected by lock
       @lock = Mutex.new
-      @selectables = Set.new    # ConnectionDrivers and Listeners
-      @auto_stop = true
-      @active = 0               # activity (connection, listener) counter for auto_stop
-      @running = 0              # concurrent calls to #run
+      @active = 0               # All active tasks, in @selectable, @work or being processed
+      @selectable = Set.new     # Tasks ready to block in IO.select
+      @running = 0              # Count of #run threads
+      @stopping = false         # #stop called, closing tasks
+      @stop_err = nil           # Optional error from #stop
     end
 
-    # @return [String] Unique identifier for this container
+    # @return [String] unique identifier for this container
     attr_reader :id
 
+    # Auto-stop flag.
+    #
+    # True (the default) means all calls to {#run} will return when the container
+    # transitions from active to inactive - all connections and listeners
+    # have closed.
+    #
+    # False means {#run} will not return unless {#stop} is called.
+    #
+    # @return [Bool] auto-stop state
+    attr_accessor :auto_stop
+
     # Open an AMQP connection.
     #
     # @param url [String, URI] Open a {TCPSocket} to url.host, url.port.
@@ -85,7 +161,6 @@ module Qpid::Proton
     # url.user, url.password are used as defaults if opts[:user], opts[:password] are nil
     # @option (see Connection#open)
     # @return [Connection] The new AMQP connection
-    #
     def connect(url, opts = {})
       url = amqp_uri(url)
       opts[:user] ||= url.user
@@ -100,16 +175,18 @@ module Qpid::Proton
     def connect_io(io, opts = {})
       cd = connection_driver(io, opts)
       cd.connection.open()
-      add(cd).connection
+      add(cd)
+      cd.connection
     end
 
     # Listen for incoming AMQP connections
     #
     # @param url [String,URI] Listen on host:port of the AMQP URL
-    # @param handler [ListenHandler] A {ListenHandler} object that will be called
+    # @param handler [Listener::Handler] A {Listener::Handler} object that will be called
     # with events for this listener and can generate a new set of options for each one.
     # @return [Listener] The AMQP listener.
-    def listen(url, handler=ListenHandler.new)
+    #
+    def listen(url, handler=Listener::Handler.new)
       url = amqp_uri(url)
       # TODO aconway 2017-11-01: amqps
       listen_io(TCPServer.new(url.host, url.port), handler)
@@ -117,134 +194,136 @@ module Qpid::Proton
 
     # Listen for incoming AMQP connections on an existing server socket.
     # @param io A server socket, for example a {TCPServer}
-    # @param handler [ListenHandler] Handler for events from this listener
-    def listen_io(io, handler=ListenHandler.new)
-      add(Listener.new(io, handler))
+    # @param handler [Listener::Handler] Handler for events from this listener
+    #
+    def listen_io(io, handler=Listener::Handler.new)
+      l = ListenTask.new(io, handler, self)
+      add(l)
+      l
     end
 
     # Run the container: wait for IO activity, dispatch events to handlers.
     #
     # More than one thread can call {#run} concurrently, the container will use
-    # all the {#run} ,threads as a pool to handle multiple connections
-    # concurrently.  The container ensures that handler methods for a single
-    # connection (or listener) instance are serialized, even if the container
-    # has multiple threads.
+    # all the {#run} threads as a thread pool. Calls to
+    # {Handler::MessagingHandler} methods are serialized for each connection or
+    # listener, even if the container has multiple threads.
     #
-    def run()
-      @lock.synchronize { @running += 1 }
-
-      unless @on_start
-        @on_start = true
+    def run
+      need_on_start = nil
+      @lock.synchronize do
+        @running += 1
+        need_on_start = !@on_start_called &&  @handler && @handler.respond_to?(:on_start)
+        @on_start_called = true
+      end
+      if need_on_start
         # TODO aconway 2017-10-28: proper synthesized event for on_start
         event = Class.new do
           def initialize(c) @container = c; end
           attr_reader :container
         end.new(self)
-        @handler.on_start(event) if @handler && @handler.respond_to?(:on_start)
+        @handler.on_start(event)
       end
 
-      while x = @work.pop
-        case x
-        when Container then
-          # Only one thread can select at a time
+      while task = @work.pop
+        case task
+        when Container
           r, w = [@wake[0]], []
           @lock.synchronize do
-            @selectables.each do |s|
+            @selectable.each do |s|
               r << s if s.send :can_read?
               w << s if s.send :can_write?
             end
           end
           r, w = IO.select(r, w)
           selected = Set.new(r).merge(w)
-          if selected.delete?(@wake[0]) # Drain the wake pipe
-            begin
-              @wake[0].read_nonblock(256, @dummy) while true
-            rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
-            end
-          end
+          drain_wake if selected.delete?(@wake[0])
           @lock.synchronize do
-            if @stop_all
-              selected = @selectables
-              @selectables = Set.new
-              selected.each { |s| s.close }
-              @work << nil if selected.empty? # Already idle, initiate stop now
-              @stop_all = false
-            else
-              @selectables.subtract(selected)
+            if @stopping # close everything
+              selected += @selectable
+              selected.each { |s| s.close @stop_err }
             end
+            @selectable -= selected # Remove selected tasks
           end
-          # Move selected items to the work queue for serialized processing.
-          @lock.synchronize { @selectables.subtract(selected) }
-          selected.each { |s| @work << s } # Queue up all the work
+          selected.each { |s| @work << s } # Queue up tasks needing #process
           @work << self                    # Allow another thread to select()
-        when ConnectionDriver then
-          x.process
-          rearm x
+        when ConnectionTask then
+          task.close @stop_err if @lock.synchronize { @stopping }
+          task.process
+          rearm task
         when Listener then
-          io, opts = x.send :process
+          task.close @stop_err if @lock.synchronize { @stopping }
+          io, opts = task.process
           add(connection_driver(io, opts, true)) if io
-          rearm x
+          rearm task
         end
-        # TODO aconway 2017-10-26: scheduled tasks
+        # TODO aconway 2017-10-26: scheduled tasks, heartbeats
       end
     ensure
-      @running -= 1
-      if @running > 0 # Signal the next #run thread that we are stopping
-        @work << nil
-        wake
+      @lock.synchronize do
+        @stopping, @stop_err = nil if (@running -= 1).zero? # Last out, reset for next #run
       end
     end
 
-    # @!attribute auto_stop [rw]
-    #   @return [Bool] With auto_stop enabled, all calls to {#run} will return when the
-    #   container's last activity (connection, listener or scheduled event) is
-    #   closed/completed. With auto_stop disabled {#run} does not return.
-    def auto_stop=(enabled) @lock.synchronize { @auto_stop=enabled }; wake; end
-    def auto_stop() @lock.synchronize { @auto_stop }; end
-
-    # Enable {#auto_stop} and close all connections and listeners with error.
+    # Disconnect all listeners and connections without a polite AMQP close sequence.
     # {#stop} returns immediately, calls to {#run} will return when all activity is finished.
-    # @param error [Condition] If non-nil pass to {#handler}.on_error
-    # Note `error` can be any value accepted by [Condition##make]
+    # @param error [Condition] Optional transport/listener error condition
+    #
     def stop(error=nil)
       @lock.synchronize do
-        @auto_stop = true
-        @stop_all = true
-        wake
+        @stopping = true
+        @stop_err = Condition.make(error)
+        check_stop_lh
+        # NOTE: @stopping =>
+        # - no new run threads can join
+        # - no more select calls after next wakeup
+        # - once @active == 0, all threads will be stopped with nil
       end
+      wake
     end
 
     private
 
-    # Always wake when we add new work
-    def work(s) work << s; wake; end
+    def wake; @wake[1].write_nonblock('x') rescue nil; end
+
+    def work_wake(task) @work << task; wake; end
 
-    def wake()
-      @wake[1].write_nonblock('x') rescue nil
+    def drain_wake
+      begin
+        @wake[0].read_nonblock(256) while true
+      rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
+      end
     end
 
     def connection_driver(io, opts, server=false)
+      opts ||= {}
       opts[:container_id] ||= @id
       opts[:handler] ||= @handler
-      ConnectionDriver.new(self, io, opts, server)
+      ConnectionTask.new(self, io, opts, server)
+    end
+
+    def add task
+      @lock.synchronize { @active += 1 }
+      work_wake task
     end
 
-    def add(s)
+    def rearm task
       @lock.synchronize do
-        @active += 1
+        if task.finished?
+          @active -= 1
+          check_stop_lh
+        else
+          @selectable << task
+        end
       end
-      @work << s
       wake
-      return s
     end
 
-    def rearm s
-      if s.send :finished?
-        @lock.synchronize { @work << nil if (@active -= 1).zero? && @auto_stop }
-      else
-        @lock.synchronize { @selectables << s }
+    def check_stop_lh
+      if @active.zero? && (@auto_stop || @stopping)
+        @running.times { @work << nil } # Signal all threads to stop
+        true
       end
-      wake
     end
   end
 end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/proton-c/bindings/ruby/lib/core/listener.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/listener.rb b/proton-c/bindings/ruby/lib/core/listener.rb
index 85dc976..0584807 100644
--- a/proton-c/bindings/ruby/lib/core/listener.rb
+++ b/proton-c/bindings/ruby/lib/core/listener.rb
@@ -25,14 +25,49 @@ module Qpid::Proton
   # pass a {ListenerHandler} on creation.
   #
   class Listener
-    # The listener's container
+
+    # Class that handles listener events and provides options for accepted
+    # connections. This class simply returns a fixed set of options for every
+    # connection accepted, but you can subclass and override all of the on_
+    # methods to provide more interesting behaviour.
+    class Handler
+      # @param opts [Hash] Options to return from on_accept.
+      def initialize(opts={}) @opts = opts; end
+
+      # Called when the listener is ready to accept connections.
+      # @param listener [Listener] The listener
+      def on_open(listener) end
+
+      # Called if an error occurs.
+      # If there is an error while opening the listener, this method is
+      # called and {#on_open} is not
+      # @param listener [Listener]
+      # @param what [Condition] Information about the error.
+      def on_error(listener, what) end
+
+      # Called when a listener accepts a new connection.
+      # @param listener [Listener] The listener
+      # @return [Hash] Options to apply to the incoming connection, see {#connect}
+      def on_accept(listener) @opts; end
+
+      # Called when the listener closes.
+      # @param listener [Listener] The listener accepting the connection.
+      def on_close(listener) end
+    end
+
+    # @return [Container] The listener's container
     attr_reader :container
 
+    # @return [Condition] The error condition if there is one
+    attr_reader :condition
+
     # Close the listener
     # @param error [Condition] Optional error condition.
     def close(error=nil)
-      @closed ||= Condition.make(error) || true
+      @closing = true
+      @condition ||= Condition.make(error) if error
       @io.close_read rescue nil # Cause listener to wake out of IO.select
+      nil
     end
 
     # Get the {IO} server socket used by the listener
@@ -40,74 +75,9 @@ module Qpid::Proton
 
     private                     # Called by {Container}
 
-    def initialize(io, handler)
+    def initialize(io, handler, container)
       @io, @handler = io, handler
+      @container = container
     end
-
-    def process
-      unless @closed
-        unless @open_dispatched
-          dispatch(:on_open)
-          @open_dispatched = true
-        end
-        begin
-          return @io.accept, dispatch(:on_accept)
-        rescue IO::WaitReadable, Errno::EINTR
-        rescue IOError, SystemCallError => e
-          close e
-        end
-      end
-      if @closed
-        dispatch(:on_error, @closed) if @closed != true
-        dispatch(:on_close)
-        close @io unless @io.closed? rescue nil
-      end
-    end
-
-    def can_read?() true; end
-    def can_write?() false; end
-    def finished?() @closed; end
-
-    # TODO aconway 2017-11-06: logging strategy
-    TRUE = Set[:true, :"1", :yes, :on]
-    def log?()
-      enabled = ENV['PN_TRACE_EVT']
-      TRUE.include? enabled.downcase.to_sym if enabled
-    end
-
-    def dispatch(method, *args)
-      STDERR.puts "(Listener 0x#{object_id.to_s(16)})[#{method}]" if log?
-      @handler.send(method, self, *args) if @handler && @handler.respond_to?(method)
-    end
-  end
-
-
-  # Class that handles listener events and provides options for accepted
-  # connections. This class simply returns a fixed set of options for every
-  # connection accepted, but you can subclass and override all of the on_
-  # methods to provide more interesting behaviour.
-  class ListenHandler
-    # @param opts [Hash] Options to return from on_accept.
-    def initialize(opts={}) @opts = opts; end
-
-    # Called when the listener is ready to accept connections.
-    # @param listener [Listener] The listener
-    def on_open(listener) end
-
-    # Called if an error occurs.
-    # If there is an error while opening the listener, this method is
-    # called and {#on_open} is not
-    # @param listener [Listener]
-    # @param what [Condition] Information about the error.
-    def on_error(listener, what) end
-
-    # Called when a listener accepts a new connection.
-    # @param listener [Listener] The listener
-    # @return [Hash] Options to apply to the incoming connection, see {#connect}
-    def on_accept(listener) @opts; end
-
-    # Called when the listener closes.
-    # @param listener [Listener] The listener accepting the connection.
-    def on_close(listener) end
   end
 end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/proton-c/bindings/ruby/lib/util/condition.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/condition.rb b/proton-c/bindings/ruby/lib/util/condition.rb
index 37c41a0..1cb307a 100644
--- a/proton-c/bindings/ruby/lib/util/condition.rb
+++ b/proton-c/bindings/ruby/lib/util/condition.rb
@@ -29,9 +29,7 @@ module Qpid::Proton
       @info = info
     end
 
-    def to_s() "#{@name}: #{@description
     def to_s() "#{@name}: #{@description}"; end
-}"; end
 
     def inspect() "#{self.class.name}(#{@name.inspect}, #{@description.inspect}, #{@info.inspect})"; end
 
@@ -50,7 +48,6 @@ module Qpid::Proton
     # - when Exception return Condition(obj.class.name, obj.to_s)
     # - when nil then nil
     # - else return Condition(default_name, obj.to_s)
-    # If objey
     def self.make(obj, default_name="proton")
       case obj
       when Condition then obj

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/proton-c/bindings/ruby/tests/test_connection_driver.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_connection_driver.rb b/proton-c/bindings/ruby/tests/test_connection_driver.rb
index 174e86d..8ce9fe8 100644
--- a/proton-c/bindings/ruby/tests/test_connection_driver.rb
+++ b/proton-c/bindings/ruby/tests/test_connection_driver.rb
@@ -20,7 +20,7 @@ require 'test_tools'
 
 include Qpid::Proton
 
-class ConnectionDriverTest < Minitest::Test
+class HandlerDriverTest < Minitest::Test
 
   def setup
     @sockets = Socket.pair(:LOCAL, :STREAM, 0)
@@ -39,18 +39,19 @@ class ConnectionDriverTest < Minitest::Test
       def on_message(event) @message = event.message; event.connection.close; end
     end
 
-    sender = ConnectionDriver.new(@sockets[0], {:handler => send_class.new})
+    sender = HandlerDriver.new(@sockets[0], send_class.new)
     sender.connection.open();
     sender.connection.open_sender()
-
-    receiver = ConnectionDriver.new(@sockets[1], {:handler => recv_class.new})
+    receiver = HandlerDriver.new(@sockets[1], recv_class.new)
     drivers = [sender, receiver]
+
     until drivers.all? { |d| d.finished? }
       rd = drivers.select {|d| d.can_read? }
       wr = drivers.select {|d| d.can_write? }
-      rs, ws = IO.select(rd, wr)
-      ws.each { |d| d.write; d.dispatch }
-      rs.each { |d| d.read; d.dispatch }
+      IO.select(rd, wr)
+      drivers.each do |d|
+        d.process
+      end
     end
     assert_equal(receiver.handler.message.body, "foo")
     assert(sender.handler.accepted)
@@ -60,10 +61,10 @@ class ConnectionDriverTest < Minitest::Test
     idle_class = Class.new(MessagingHandler) do
       def on_connection_bound(event) event.transport.idle_timeout = 10; end
     end
-    drivers = [ConnectionDriver.new(@sockets[0], {:handler => idle_class.new}), ConnectionDriver.new(@sockets[1])]
+    drivers = [HandlerDriver.new(@sockets[0], idle_class.new), HandlerDriver.new(@sockets[1], nil)]
     drivers[0].connection.open()
     now = Time.now
-    drivers.each { |d| d.process(true, true, now) } until drivers[0].connection.open?
+    drivers.each { |d| d.process(now) } until drivers[0].connection.open?
     assert_equal(10, drivers[0].transport.idle_timeout)
     assert_in_delta(10, (drivers[0].tick(now) - now)*1000, 1)
   end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index 51b1760..9384d65 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -112,22 +112,34 @@ class ContainerTest < Minitest::Test
     assert t1.join(1)
   end
 
+  def test_stop_empty
+    c = Container.new
+    threads = 3.times.collect { Thread.new { c.run } }
+    assert_nil threads[0].join(0.001) # Not stopped
+    c.stop
+    threads.each { |t| assert t.join(1) }
+  end
+
   def test_stop
     c = Container.new
     c.auto_stop = false
+
     l = c.listen_io(TCPServer.new(0))
-    c.connect("amqp://:#{l.to_io.addr[1]}")
-    threads = 5.times.collect { Thread.new { c.run } }
-    assert_nil threads[0].join(0.001)
+    threads = 3.times.collect { Thread.new { c.run } }
+    l.close
+    assert_nil threads[0].join(0.001) # Not stopped, no auto_stop
+
+    l = c.listen_io(TCPServer.new(0)) # New listener
+    conn = c.connect("amqp://:#{l.to_io.addr[1]}")
     c.stop
     threads.each { |t| assert t.join(1) }
-    assert c.auto_stop          # Set by stop
+    assert_nil l.condition
+    assert_nil conn.condition
 
-    # Stop an empty container
+    # We should be able to run the container again once stopped.
     threads = 5.times.collect { Thread.new { c.run } }
-    assert_nil threads[0].join(0.001)
-    c.stop
-    threads.each { |t| assert t.join(1) }
+    assert_nil threads[0].join(0.01)
+
   end
 
 end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/proton-c/bindings/ruby/tests/test_tools.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_tools.rb b/proton-c/bindings/ruby/tests/test_tools.rb
index b45add2..3b89cd0 100644
--- a/proton-c/bindings/ruby/tests/test_tools.rb
+++ b/proton-c/bindings/ruby/tests/test_tools.rb
@@ -25,7 +25,7 @@ require 'thread'
 require 'socket'
 
 Container = Qpid::Proton::Container
-ListenHandler = Qpid::Proton::Listener
+ListenHandler = Qpid::Proton::Listener::Handler
 MessagingHandler = Qpid::Proton::Handler::MessagingHandler
 
 class TestError < Exception; end
@@ -113,7 +113,7 @@ end
 # ListenHandler that closes the Listener after first accept
 class ListenOnceHandler < ListenHandler
   def initialize(opts={}) @opts=opts; end
-  def on_error(l, e)  raise TestError, e; end
+  def on_error(l, e)  raise TestError, e.inspect; end
   def on_accept(l) l.close; return @opts; end
   attr_reader :opts
 end


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org