You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/03/16 12:50:21 UTC

[1/2] qpid-proton git commit: PROTON-1782: [ruby] implement for idle_timeout heartbeats

Repository: qpid-proton
Updated Branches:
  refs/heads/master 561dcf574 -> 3c80d6f4c


PROTON-1782: [ruby] implement for idle_timeout heartbeats

Container tracks transport ticks and ensures that select wakes in time to
service them.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/3c80d6f4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/3c80d6f4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/3c80d6f4

Branch: refs/heads/master
Commit: 3c80d6f4ca3a24bff198465bf00307aeae60e048
Parents: e9bc212
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 15 19:03:26 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Mar 15 19:14:12 2018 -0400

----------------------------------------------------------------------
 .../bindings/ruby/lib/core/connection_driver.rb | 15 +++++--
 proton-c/bindings/ruby/lib/core/container.rb    | 24 +++++++++--
 proton-c/bindings/ruby/tests/test_container.rb  | 45 ++++++++++++++++++--
 3 files changed, 73 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c80d6f4/proton-c/bindings/ruby/lib/core/connection_driver.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection_driver.rb b/proton-c/bindings/ruby/lib/core/connection_driver.rb
index 5a5ad06..11efa3a 100644
--- a/proton-c/bindings/ruby/lib/core/connection_driver.rb
+++ b/proton-c/bindings/ruby/lib/core/connection_driver.rb
@@ -118,9 +118,17 @@ module Qpid::Proton
     def tick(now=Time.now)
       transport = Cproton.pni_connection_driver_transport(@impl)
       ms = Cproton.pn_transport_tick(transport, (now.to_r * 1000).to_i)
-      return ms.zero? ? nil : Time.at(ms.to_r / 1000);
+      @next_tick = ms.zero? ? nil : Time.at(ms.to_r / 1000);
+      unless @next_tick
+        idle = Cproton.pn_transport_get_idle_timeout(transport);
+        @next_tick = now + (idle.to_r / 1000) unless idle.zero?
+      end
+      @next_tick
     end
 
+    # Time returned by the last call to {#tick}
+    attr_accessor :next_tick
+
     # Disconnect the write side of the transport, *without* sending an AMQP
     # close frame. To close politely, you should use {Connection#close}, the
     # transport will close itself once the protocol close is complete.
@@ -193,10 +201,11 @@ module Qpid::Proton
     #   or nil if there are no scheduled events
     def process(now=Time.now)
       read
+      dispatch
       next_tick = tick(now)
-      dispatch                # Generate data for write
+      dispatch
       write
-      dispatch                # Consume events generated by write
+      dispatch
       return next_tick
     end
   end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c80d6f4/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index 3b5d23f..5d7beb4 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -117,6 +117,16 @@ module Qpid::Proton
       end
     end
 
+    def next_tick_due(x, now)
+      nt = x.respond_to?(:next_tick) && x.next_tick
+      nt && (nt <= now)
+    end
+
+    def next_tick_min(x, t)
+      nt = x.respond_to?(:next_tick) && x.next_tick
+      nt if !t || (nt < t)
+    end
+
     public
 
     # Error raised if the container is used after {#stop} has been called.
@@ -270,15 +280,22 @@ module Qpid::Proton
 
         when Container
           r, w = [@wake], []
+          next_tick = nil
           @lock.synchronize do
             @selectable.each do |s|
               r << s if s.send :can_read?
               w << s if s.send :can_write?
+              next_tick = next_tick_min(s, next_tick)
             end
           end
-          r, w = IO.select(r, w)
-          selected = Set.new(r).merge(w)
-          @wake.reset if selected.delete?(@wake)
+          now = Time.now
+          timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
+          r, w = IO.select(r, w, nil, timeout)
+          now = Time.now
+          selected = Set.new(r).delete(@wake)
+          selected.merge(w) if w
+          selected.merge(@selectable.select { |s| next_tick_due(s, now) })
+          @wake.reset
           stop_select = nil
           @lock.synchronize do
             if stop_select = @stopped # close everything
@@ -300,7 +317,6 @@ module Qpid::Proton
           add(connection_driver(io, opts, true)) if io
           rearm task
         end
-        # TODO aconway 2017-10-26: scheduled tasks, heartbeats
       end
     ensure
       @lock.synchronize do

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c80d6f4/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index 5fece3b..0279c19 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -32,11 +32,17 @@ class TestContainer < Qpid::Proton::Container
   def url() "amqp://:#{port}"; end#
 end
 
+# MessagingHandler that raises in on_error to catch unexpected errors
+class ExceptionMessagingHandler
+  def on_error(e) raise e; end
+end
+Thread.abort_on_exception = true
+
 class ContainerTest < MiniTest::Test
   include Qpid::Proton
 
   def test_simple()
-    send_handler = Class.new(MessagingHandler) do
+    send_handler = Class.new(ExceptionMessagingHandler) do
       attr_reader :accepted, :sent
       def on_sendable(sender)
         sender.send Message.new("foo") unless @sent
@@ -49,7 +55,7 @@ class ContainerTest < MiniTest::Test
       end
     end.new
 
-    receive_handler = Class.new(MessagingHandler) do
+    receive_handler = Class.new(ExceptionMessagingHandler) do
       attr_reader :message, :link
       def on_receiver_open(link)
         @link = link
@@ -159,8 +165,7 @@ class ContainerTest < MiniTest::Test
   # Verify that connection options are sent to the peer and available as Connection methods
   def test_connection_options
     # Note: user, password and sasl_xxx options are tested by ContainerSASLTest below
