You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/04/05 14:42:47 UTC
[1/2] qpid-proton git commit: PROTON-1820: [ruby] Container#schedule
does not work if called from handler
Repository: qpid-proton
Updated Branches:
refs/heads/master 49f4cb56f -> dab607e63
PROTON-1820: [ruby] Container#schedule does not work if called from handler
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/dab607e6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/dab607e6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/dab607e6
Branch: refs/heads/master
Commit: dab607e6393e9d939847f845d4ca322705efd717
Parents: d934593
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Apr 5 10:00:38 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Apr 5 10:10:53 2018 -0400
----------------------------------------------------------------------
proton-c/bindings/ruby/lib/core/container.rb | 131 ++++++--------------
proton-c/bindings/ruby/lib/core/exceptions.rb | 4 +
proton-c/bindings/ruby/lib/core/work_queue.rb | 87 ++++++++-----
proton-c/bindings/ruby/lib/util/schedule.rb | 58 ++++-----
proton-c/bindings/ruby/tests/test_container.rb | 117 ++++++++++++-----
proton-c/bindings/ruby/tests/test_tools.rb | 6 +-
proton-c/bindings/ruby/tests/test_utils.rb | 63 ++++++++++
7 files changed, 274 insertions(+), 192 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dab607e6/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index b683412..7d9f2cb 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -35,8 +35,8 @@ module Qpid::Proton
include TimeCompare
# Error raised if the container is used after {#stop} has been called.
- class StoppedError < StateError
- def initialize(*args) super("container has been stopped"); end
+ class StoppedError < Qpid::Proton::StoppedError
+ def initialize() super("container has been stopped"); end
end
# Create a new Container
@@ -66,19 +66,13 @@ module Qpid::Proton
@adapter = Handler::Adapter.adapt(@handler) || Handler::MessagingAdapter.new(nil)
@id = (@id || SecureRandom.uuid).freeze
- # Implementation note:
- #
- # - #run threads take work items from @work, process them, and rearm them for select
- # - work items are: ConnectionTask, ListenTask, :start, :select, :schedule
- # - nil on the @work queue makes a #run thread exit
-
+ # Threading and implementation notes: see comment on #run_one
@work = Queue.new
@work << :start
@work << :select
@wake = SelectWaker.new # Wakes #run thread in IO.select
@auto_stop = true # Stop when @active drops to 0
- @schedule = Schedule.new
- @schedule_working = false # True if :schedule is on the work queue
+ @work_queue = WorkQueue.new(self) # work scheduled by other threads for :select context
# Following instance variables protected by lock
@lock = Mutex.new
@@ -242,25 +236,22 @@ module Qpid::Proton
# - no more select calls after next wakeup
# - once @active == 0, all threads will be stopped with nil
end
- @wake.wake
+ wake
end
- # Schedule code to be executed after a delay.
- # @param delay [Numeric] delay in seconds, must be >= 0
- # @yield [ ] the block is invoked with no parameters in a {#run} thread after +delay+ has elapsed
- # @return [void]
- # @raise [ThreadError] if +non_block+ is true and the operation would block
- def schedule(delay, non_block=false, &block)
- not_stopped
- @lock.synchronize { @active += 1 } if @schedule.add(Time.now + delay, non_block, &block)
- @wake.wake
- end
+ # Get the {WorkQueue} that can be used to schedule code to be run by the container.
+ #
+ # Note: to run code that affects a {Connection} or it's associated objects,
+ # use {Connection#work_queue}
+ def work_queue() @work_queue; end
+
+ # (see WorkQueue#schedule)
+ def schedule(at, &block) @work_queue.schedule(at, &block) end
private
def wake() @wake.wake; end
- # Container driver applies options and adds container context to events
class ConnectionTask < Qpid::Proton::HandlerDriver
include TimeCompare
@@ -269,15 +260,15 @@ module Qpid::Proton
transport.set_server if server
transport.apply opts
connection.apply opts
- @work_queue = WorkQueue.new container
+ @work_queue = WorkQueue.new(container)
connection.instance_variable_set(:@work_queue, @work_queue)
end
- def next_tick() earliest(super, @work_queue.send(:next_tick)); end
- def process(now) @work_queue.send(:process, now); super(); end
+ def next_tick() earliest(super, @work_queue.next_tick); end
+ def process(now) @work_queue.process(now); super(); end
def dispatch # Intercept dispatch to close work_queue
super
- @work_queue.send(:close) if read_closed? && write_closed?
+ @work_queue.close if read_closed? && write_closed?
end
end
@@ -360,6 +351,12 @@ module Qpid::Proton
end
# Handle a single item from the @work queue, this is the heart of the #run loop.
+ # Take one task from @work, process it, and rearm for select
+ # Tasks are: ConnectionTask, ListenTask, :start, :select
+ # - ConnectionTask/ListenTask have #can_read, #can_write, #next_tick to set up IO.select
+ # and #process to run handlers and process relevant work_queue
+ # - nil means exit from the #run thread exit (handled by #run)
+ # - :select does IO.select and processes Container#work_queue
def run_one(task, now)
case task
@@ -369,40 +366,43 @@ module Qpid::Proton
when :select
# Compute read/write select sets and minimum next_tick for select timeout
r, w = [@wake], []
- next_tick = @schedule.next_tick
+ next_tick = @work_queue.next_tick
@lock.synchronize do
@selectable.each do |s|
- r << s if s.send :can_read?
- w << s if s.send :can_write?
+ r << s if s.can_read?
+ w << s if s.can_write?
next_tick = earliest(s.next_tick, next_tick)
end
end
timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
r, w = IO.select(r, w, nil, timeout)
- now = Time.now unless timeout == 0
@wake.reset if r && r.delete(@wake)
+ now = Time.now unless timeout == 0 # Update now if we may have blocked
# selected is a Set to eliminate duplicates between r, w and next_tick due.
selected = Set.new
selected.merge(r) if r
selected.merge(w) if w
- @lock.synchronize do
- if @stopped # close everything
+ stopped = @lock.synchronize do
+ if @stopped # close everything
@selectable.each { |s| s.close @stop_err; @work << s }
@selectable.clear
+ @work_queue.close
@wake.close
- return
- end
- if !@schedule_working && before_eq(@schedule.next_tick, now)
- @schedule_working = true
- @work << :schedule
+ else
+ @selectable -= selected # Remove already-selected tasks from @selectable
+ # Also select and remove items with next_tick before now
+ @selectable.delete_if { |s| before_eq(s.next_tick, now) and selected << s }
end
- selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) })
- @selectable -= selected # Remove selected tasks from @selectable
+ @stopped
end
selected.each { |s| @work << s } # Queue up tasks needing #process
- @work << :select # Enable next select
+ maybe_panic { @work_queue.process(now) } # Process current work queue items
+ @work_queue.clear if stopped
+ @lock.synchronize { check_stop_lh } if @work_queue.empty?
+
+ @work << :select unless stopped # Enable next select
when ConnectionTask then
maybe_panic { task.process now }
@@ -412,58 +412,9 @@ module Qpid::Proton
io, opts = maybe_panic { task.process }
add(connection_driver(io, opts, true)) if io
rearm task
-
- when :schedule then
- if maybe_panic { @schedule.process now }
- @lock.synchronize { @active -= 1; check_stop_lh }
- else
- @lock.synchronize { @schedule_working = false }
- end
end
end
- def do_select
- # Compute the sets to select for read and write, and the minimum next_tick for the timeout
- r, w = [@wake], []
- next_tick = nil
- @lock.synchronize do
- @selectable.each do |s|
- r << s if s.can_read?
- w << s if s.can_write?
- next_tick = earliest(s.next_tick, next_tick)
- end
- end
- next_tick = earliest(@schedule.next_tick, next_tick)
-
- # Do the select and queue up all resulting work
- now = Time.now
- timeout = next_tick - now if next_tick
- r, w = (timeout.nil? || timeout > 0) && IO.select(r, w, nil, timeout)
- @wake.reset
- selected = Set.new
- @lock.synchronize do
- if @stopped
- @selectable.each { |s| s.close @stop_err; @work << s }
- @wake.close
- return
- end
- # Check if schedule has items due and is not already working
- if !@schedule_working && before_eq(@schedule.next_tick, now)
- @work << :schedule
- @schedule_working = true
- end
- # Eliminate duplicates between r, w and next_tick due.
- selected.merge(r) if r
- selected.delete(@wake)
- selected.merge(w) if w
- @selectable -= selected
- selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) })
- @selectable -= selected
- end
- selected.each { |s| @work << s } # Queue up tasks needing #process
- @work << :select
- end
-
# Rescue any exception raised by the block and stop the container.
def maybe_panic
begin
@@ -513,7 +464,7 @@ module Qpid::Proton
end
def check_stop_lh
- if @active.zero? && (@auto_stop || @stopped)
+ if @active.zero? && (@auto_stop || @stopped) && @work_queue.empty?
@stopped = true
work_wake nil # Signal threads to stop
true
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dab607e6/proton-c/bindings/ruby/lib/core/exceptions.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/exceptions.rb b/proton-c/bindings/ruby/lib/core/exceptions.rb
index 1f74085..443472e 100644
--- a/proton-c/bindings/ruby/lib/core/exceptions.rb
+++ b/proton-c/bindings/ruby/lib/core/exceptions.rb
@@ -126,4 +126,8 @@ module Qpid::Proton
# so that the application can delay completing the open/close to a later time.
class StopAutoResponse < ProtonError
end
+
+ # Raised when a method is called on an object that has been stopped.
+ class StoppedError < StateError
+ end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dab607e6/proton-c/bindings/ruby/lib/core/work_queue.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/work_queue.rb b/proton-c/bindings/ruby/lib/core/work_queue.rb
index d205a13..3a4fad6 100644
--- a/proton-c/bindings/ruby/lib/core/work_queue.rb
+++ b/proton-c/bindings/ruby/lib/core/work_queue.rb
@@ -19,58 +19,79 @@ module Qpid::Proton
# A thread-safe queue of work for multi-threaded programs.
#
- # Instances of {Connection} and objects associated with it ({Session}, {Sender},
- # {Receiver}, {Delivery}, {Tracker}) are not thread-safe and must be
- # used correctly when multiple threads call {Container#run}
+ # A {Container} can have multiple threads calling {Container#run}
+ # The container ensures that work associated with a single {Connection} or
+ # {Listener} is _serialized_ - two threads will never concurrently call
+ # handlers associated with the same object.
#
- # Calls to {MessagingHandler} methods by the {Container} are automatically
- # serialized for each connection instance. Other threads may have code
- # similarly serialized by adding it to the {Connection#work_queue} for the
- # connection. Each object related to a {Connection} also provides a
- # +work_queue+ method.
+ # To have your own code serialized in the same, add a block to the connection's
+ # {WorkQueue}. The block will be invoked as soon as it is safe to do so.
+ #
+ # A {Connection} and the objects associated with it ({Session}, {Sender},
+ # {Receiver}, {Delivery}, {Tracker}) are not thread safe, so if you have
+ # multiple threads calling {Container#run} or if you want to affect objects
+ # managed by the container from non-container threads you need to use the
+ # {WorkQueue}
#
class WorkQueue
- # Add code to be executed in series with other {Container} operations on the
- # work queue's owner. The code will be executed as soon as possible.
+ # Error raised if work is added after the queue has been stopped.
+ class StoppedError < Qpid::Proton::StoppedError
+ def initialize() super("WorkQueue has been stopped"); end
+ end
+
+ # Add a block of code to be invoked in sequence.
#
+ # @yield [ ] the block will be invoked with no parameters in the appropriate thread context
# @note Thread Safe: may be called in any thread.
- # @param non_block [Boolean] if true raise {ThreadError} if the operation would block.
- # @yield [ ] the block will be invoked with no parameters in the {WorkQueue} context,
- # which may be a different thread.
# @return [void]
- # @raise [ThreadError] if +non_block+ is true and the operation would block
- # @raise [EOFError] if the queue is closed and cannot accept more work
- def add(non_block=false, &block)
- @schedule.add(Time.at(0), non_block, &block)
- @container.send :wake
+ # @raise [StoppedError] if the queue is closed and cannot accept more work
+ def add(&block)
+ schedule(0, &block)
end
- # Schedule code to be executed after +delay+ seconds in series with other
- # {Container} operations on the work queue's owner.
- #
- # Work scheduled for after the {WorkQueue} has closed will be silently dropped.
+ # Schedule a block to be invoked at a certain time.
#
+ # @param at [Time] Invoke block as soon as possible after Time +at+
+ # @param at [Numeric] Invoke block after a delay of +at+ seconds from now
+ # @yield [ ] (see #add)
# @note (see #add)
- # @param delay delay in seconds until the block is added to the queue.
- # @param (see #add)
- # @yield (see #add)
- # @return [void]
+ # @return (see #add)
# @raise (see #add)
- def schedule(delay, non_block=false, &block)
- @schedule.add(Time.now + delay, non_block, &block)
+ def schedule(at, &block)
+ raise ArgumentError, "no block" unless block_given?
+ @lock.synchronize do
+ raise @closed if @closed
+ @schedule.insert(at, block)
+ end
@container.send :wake
end
- private
-
+ # @private
def initialize(container)
+ @lock = Mutex.new
@schedule = Schedule.new
@container = container
+ @closed = nil
+ end
+
+ # @private
+ def close() @lock.synchronize { @closed = StoppedError.new } end
+
+ # @private
+ def process(now)
+ while p = @lock.synchronize { @schedule.pop(now) }
+ p.call
+ end
end
- def close() @schedule.close; end
- def process(now) @schedule.process(now); end
- def next_tick() @schedule.next_tick; end
+ # @private
+ def next_tick() @lock.synchronize { @schedule.next_tick } end
+
+ # @private
+ def empty?() @lock.synchronize { @schedule.empty? } end
+
+ # @private
+ def clear() @lock.synchronize { @schedule.clear } end
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dab607e6/proton-c/bindings/ruby/lib/util/schedule.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/schedule.rb b/proton-c/bindings/ruby/lib/util/schedule.rb
index f1bfb36..cd2f0de 100644
--- a/proton-c/bindings/ruby/lib/util/schedule.rb
+++ b/proton-c/bindings/ruby/lib/util/schedule.rb
@@ -27,53 +27,37 @@ module Qpid::Proton
end
# @private
- # A sorted, thread-safe list of scheduled Proc.
- # Note: calls to #process are always serialized, but calls to #add may be concurrent.
+ # A time-sorted list of objects. Thread unsafe.
class Schedule
include TimeCompare
- Item = Struct.new(:time, :proc)
+ Entry = Struct.new(:time, :item)
- def initialize()
- @lock = Mutex.new
- @items = []
- @closed = false
- end
+ def initialize() @entries = []; end
+
+ def empty?() @entries.empty?; end
def next_tick()
- @lock.synchronize { @items.first.time unless @items.empty? }
+ @entries.first.time unless @entries.empty?
end
- # @return true if the Schedule was previously empty
- # @raise EOFError if schedule is closed
- # @raise ThreadError if +non_block+ and operation would block
- def add(time, non_block=false, &proc)
- # non_block ignored for now, but we may implement a bounded schedule in future.
- @lock.synchronize do
- raise EOFError if @closed
- if at = (0...@items.size).bsearch { |i| @items[i].time > time }
- @items.insert(at, Item.new(time, proc))
- else
- @items << Item.new(time, proc)
- end
- return @items.size == 1
- end
+ # @param at [Time] Insert item at time +at+
+ # @param at [Numeric] Insert item at +Time.now \+ at+
+ # @param at [0] Insert item at Time.at(0)
+ def insert(at, item)
+ time = case at
+ when 0 then Time.at(0) # Avoid call to Time.now for immediate tasks
+ when Numeric then Time.now + at
+ else at
+ end
+ index = time && ((0...@entries.size).bsearch { |i| @entries[i].time > time })
+ @entries.insert(index || -1, Entry.new(time, item))
end
- # @return true if the Schedule became empty as a result of this call
- def process(now)
- due = []
- empty = @lock.synchronize do
- due << @items.shift while (!@items.empty? && before_eq(@items.first.time, now))
- @items.empty?
- end
- due.each { |i| i.proc.call() }
- return empty && !due.empty?
+ # Return next item due at or before time, else nil
+ def pop(time)
+ @entries.shift.item if !@entries.empty? && before_eq(@entries.first.time, time)
end
- # #add raises EOFError after #close.
- # #process can still be called to drain the schedule.
- def close()
- @lock.synchronize { @closed = true }
- end
+ def clear() @entries.clear; end
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dab607e6/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index 1026d46..2c460aa 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -330,34 +330,10 @@ class ContainerTest < MiniTest::Test
assert_raises(Container::StoppedError) { cont.listen "" }
end
- # Make sure Container::Scheduler puts tasks in proper order.
- def test_scheduler
- a = []
- s = Schedule.new
-
- assert_equal true, s.add(Time.at 3) { a << 3 }
- assert_equal false, s.process(Time.at 2) # Should not run
- assert_equal [], a
- assert_equal true, s.process(Time.at 3) # Should run
- assert_equal [3], a
-
- a = []
- assert_equal true, s.add(Time.at 3) { a << 3 }
- assert_equal false, s.add(Time.at 5) { a << 5 }
- assert_equal false, s.add(Time.at 1) { a << 1 }
- assert_equal false, s.add(Time.at 4) { a << 4 }
- assert_equal false, s.add(Time.at 4) { a << 4.1 }
- assert_equal false, s.add(Time.at 4) { a << 4.2 }
- assert_equal false, s.process(Time.at 4)
- assert_equal [1, 3, 4, 4.1, 4.2], a
- a = []
- assert_equal true, s.process(Time.at 5)
- assert_equal [5], a
- end
-
- def test_container_schedule
+ # Test container doesn't stops only when schedule work is done
+ def test_container_work_queue
c = Container.new __method__
- delays = [0.1, 0.03, 0.02, 0.04]
+ delays = [0.1, 0.03, 0.02]
a = []
delays.each { |d| c.schedule(d) { a << [d, Time.now] } }
start = Time.now
@@ -369,7 +345,82 @@ class ContainerTest < MiniTest::Test
end
end
- def test_work_queue
+ # Test container work queue finishes due tasks on external stop, drops future tasks
+ def test_container_work_queue_stop
+ q = Queue.new
+ c = Container.new __method__
+ t = Thread.new { c.run }
+ [0.1, 0.2, 0.2, 0.2, 1.0].each { |d| c.schedule(d) { q << d } }
+ assert_equal 0.1, q.pop
+ assert_equal 0.2, q.pop
+ c.stop
+ t.join
+ assert_equal 0.2, q.pop
+ assert_equal 0.2, q.pop
+ assert_empty q
+ end
+
+ # Chain schedule calls from other schedule calls
+ def test_container_schedule_chain
+ c = Container.new(__method__)
+ delays = [0.05, 0.02, 0.04]
+ i = delays.each
+ a = []
+ p = Proc.new { c.schedule(i.next) { a << Time.now; p.call } rescue nil }
+ p.call # Schedule the first, which schedules the second etc.
+ start = Time.now
+ c.run
+ assert_equal 3, a.size
+ delays.inject(0) do |d,sum|
+ x = a.shift
+ assert_in_delta start + d + sum, x, 0.01
+ sum + d
+ end
+ end
+
+ # Schedule calls from handlers
+ def test_container_schedule_handler
+ h = Class.new() do
+ def initialize() @got = []; end
+ attr_reader :got
+ def record(m) @got << m; end
+ def on_container_start(c) c.schedule(0) {record __method__}; end
+ def on_connection_open(c) c.close; c.container.schedule(0) {record __method__}; end
+ def on_connection_close(c) c.container.schedule(0) {record __method__}; end
+ end.new
+ t = ServerContainerThread.new(__method__, nil, 1, h)
+ t.connect(t.url)
+ t.join
+ assert_equal [:on_container_start, :on_connection_open, :on_connection_open, :on_connection_close, :on_connection_close], h.got
+ end
+
+ # Raising from container handler should stop container
+ def test_container_handler_raise
+ h = Class.new() do
+ def on_container_start(c) raise "BROKEN"; end
+ end.new
+ c = Container.new(h, __method__)
+ assert_equal("BROKEN", (assert_raises(RuntimeError) { c.run }).to_s)
+ end
+
+ # Raising from connection handler should stop container
+ def test_connection_handler_raise
+ h = Class.new() do
+ def on_connection_open(c) raise "BROKEN"; end
+ end.new
+ c = ServerContainer.new(__method__, nil, 1, h)
+ c.connect(c.url)
+ assert_equal("BROKEN", (assert_raises(RuntimeError) { c.run }).to_s)
+ end
+
+ # Raising from container schedule should stop container
+ def test_container_schedule_raise
+ c = Container.new(__method__)
+ c.schedule(0) { raise "BROKEN" }
+ assert_equal("BROKEN", (assert_raises(RuntimeError) { c.run }).to_s)
+ end
+
+ def test_connection_work_queue
cont = ServerContainer.new(__method__, {}, 1)
c = cont.connect(cont.url)
t = Thread.new { cont.run }
@@ -391,6 +442,14 @@ class ContainerTest < MiniTest::Test
c.work_queue.add { c.close }
t.join
- assert_raises(EOFError) { c.work_queue.add { } }
+ assert_raises(WorkQueue::StoppedError) { c.work_queue.add { } }
+ end
+
+ # Raising from connection schedule should stop container
+ def test_connection_work_queue_raise
+ c = ServerContainer.new(__method__)
+ c.connect(c.url)
+ c.work_queue.add { raise "BROKEN" }
+ assert_equal("BROKEN", (assert_raises(RuntimeError) { c.run }).to_s)
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dab607e6/proton-c/bindings/ruby/tests/test_tools.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_tools.rb b/proton-c/bindings/ruby/tests/test_tools.rb
index 091322d..171e887 100644
--- a/proton-c/bindings/ruby/tests/test_tools.rb
+++ b/proton-c/bindings/ruby/tests/test_tools.rb
@@ -150,8 +150,8 @@ end
class ServerContainer < Qpid::Proton::Container
include Qpid::Proton
- def initialize(id=nil, listener_opts=nil, n=1)
- super id
+ def initialize(id=nil, listener_opts=nil, n=1, handler=nil)
+ super handler, id
@listener = listen_io(TCPServer.open(0), ListenOnceHandler.new(listener_opts, n))
end
@@ -162,7 +162,7 @@ class ServerContainer < Qpid::Proton::Container
end
class ServerContainerThread < ServerContainer
- def initialize(id=nil, listener_opts=nil, n=1)
+ def initialize(*args)
super
@thread = Thread.new { run }
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dab607e6/proton-c/bindings/ruby/tests/test_utils.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_utils.rb b/proton-c/bindings/ruby/tests/test_utils.rb
new file mode 100644
index 0000000..30e5e80
--- /dev/null
+++ b/proton-c/bindings/ruby/tests/test_utils.rb
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+require 'test_tools'
+require 'minitest/unit'
+
+class UtilsTest < MiniTest::Test
+ include Qpid::Proton
+
+ # Make sure Schedule puts tasks in proper order.
+ def test_schedule_empty
+ s = Schedule.new
+ assert_empty s
+ assert_nil s.next_tick
+ assert_nil s.pop(nil)
+ end
+
+ def test_schedule_insert_pop
+ s = Schedule.new
+ [3,5,4].each { |i| s.insert(Time.at(i), i) }
+ assert_equal Time.at(3), s.next_tick
+ assert_nil s.pop(Time.at(2))
+ assert_equal [3,4], [s.pop(Time.at(4)), s.pop(Time.at(4))]
+ refute_empty s
+ assert_nil s.pop(Time.at(4.9))
+ assert_equal Time.at(5), s.next_tick
+ assert_equal 5, s.pop(Time.at(5))
+ assert_empty s
+ end
+
+ # Make sure we sort by time and don't change order if same time
+ def test_schedule_sort
+ s = Schedule.new
+ [4.0, 3, 5, 1, 4.1, 4.2 ].each { |n| s.insert(Time.at(n.to_i), n) }
+ [1, 3, 4.0, 4.1, 4.2].each { |n| assert_equal n, s.pop(Time.at(4)) }
+ refute_empty s # Not empty but nothing due until time 5
+ assert_equal Time.at(5), s.next_tick
+ assert_nil s.pop(Time.at(4))
+ assert_equal 5, s.pop(Time.at(5))
+ end
+
+ def test_schedule_clear
+ s = Schedule.new
+ [3, 5].each { |n| s.insert(Time.at(n.to_i), n) }
+ refute_empty s
+ s.clear
+ assert_empty s
+ end
+end
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-proton git commit: PROTON-1820: [ruby] Improve error
handling in container
Posted by ac...@apache.org.
PROTON-1820: [ruby] Improve error handling in container
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d9345931
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d9345931
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d9345931
Branch: refs/heads/master
Commit: d93459311c4d5b35f64ce7e443dbf2c161b1b1bc
Parents: 49f4cb5
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Apr 4 20:45:16 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Apr 5 10:10:53 2018 -0400
----------------------------------------------------------------------
proton-c/bindings/ruby/lib/core/container.rb | 23 +++++++++++++----------
1 file changed, 13 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9345931/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index 85dbe69..b683412 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -35,7 +35,7 @@ module Qpid::Proton
include TimeCompare
# Error raised if the container is used after {#stop} has been called.
- class StoppedError < RuntimeError
+ class StoppedError < StateError
def initialize(*args) super("container has been stopped"); end
end
@@ -53,7 +53,7 @@ module Qpid::Proton
# concurrently.
#
def initialize(*args)
- @handler, @id, @panic = nil
+ @handler, @id = nil
case args.size
when 2 then @handler, @id = args
when 1 then
@@ -87,6 +87,7 @@ module Qpid::Proton
@running = 0 # Count of #run threads
@stopped = false # #stop called
@stop_err = nil # Optional error to pass to tasks, from #stop
+ @panic = nil # Exception caught in a run thread, to be raised by all run threads
end
# @return [MessagingHandler] The container-wide handler
@@ -95,6 +96,9 @@ module Qpid::Proton
# @return [String] unique identifier for this container
attr_reader :id
+ def to_s() "#<#{self.class} id=#{id.inspect}>"; end
+ def inspect() to_s; end
+
# Auto-stop flag.
#
# True (the default) means that the container will stop automatically, as if {#stop}
@@ -198,12 +202,13 @@ module Qpid::Proton
while task = @work.pop
run_one(task, Time.now)
end
- raise @panic if @panic
+ @lock.synchronize { raise @panic if @panic }
ensure
@lock.synchronize do
if (@running -= 1) > 0
work_wake nil # Signal the next thread
else
+ # This is the last thread, no need to do maybe_panic around this final handler call.
@adapter.on_container_stop(self) if @adapter.respond_to? :on_container_stop
end
end
@@ -217,15 +222,13 @@ module Qpid::Proton
# is finished.
#
# The container can no longer be used, using a stopped container raises
- # {StoppedError} on attempting. Create a new container if you want to
- # resume activity.
+ # {StoppedError}. Create a new container if you want to resume activity.
#
# @param error [Condition] Optional error condition passed to
# {MessagingHandler#on_transport_error} for each connection and
# {Listener::Handler::on_error} for each listener.
#
- # @param panic [Exception] Optional exception raised by all concurrent calls
- # to run()
+ # @param panic [Exception] Optional exception to raise from all calls to run()
#
def stop(error=nil, panic=nil)
@lock.synchronize do
@@ -338,14 +341,14 @@ module Qpid::Proton
@lock.synchronize do
return if @set # Don't write if already has data
@set = true
- begin @wr.write_nonblock('x') rescue IO::WaitWritable end
+ @wr.write_nonblock('x') rescue nil
end
end
def reset
@lock.synchronize do
return unless @set
- begin @rd.read_nonblock(1) rescue IO::WaitReadable end
+ @rd.read_nonblock(1) rescue nil
@set = false
end
end
@@ -361,7 +364,7 @@ module Qpid::Proton
case task
when :start
- @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
+ maybe_panic { @adapter.on_container_start(self) } if @adapter.respond_to? :on_container_start
when :select
# Compute read/write select sets and minimum next_tick for select timeout
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org