You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/03/16 20:34:19 UTC

qpid-proton git commit: PROTON-1791: [ruby] Container does not clean up on handler exception

Repository: qpid-proton
Updated Branches:
  refs/heads/master 3c80d6f4c -> 828405a23


PROTON-1791: [ruby] Container does not clean up on handler exception

If a handler raised an exception it simply terminated that one Container thread
with no cleanup (closing sockets etc.)

Now an exception raised by a handler causes is equivalent to Container#stop(),
but also causes all threads in Container#run() to raise the exception.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/828405a2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/828405a2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/828405a2

Branch: refs/heads/master
Commit: 828405a23575cf9876b1ac14048e078f7f8c4cc3
Parents: 3c80d6f
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Mar 16 14:15:26 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 16 16:33:33 2018 -0400

----------------------------------------------------------------------
 proton-c/bindings/ruby/lib/core/container.rb    | 139 ++++++++++++-------
 proton-c/bindings/ruby/lib/core/listener.rb     |   3 +
 .../bindings/ruby/lib/core/messaging_handler.rb |   3 +
 proton-c/bindings/ruby/tests/test_container.rb  | 108 +++++++-------
 proton-c/bindings/ruby/tests/test_tools.rb      |  22 ++-
 5 files changed, 171 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/828405a2/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index 5d7beb4..d4fe5d1 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -127,6 +127,63 @@ module Qpid::Proton
       nt if !t || (nt < t)
     end
 
+    # Rescue any exception raised by the block and stop the container.
+    def maybe_panic
+      begin
+        yield
+      rescue Exception => e
+        stop(nil, e)
+      end
+    end
+
+    # Handle a single item from the @work queue, this is the heart of the #run loop.
+    def run_one(task)
+      case task
+
+      when :start
+        @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
+
+      when Container
+        r, w = [@wake], []
+        next_tick = nil
+        @lock.synchronize do
+          @selectable.each do |s|
+            r << s if s.send :can_read?
+            w << s if s.send :can_write?
+            next_tick = next_tick_min(s, next_tick)
+          end
+        end
+        now = Time.now
+        timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
+        r, w = IO.select(r, w, nil, timeout)
+        now = Time.now
+        selected = Set.new(r).delete(@wake)
+        selected.merge(w) if w
+        selected.merge(@selectable.select { |s| next_tick_due(s, now) })
+        @wake.reset
+        stop_select = nil
+        @lock.synchronize do
+          if stop_select = @stopped # close everything
+            selected += @selectable
+            selected.each { |s| s.close @stop_err }
+            @wake.close
+          end
+          @selectable -= selected # Remove selected tasks
+        end
+        selected.each { |s| @work << s } # Queue up tasks needing #process
+        @work << self unless stop_select
+
+      when ConnectionTask then
+        maybe_panic { task.process }
+        rearm task
+
+      when ListenTask then
+        io, opts = maybe_panic { task.process }
+        add(connection_driver(io, opts, true)) if io
+        rearm task
+      end
+    end
+
     public
 
     # Error raised if the container is used after {#stop} has been called.
@@ -262,10 +319,24 @@ module Qpid::Proton
 
     # Run the container: wait for IO activity, dispatch events to handlers.
     #
-    # More than one thread can call {#run} concurrently, the container will use
-    # all the {#run} threads as a thread pool. Calls to
-    # {Handler::MessagingHandler} methods are serialized for each connection or
-    # listener, even if the container has multiple threads.
+    # *Multiple threads* : More than one thread can call {#run} concurrently,
+    # the container will use all {#run} threads as a thread pool. Calls to
+    # {MessagingHandler} or {Listener::Handler} methods are serialized for each
+    # connection or listener, even if the container has multiple threads.
+    #
+    # *Exceptions*: If any handler method raises an exception it will stop the
+    # container, and the exception will be raised by all calls to {#run}. For
+    # single threaded code this is often desirable. Multi-threaded server
+    # applications should normally rescue exceptions in the handler and deal
+    # with them in another way: logging, closing the connection with an error
+    # condition, signalling another thread etc.
+    #
+    # @return [void] Returns when the container stops, see {#stop} and {#auto_stop}
+    #
+    # @raise [StoppedError] If the container has already been stopped when {#run} was called.
+    #
+    # @raise [Exception] If any {MessagingHandler} or {Listener::Handler} managed by
+    #   the container raises an exception, that exception will be raised by {#run}
     #
     def run
       @lock.synchronize do
