You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/03/16 12:50:21 UTC
[1/2] qpid-proton git commit: PROTON-1782: [ruby] implement for
idle_timeout heartbeats
Repository: qpid-proton
Updated Branches:
refs/heads/master 561dcf574 -> 3c80d6f4c
PROTON-1782: [ruby] implement for idle_timeout heartbeats
Container tracks transport ticks and ensures that select wakes in time to
service them.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/3c80d6f4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/3c80d6f4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/3c80d6f4
Branch: refs/heads/master
Commit: 3c80d6f4ca3a24bff198465bf00307aeae60e048
Parents: e9bc212
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 15 19:03:26 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Mar 15 19:14:12 2018 -0400
----------------------------------------------------------------------
.../bindings/ruby/lib/core/connection_driver.rb | 15 +++++--
proton-c/bindings/ruby/lib/core/container.rb | 24 +++++++++--
proton-c/bindings/ruby/tests/test_container.rb | 45 ++++++++++++++++++--
3 files changed, 73 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c80d6f4/proton-c/bindings/ruby/lib/core/connection_driver.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection_driver.rb b/proton-c/bindings/ruby/lib/core/connection_driver.rb
index 5a5ad06..11efa3a 100644
--- a/proton-c/bindings/ruby/lib/core/connection_driver.rb
+++ b/proton-c/bindings/ruby/lib/core/connection_driver.rb
@@ -118,9 +118,17 @@ module Qpid::Proton
def tick(now=Time.now)
transport = Cproton.pni_connection_driver_transport(@impl)
ms = Cproton.pn_transport_tick(transport, (now.to_r * 1000).to_i)
- return ms.zero? ? nil : Time.at(ms.to_r / 1000);
+ @next_tick = ms.zero? ? nil : Time.at(ms.to_r / 1000);
+ unless @next_tick
+ idle = Cproton.pn_transport_get_idle_timeout(transport);
+ @next_tick = now + (idle.to_r / 1000) unless idle.zero?
+ end
+ @next_tick
end
+ # Time returned by the last call to {#tick}
+ attr_accessor :next_tick
+
# Disconnect the write side of the transport, *without* sending an AMQP
# close frame. To close politely, you should use {Connection#close}, the
# transport will close itself once the protocol close is complete.
@@ -193,10 +201,11 @@ module Qpid::Proton
# or nil if there are no scheduled events
def process(now=Time.now)
read
+ dispatch
next_tick = tick(now)
- dispatch # Generate data for write
+ dispatch
write
- dispatch # Consume events generated by write
+ dispatch
return next_tick
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c80d6f4/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index 3b5d23f..5d7beb4 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -117,6 +117,16 @@ module Qpid::Proton
end
end
+ def next_tick_due(x, now)
+ nt = x.respond_to?(:next_tick) && x.next_tick
+ nt && (nt <= now)
+ end
+
+ def next_tick_min(x, t)
+ nt = x.respond_to?(:next_tick) && x.next_tick
+ nt if !t || (nt < t)
+ end
+
public
# Error raised if the container is used after {#stop} has been called.
@@ -270,15 +280,22 @@ module Qpid::Proton
when Container
r, w = [@wake], []
+ next_tick = nil
@lock.synchronize do
@selectable.each do |s|
r << s if s.send :can_read?
w << s if s.send :can_write?
+ next_tick = next_tick_min(s, next_tick)
end
end
- r, w = IO.select(r, w)
- selected = Set.new(r).merge(w)
- @wake.reset if selected.delete?(@wake)
+ now = Time.now
+ timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
+ r, w = IO.select(r, w, nil, timeout)
+ now = Time.now
+ selected = Set.new(r).delete(@wake)
+ selected.merge(w) if w
+ selected.merge(@selectable.select { |s| next_tick_due(s, now) })
+ @wake.reset
stop_select = nil
@lock.synchronize do
if stop_select = @stopped # close everything
@@ -300,7 +317,6 @@ module Qpid::Proton
add(connection_driver(io, opts, true)) if io
rearm task
end
- # TODO aconway 2017-10-26: scheduled tasks, heartbeats
end
ensure
@lock.synchronize do
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c80d6f4/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index 5fece3b..0279c19 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -32,11 +32,17 @@ class TestContainer < Qpid::Proton::Container
def url() "amqp://:#{port}"; end#
end
+# MessagingHandler that raises in on_error to catch unexpected errors
+class ExceptionMessagingHandler
+ def on_error(e) raise e; end
+end
+Thread.abort_on_exception = true
+
class ContainerTest < MiniTest::Test
include Qpid::Proton
def test_simple()
- send_handler = Class.new(MessagingHandler) do
+ send_handler = Class.new(ExceptionMessagingHandler) do
attr_reader :accepted, :sent
def on_sendable(sender)
sender.send Message.new("foo") unless @sent
@@ -49,7 +55,7 @@ class ContainerTest < MiniTest::Test
end
end.new
- receive_handler = Class.new(MessagingHandler) do
+ receive_handler = Class.new(ExceptionMessagingHandler) do
attr_reader :message, :link
def on_receiver_open(link)
@link = link
@@ -159,8 +165,7 @@ class ContainerTest < MiniTest::Test
# Verify that connection options are sent to the peer and available as Connection methods
def test_connection_options
# Note: user, password and sasl_xxx options are tested by ContainerSASLTest below
- server_handler = Class.new(MessagingHandler) do
- def on_error(e) raise e.inspect; end
+ server_handler = Class.new(ExceptionMessagingHandler) do
def on_connection_open(c)
@connection = c
c.open({
@@ -212,6 +217,38 @@ class ContainerTest < MiniTest::Test
assert_equal 44, c.idle_timeout # Proton divides by 2
assert_equal 100, c.max_sessions
end
+
+ # Test for time out on connecting to an unresponsive server
+ def test_idle_timeout_server_no_open
+ s = TCPServer.new(0)
+ cont = Container.new(__method__)
+ cont.connect(":#{s.addr[1]}", {:idle_timeout => 0.1, :handler => ExceptionMessagingHandler.new })
+ ex = assert_raises(Qpid::Proton::Condition) { cont.run }
+ assert_match(/resource-limit-exceeded/, ex.to_s)
+ ensure
+ s.close if s
+ end
+
+ # Test for time out on unresponsive client
+ def test_idle_timeout_client
+ server = TestContainer.new(nil, {:idle_timeout => 0.1}, "#{__method__}.server")
+ server_thread = Thread.new { server.run }
+
+ client_handler = Class.new(ExceptionMessagingHandler) do
+ def initialize() @signal = Queue.new; end
+ attr_reader :signal
+ def on_connection_open(c) @signal.pop; end # Jam the client to get a timeout
+ end.new
+ client = Container.new(nil, "#{__method__}.client")
+ client.connect(server.url, {:handler => client_handler})
+ client_thread = Thread.new { client.run }
+
+ server_thread.join # Exits when the connection closes from idle-timeout
+ client_handler.signal.push true # Unblock the client
+
+ ex = assert_raises(Qpid::Proton::Condition) { client_thread.join }
+ assert_match(/resource-limit-exceeded/, ex.to_s)
+ end
end
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-proton git commit: PROTON-1782: [ruby] More efficient wake
mechanism
Posted by ac...@apache.org.
PROTON-1782: [ruby] More efficient wake mechanism
Writing a pipe on every wake is inefficient, use a Mutex to write only when needed.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e9bc2125
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e9bc2125
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e9bc2125
Branch: refs/heads/master
Commit: e9bc2125f649c6de5c71bde615212fb147465701
Parents: 561dcf5
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 15 11:00:30 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Mar 15 19:14:12 2018 -0400
----------------------------------------------------------------------
proton-c/bindings/ruby/lib/core/container.rb | 56 +++++++++++++++++------
1 file changed, 41 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9bc2125/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index f262ea1..3b5d23f 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -85,6 +85,38 @@ module Qpid::Proton
end
end
+ # Selectable object that can be used to wake IO.select from another thread
+ class SelectWaker
+ def initialize
+ @rd, @wr = IO.pipe
+ @lock = Mutex.new
+ @set = false
+ end
+
+ def to_io() @rd; end
+
+ def wake
+ @lock.synchronize do
+ return if @set # Don't write if already has data
+ @set = true
+ begin @wr.write_nonblock('x') rescue IO::WaitWritable end
+ end
+ end
+
+ def reset
+ @lock.synchronize do
+ return unless @set
+ begin @rd.read_nonblock(1) rescue IO::WaitReadable end
+ @set = false
+ end
+ end
+
+ def close
+ @rd.close
+ @wr.close
+ end
+ end
+
public
# Error raised if the container is used after {#stop} has been called.
@@ -127,7 +159,7 @@ module Qpid::Proton
@work = Queue.new
@work << :start
@work << self # Issue start and start start selecting
- @wake = IO.pipe # Wakes #run thread in IO.select
+ @wake = SelectWaker.new # Wakes #run thread in IO.select
@auto_stop = true # Stop when @active drops to 0
# Following instance variables protected by lock
@@ -237,7 +269,7 @@ module Qpid::Proton
@adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
when Container
- r, w = [@wake[0]], []
+ r, w = [@wake], []
@lock.synchronize do
@selectable.each do |s|
r << s if s.send :can_read?
@@ -246,13 +278,13 @@ module Qpid::Proton
end
r, w = IO.select(r, w)
selected = Set.new(r).merge(w)
- drain_wake if selected.delete?(@wake[0])
+ @wake.reset if selected.delete?(@wake)
stop_select = nil
@lock.synchronize do
if stop_select = @stopped # close everything
selected += @selectable
selected.each { |s| s.close @stop_err }
- @wake.each { |fd| fd.close() }
+ @wake.close
end
@selectable -= selected # Remove selected tasks
end
@@ -304,22 +336,16 @@ module Qpid::Proton
# - no more select calls after next wakeup
# - once @active == 0, all threads will be stopped with nil
end
- wake
+ @wake.wake
end
protected
- def wake; @wake[1].write_nonblock('x') rescue nil; end
-
# Normally if we add work we need to set a wakeup to ensure a single #run
# thread doesn't get stuck in select while there is other work on the queue.
- def work_wake(task) @work << task; wake; end
-
- def drain_wake
- begin
- @wake[0].read_nonblock(256) while true
- rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
- end
+ def work_wake(task)
+ @work << task
+ @wake.wake
end
def connection_driver(io, opts=nil, server=false)
@@ -350,7 +376,7 @@ module Qpid::Proton
@selectable << task
end
end
- wake
+ @wake.wake
end
def check_stop_lh
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org