You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/01/05 16:35:44 UTC
[20/50] [abbrv] qpid-proton git commit: PROTON-1537: [ruby] idle
time-out conversion
PROTON-1537: [ruby] idle time-out conversion
Idiomatic ruby expresses intervals in seconds, which can be Float or Rational
for sub-second intervals.
Use seconds in the :idle_timeout connection options and Connection#idle_timeout.
Transport remains in milliseconds but is no longer a user-facing API.
Conversion is done using Rational to preserve millisecond accuracy, although
the user is free to use Float if rounding is not a concern.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4678e74b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4678e74b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4678e74b
Branch: refs/heads/go1
Commit: 4678e74bce48ea7b2457ba801128365c7be8b5bd
Parents: cf4a3f6
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Dec 11 11:24:43 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Dec 13 13:16:48 2017 -0500
----------------------------------------------------------------------
proton-c/bindings/ruby/lib/core/connection.rb | 16 +++++
.../bindings/ruby/lib/core/connection_driver.rb | 20 ++++---
proton-c/bindings/ruby/lib/core/transport.rb | 18 +++++-
proton-c/bindings/ruby/tests/test_adapter.rb | 3 +-
.../ruby/tests/test_connection_driver.rb | 62 ++++++++++----------
proton-c/bindings/ruby/tests/test_tools.rb | 33 +++++++++--
6 files changed, 103 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4678e74b/proton-c/bindings/ruby/lib/core/connection.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection.rb b/proton-c/bindings/ruby/lib/core/connection.rb
index ed82dc8..55873dd 100644
--- a/proton-c/bindings/ruby/lib/core/connection.rb
+++ b/proton-c/bindings/ruby/lib/core/connection.rb
@@ -133,6 +133,17 @@ module Qpid::Proton
Codec::Data.from_object(Cproton.pn_connection_properties(@impl), opts[:properties])
end
+ # Idle-timeout advertised by the remote peer, in seconds.
+ # Set by {Connection#open} with the +:idle_timeout+ option.
+ # @return [Numeric] Idle-timeout advertised by the remote peer, in seconds.
+ # @return [nil] if The peer does not advertise an idle time-out
+ # @option :idle_timeout (see {#open})
+ def idle_timeout()
+ if transport && (t = transport.remote_idle_timeout)
+ Rational(t, 1000) # More precise than Float
+ end
+ end
+
# @private Generate a unique link name, internal use only.
def link_name()
@link_prefix + "/" + (@link_count += 1).to_s(16)
@@ -259,6 +270,11 @@ module Qpid::Proton
Cproton.pn_error_code(Cproton.pn_connection_error(@impl))
end
+ # @private Generate a unique link name, internal use only.
+ def link_name()
+ @link_prefix + "/" + (@link_count += 1).to_s(16)
+ end
+
protected
def _local_condition
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4678e74b/proton-c/bindings/ruby/lib/core/connection_driver.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection_driver.rb b/proton-c/bindings/ruby/lib/core/connection_driver.rb
index 29bd299..b796d4d 100644
--- a/proton-c/bindings/ruby/lib/core/connection_driver.rb
+++ b/proton-c/bindings/ruby/lib/core/connection_driver.rb
@@ -97,7 +97,9 @@ module Qpid::Proton
# Non-blocking write to {#io}
# IO errors are returned as transport errors by {#event}, not raised
def write
- n = @io.write_nonblock(Cproton.pn_connection_driver_write_buffer(@impl))
+ data = Cproton.pn_connection_driver_write_buffer(@impl)
+ return unless data && data.size > 0
+ n = @io.write_nonblock(data)
Cproton.pn_connection_driver_write_done(@impl, n) if n > 0
rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
# Try again later.
@@ -175,8 +177,13 @@ module Qpid::Proton
attr_reader :handler
# Dispatch all events available from {#event} to {#handler}
- def dispatch() each_event do |e|
- e.dispatch self # See private on_transport_ methods below
+ def dispatch()
+ each_event do |e|
+ case e.method # Events that affect the driver
+ when :on_transport_tail_closed then close_read
+ when :on_transport_head_closed then close_write
+ when :on_transport_authenticated then connection.user = transport.user
+ end
e.dispatch @adapter
end
end
@@ -190,13 +197,8 @@ module Qpid::Proton
next_tick = tick(now)
dispatch # Generate data for write
write
- dispatch # Consume all events
+ dispatch # Consume events generated by write
return next_tick
end
-
- private
- def on_transport_tail_closed(event) close_read; end
- def on_transport_head_closed(event) close_write; end
- def on_transport_authenticated(event) connection.user = transport.user; end
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4678e74b/proton-c/bindings/ruby/lib/core/transport.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/transport.rb b/proton-c/bindings/ruby/lib/core/transport.rb
index c1593a0..f840f3f 100644
--- a/proton-c/bindings/ruby/lib/core/transport.rb
+++ b/proton-c/bindings/ruby/lib/core/transport.rb
@@ -103,11 +103,19 @@ module Qpid::Proton
# @!attribute idle_timeout
#
- # @return [Integer] The idle timeout.
+ # @deprecated use {Connection#open} with the +:idle_timeout+ option to set
+ # the timeout, and {Connection#idle_timeout} to query the remote timeout.
+ #
+ # The Connection timeout values are in *seconds* and are automatically
+ # converted.
+ #
+ # @return [Integer] The idle timeout in *milliseconds*.
#
proton_set_get :idle_timeout
- # @!attribute [r] remote_idle_timeout
+ # @!attribute [r] remote_idle_timeout in milliseconds
+ #
+ # @deprecated Use {Connection#idle_timeout} to query the remote timeout.
#
# @return [Integer] The idle timeout for the transport's remote peer.
#
@@ -397,7 +405,11 @@ module Qpid::Proton
end
self.channel_max= opts[:channel_max] if opts.include? :channel_max
self.max_frame_size= opts[:max_frame_size] if opts.include? :max_frame_size
- self.idle_timeout= opts[:idle_timeout] if opts.include? :idle_timeout
+ # NOTE: The idle_timeout option is in Numeric *seconds*, can be Integer, Float or Rational.
+ # This is consistent with idiomatic ruby.
+ # The transport #idle_timeout property is in *milliseconds* passed direct to C.
+ # Direct use of the transport is deprecated.
+ self.idle_timeout= (opts[:idle_timeout]*1000).round if opts.include? :idle_timeout
end
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4678e74b/proton-c/bindings/ruby/tests/test_adapter.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_adapter.rb b/proton-c/bindings/ruby/tests/test_adapter.rb
index 50f46c3..62ef109 100644
--- a/proton-c/bindings/ruby/tests/test_adapter.rb
+++ b/proton-c/bindings/ruby/tests/test_adapter.rb
@@ -77,7 +77,8 @@ class TestOldHandler < Minitest::Test
assert_equal [:on_session_opening], @sh.names
assert_equal [], @ch.names
clear
- @d.client.connection.close; @d.run
+ @d.client.connection.close;
+ 3.times { @d.process }
assert_equal [:on_connection_closing], @sh.names
assert_equal [], @ch.names
@d.server.connection.close; @d.run
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4678e74b/proton-c/bindings/ruby/tests/test_connection_driver.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_connection_driver.rb b/proton-c/bindings/ruby/tests/test_connection_driver.rb
index 3617dce..a9982b5 100644
--- a/proton-c/bindings/ruby/tests/test_connection_driver.rb
+++ b/proton-c/bindings/ruby/tests/test_connection_driver.rb
@@ -21,10 +21,6 @@ include Qpid::Proton
class HandlerDriverTest < Minitest::Test
- def setup
- @sockets = Socket.pair(:LOCAL, :STREAM, 0)
- end
-
def test_send_recv
send_class = Class.new(MessagingHandler) do
attr_reader :accepted
@@ -38,35 +34,39 @@ class HandlerDriverTest < Minitest::Test
def on_message(event) @message = event.message; event.connection.close; end
end
- sender = HandlerDriver.new(@sockets[0], send_class.new)
- sender.connection.open(:container_id => "sender");
- sender.connection.open_sender()
- receiver = HandlerDriver.new(@sockets[1], recv_class.new)
- drivers = [sender, receiver]
-
- until drivers.all? { |d| d.finished? }
- rd = drivers.select {|d| d.can_read? }
- wr = drivers.select {|d| d.can_write? }
- IO.select(rd, wr)
- drivers.each do |d|
- d.process
- end
- end
- assert_equal(receiver.handler.message.body, "foo")
- assert(sender.handler.accepted)
+ d = DriverPair.new(send_class.new, recv_class.new)
+ d.client.connection.open(:container_id => "sender");
+ d.client.connection.open_sender()
+ d.run
+ assert_equal(d.server.handler.message.body, "foo")
+ assert(d.client.handler.accepted)
end
def test_idle
- drivers = [HandlerDriver.new(@sockets[0], nil), HandlerDriver.new(@sockets[1], nil)]
- opts = {:idle_timeout=>10}
- drivers[0].transport.apply(opts)
- assert_equal 10, drivers[0].transport.idle_timeout
- drivers[0].connection.open(opts)
- drivers[1].transport.set_server
- now = Time.now
- drivers.each { |d| d.process(now) } until drivers[0].connection.open?
- assert_equal(10, drivers[0].transport.idle_timeout)
- assert_equal(5, drivers[1].transport.remote_idle_timeout) # proton changes the value
- assert_in_delta(10, (drivers[0].tick(now) - now)*1000, 1)
+
+ d = DriverPair.new(UnhandledHandler.new, UnhandledHandler.new)
+ ms = 444
+ secs = Rational(ms, 1000) # Use rationals to keep it accurate
+ opts = {:idle_timeout => secs}
+ d.client.transport.apply(opts)
+ assert_equal(ms, d.client.transport.idle_timeout) # Transport converts to ms
+ d.server.transport.set_server
+ d.client.connection.open(opts)
+
+ start = Time.at(1) # Dummy timeline
+ tick = d.run start # Process all IO events
+ assert_equal(secs/4, tick - start)
+ assert_equal [:on_connection_opened], d.client.handler.calls
+ assert_equal [:on_connection_opening, :on_connection_opened], d.server.handler.calls
+ assert_equal (ms), d.client.transport.idle_timeout
+ assert_equal (ms/2), d.server.transport.remote_idle_timeout # proton changes the value
+ assert_equal (secs/2), d.server.connection.idle_timeout
+
+ # Now update the time till we get connections closing
+ d.each { |x| x.handler.calls.clear }
+ d.run(start + secs - 0.001) # Should nothing, timeout not reached
+ assert_equal [[],[]], d.collect { |x| x.handler.calls }
+ d.run(start + secs*2) # After 2x timeout, connections should close
+ assert_equal [[:on_transport_error, :on_transport_closed], [:on_connection_error, :on_connection_closed, :on_transport_closed]], d.collect { |x| x.handler.calls }
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4678e74b/proton-c/bindings/ruby/tests/test_tools.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_tools.rb b/proton-c/bindings/ruby/tests/test_tools.rb
index bc9c1d9..ad4a7cb 100644
--- a/proton-c/bindings/ruby/tests/test_tools.rb
+++ b/proton-c/bindings/ruby/tests/test_tools.rb
@@ -108,13 +108,20 @@ class DriverPair < Array
alias client first
alias server last
- # Run till there is nothing to do
- def run
- begin
- each { |d| d.process }
- end while (IO.select(self, [], [], 0) rescue nil)
+ # Process each driver once, return time of next timed event
+ def process(now = Time.now, max_time=nil)
+ t = collect { |d| d.process(now) }.compact.min
+ t = max_time if max_time && t > max_time
+ t
end
+ # Run till there is no IO activity - does not handle waiting for timed events
+ # but does pass +now+ to process and returns the min returned timed event time
+ def run(now=Time.now)
+ t = process(now) # Generate initial IO activity and get initial next-time
+ t = process(now, t) while (IO.select(self, [], [], 0) rescue nil)
+ t = process(now, t) # Final gulp to finish off events
+ end
end
# Container that listens on a random port for a single connection
@@ -129,3 +136,19 @@ class TestContainer < Container
def port() @server.addr[1]; end
def url() "amqp://:#{port}"; end
end
+
+# Raw handler to record on_xxx calls via on_unhandled.
+# Handy as a base for raw test handlers
+class UnhandledHandler
+ def initialize() @calls =[]; end
+ def on_unhandled(name, args) @calls << name; end
+ attr_reader :calls
+
+ # Ruby mechanics to capture on_xxx calls
+
+ def method_missing(name, *args)
+ if respond_to_missing?(name) then on_unhandled(name, *args) else super end;
+ end
+ def respond_to_missing?(name, private=false); (/^on_/ =~ name); end
+ def respond_to?(name, all=false) super || respond_to_missing?(name); end # For ruby < 1.9.2
+end
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org