@@ -273,51 +344,9 @@ module Qpid::Proton
         raise StoppedError if @stopped
       end
       while task = @work.pop
-        case task
-
-        when :start
-          @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
-
-        when Container
-          r, w = [@wake], []
-          next_tick = nil
-          @lock.synchronize do
-            @selectable.each do |s|
-              r << s if s.send :can_read?
-              w << s if s.send :can_write?
-              next_tick = next_tick_min(s, next_tick)
-            end
-          end
-          now = Time.now
-          timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
-          r, w = IO.select(r, w, nil, timeout)
-          now = Time.now
-          selected = Set.new(r).delete(@wake)
-          selected.merge(w) if w
-          selected.merge(@selectable.select { |s| next_tick_due(s, now) })
-          @wake.reset
-          stop_select = nil
-          @lock.synchronize do
-            if stop_select = @stopped # close everything
-              selected += @selectable
-              selected.each { |s| s.close @stop_err }
-              @wake.close
-            end
-            @selectable -= selected # Remove selected tasks
-          end
-          selected.each { |s| @work << s } # Queue up tasks needing #process
-          @work << self unless stop_select
-
-        when ConnectionTask then
-          task.process
-          rearm task
-
-        when ListenTask then
-          io, opts = task.process
-          add(connection_driver(io, opts, true)) if io
-          rearm task
-        end
+        run_one task
       end
+      raise @panic if @panic
     ensure
       @lock.synchronize do
         if (@running -= 1) > 0
@@ -339,13 +368,19 @@ module Qpid::Proton
     # {StoppedError} on attempting.  Create a new container if you want to
     # resume activity.
     #
-    # @param error [Condition] Optional transport/listener error condition
+    # @param error [Condition] Optional error condition passed to
+    #  {MessagingHandler#on_transport_error} for each connection and
+    #  {Listener::Handler::on_error} for each listener.
+    #
+    # @param panic [Exception] Optional exception raised by all concurrent calls
+    # to run()
     #
-    def stop(error=nil)
+    def stop(error=nil, panic=nil)
       @lock.synchronize do
-        raise StoppedError if @stopped
-        @stopped = true
+        return if @stopped
         @stop_err = Condition.convert(error)
+        @panic = panic
+        @stopped = true
         check_stop_lh
         # NOTE: @stopped =>
         # - no new run threads can join

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/828405a2/proton-c/bindings/ruby/lib/core/listener.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/listener.rb b/proton-c/bindings/ruby/lib/core/listener.rb
index f5ff9e5..82c711a 100644
--- a/proton-c/bindings/ruby/lib/core/listener.rb
+++ b/proton-c/bindings/ruby/lib/core/listener.rb
@@ -29,6 +29,9 @@ module Qpid::Proton
     # connections. This class simply returns a fixed set of options for every
     # connection accepted, but you can subclass and override all of the on_
     # methods to provide more interesting behaviour.
+    #
+    # *Note*: If a {Listener} method raises an exception, it will stop the {Container}
+    # that the handler is running in. See {Container#run}
     class Handler
       # @param opts [Hash] Options to return from on_accept.
       def initialize(opts=nil) @opts = opts || {}; end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/828405a2/proton-c/bindings/ruby/lib/core/messaging_handler.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/messaging_handler.rb b/proton-c/bindings/ruby/lib/core/messaging_handler.rb
index 66d9ce2..475f1c3 100644
--- a/proton-c/bindings/ruby/lib/core/messaging_handler.rb
+++ b/proton-c/bindings/ruby/lib/core/messaging_handler.rb
@@ -28,6 +28,9 @@ module Qpid::Proton
   # {StopAutoResponse} from +#on_xxx_open+ or +#on_xxx_close+. The application becomes responsible
   # for calling +#open/#close+ at a later point.
   #
