You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/11/21 17:51:36 UTC
qpid-proton git commit: PROTON-1636: Fix Container shut-down races
Repository: qpid-proton
Updated Branches:
refs/heads/master b7d5c9eb5 -> 235f0a8aa
PROTON-1636: Fix Container shut-down races
- Fixed races in Container shut-down via #stop or #auto_stop=true
- Refactor ConnectionDriver; decouple from MessagingHandler
- Fixes to example issues that showed up during the work
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/235f0a8a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/235f0a8a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/235f0a8a
Branch: refs/heads/master
Commit: 235f0a8aa6238431cd90d5d03d9cbfcb94074b20
Parents: b7d5c9e
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Nov 21 12:37:28 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Nov 21 12:50:59 2017 -0500
----------------------------------------------------------------------
examples/ruby/README.md | 19 --
examples/ruby/broker.rb | 4 +-
examples/ruby/direct_recv.rb | 12 +-
examples/ruby/direct_send.rb | 12 +-
examples/ruby/example_test.rb | 43 +--
examples/ruby/helloworld_direct.rb | 60 ----
examples/ruby/simple_recv.rb | 4 +-
examples/ruby/simple_send.rb | 3 +-
.../bindings/ruby/lib/core/connection_driver.rb | 150 +++++-----
proton-c/bindings/ruby/lib/core/container.rb | 285 ++++++++++++-------
proton-c/bindings/ruby/lib/core/listener.rb | 108 +++----
proton-c/bindings/ruby/lib/util/condition.rb | 3 -
.../ruby/tests/test_connection_driver.rb | 19 +-
proton-c/bindings/ruby/tests/test_container.rb | 28 +-
proton-c/bindings/ruby/tests/test_tools.rb | 4 +-
15 files changed, 365 insertions(+), 389 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/examples/ruby/README.md
----------------------------------------------------------------------
diff --git a/examples/ruby/README.md b/examples/ruby/README.md
index 8526e29..938ef17 100644
--- a/examples/ruby/README.md
+++ b/examples/ruby/README.md
@@ -32,25 +32,6 @@ The following events occur while **helloworld.rb** runs:
* **on_sendable** - Fired when a message can be sent.
* **on_message** - Fired when a message is received.
-### Hello World Without A Broker required
-
-The next example we'll look at will send the classic "Hello world" message to itself directly,
-without going through a broker.
-
-To launch the example:
-
-```
- $ ruby helloworld_direct.rb //:9999
- Hello world!
-```
-
-Not very different from the example that uses the broker, which is what we'd expect from the outside. But let's take a look inside of the example and see how it's different at that level
-
-The direct version takes on the responsibility for listening to incoming connections as well as making an outgoing connection. So we see the following additional events occurring:
-
- * **on_accepted** - Fired when a message is received.
- * **on_connection_closed** - Fired when an endpoint closes its connection.
-
## More Complex Examples
Now that we've covered the basics with the archetypical hello world app, let's look at some more interesting examples.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/examples/ruby/broker.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/broker.rb b/examples/ruby/broker.rb
index bd461ed..111d41b 100644
--- a/examples/ruby/broker.rb
+++ b/examples/ruby/broker.rb
@@ -78,8 +78,8 @@ class Broker < Qpid::Proton::Handler::MessagingHandler
end
def on_start(event)
- @acceptor = event.container.listen(@url)
- print "Listening on #{@url}\n"
+ @listener = event.container.listen(@url)
+ STDOUT.puts "Listening on #{@url}"; STDOUT.flush
end
def queue(address)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/examples/ruby/direct_recv.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/direct_recv.rb b/examples/ruby/direct_recv.rb
index b2b0ba9..2e3295f 100644
--- a/examples/ruby/direct_recv.rb
+++ b/examples/ruby/direct_recv.rb
@@ -30,8 +30,13 @@ class DirectReceive < Qpid::Proton::Handler::MessagingHandler
@received = 0
end
+ class ListenOnce < Qpid::Proton::Listener::Handler
+ def on_open(l) STDOUT.puts "Listening\n"; STDOUT.flush; end
+ def on_accept(l) l.close; end
+ end
+
def on_start(event)
- event.container.listen(@url)
+ event.container.listen(@url, ListenOnce.new)
end
def on_message(event)
@@ -39,7 +44,7 @@ class DirectReceive < Qpid::Proton::Handler::MessagingHandler
puts "Received: #{event.message.body}"
@received = @received + 1
if @received == @expected
- event.container.stop
+ event.connection.close
end
end
end
@@ -51,5 +56,6 @@ Listen on URL and receive COUNT messages from ADDRESS"
return 1
end
url, address, count = ARGV
-Qpid::Proton::Container.new(DirectReceive.new(url, address, count || 10)).run
+count = Integer(count || 10)
+Qpid::Proton::Container.new(DirectReceive.new(url, address, count)).run
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/examples/ruby/direct_send.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/direct_send.rb b/examples/ruby/direct_send.rb
index e54e1ab..0ccfe38 100644
--- a/examples/ruby/direct_send.rb
+++ b/examples/ruby/direct_send.rb
@@ -31,8 +31,13 @@ class DirectSend < Qpid::Proton::Handler::MessagingHandler
@expected = expected
end
+ class ListenOnce < Qpid::Proton::Listener::Handler
+ def on_open(l) STDOUT.puts "Listening\n"; STDOUT.flush; end
+ def on_accept(l) l.close; end
+ end
+
def on_start(event)
-*co event.container.listen(@url)
+ event.container.listen(@url, ListenOnce.new)
end
def on_sendable(event)
@@ -47,7 +52,7 @@ class DirectSend < Qpid::Proton::Handler::MessagingHandler
@confirmed = @confirmed + 1
if @confirmed == @expected
puts "All #{@expected} messages confirmed!"
- event.container.stop
+ event.connection.close
end
end
end
@@ -58,4 +63,5 @@ Listen on URL and send COUNT messages to ADDRESS"
return 1
end
url, address, count = ARGV
-Qpid::Proton::Container.new(DirectSend.new(url, address, count || 10)).run
+count = Integer(count || 10)
+Qpid::Proton::Container.new(DirectSend.new(url, address, count)).run
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/examples/ruby/example_test.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/example_test.rb b/examples/ruby/example_test.rb
index bcb6aa6..19f1638 100755
--- a/examples/ruby/example_test.rb
+++ b/examples/ruby/example_test.rb
@@ -22,17 +22,6 @@ require 'minitest/autorun'
require 'qpid_proton'
require 'socket'
-# Wait for the broker to be listening
-def wait_for(url, timeout = 5)
- deadline = Time.now + 5
- begin
- TCPSocket.open("", URI(url).port).close
- rescue Errno::ECONNREFUSED
- retry if Time.now < deadline
- raise
- end
-end
-
# URL with an unused port
def test_url()
"amqp://:#{TCPServer.open(0) { |s| s.addr[1] }}"
@@ -46,8 +35,8 @@ class ExampleTest < MiniTest::Test
end
def assert_output(want, *args)
- out = run_script(*args)
- assert_equal(want, out.read.strip)
+ p = run_script(*args)
+ assert_equal(want, p.read.strip)
end
def test_helloworld
@@ -77,43 +66,29 @@ EOS
assert_output(want.strip, "simple_recv.rb", $url, __method__)
end
- def test_helloworld_direct
- url = test_url
- assert_output("Hello world!", "helloworld_direct.rb", url, __method__)
- end
-
def test_direct_recv
url = test_url
- p = run_script("direct_recv.rb", url, __method__)
- wait_for url
- assert_output("All 10 messages confirmed!", "simple_send.rb", url, __method__)
- want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
- assert_equal(want.strip, p.read.strip)
+ p = run_script("direct_recv.rb", url, __method__)
+ p.readline # Wait till ready
+ assert_output("All 10 messages confirmed!", "simple_send.rb", url, __method__)
+ want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
+ assert_equal(want.strip, p.read.strip)
end
def test_direct_send
url = test_url
p = run_script("direct_send.rb", url, __method__)
- wait_for url
+ p.readline # Wait till ready
want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
assert_output(want.strip, "simple_recv.rb", url, __method__)
assert_equal("All 10 messages confirmed!", p.read.strip)
end
-
- def test_direct_send
- url = test_url
- p = run_script("direct_recv.rb", url, __method__)
- wait_for url
- assert_output("All 10 messages confirmed!", "simple_send.rb", url, __method__)
- want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
- assert_equal(want.strip, p.read.strip)
- end
end
# Start the broker before all tests.
$url = test_url
$broker = IO.popen([RbConfig.ruby, 'broker.rb', $url])
-wait_for $url
+$broker.readline
# Kill the broker after all tests
MiniTest.after_run do
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/examples/ruby/helloworld_direct.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/helloworld_direct.rb b/examples/ruby/helloworld_direct.rb
deleted file mode 100644
index dab368b..0000000
--- a/examples/ruby/helloworld_direct.rb
+++ /dev/null
@@ -1,60 +0,0 @@
-#--
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#++
-
-require 'qpid_proton'
-require 'optparse'
-
-class HelloWorldDirect < Qpid::Proton::Handler::MessagingHandler
-
- include Qpid::Proton::Util::Wrapper
-
- def initialize(url, address)
- super()
- @url, @address = url, address
- end
-
- def on_start(event)
- event.container.listen(@url)
- c = event.container.connect(@url) # Connect to self!
- c.open_sender(@address)
- end
-
- def on_sendable(event)
- msg = Qpid::Proton::Message.new
- msg.body = "Hello world!"
- event.sender.send(msg)
- event.sender.close
- end
-
- def on_message(event)
- puts "#{event.message.body}"
- end
-
- def on_accepted(event)
- event.container.stop
- end
-end
-
-if ARGV.size != 2
- STDERR.puts "Usage: #{__FILE__} URL ADDRESS
-Listen on and connect to URL (connect to self), send a message to ADDRESS and receive it back"
- return 1
-end
-url, address = ARGV
-Qpid::Proton::Container.new(HelloWorldDirect.new(url, address)).run
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/examples/ruby/simple_recv.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/simple_recv.rb b/examples/ruby/simple_recv.rb
index d1a9607..446d7da 100644
--- a/examples/ruby/simple_recv.rb
+++ b/examples/ruby/simple_recv.rb
@@ -52,6 +52,6 @@ Connect to URL and receive COUNT messages from ADDRESS"
return 1
end
url, address, count = ARGV
-
-Qpid::Proton::Container.new(SimpleReceive.new(url, address, count || 10)).run
+count = Integer(count || 10)
+Qpid::Proton::Container.new(SimpleReceive.new(url, address, count)).run
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/examples/ruby/simple_send.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/simple_send.rb b/examples/ruby/simple_send.rb
index 25fc56c..be919d5 100644
--- a/examples/ruby/simple_send.rb
+++ b/examples/ruby/simple_send.rb
@@ -59,4 +59,5 @@ Connect to URL and send COUNT messages to ADDRESS"
return 1
end
url, address, count = ARGV
-Qpid::Proton::Container.new(SimpleSend.new(url, address, count || 10)).run
+count = Integer(count || 10)
+Qpid::Proton::Container.new(SimpleSend.new(url, address, count)).run
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/proton-c/bindings/ruby/lib/core/connection_driver.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection_driver.rb b/proton-c/bindings/ruby/lib/core/connection_driver.rb
index 6c85659..537e308 100644
--- a/proton-c/bindings/ruby/lib/core/connection_driver.rb
+++ b/proton-c/bindings/ruby/lib/core/connection_driver.rb
@@ -20,37 +20,27 @@ require 'socket'
module Qpid
module Proton
- # Associate an AMQP {Connection} with an {IO} and a {MessagingHandler}
+ # Associate an AMQP {Connection} and {Transport} with an {IO}
#
- # - Read AMQP binary data from the {IO} (#read, #process)
- # - Call on_* methods on the {MessagingHandler} for AMQP events (#dispatch, #process)
- # - Write AMQP binary data to the {IO} (#write, #process)
+ # - {#read} reads AMQP binary data from the {IO} and generates events
+ # - {#tick} generates timing-related events
+ # - {#event} gets events to be dispatched to {Handler::MessagingHandler}s
+ # - {#write} writes AMQP binary data to the {IO}
#
# Thread safety: The {ConnectionDriver} is not thread safe but separate
# {ConnectionDriver} instances can be processed concurrently. The
# {Container} handles multiple connections concurrently in multiple threads.
#
class ConnectionDriver
-
- # Create a {Connection} and associate it with +io+ and +handler+
- #
- # @param io [#read_nonblock, #write_nonblock] An {IO} or {IO}-like object that responds
- # to #read_nonblock and #write_nonblock.
- # @param opts [Hash] See {Connection#open} - transport options are set here,
- # remaining options
- # @pram server [Bool] If true create a server (incoming) connection
- def initialize(io, opts = {}, server=false)
- @impl = Cproton.pni_connection_driver or raise RuntimeError, "cannot create connection driver"
+ # Create a {Connection} and {Transport} associated with +io+
+ # @param io [IO] An {IO} or {IO}-like object that responds
+ # to {IO#read_nonblock} and {IO#write_nonblock}
+ def initialize(io)
+ @impl = Cproton.pni_connection_driver or raise NoMemoryError
@io = io
- @handler = opts[:handler] || Handler::MessagingHandler.new # Default handler if missing
- @rbuf = "" # String to re-use as read buffer
- connection.apply opts
- transport.set_server if server
- transport.apply opts
+ @rbuf = "" # String for re-usable read buffer
end
- attr_reader :handler
-
# @return [Connection]
def connection()
@connection ||= Connection.wrap(Cproton.pni_connection_driver_connection(@impl))
@@ -74,18 +64,11 @@ module Qpid
# transport are closed and there are no events to dispatch.
def finished?() Cproton.pn_connection_driver_finished(@impl); end
- # Dispatch available events, call the relevant on_* methods on the {#handler}.
- def dispatch(extra_handlers = nil)
- extra_handlers ||= []
- while event = Event::Event.wrap(Cproton.pn_connection_driver_next_event(@impl))
- pre_dispatch(event)
- event.dispatch(@handler)
- extra_handlers.each { |h| event.dispatch h }
- end
- end
+ # Get the next event to dispatch, nil if no events available
+ def event() Event::Event.wrap(Cproton.pn_connection_driver_next_event(@impl)); end
- # Read from IO without blocking.
- # IO errors are not raised, they are passed to {#handler}.on_transport_error by {#dispatch}
+ # Non-blocking read from {#io}, generate events for {#event}
+ # IO errors are returned as transport errors by {#event}, not raised
def read
size = Cproton.pni_connection_driver_read_size(@impl)
return if size <= 0
@@ -95,12 +78,12 @@ module Qpid
# Try again later.
rescue EOFError # EOF is not an error
close_read
- rescue IOError, SystemCallError => e # is passed to the transport
+ rescue IOError, SystemCallError => e
close e
end
- # Write to IO without blocking.
- # IO errors are not raised, they are passed to {#handler}.on_transport_error by {#dispatch}
+ # Non-blocking write to {#io}
+ # IO errors are returned as transport errors by {#event}, not raised
def write
n = @io.write_nonblock(Cproton.pn_connection_driver_write_buffer(@impl))
Cproton.pn_connection_driver_write_done(@impl, n) if n > 0
@@ -110,13 +93,13 @@ module Qpid
close e
end
- # Generate timed events and IO, for example idle-timeout and heart-beat events.
- # May generate events for {#dispatch} and change the readable/writeable state.
+ # Handle time-related work, for example idle-timeout events.
+ # May generate events for {#event} and change {#can_read?}, {#can_write?}
#
# @param [Time] now the current time, defaults to {Time#now}.
#
# @return [Time] time of the next scheduled event, or nil if there are no
- # scheduled events. If non-nil, tick() must be called again no later than
+ # scheduled events. If non-nil you must call {#tick} again no later than
# this time.
def tick(now=Time.now)
transport = Cproton.pni_connection_driver_transport(@impl)
@@ -124,54 +107,79 @@ module Qpid
return ms.zero? ? nil : Time.at(ms.to_r / 1000);
end
- # Do read, tick, write and dispatch without blocking.
- # @param [Bool] io_readable true if the IO might be readable
- # @param [Bool] io_writable true if the IO might be writeable
- # @param [Time] now the current time
- # @return [Time] Latest time to call {#process} again for scheduled events,
- # or nil if there are no scheduled events
- def process(io_readable=true, io_writable=true, now=Time.now)
- read if io_readable
- next_tick = tick(now)
- if io_writable
- dispatch
- write
- end
- dispatch
- return next_tick
+ # Disconnect the write side of the transport, *without* sending an AMQP
+ # close frame. To close politely, you should use {Connection#close}, the
+ # transport will close itself once the protocol close is complete.
+ #
+ def close_write error=nil
+ return if Cproton.pn_connection_driver_write_closed(@impl)
+ set_error error if error
+ Cproton.pn_connection_driver_write_close(@impl)
+ @io.close_write
end
- # Close the read side of the transport
- def close_read
+ # Disconnect the read side of the transport, without waiting for an AMQP
+ # close frame. See comments on {#close_write}
+ def close_read error=nil
return if Cproton.pn_connection_driver_read_closed(@impl)
+ set_error error if error
Cproton.pn_connection_driver_read_close(@impl)
@io.close_read
end
- # Close the write side of the transport
- def close_write
- return if Cproton.pn_connection_driver_write_closed(@impl)
- Cproton.pn_connection_driver_write_close(@impl)
- @io.close_write
+ # Disconnect both sides of the transport sending/waiting for AMQP close
+ # frames. See comments on {#close_write}
+ def close error=nil
+ close_write error
+ close_read
end
- # Close both sides of the IO with optional error
- # @param error [Condition] If non-nil pass to {#handler}.on_transport_error on next {#dispatch}
- # Note `error` can be any value accepted by [Condition##make]
- def close(error=nil)
- if error
- cond = Condition.make(error, "proton:io")
+ private
+
+ def set_error e
+ if cond = Condition.make(e, "proton:io")
Cproton.pn_connection_driver_errorf(@impl, cond.name, "%s", cond.description)
end
- close_read
- close_write
end
+ end
- protected
+ # A {ConnectionDriver} that feeds events to a {Handler::MessagingHandler}
+ class HandlerDriver < ConnectionDriver
+ # Combine an {IO} with a {Handler::MessagingHandler} and provide
+ # a simplified way to run the driver via {#process}
+ #
+ # @param io [IO]
+ # @param handler [Handler::MessagingHandler] to receive events in
+ # {#dispatch} and {#process}
+ def initialize(io, handler)
+ super(io)
+ @handler = handler || Handler::MessagingHandler.new
+ end
+
+ attr_reader :handler
- # Override in subclass to add event context
- def pre_dispatch(event) event; end
+ # Dispatch all events available from {#event} to {#handler}
+ # @param handlers [Enum<Handler::MessagingHandler>]
+ def dispatch()
+ while e = event
+ e.dispatch @handler
+ end
+ end
+ # Do {#read}, {#tick}, {#write} and {#dispatch} without blocking.
+ #
+ # @param [Handle::MessagingHanlder] handler A handler to dispatch
+ # events to.
+ # @param [Time] now the current time
+ # @return [Time] Latest time to call {#process} again for scheduled events,
+ # or nil if there are no scheduled events
+ def process(now=Time.now)
+ read
+ next_tick = tick(now)
+ write
+ dispatch
+ return next_tick
+ end
end
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index 6e14d11..6fbdbb5 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -22,62 +22,138 @@ require 'set'
require_relative 'listener'
module Qpid::Proton
- # An AMQP container manages a set of {Connection}s which contain {#Sender} and
- # {#Receiver} links to transfer messages.
+ # An AMQP container manages a set of {Listener}s and {Connection}s which
+ # contain {#Sender} and {#Receiver} links to transfer messages. Usually, each
+ # AMQP client or server process has a single container for all of its
+ # connections and links.
#
- # TODO aconway 2017-10-26: documentthreading/dispatch role
- #
- # Usually, each AMQP client or server process has a single container for
- # all of its connections and links.
+ # One or more threads can call {#run}, events generated by all the listeners and
+ # connections will be dispatched in the {#run} threads.
class Container
private
def amqp_uri(s) Qpid::Proton::amqp_uri s; end
- class ConnectionDriver < Qpid::Proton::ConnectionDriver
+ # Container driver applies options and adds container context to events
+ class ConnectionTask < Qpid::Proton::HandlerDriver
def initialize container, io, opts, server=false
- super io, opts, server
+ super io, opts[:handler]
+ transport.set_server if server
+ transport.apply opts
+ connection.apply opts
@container = container
end
- def final() end
- def pre_dispatch(event) event.container = @container; end
+ def event
+ # Add a container to the event
+ e = super()
+ e.container = @container if e
+ e
+ end
+ end
+
+ class ListenTask < Listener
+
+ def initialize(io, handler, container)
+ super
+ env = ENV['PN_TRACE_EVT']
+ if env && ["true", "1", "yes", "on"].include?(env.downcase)
+ @log_prefix = "[0x#{object_id.to_s(16)}](PN_LISTENER_"
+ end
+ end
+
+ def process
+ unless @closing
+ unless @open_dispatched
+ dispatch(:on_open);
+ @open_dispatched = true
+ end
+ begin
+ return @io.accept, dispatch(:on_accept)
+ rescue IO::WaitReadable, Errno::EINTR
+ rescue IOError, SystemCallError => e
+ close e
+ end
+ end
+ if @closing
+ @io.close rescue nil
+ @closing = false
+ @closed = true
+ dispatch(:on_error, @condition) if @condition
+ dispatch(:on_close)
+ end
+ end
+
+ def can_read?() !@closed; end
+ def can_write?() false; end
+ def finished?() @closed; end
+
+ def dispatch(method, *args)
+ STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix
+ @handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method)
+ end
end
public
# Create a new Container
+ # @overload initialize(id=nil)
+ # @param id [String] A unique ID for this container, use random UUID if nil.
#
- # @param handler [MessagingHandler] Optional default handler for connections
+ # @overload initialize(handler=nil, id=nil)
+ # @param id [String] A unique ID for this container, use random UUID if nil.
+ # @param handler [MessagingHandler] Optional default handler for connections
# that do not have their own handler (see {#connect} and {#listen})
#
- # @note For multi-threaded code, it is recommended to use a separate
- # handler instance for every connection, as a shared global handler can be
- # called concurrently for every connection that uses it.
- # @param id [String] A unique ID for this container. Defaults to a random UUID.
+ # *Note*: For multi-threaded code, it is recommended to use a separate
+ # handler instance for each connection, as a shared handler may be called
+ # concurrently.
#
def initialize(handler = nil, id = nil)
# Allow ID as sole argument
(handler, id = nil, handler.to_str) if (id.nil? && handler.respond_to?(:to_str))
- raise TypeError, "Expected MessagingHandler, got #{handler.class}" if handler && !handler.is_a?(Qpid::Proton::Handler::MessagingHandler)
+ raise TypeError, "Expected MessagingHandler, got #{handler.class}" if
+ handler && !handler.is_a?(Qpid::Proton::Handler::MessagingHandler)
- # TODO aconway 2017-11-08: allow handlers, opts for backwards compat?
+ # TODO aconway 2017-11-08: allow multiple handlers, opts for backwards compat?
@handler = handler
- @id = (id || SecureRandom.uuid).freeze
+ @id = ((id && id.to_s) || SecureRandom.uuid).freeze
+
+ # Implementation note:
+ #
+ # - #run threads take work from @work
+ # - Each driver and the Container itself is processed by at most one #run thread at a time
+ # - The Container thread does IO.select
+ # - nil on the @work queue makes a #run thread exit
+
@work = Queue.new
- @work.push self # Let the first #run thread select
- @wake = IO.pipe
- @dummy = "" # Dummy buffer for draining wake pipe
+ @work << self # Let the first #run thread start selecting
+ @wake = IO.pipe # Wakes #run thread in IO.select
+ @auto_stop = true # Exit #run when @active drops to 0
+
+ # Following instance variables protected by lock
@lock = Mutex.new
- @selectables = Set.new # ConnectionDrivers and Listeners
- @auto_stop = true
- @active = 0 # activity (connection, listener) counter for auto_stop
- @running = 0 # concurrent calls to #run
+ @active = 0 # All active tasks, in @selectable, @work or being processed
+ @selectable = Set.new # Tasks ready to block in IO.select
+ @running = 0 # Count of #run threads
+ @stopping = false # #stop called, closing tasks
+ @stop_err = nil # Optional error from #stop
end
- # @return [String] Unique identifier for this container
+ # @return [String] unique identifier for this container
attr_reader :id
+ # Auto-stop flag.
+ #
+ # True (the default) means all calls to {#run} will return when the container
+ # transitions from active to inactive - all connections and listeners
+ # have closed.
+ #
+ # False means {#run} will not return unless {#stop} is called.
+ #
+ # @return [Bool] auto-stop state
+ attr_accessor :auto_stop
+
# Open an AMQP connection.
#
# @param url [String, URI] Open a {TCPSocket} to url.host, url.port.
@@ -85,7 +161,6 @@ module Qpid::Proton
# url.user, url.password are used as defaults if opts[:user], opts[:password] are nil
# @option (see Connection#open)
# @return [Connection] The new AMQP connection
- #
def connect(url, opts = {})
url = amqp_uri(url)
opts[:user] ||= url.user
@@ -100,16 +175,18 @@ module Qpid::Proton
def connect_io(io, opts = {})
cd = connection_driver(io, opts)
cd.connection.open()
- add(cd).connection
+ add(cd)
+ cd.connection
end
# Listen for incoming AMQP connections
#
# @param url [String,URI] Listen on host:port of the AMQP URL
- # @param handler [ListenHandler] A {ListenHandler} object that will be called
+ # @param handler [Listener::Handler] A {Listener::Handler} object that will be called
# with events for this listener and can generate a new set of options for each one.
# @return [Listener] The AMQP listener.
- def listen(url, handler=ListenHandler.new)
+ #
+ def listen(url, handler=Listener::Handler.new)
url = amqp_uri(url)
# TODO aconway 2017-11-01: amqps
listen_io(TCPServer.new(url.host, url.port), handler)
@@ -117,134 +194,136 @@ module Qpid::Proton
# Listen for incoming AMQP connections on an existing server socket.
# @param io A server socket, for example a {TCPServer}
- # @param handler [ListenHandler] Handler for events from this listener
- def listen_io(io, handler=ListenHandler.new)
- add(Listener.new(io, handler))
+ # @param handler [Listener::Handler] Handler for events from this listener
+ #
+ def listen_io(io, handler=Listener::Handler.new)
+ l = ListenTask.new(io, handler, self)
+ add(l)
+ l
end
# Run the container: wait for IO activity, dispatch events to handlers.
#
# More than one thread can call {#run} concurrently, the container will use
- # all the {#run} ,threads as a pool to handle multiple connections
- # concurrently. The container ensures that handler methods for a single
- # connection (or listener) instance are serialized, even if the container
- # has multiple threads.
+ # all the {#run} threads as a thread pool. Calls to
+ # {Handler::MessagingHandler} methods are serialized for each connection or
+ # listener, even if the container has multiple threads.
#
- def run()
- @lock.synchronize { @running += 1 }
-
- unless @on_start
- @on_start = true
+ def run
+ need_on_start = nil
+ @lock.synchronize do
+ @running += 1
+ need_on_start = !@on_start_called && @handler && @handler.respond_to?(:on_start)
+ @on_start_called = true
+ end
+ if need_on_start
# TODO aconway 2017-10-28: proper synthesized event for on_start
event = Class.new do
def initialize(c) @container = c; end
attr_reader :container
end.new(self)
- @handler.on_start(event) if @handler && @handler.respond_to?(:on_start)
+ @handler.on_start(event)
end
- while x = @work.pop
- case x
- when Container then
- # Only one thread can select at a time
+ while task = @work.pop
+ case task
+ when Container
r, w = [@wake[0]], []
@lock.synchronize do
- @selectables.each do |s|
+ @selectable.each do |s|
r << s if s.send :can_read?
w << s if s.send :can_write?
end
end
r, w = IO.select(r, w)
selected = Set.new(r).merge(w)
- if selected.delete?(@wake[0]) # Drain the wake pipe
- begin
- @wake[0].read_nonblock(256, @dummy) while true
- rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
- end
- end
+ drain_wake if selected.delete?(@wake[0])
@lock.synchronize do
- if @stop_all
- selected = @selectables
- @selectables = Set.new
- selected.each { |s| s.close }
- @work << nil if selected.empty? # Already idle, initiate stop now
- @stop_all = false
- else
- @selectables.subtract(selected)
+ if @stopping # close everything
+ selected += @selectable
+ selected.each { |s| s.close @stop_err }
end
+ @selectable -= selected # Remove selected tasks
end
- # Move selected items to the work queue for serialized processing.
- @lock.synchronize { @selectables.subtract(selected) }
- selected.each { |s| @work << s } # Queue up all the work
+ selected.each { |s| @work << s } # Queue up tasks needing #process
@work << self # Allow another thread to select()
- when ConnectionDriver then
- x.process
- rearm x
+ when ConnectionTask then
+ task.close @stop_err if @lock.synchronize { @stopping }
+ task.process
+ rearm task
when Listener then
- io, opts = x.send :process
+ task.close @stop_err if @lock.synchronize { @stopping }
+ io, opts = task.process
add(connection_driver(io, opts, true)) if io
- rearm x
+ rearm task
end
- # TODO aconway 2017-10-26: scheduled tasks
+ # TODO aconway 2017-10-26: scheduled tasks, heartbeats
end
ensure
- @running -= 1
- if @running > 0 # Signal the next #run thread that we are stopping
- @work << nil
- wake
+ @lock.synchronize do
+ @stopping, @stop_err = nil if (@running -= 1).zero? # Last out, reset for next #run
end
end
- # @!attribute auto_stop [rw]
- # @return [Bool] With auto_stop enabled, all calls to {#run} will return when the
- # container's last activity (connection, listener or scheduled event) is
- # closed/completed. With auto_stop disabled {#run} does not return.
- def auto_stop=(enabled) @lock.synchronize { @auto_stop=enabled }; wake; end
- def auto_stop() @lock.synchronize { @auto_stop }; end
-
- # Enable {#auto_stop} and close all connections and listeners with error.
+ # Disconnect all listeners and connections without a polite AMQP close sequence.
# {#stop} returns immediately, calls to {#run} will return when all activity is finished.
- # @param error [Condition] If non-nil pass to {#handler}.on_error
- # Note `error` can be any value accepted by [Condition##make]
+ # @param error [Condition] Optional transport/listener error condition
+ #
def stop(error=nil)
@lock.synchronize do
- @auto_stop = true
- @stop_all = true
- wake
+ @stopping = true
+ @stop_err = Condition.make(error)
+ check_stop_lh
+ # NOTE: @stopping =>
+ # - no new run threads can join
+ # - no more select calls after next wakeup
+ # - once @active == 0, all threads will be stopped with nil
end
+ wake
end
private
- # Always wake when we add new work
- def work(s) work << s; wake; end
+ def wake; @wake[1].write_nonblock('x') rescue nil; end
+
+ def work_wake(task) @work << task; wake; end
- def wake()
- @wake[1].write_nonblock('x') rescue nil
+ def drain_wake
+ begin
+ @wake[0].read_nonblock(256) while true
+ rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
+ end
end
def connection_driver(io, opts, server=false)
+ opts ||= {}
opts[:container_id] ||= @id
opts[:handler] ||= @handler
- ConnectionDriver.new(self, io, opts, server)
+ ConnectionTask.new(self, io, opts, server)
+ end
+
+ def add task
+ @lock.synchronize { @active += 1 }
+ work_wake task
end
- def add(s)
+ def rearm task
@lock.synchronize do
- @active += 1
+ if task.finished?
+ @active -= 1
+ check_stop_lh
+ else
+ @selectable << task
+ end
end
- @work << s
wake
- return s
end
- def rearm s
- if s.send :finished?
- @lock.synchronize { @work << nil if (@active -= 1).zero? && @auto_stop }
- else
- @lock.synchronize { @selectables << s }
+ def check_stop_lh
+ if @active.zero? && (@auto_stop || @stopping)
+ @running.times { @work << nil } # Signal all threads to stop
+ true
end
- wake
end
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/proton-c/bindings/ruby/lib/core/listener.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/listener.rb b/proton-c/bindings/ruby/lib/core/listener.rb
index 85dc976..0584807 100644
--- a/proton-c/bindings/ruby/lib/core/listener.rb
+++ b/proton-c/bindings/ruby/lib/core/listener.rb
@@ -25,14 +25,49 @@ module Qpid::Proton
# pass a {ListenerHandler} on creation.
#
class Listener
- # The listener's container
+
+ # Class that handles listener events and provides options for accepted
+ # connections. This class simply returns a fixed set of options for every
+ # connection accepted, but you can subclass and override all of the on_
+ # methods to provide more interesting behaviour.
+ class Handler
+ # @param opts [Hash] Options to return from on_accept.
+ def initialize(opts={}) @opts = opts; end
+
+ # Called when the listener is ready to accept connections.
+ # @param listener [Listener] The listener
+ def on_open(listener) end
+
+ # Called if an error occurs.
+ # If there is an error while opening the listener, this method is
+ # called and {#on_open} is not
+ # @param listener [Listener]
+ # @param what [Condition] Information about the error.
+ def on_error(listener, what) end
+
+ # Called when a listener accepts a new connection.
+ # @param listener [Listener] The listener
+ # @return [Hash] Options to apply to the incoming connection, see {#connect}
+ def on_accept(listener) @opts; end
+
+ # Called when the listener closes.
+ # @param listener [Listener] The listener accepting the connection.
+ def on_close(listener) end
+ end
+
+ # @return [Container] The listener's container
attr_reader :container
+ # @return [Condition] The error condition if there is one
+ attr_reader :condition
+
# Close the listener
# @param error [Condition] Optional error condition.
def close(error=nil)
- @closed ||= Condition.make(error) || true
+ @closing = true
+ @condition ||= Condition.make(error) if error
@io.close_read rescue nil # Cause listener to wake out of IO.select
+ nil
end
# Get the {IO} server socket used by the listener
@@ -40,74 +75,9 @@ module Qpid::Proton
private # Called by {Container}
- def initialize(io, handler)
+ def initialize(io, handler, container)
@io, @handler = io, handler
+ @container = container
end
-
- def process
- unless @closed
- unless @open_dispatched
- dispatch(:on_open)
- @open_dispatched = true
- end
- begin
- return @io.accept, dispatch(:on_accept)
- rescue IO::WaitReadable, Errno::EINTR
- rescue IOError, SystemCallError => e
- close e
- end
- end
- if @closed
- dispatch(:on_error, @closed) if @closed != true
- dispatch(:on_close)
- close @io unless @io.closed? rescue nil
- end
- end
-
- def can_read?() true; end
- def can_write?() false; end
- def finished?() @closed; end
-
- # TODO aconway 2017-11-06: logging strategy
- TRUE = Set[:true, :"1", :yes, :on]
- def log?()
- enabled = ENV['PN_TRACE_EVT']
- TRUE.include? enabled.downcase.to_sym if enabled
- end
-
- def dispatch(method, *args)
- STDERR.puts "(Listener 0x#{object_id.to_s(16)})[#{method}]" if log?
- @handler.send(method, self, *args) if @handler && @handler.respond_to?(method)
- end
- end
-
-
- # Class that handles listener events and provides options for accepted
- # connections. This class simply returns a fixed set of options for every
- # connection accepted, but you can subclass and override all of the on_
- # methods to provide more interesting behaviour.
- class ListenHandler
- # @param opts [Hash] Options to return from on_accept.
- def initialize(opts={}) @opts = opts; end
-
- # Called when the listener is ready to accept connections.
- # @param listener [Listener] The listener
- def on_open(listener) end
-
- # Called if an error occurs.
- # If there is an error while opening the listener, this method is
- # called and {#on_open} is not
- # @param listener [Listener]
- # @param what [Condition] Information about the error.
- def on_error(listener, what) end
-
- # Called when a listener accepts a new connection.
- # @param listener [Listener] The listener
- # @return [Hash] Options to apply to the incoming connection, see {#connect}
- def on_accept(listener) @opts; end
-
- # Called when the listener closes.
- # @param listener [Listener] The listener accepting the connection.
- def on_close(listener) end
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/proton-c/bindings/ruby/lib/util/condition.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/condition.rb b/proton-c/bindings/ruby/lib/util/condition.rb
index 37c41a0..1cb307a 100644
--- a/proton-c/bindings/ruby/lib/util/condition.rb
+++ b/proton-c/bindings/ruby/lib/util/condition.rb
@@ -29,9 +29,7 @@ module Qpid::Proton
@info = info
end
- def to_s() "#{@name}: #{@description
def to_s() "#{@name}: #{@description}"; end
-}"; end
def inspect() "#{self.class.name}(#{@name.inspect}, #{@description.inspect}, #{@info.inspect})"; end
@@ -50,7 +48,6 @@ module Qpid::Proton
# - when Exception return Condition(obj.class.name, obj.to_s)
# - when nil then nil
# - else return Condition(default_name, obj.to_s)
- # If objey
def self.make(obj, default_name="proton")
case obj
when Condition then obj
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/proton-c/bindings/ruby/tests/test_connection_driver.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_connection_driver.rb b/proton-c/bindings/ruby/tests/test_connection_driver.rb
index 174e86d..8ce9fe8 100644
--- a/proton-c/bindings/ruby/tests/test_connection_driver.rb
+++ b/proton-c/bindings/ruby/tests/test_connection_driver.rb
@@ -20,7 +20,7 @@ require 'test_tools'
include Qpid::Proton
-class ConnectionDriverTest < Minitest::Test
+class HandlerDriverTest < Minitest::Test
def setup
@sockets = Socket.pair(:LOCAL, :STREAM, 0)
@@ -39,18 +39,19 @@ class ConnectionDriverTest < Minitest::Test
def on_message(event) @message = event.message; event.connection.close; end
end
- sender = ConnectionDriver.new(@sockets[0], {:handler => send_class.new})
+ sender = HandlerDriver.new(@sockets[0], send_class.new)
sender.connection.open();
sender.connection.open_sender()
-
- receiver = ConnectionDriver.new(@sockets[1], {:handler => recv_class.new})
+ receiver = HandlerDriver.new(@sockets[1], recv_class.new)
drivers = [sender, receiver]
+
until drivers.all? { |d| d.finished? }
rd = drivers.select {|d| d.can_read? }
wr = drivers.select {|d| d.can_write? }
- rs, ws = IO.select(rd, wr)
- ws.each { |d| d.write; d.dispatch }
- rs.each { |d| d.read; d.dispatch }
+ IO.select(rd, wr)
+ drivers.each do |d|
+ d.process
+ end
end
assert_equal(receiver.handler.message.body, "foo")
assert(sender.handler.accepted)
@@ -60,10 +61,10 @@ class ConnectionDriverTest < Minitest::Test
idle_class = Class.new(MessagingHandler) do
def on_connection_bound(event) event.transport.idle_timeout = 10; end
end
- drivers = [ConnectionDriver.new(@sockets[0], {:handler => idle_class.new}), ConnectionDriver.new(@sockets[1])]
+ drivers = [HandlerDriver.new(@sockets[0], idle_class.new), HandlerDriver.new(@sockets[1], nil)]
drivers[0].connection.open()
now = Time.now
- drivers.each { |d| d.process(true, true, now) } until drivers[0].connection.open?
+ drivers.each { |d| d.process(now) } until drivers[0].connection.open?
assert_equal(10, drivers[0].transport.idle_timeout)
assert_in_delta(10, (drivers[0].tick(now) - now)*1000, 1)
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index 51b1760..9384d65 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -112,22 +112,34 @@ class ContainerTest < Minitest::Test
assert t1.join(1)
end
+ def test_stop_empty
+ c = Container.new
+ threads = 3.times.collect { Thread.new { c.run } }
+ assert_nil threads[0].join(0.001) # Not stopped
+ c.stop
+ threads.each { |t| assert t.join(1) }
+ end
+
def test_stop
c = Container.new
c.auto_stop = false
+
l = c.listen_io(TCPServer.new(0))
- c.connect("amqp://:#{l.to_io.addr[1]}")
- threads = 5.times.collect { Thread.new { c.run } }
- assert_nil threads[0].join(0.001)
+ threads = 3.times.collect { Thread.new { c.run } }
+ l.close
+ assert_nil threads[0].join(0.001) # Not stopped, no auto_stop
+
+ l = c.listen_io(TCPServer.new(0)) # New listener
+ conn = c.connect("amqp://:#{l.to_io.addr[1]}")
c.stop
threads.each { |t| assert t.join(1) }
- assert c.auto_stop # Set by stop
+ assert_nil l.condition
+ assert_nil conn.condition
- # Stop an empty container
+ # We should be able to run the container again once stopped.
threads = 5.times.collect { Thread.new { c.run } }
- assert_nil threads[0].join(0.001)
- c.stop
- threads.each { |t| assert t.join(1) }
+ assert_nil threads[0].join(0.01)
+
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/235f0a8a/proton-c/bindings/ruby/tests/test_tools.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_tools.rb b/proton-c/bindings/ruby/tests/test_tools.rb
index b45add2..3b89cd0 100644
--- a/proton-c/bindings/ruby/tests/test_tools.rb
+++ b/proton-c/bindings/ruby/tests/test_tools.rb
@@ -25,7 +25,7 @@ require 'thread'
require 'socket'
Container = Qpid::Proton::Container
-ListenHandler = Qpid::Proton::Listener
+ListenHandler = Qpid::Proton::Listener::Handler
MessagingHandler = Qpid::Proton::Handler::MessagingHandler
class TestError < Exception; end
@@ -113,7 +113,7 @@ end
# ListenHandler that closes the Listener after first accept
class ListenOnceHandler < ListenHandler
def initialize(opts={}) @opts=opts; end
- def on_error(l, e) raise TestError, e; end
+ def on_error(l, e) raise TestError, e.inspect; end
def on_accept(l) l.close; return @opts; end
attr_reader :opts
end
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org