-    server_handler = Class.new(MessagingHandler) do
-      def on_error(e) raise e.inspect; end
+    server_handler = Class.new(ExceptionMessagingHandler) do
       def on_connection_open(c)
         @connection = c
         c.open({
@@ -212,6 +217,38 @@ class ContainerTest < MiniTest::Test
     assert_equal 44, c.idle_timeout # Proton divides by 2
     assert_equal 100, c.max_sessions
   end
+
+  # Test for time out on connecting to an unresponsive server
+  def test_idle_timeout_server_no_open
+    s = TCPServer.new(0)
+    cont = Container.new(__method__)
+    cont.connect(":#{s.addr[1]}", {:idle_timeout => 0.1, :handler => ExceptionMessagingHandler.new })
+    ex = assert_raises(Qpid::Proton::Condition) { cont.run }
+    assert_match(/resource-limit-exceeded/, ex.to_s)
+  ensure
+    s.close if s
+  end
+
+  # Test for time out on unresponsive client
+  def test_idle_timeout_client
+    server = TestContainer.new(nil, {:idle_timeout => 0.1}, "#{__method__}.server")
+    server_thread = Thread.new { server.run }
+
+    client_handler = Class.new(ExceptionMessagingHandler) do
+      def initialize() @signal = Queue.new; end
+      attr_reader :signal
+      def on_connection_open(c) @signal.pop; end # Jam the client to get a timeout
+    end.new
+    client = Container.new(nil, "#{__method__}.client")
+    client.connect(server.url, {:handler => client_handler})
+    client_thread = Thread.new { client.run }
+
+    server_thread.join          # Exits when the connection closes from idle-timeout
+    client_handler.signal.push true # Unblock the client
+
+    ex = assert_raises(Qpid::Proton::Condition) { client_thread.join }
+    assert_match(/resource-limit-exceeded/, ex.to_s)
+  end
 end
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-proton git commit: PROTON-1782: [ruby] More efficient wake mechanism

Posted by ac...@apache.org.
PROTON-1782: [ruby] More efficient wake mechanism

Writing a pipe on every wake is inefficient, use a Mutex to write only when needed.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e9bc2125
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e9bc2125
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e9bc2125

Branch: refs/heads/master
Commit: e9bc2125f649c6de5c71bde615212fb147465701
Parents: 561dcf5
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 15 11:00:30 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Mar 15 19:14:12 2018 -0400

----------------------------------------------------------------------
 proton-c/bindings/ruby/lib/core/container.rb | 56 +++++++++++++++++------
 1 file changed, 41 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9bc2125/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index f262ea1..3b5d23f 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -85,6 +85,38 @@ module Qpid::Proton
       end
     end
 
+    # Selectable object that can be used to wake IO.select from another thread
+    class SelectWaker
+      def initialize
+        @rd, @wr = IO.pipe
+        @lock = Mutex.new
+        @set = false
+      end
+
+      def to_io() @rd; end
+
+      def wake
+        @lock.synchronize do
+          return if @set        # Don't write if already has data
+          @set = true
+          begin @wr.write_nonblock('x') rescue IO::WaitWritable end
+        end
+      end
+
+      def reset
+        @lock.synchronize do
+          return unless @set
+          begin @rd.read_nonblock(1) rescue IO::WaitReadable end
+          @set = false
+        end
+      end
+
+      def close
+        @rd.close
+        @wr.close
+      end
+    end
+
     public
 
     # Error raised if the container is used after {#stop} has been called.
@@ -127,7 +159,7 @@ module Qpid::Proton
       @work = Queue.new
       @work << :start
       @work << self             # Issue start and start start selecting
-      @wake = IO.pipe           # Wakes #run thread in IO.select
+      @wake = SelectWaker.new   # Wakes #run thread in IO.select
       @auto_stop = true         # Stop when @active drops to 0
 
       # Following instance variables protected by lock
@@ -237,7 +269,7 @@ module Qpid::Proton
           @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
 
         when Container
-          r, w = [@wake[0]], []
+          r, w = [@wake], []
           @lock.synchronize do
             @selectable.each do |s|
               r << s if s.send :can_read?
@@ -246,13 +278,13 @@ module Qpid::Proton
           end
           r, w = IO.select(r, w)
           selected = Set.new(r).merge(w)
-          drain_wake if selected.delete?(@wake[0])
+          @wake.reset if selected.delete?(@wake)
           stop_select = nil
           @lock.synchronize do
             if stop_select = @stopped # close everything
               selected += @selectable
               selected.each { |s| s.close @stop_err }
-              @wake.each { |fd| fd.close() }
+              @wake.close
             end
             @selectable -= selected # Remove selected tasks
           end
@@ -304,22 +336,16 @@ module Qpid::Proton
         # - no more select calls after next wakeup
         # - once @active == 0, all threads will be stopped with nil
       end
-      wake
+      @wake.wake
     end
 
     protected
 
-    def wake; @wake[1].write_nonblock('x') rescue nil; end
-
     # Normally if we add work we need to set a wakeup to ensure a single #run
     # thread doesn't get stuck in select while there is other work on the queue.
-    def work_wake(task) @work << task; wake; end
-
-    def drain_wake
-      begin
-        @wake[0].read_nonblock(256) while true
-      rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
-      end
+    def work_wake(task)
+      @work << task
+      @wake.wake
     end
 
     def connection_driver(io, opts=nil, server=false)
@@ -350,7 +376,7 @@ module Qpid::Proton
           @selectable << task
         end
       end
-      wake
+      @wake.wake
     end
 
     def check_stop_lh


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org