+  # *Note*: If a {MessagingHandler} method raises an exception, it will stop the {Container}
+  # that the handler is running in. See {Container#run}
+  #
   class MessagingHandler
 
     # @return [Hash] handler options, see {#initialize}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/828405a2/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index 0279c19..bed810c 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -20,23 +20,10 @@ require 'test_tools'
 require 'minitest/unit'
 require 'socket'
 
-# Container that listens on a random port
-class TestContainer < Qpid::Proton::Container
-
-  def initialize(handler, lopts=nil, id=nil)
-    super handler, id
-    @listener = listen_io(TCPServer.open(0), ListenOnceHandler.new(lopts))
-  end
-
-  def port() @listener.to_io.addr[1]; end
-  def url() "amqp://:#{port}"; end#
-end
-
 # MessagingHandler that raises in on_error to catch unexpected errors
 class ExceptionMessagingHandler
   def on_error(e) raise e; end
 end
-Thread.abort_on_exception = true
 
 class ContainerTest < MiniTest::Test
   include Qpid::Proton
@@ -70,9 +57,9 @@ class ContainerTest < MiniTest::Test
       end
     end.new
 
-    c = TestContainer.new(receive_handler, {}, __method__)
+    c = ServerContainer.new(__method__, {:handler => receive_handler})
     c.connect(c.url, {:handler => send_handler}).open_sender({:name => "testlink"})
-    c.run
+    c.wait
 
     assert send_handler.accepted
     assert_equal "testlink", receive_handler.link.name
@@ -162,41 +149,42 @@ class ContainerTest < MiniTest::Test
     assert_raises (SocketError) { c = cont.connect("badconnect.example.com:999") }
   end
 
-  # Verify that connection options are sent to the peer and available as Connection methods
+  # Verify that connection options are sent to the peer
   def test_connection_options
     # Note: user, password and sasl_xxx options are tested by ContainerSASLTest below
     server_handler = Class.new(ExceptionMessagingHandler) do
       def on_connection_open(c)
         @connection = c
         c.open({
-                :virtual_host => "server.to.client",
-                :properties => { :server => :client },
-                :offered_capabilities => [ :s1 ],
-                :desired_capabilities => [ :s2 ],
-                :container_id => "box",
-               })
+          :virtual_host => "server.to.client",
+          :properties => { :server => :client },
+          :offered_capabilities => [ :s1 ],
+          :desired_capabilities => [ :s2 ],
+          :container_id => "box",
+        })
         c.close
       end
       attr_reader :connection
     end.new
-    # Transport options must be provided to the listener, by Connection#open it is too late
-    cont = TestContainer.new(nil, {
-                                   :handler => server_handler,
-                                   :idle_timeout => 88,
-                                   :max_sessions =>1000,
-                                   :max_frame_size => 8888,
-                                  })
+    # Transport options set by listener, by Connection#open it is too late
+    cont = ServerContainer.new(__method__, {
+      :handler => server_handler,
+      :idle_timeout => 88,
+      :max_sessions =>1000,
+      :max_frame_size => 8888,
+    })
     client = cont.connect(cont.url,
-                          {:virtual_host => "client.to.server",
-                           :properties => { :foo => :bar, "str" => "str" },
-                           :offered_capabilities => [:c1 ],
-                           :desired_capabilities => ["c2" ],
-                           :idle_timeout => 42,
-                           :max_sessions =>100,
-                           :max_frame_size => 4096,
-                           :container_id => "bowl"
-                          })
-    cont.run
+      {:virtual_host => "client.to.server",
+        :properties => { :foo => :bar, "str" => "str" },
+        :offered_capabilities => [:c1 ],
+        :desired_capabilities => ["c2" ],
+        :idle_timeout => 42,
+        :max_sessions =>100,
+        :max_frame_size => 4096,
+        :container_id => "bowl"
+      })
+    cont.wait
+
     c = server_handler.connection
     assert_equal "client.to.server", c.virtual_host
     assert_equal({ :foo => :bar, :str => "str" }, c.properties)
@@ -231,24 +219,44 @@ class ContainerTest < MiniTest::Test
 
   # Test for time out on unresponsive client
   def test_idle_timeout_client
