You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/03/16 20:34:19 UTC
qpid-proton git commit: PROTON-1791: [ruby] Container does not clean
up on handler exception
Repository: qpid-proton
Updated Branches:
refs/heads/master 3c80d6f4c -> 828405a23
PROTON-1791: [ruby] Container does not clean up on handler exception
If a handler raised an exception it simply terminated that one Container thread
with no cleanup (closing sockets etc.)
Now an exception raised by a handler causes is equivalent to Container#stop(),
but also causes all threads in Container#run() to raise the exception.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/828405a2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/828405a2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/828405a2
Branch: refs/heads/master
Commit: 828405a23575cf9876b1ac14048e078f7f8c4cc3
Parents: 3c80d6f
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Mar 16 14:15:26 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 16 16:33:33 2018 -0400
----------------------------------------------------------------------
proton-c/bindings/ruby/lib/core/container.rb | 139 ++++++++++++-------
proton-c/bindings/ruby/lib/core/listener.rb | 3 +
.../bindings/ruby/lib/core/messaging_handler.rb | 3 +
proton-c/bindings/ruby/tests/test_container.rb | 108 +++++++-------
proton-c/bindings/ruby/tests/test_tools.rb | 22 ++-
5 files changed, 171 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/828405a2/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index 5d7beb4..d4fe5d1 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -127,6 +127,63 @@ module Qpid::Proton
nt if !t || (nt < t)
end
+ # Rescue any exception raised by the block and stop the container.
+ def maybe_panic
+ begin
+ yield
+ rescue Exception => e
+ stop(nil, e)
+ end
+ end
+
+ # Handle a single item from the @work queue, this is the heart of the #run loop.
+ def run_one(task)
+ case task
+
+ when :start
+ @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
+
+ when Container
+ r, w = [@wake], []
+ next_tick = nil
+ @lock.synchronize do
+ @selectable.each do |s|
+ r << s if s.send :can_read?
+ w << s if s.send :can_write?
+ next_tick = next_tick_min(s, next_tick)
+ end
+ end
+ now = Time.now
+ timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
+ r, w = IO.select(r, w, nil, timeout)
+ now = Time.now
+ selected = Set.new(r).delete(@wake)
+ selected.merge(w) if w
+ selected.merge(@selectable.select { |s| next_tick_due(s, now) })
+ @wake.reset
+ stop_select = nil
+ @lock.synchronize do
+ if stop_select = @stopped # close everything
+ selected += @selectable
+ selected.each { |s| s.close @stop_err }
+ @wake.close
+ end
+ @selectable -= selected # Remove selected tasks
+ end
+ selected.each { |s| @work << s } # Queue up tasks needing #process
+ @work << self unless stop_select
+
+ when ConnectionTask then
+ maybe_panic { task.process }
+ rearm task
+
+ when ListenTask then
+ io, opts = maybe_panic { task.process }
+ add(connection_driver(io, opts, true)) if io
+ rearm task
+ end
+ end
+
public
# Error raised if the container is used after {#stop} has been called.
@@ -262,10 +319,24 @@ module Qpid::Proton
# Run the container: wait for IO activity, dispatch events to handlers.
#
- # More than one thread can call {#run} concurrently, the container will use
- # all the {#run} threads as a thread pool. Calls to
- # {Handler::MessagingHandler} methods are serialized for each connection or
- # listener, even if the container has multiple threads.
+ # *Multiple threads* : More than one thread can call {#run} concurrently,
+ # the container will use all {#run} threads as a thread pool. Calls to
+ # {MessagingHandler} or {Listener::Handler} methods are serialized for each
+ # connection or listener, even if the container has multiple threads.
+ #
+ # *Exceptions*: If any handler method raises an exception it will stop the
+ # container, and the exception will be raised by all calls to {#run}. For
+ # single threaded code this is often desirable. Multi-threaded server
+ # applications should normally rescue exceptions in the handler and deal
+ # with them in another way: logging, closing the connection with an error
+ # condition, signalling another thread etc.
+ #
+ # @return [void] Returns when the container stops, see {#stop} and {#auto_stop}
+ #
+ # @raise [StoppedError] If the container has already been stopped when {#run} was called.
+ #
+ # @raise [Exception] If any {MessagingHandler} or {Listener::Handler} managed by
+ # the container raises an exception, that exception will be raised by {#run}
#
def run
@lock.synchronize do
@@ -273,51 +344,9 @@ module Qpid::Proton
raise StoppedError if @stopped
end
while task = @work.pop
- case task
-
- when :start
- @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
-
- when Container
- r, w = [@wake], []
- next_tick = nil
- @lock.synchronize do
- @selectable.each do |s|
- r << s if s.send :can_read?
- w << s if s.send :can_write?
- next_tick = next_tick_min(s, next_tick)
- end
- end
- now = Time.now
- timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
- r, w = IO.select(r, w, nil, timeout)
- now = Time.now
- selected = Set.new(r).delete(@wake)
- selected.merge(w) if w
- selected.merge(@selectable.select { |s| next_tick_due(s, now) })
- @wake.reset
- stop_select = nil
- @lock.synchronize do
- if stop_select = @stopped # close everything
- selected += @selectable
- selected.each { |s| s.close @stop_err }
- @wake.close
- end
- @selectable -= selected # Remove selected tasks
- end
- selected.each { |s| @work << s } # Queue up tasks needing #process
- @work << self unless stop_select
-
- when ConnectionTask then
- task.process
- rearm task
-
- when ListenTask then
- io, opts = task.process
- add(connection_driver(io, opts, true)) if io
- rearm task
- end
+ run_one task
end
+ raise @panic if @panic
ensure
@lock.synchronize do
if (@running -= 1) > 0
@@ -339,13 +368,19 @@ module Qpid::Proton
# {StoppedError} on attempting. Create a new container if you want to
# resume activity.
#
- # @param error [Condition] Optional transport/listener error condition
+ # @param error [Condition] Optional error condition passed to
+ # {MessagingHandler#on_transport_error} for each connection and
+ # {Listener::Handler::on_error} for each listener.
+ #
+ # @param panic [Exception] Optional exception raised by all concurrent calls
+ # to run()
#
- def stop(error=nil)
+ def stop(error=nil, panic=nil)
@lock.synchronize do
- raise StoppedError if @stopped
- @stopped = true
+ return if @stopped
@stop_err = Condition.convert(error)
+ @panic = panic
+ @stopped = true
check_stop_lh
# NOTE: @stopped =>
# - no new run threads can join
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/828405a2/proton-c/bindings/ruby/lib/core/listener.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/listener.rb b/proton-c/bindings/ruby/lib/core/listener.rb
index f5ff9e5..82c711a 100644
--- a/proton-c/bindings/ruby/lib/core/listener.rb
+++ b/proton-c/bindings/ruby/lib/core/listener.rb
@@ -29,6 +29,9 @@ module Qpid::Proton
# connections. This class simply returns a fixed set of options for every
# connection accepted, but you can subclass and override all of the on_
# methods to provide more interesting behaviour.
+ #
+ # *Note*: If a {Listener} method raises an exception, it will stop the {Container}
+ # that the handler is running in. See {Container#run}
class Handler
# @param opts [Hash] Options to return from on_accept.
def initialize(opts=nil) @opts = opts || {}; end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/828405a2/proton-c/bindings/ruby/lib/core/messaging_handler.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/messaging_handler.rb b/proton-c/bindings/ruby/lib/core/messaging_handler.rb
index 66d9ce2..475f1c3 100644
--- a/proton-c/bindings/ruby/lib/core/messaging_handler.rb
+++ b/proton-c/bindings/ruby/lib/core/messaging_handler.rb
@@ -28,6 +28,9 @@ module Qpid::Proton
# {StopAutoResponse} from +#on_xxx_open+ or +#on_xxx_close+. The application becomes responsible
# for calling +#open/#close+ at a later point.
#
+ # *Note*: If a {MessagingHandler} method raises an exception, it will stop the {Container}
+ # that the handler is running in. See {Container#run}
+ #
class MessagingHandler
# @return [Hash] handler options, see {#initialize}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/828405a2/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index 0279c19..bed810c 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -20,23 +20,10 @@ require 'test_tools'
require 'minitest/unit'
require 'socket'
-# Container that listens on a random port
-class TestContainer < Qpid::Proton::Container
-
- def initialize(handler, lopts=nil, id=nil)
- super handler, id
- @listener = listen_io(TCPServer.open(0), ListenOnceHandler.new(lopts))
- end
-
- def port() @listener.to_io.addr[1]; end
- def url() "amqp://:#{port}"; end#
-end
-
# MessagingHandler that raises in on_error to catch unexpected errors
class ExceptionMessagingHandler
def on_error(e) raise e; end
end
-Thread.abort_on_exception = true
class ContainerTest < MiniTest::Test
include Qpid::Proton
@@ -70,9 +57,9 @@ class ContainerTest < MiniTest::Test
end
end.new
- c = TestContainer.new(receive_handler, {}, __method__)
+ c = ServerContainer.new(__method__, {:handler => receive_handler})
c.connect(c.url, {:handler => send_handler}).open_sender({:name => "testlink"})
- c.run
+ c.wait
assert send_handler.accepted
assert_equal "testlink", receive_handler.link.name
@@ -162,41 +149,42 @@ class ContainerTest < MiniTest::Test
assert_raises (SocketError) { c = cont.connect("badconnect.example.com:999") }
end
- # Verify that connection options are sent to the peer and available as Connection methods
+ # Verify that connection options are sent to the peer
def test_connection_options
# Note: user, password and sasl_xxx options are tested by ContainerSASLTest below
server_handler = Class.new(ExceptionMessagingHandler) do
def on_connection_open(c)
@connection = c
c.open({
- :virtual_host => "server.to.client",
- :properties => { :server => :client },
- :offered_capabilities => [ :s1 ],
- :desired_capabilities => [ :s2 ],
- :container_id => "box",
- })
+ :virtual_host => "server.to.client",
+ :properties => { :server => :client },
+ :offered_capabilities => [ :s1 ],
+ :desired_capabilities => [ :s2 ],
+ :container_id => "box",
+ })
c.close
end
attr_reader :connection
end.new
- # Transport options must be provided to the listener, by Connection#open it is too late
- cont = TestContainer.new(nil, {
- :handler => server_handler,
- :idle_timeout => 88,
- :max_sessions =>1000,
- :max_frame_size => 8888,
- })
+ # Transport options set by listener, by Connection#open it is too late
+ cont = ServerContainer.new(__method__, {
+ :handler => server_handler,
+ :idle_timeout => 88,
+ :max_sessions =>1000,
+ :max_frame_size => 8888,
+ })
client = cont.connect(cont.url,
- {:virtual_host => "client.to.server",
- :properties => { :foo => :bar, "str" => "str" },
- :offered_capabilities => [:c1 ],
- :desired_capabilities => ["c2" ],
- :idle_timeout => 42,
- :max_sessions =>100,
- :max_frame_size => 4096,
- :container_id => "bowl"
- })
- cont.run
+ {:virtual_host => "client.to.server",
+ :properties => { :foo => :bar, "str" => "str" },
+ :offered_capabilities => [:c1 ],
+ :desired_capabilities => ["c2" ],
+ :idle_timeout => 42,
+ :max_sessions =>100,
+ :max_frame_size => 4096,
+ :container_id => "bowl"
+ })
+ cont.wait
+
c = server_handler.connection
assert_equal "client.to.server", c.virtual_host
assert_equal({ :foo => :bar, :str => "str" }, c.properties)
@@ -231,24 +219,44 @@ class ContainerTest < MiniTest::Test
# Test for time out on unresponsive client
def test_idle_timeout_client
- server = TestContainer.new(nil, {:idle_timeout => 0.1}, "#{__method__}.server")
- server_thread = Thread.new { server.run }
-
+ server = ServerContainer.new("#{__method__}.server", {:idle_timeout => 0.1})
client_handler = Class.new(ExceptionMessagingHandler) do
- def initialize() @signal = Queue.new; end
- attr_reader :signal
- def on_connection_open(c) @signal.pop; end # Jam the client to get a timeout
+ def initialize() @ready, @block = Queue.new, Queue.new; end
+ attr_reader :ready, :block
+ def on_connection_open(c)
+ @ready.push nil # Tell the main thread we are now open
+ @block.pop # Block the client so the server will time it out
+ end
end.new
client = Container.new(nil, "#{__method__}.client")
client.connect(server.url, {:handler => client_handler})
client_thread = Thread.new { client.run }
-
- server_thread.join # Exits when the connection closes from idle-timeout
- client_handler.signal.push true # Unblock the client
-
+ client_handler.ready.pop # Wait till the client has connected
+ server.wait # Exits when the connection closes from idle-timeout
+ client_handler.block.push nil # Unblock the client
ex = assert_raises(Qpid::Proton::Condition) { client_thread.join }
assert_match(/resource-limit-exceeded/, ex.to_s)
end
-end
+ # Make sure we stop and clean up if an aborted connection causes a handler to raise.
+ # https://issues.apache.org/jira/browse/PROTON-1791
+ def test_handler_raise
+ cont = ServerContainer.new(__method__)
+ client_handler = Class.new(MessagingHandler) do
+ # TestException is < Exception so not handled by default rescue clause
+ def on_connection_open(c) raise TestException.new("Bad Dog"); end
+ end.new
+ threads = 3.times.collect { Thread.new { cont.run } }
+ sleep 0.01 while cont.running < 3 # Wait for all threads to be running
+ sockets = 2.times.collect { TCPSocket.new("", cont.port) }
+ cont.connect_io(sockets[1]) # No exception
+ cont.connect_io(sockets[0], {:handler => client_handler}) # Should stop container
+
+ threads.each { |t| assert_equal("Bad Dog", assert_raises(TestException) {t.join}.message) }
+ sockets.each { |s| assert s.closed? }
+ assert cont.listener.to_io.closed?
+ assert_raises(Container::StoppedError) { cont.run }
+ assert_raises(Container::StoppedError) { cont.listen "" }
+ end
+end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/828405a2/proton-c/bindings/ruby/tests/test_tools.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_tools.rb b/proton-c/bindings/ruby/tests/test_tools.rb
index a8fe1de..07df5e7 100644
--- a/proton-c/bindings/ruby/tests/test_tools.rb
+++ b/proton-c/bindings/ruby/tests/test_tools.rb
@@ -28,7 +28,8 @@ rescue NameError # For older versions of MiniTest
MiniTest::Test = MiniTest::Unit::TestCase
end
-class TestError < Exception; end
+class TestError < RuntimeError; end # Normal error
+class TestException < Exception; end # Not caught by default rescue
def wait_port(port, timeout=5)
deadline = Time.now + timeout
@@ -97,7 +98,7 @@ end
# ListenHandler that closes the Listener after first accept
class ListenOnceHandler < Qpid::Proton::Listener::Handler
- def on_error(l, e) raise TestError, e.inspect; end
+ def on_error(l, e) raise e; end
def on_accept(l) l.close; super; end
end
@@ -145,3 +146,20 @@ DriverPair = Struct.new(:client, :server) do
end
end
+# Container that listens on a random port and runs itself
+class ServerContainer < Qpid::Proton::Container
+ include Qpid::Proton
+
+ def initialize(id=nil, listener_opts=nil)
+ super id
+ @listener = listen_io(TCPServer.open(0), Listener::Handler.new(listener_opts))
+ @thread = Thread.new { run }
+ end
+
+ attr_reader :listener
+
+ def port() @listener.port; end
+ def url() "amqp://:#{port}"; end
+ def wait() @listener.close; @thread.join; end
+end
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org