-    server = TestContainer.new(nil, {:idle_timeout => 0.1}, "#{__method__}.server")
-    server_thread = Thread.new { server.run }
-
+    server = ServerContainer.new("#{__method__}.server", {:idle_timeout => 0.1})
     client_handler = Class.new(ExceptionMessagingHandler) do
-      def initialize() @signal = Queue.new; end
-      attr_reader :signal
-      def on_connection_open(c) @signal.pop; end # Jam the client to get a timeout
+      def initialize() @ready, @block = Queue.new, Queue.new; end
+      attr_reader :ready, :block
+      def on_connection_open(c)
+        @ready.push nil        # Tell the main thread we are now open
+        @block.pop             # Block the client so the server will time it out
+      end
     end.new
     client = Container.new(nil, "#{__method__}.client")
     client.connect(server.url, {:handler => client_handler})
     client_thread = Thread.new { client.run }
-
-    server_thread.join          # Exits when the connection closes from idle-timeout
-    client_handler.signal.push true # Unblock the client
-
+    client_handler.ready.pop    # Wait till the client has connected
+    server.wait                 # Exits when the connection closes from idle-timeout
+    client_handler.block.push nil   # Unblock the client
     ex = assert_raises(Qpid::Proton::Condition) { client_thread.join }
     assert_match(/resource-limit-exceeded/, ex.to_s)
   end
-end
 
+  # Make sure we stop and clean up if an aborted connection causes a handler to raise.
+  # https://issues.apache.org/jira/browse/PROTON-1791
+  def test_handler_raise
+    cont = ServerContainer.new(__method__)
+    client_handler = Class.new(MessagingHandler) do
+      # TestException is < Exception so not handled by default rescue clause
+      def on_connection_open(c) raise TestException.new("Bad Dog"); end
+    end.new
+    threads = 3.times.collect { Thread.new { cont.run } }
+    sleep 0.01 while cont.running < 3 # Wait for all threads to be running
+    sockets = 2.times.collect { TCPSocket.new("", cont.port) }
+    cont.connect_io(sockets[1]) # No exception
+    cont.connect_io(sockets[0], {:handler => client_handler}) # Should stop container
+
+    threads.each { |t| assert_equal("Bad Dog", assert_raises(TestException) {t.join}.message) }
+    sockets.each { |s| assert s.closed? }
+    assert cont.listener.to_io.closed?
+    assert_raises(Container::StoppedError) { cont.run }
+    assert_raises(Container::StoppedError) { cont.listen "" }
+  end
+end
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/828405a2/proton-c/bindings/ruby/tests/test_tools.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_tools.rb b/proton-c/bindings/ruby/tests/test_tools.rb
index a8fe1de..07df5e7 100644
--- a/proton-c/bindings/ruby/tests/test_tools.rb
+++ b/proton-c/bindings/ruby/tests/test_tools.rb
@@ -28,7 +28,8 @@ rescue NameError                # For older versions of MiniTest
   MiniTest::Test = MiniTest::Unit::TestCase
 end
 
-class TestError < Exception; end
+class TestError < RuntimeError; end  # Normal error
+class TestException < Exception; end # Not caught by default rescue
 
 def wait_port(port, timeout=5)
   deadline = Time.now + timeout
@@ -97,7 +98,7 @@ end
 
 # ListenHandler that closes the Listener after first accept
 class ListenOnceHandler < Qpid::Proton::Listener::Handler
-  def on_error(l, e)  raise TestError, e.inspect; end
+  def on_error(l, e) raise e; end
   def on_accept(l) l.close; super; end
 end
 
@@ -145,3 +146,20 @@ DriverPair = Struct.new(:client, :server) do
   end
 end
 
+# Container that listens on a random port and runs itself
+class ServerContainer < Qpid::Proton::Container
+  include Qpid::Proton
+
+  def initialize(id=nil, listener_opts=nil)
+    super id
+    @listener = listen_io(TCPServer.open(0), Listener::Handler.new(listener_opts))
+    @thread = Thread.new { run }
+  end
+
+  attr_reader :listener
+
+  def port() @listener.port; end
+  def url() "amqp://:#{port}"; end
+  def wait() @listener.close; @thread.join; end
+end
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org