You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/04/05 14:42:47 UTC

[1/2] qpid-proton git commit: PROTON-1820: [ruby] Container#schedule does not work if called from handler

Repository: qpid-proton
Updated Branches:
  refs/heads/master 49f4cb56f -> dab607e63


PROTON-1820: [ruby] Container#schedule does not work if called from handler


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/dab607e6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/dab607e6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/dab607e6

Branch: refs/heads/master
Commit: dab607e6393e9d939847f845d4ca322705efd717
Parents: d934593
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Apr 5 10:00:38 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Apr 5 10:10:53 2018 -0400

----------------------------------------------------------------------
 proton-c/bindings/ruby/lib/core/container.rb   | 131 ++++++--------------
 proton-c/bindings/ruby/lib/core/exceptions.rb  |   4 +
 proton-c/bindings/ruby/lib/core/work_queue.rb  |  87 ++++++++-----
 proton-c/bindings/ruby/lib/util/schedule.rb    |  58 ++++-----
 proton-c/bindings/ruby/tests/test_container.rb | 117 ++++++++++++-----
 proton-c/bindings/ruby/tests/test_tools.rb     |   6 +-
 proton-c/bindings/ruby/tests/test_utils.rb     |  63 ++++++++++
 7 files changed, 274 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dab607e6/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index b683412..7d9f2cb 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -35,8 +35,8 @@ module Qpid::Proton
     include TimeCompare
 
     # Error raised if the container is used after {#stop} has been called.
-    class StoppedError < StateError
-      def initialize(*args) super("container has been stopped"); end
+    class StoppedError < Qpid::Proton::StoppedError
+      def initialize() super("container has been stopped"); end
     end
 
     # Create a new Container
@@ -66,19 +66,13 @@ module Qpid::Proton
       @adapter = Handler::Adapter.adapt(@handler) || Handler::MessagingAdapter.new(nil)
       @id = (@id || SecureRandom.uuid).freeze
 
-      # Implementation note:
-      #
-      # - #run threads take work items from @work, process them, and rearm them for select
-      # - work items are: ConnectionTask, ListenTask, :start, :select, :schedule
-      # - nil on the @work queue makes a #run thread exit
-
+      # Threading and implementation notes: see comment on #run_one
       @work = Queue.new
       @work << :start
       @work << :select
       @wake = SelectWaker.new   # Wakes #run thread in IO.select
       @auto_stop = true         # Stop when @active drops to 0
-      @schedule = Schedule.new
-      @schedule_working = false # True if :schedule is on the work queue
+      @work_queue = WorkQueue.new(self)  # work scheduled by other threads for :select context
 
       # Following instance variables protected by lock
       @lock = Mutex.new
@@ -242,25 +236,22 @@ module Qpid::Proton
         # - no more select calls after next wakeup
         # - once @active == 0, all threads will be stopped with nil
       end
-      @wake.wake
+      wake
     end
 
-    # Schedule code to be executed after a delay.
-    # @param delay [Numeric] delay in seconds, must be >= 0
-    # @yield [ ] the block is invoked with no parameters in a {#run} thread after +delay+ has elapsed
-    # @return [void]
-    # @raise [ThreadError] if +non_block+ is true and the operation would block
-    def schedule(delay, non_block=false, &block)
-      not_stopped
-      @lock.synchronize { @active += 1 } if @schedule.add(Time.now + delay, non_block, &block)
-      @wake.wake
-    end
+    # Get the {WorkQueue} that can be used to schedule code to be run by the container.
+    #
+    # Note: to run code that affects a {Connection} or it's associated objects,
+    # use {Connection#work_queue}
+    def work_queue() @work_queue; end
+
+    # (see WorkQueue#schedule)
+    def schedule(at, &block) @work_queue.schedule(at, &block) end
 
     private
 
     def wake() @wake.wake; end
 
-    # Container driver applies options and adds container context to events
     class ConnectionTask < Qpid::Proton::HandlerDriver
       include TimeCompare
 
@@ -269,15 +260,15 @@ module Qpid::Proton
         transport.set_server if server
         transport.apply opts
         connection.apply opts
-        @work_queue = WorkQueue.new container
+        @work_queue = WorkQueue.new(container)
         connection.instance_variable_set(:@work_queue, @work_queue)
       end
-      def next_tick() earliest(super, @work_queue.send(:next_tick)); end
-      def process(now) @work_queue.send(:process, now); super(); end
+      def next_tick() earliest(super, @work_queue.next_tick); end
+      def process(now) @work_queue.process(now); super(); end
 
       def dispatch              # Intercept dispatch to close work_queue
         super
-        @work_queue.send(:close) if read_closed? && write_closed? 
+        @work_queue.close if read_closed? && write_closed?
       end
     end
 
@@ -360,6 +351,12 @@ module Qpid::Proton
     end
 
     # Handle a single item from the @work queue, this is the heart of the #run loop.
+    # Take one task from @work, process it, and rearm for select
+    # Tasks are: ConnectionTask, ListenTask, :start, :select
+    # - ConnectionTask/ListenTask have #can_read, #can_write, #next_tick to set up IO.select
+    #   and #process to run handlers and process relevant work_queue
+    # - nil means exit from the  #run thread exit (handled by #run)
+    # - :select does IO.select and processes Container#work_queue
     def run_one(task, now)
       case task
 
@@ -369,40 +366,43 @@ module Qpid::Proton
       when :select
         # Compute read/write select sets and minimum next_tick for select timeout
         r, w = [@wake], []
-        next_tick = @schedule.next_tick
+        next_tick = @work_queue.next_tick
         @lock.synchronize do
           @selectable.each do |s|
-            r << s if s.send :can_read?
-            w << s if s.send :can_write?
+            r << s if s.can_read?
+            w << s if s.can_write?
             next_tick = earliest(s.next_tick, next_tick)
           end
         end
 
         timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
         r, w = IO.select(r, w, nil, timeout)
-        now = Time.now unless timeout == 0
         @wake.reset if r && r.delete(@wake)
+        now = Time.now unless timeout == 0 # Update now if we may have blocked
 
         # selected is a Set to eliminate duplicates between r, w and next_tick due.
         selected = Set.new
         selected.merge(r) if r
         selected.merge(w) if w
-        @lock.synchronize do
-          if @stopped # close everything
+        stopped = @lock.synchronize do
+          if @stopped           # close everything
             @selectable.each { |s| s.close @stop_err; @work << s }
             @selectable.clear
+            @work_queue.close
             @wake.close
-            return
-          end
-          if !@schedule_working && before_eq(@schedule.next_tick, now)
-            @schedule_working = true
-            @work << :schedule
+          else
+            @selectable -= selected # Remove already-selected tasks from @selectable
+            # Also select and remove items with next_tick before now
+            @selectable.delete_if { |s| before_eq(s.next_tick, now) and selected << s }
           end
-          selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) })
-          @selectable -= selected # Remove selected tasks from @selectable
+          @stopped
         end
         selected.each { |s| @work << s } # Queue up tasks needing #process
-        @work << :select        # Enable next select
+        maybe_panic { @work_queue.process(now) } # Process current work queue items
+        @work_queue.clear if stopped
+        @lock.synchronize { check_stop_lh } if @work_queue.empty?
+
+        @work << :select  unless stopped # Enable next select
 
       when ConnectionTask then
         maybe_panic { task.process now }
@@ -412,58 +412,9 @@ module Qpid::Proton
         io, opts = maybe_panic { task.process }
         add(connection_driver(io, opts, true)) if io
         rearm task
-
-      when :schedule then
-        if maybe_panic { @schedule.process now }
-          @lock.synchronize { @active -= 1; check_stop_lh }
-        else
-          @lock.synchronize { @schedule_working = false }
-        end
       end
     end
 
-    def do_select
-      # Compute the sets to select for read and write, and the minimum next_tick for the timeout
-      r, w = [@wake], []
-      next_tick = nil
-      @lock.synchronize do
-        @selectable.each do |s|
-          r << s if s.can_read?
-          w << s if s.can_write?
-          next_tick = earliest(s.next_tick, next_tick)
-        end
-      end
-      next_tick = earliest(@schedule.next_tick, next_tick)
-
-      # Do the select and queue up all resulting work
-      now = Time.now
-      timeout = next_tick - now if next_tick
-      r, w = (timeout.nil? || timeout > 0) && IO.select(r, w, nil, timeout)
-      @wake.reset
-      selected = Set.new
-      @lock.synchronize do
-        if @stopped
-          @selectable.each { |s| s.close @stop_err; @work << s }
-          @wake.close
-          return
-        end
-        # Check if schedule has items due and is not already working
-        if !@schedule_working && before_eq(@schedule.next_tick, now)
-          @work << :schedule
-          @schedule_working = true
-        end
-        # Eliminate duplicates between r, w and next_tick due.
-        selected.merge(r) if r
-        selected.delete(@wake)
-        selected.merge(w) if w
-        @selectable -= selected
-        selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) })
-        @selectable -= selected
-      end
-      selected.each { |s| @work << s } # Queue up tasks needing #process
-      @work << :select
-    end
-
     # Rescue any exception raised by the block and stop the container.
     def maybe_panic
       begin
@@ -513,7 +464,7 @@ module Qpid::Proton
     end
 
     def check_stop_lh
-      if @active.zero? && (@auto_stop || @stopped)
+      if @active.zero? && (@auto_stop || @stopped) && @work_queue.empty?
         @stopped = true
         work_wake nil          # Signal threads to stop
         true

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dab607e6/proton-c/bindings/ruby/lib/core/exceptions.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/exceptions.rb b/proton-c/bindings/ruby/lib/core/exceptions.rb
index 1f74085..443472e 100644
--- a/proton-c/bindings/ruby/lib/core/exceptions.rb
+++ b/proton-c/bindings/ruby/lib/core/exceptions.rb
@@ -126,4 +126,8 @@ module Qpid::Proton
   # so that the application can delay completing the open/close to a later time.
   class StopAutoResponse < ProtonError
   end
+
+  # Raised when a method is called on an object that has been stopped.
+  class StoppedError < StateError
+  end
 end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dab607e6/proton-c/bindings/ruby/lib/core/work_queue.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/work_queue.rb b/proton-c/bindings/ruby/lib/core/work_queue.rb
index d205a13..3a4fad6 100644
--- a/proton-c/bindings/ruby/lib/core/work_queue.rb
+++ b/proton-c/bindings/ruby/lib/core/work_queue.rb
@@ -19,58 +19,79 @@ module Qpid::Proton
 
   # A thread-safe queue of work for multi-threaded programs.
   #
-  # Instances of {Connection} and objects associated with it ({Session}, {Sender},
-  # {Receiver}, {Delivery}, {Tracker}) are not thread-safe and must be
-  # used correctly when multiple threads call {Container#run}
+  # A {Container} can have multiple threads calling {Container#run}
+  # The container ensures that work associated with a single {Connection} or
+  # {Listener} is _serialized_ - two threads will never concurrently call
+  # handlers associated with the same object.
   #
-  # Calls to {MessagingHandler} methods by the {Container} are automatically
-  # serialized for each connection instance. Other threads may have code
-  # similarly serialized by adding it to the {Connection#work_queue} for the
-  # connection.  Each object related to a {Connection} also provides a
-  # +work_queue+ method.
+  # To have your own code serialized in the same, add a block to the connection's
+  # {WorkQueue}. The block will be invoked as soon as it is safe to do so.
+  #
+  # A {Connection} and the objects associated with it ({Session}, {Sender},
+  # {Receiver}, {Delivery}, {Tracker}) are not thread safe, so if you have
+  # multiple threads calling {Container#run} or if you want to affect objects
+  # managed by the container from non-container threads you need to use the
+  # {WorkQueue}
   #
   class WorkQueue
 
-    # Add code to be executed in series with other {Container} operations on the
-    # work queue's owner. The code will be executed as soon as possible.
+    # Error raised if work is added after the queue has been stopped.
+    class StoppedError < Qpid::Proton::StoppedError
+      def initialize() super("WorkQueue has been stopped"); end
+    end
+
+    # Add a block of code to be invoked in sequence.
     #
+    # @yield [ ] the block will be invoked with no parameters in the appropriate thread context
     # @note Thread Safe: may be called in any thread.
-    # @param non_block [Boolean] if true raise {ThreadError} if the operation would block.
-    # @yield [ ] the block will be invoked with no parameters in the {WorkQueue} context,
-    #  which may be a different thread.
     # @return [void]
-    # @raise [ThreadError] if +non_block+ is true and the operation would block
-    # @raise [EOFError] if the queue is closed and cannot accept more work
-    def add(non_block=false, &block)
-      @schedule.add(Time.at(0), non_block, &block)
-      @container.send :wake
+    # @raise [StoppedError] if the queue is closed and cannot accept more work
+    def add(&block)
+      schedule(0, &block)
     end
 
-    # Schedule code to be executed after +delay+ seconds in series with other
-    # {Container} operations on the work queue's owner.
-    #
-    # Work scheduled for after the {WorkQueue} has closed will be silently dropped.
+    # Schedule a block to be invoked at a certain time.
     #
+    # @param at [Time] Invoke block as soon as possible after Time +at+
+    # @param at [Numeric] Invoke block after a delay of +at+ seconds from now
+    # @yield [ ] (see #add)
     # @note (see #add)
-    # @param delay delay in seconds until the block is added to the queue.
-    # @param (see #add)
-    # @yield (see #add)
-    # @return [void]
+    # @return (see #add)
     # @raise (see #add)
-    def schedule(delay, non_block=false, &block)
-      @schedule.add(Time.now + delay, non_block, &block)
+    def schedule(at, &block)
+      raise ArgumentError, "no block" unless block_given?
+      @lock.synchronize do
+        raise @closed if @closed
+        @schedule.insert(at, block)
+      end
       @container.send :wake
     end
 
-    private
-
+    # @private
     def initialize(container)
+      @lock = Mutex.new
       @schedule = Schedule.new
       @container = container
+      @closed = nil
+    end
+
+    # @private
+    def close() @lock.synchronize { @closed = StoppedError.new } end
+
+    # @private
+    def process(now)
+      while p = @lock.synchronize { @schedule.pop(now) }
+        p.call
+      end
     end
 
-    def close() @schedule.close; end
-    def process(now) @schedule.process(now); end
-    def next_tick() @schedule.next_tick; end
+    # @private
+    def next_tick() @lock.synchronize { @schedule.next_tick } end
+
+    # @private
+    def empty?() @lock.synchronize { @schedule.empty? } end
+
+    # @private
+    def clear() @lock.synchronize { @schedule.clear } end
   end
 end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dab607e6/proton-c/bindings/ruby/lib/util/schedule.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/schedule.rb b/proton-c/bindings/ruby/lib/util/schedule.rb
index f1bfb36..cd2f0de 100644
--- a/proton-c/bindings/ruby/lib/util/schedule.rb
+++ b/proton-c/bindings/ruby/lib/util/schedule.rb
@@ -27,53 +27,37 @@ module Qpid::Proton
   end
 
   # @private
-  # A sorted, thread-safe list of scheduled Proc.
-  # Note: calls to #process are always serialized, but calls to #add may be concurrent.
+  # A time-sorted list of objects. Thread unsafe.
   class Schedule
     include TimeCompare
-    Item = Struct.new(:time, :proc)
+    Entry = Struct.new(:time, :item)
 
-    def initialize()
-      @lock = Mutex.new
-      @items = []
-      @closed = false
-    end
+    def initialize() @entries = []; end
+
+    def empty?() @entries.empty?; end
 
     def next_tick()
-      @lock.synchronize { @items.first.time unless @items.empty? }
+      @entries.first.time unless @entries.empty?
     end
 
-    # @return true if the Schedule was previously empty
-    # @raise EOFError if schedule is closed
-    # @raise ThreadError if +non_block+ and operation would block
-    def add(time, non_block=false, &proc)
-      # non_block ignored for now, but we may implement a bounded schedule in future.
-      @lock.synchronize do
-        raise EOFError if @closed
-        if at = (0...@items.size).bsearch { |i| @items[i].time > time }
-          @items.insert(at, Item.new(time, proc))
-        else
-          @items << Item.new(time, proc)
-        end
-        return @items.size == 1
-      end
+    # @param at [Time] Insert item at time +at+
+    # @param at [Numeric] Insert item at +Time.now \+ at+
+    # @param at [0] Insert item at Time.at(0)
+    def insert(at, item)
+      time = case at
+             when 0 then Time.at(0) # Avoid call to Time.now for immediate tasks
+             when Numeric then Time.now + at
+             else at
+             end
+      index = time && ((0...@entries.size).bsearch { |i| @entries[i].time > time })
+      @entries.insert(index || -1, Entry.new(time, item))
     end
 
-    # @return true if the Schedule became empty as a result of this call
-    def process(now)
-      due = []
-      empty = @lock.synchronize do
-        due << @items.shift while (!@items.empty? && before_eq(@items.first.time, now))
-        @items.empty?
-      end
-      due.each { |i| i.proc.call() }
-      return empty && !due.empty?
+    # Return next item due at or before time, else nil
+    def pop(time)
+      @entries.shift.item if !@entries.empty? && before_eq(@entries.first.time, time)
     end
 
-    # #add raises EOFError after #close.
-    # #process can still be called to drain the schedule.
-    def close()
-      @lock.synchronize { @closed = true }
-    end
+    def clear() @entries.clear; end
   end
 end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dab607e6/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index 1026d46..2c460aa 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -330,34 +330,10 @@ class ContainerTest < MiniTest::Test
     assert_raises(Container::StoppedError) { cont.listen "" }
   end
 
-  # Make sure Container::Scheduler puts tasks in proper order.
-  def test_scheduler
-    a = []
-    s = Schedule.new
-
-    assert_equal true,  s.add(Time.at 3) { a << 3 }
-    assert_equal false, s.process(Time.at 2)      # Should not run
-    assert_equal [], a
-    assert_equal true, s.process(Time.at 3)      # Should run
-    assert_equal [3], a
-
-    a = []
-    assert_equal true, s.add(Time.at 3) { a << 3 }
-    assert_equal false, s.add(Time.at 5) { a << 5 }
-    assert_equal false, s.add(Time.at 1) { a << 1 }
-    assert_equal false, s.add(Time.at 4) { a << 4 }
-    assert_equal false, s.add(Time.at 4) { a << 4.1 }
-    assert_equal false, s.add(Time.at 4) { a << 4.2 }
-    assert_equal false, s.process(Time.at 4)
-    assert_equal [1, 3, 4, 4.1, 4.2], a
-    a = []
-    assert_equal true, s.process(Time.at 5)
-    assert_equal [5], a
-  end
-
-  def test_container_schedule
+  # Test container doesn't stops only when schedule work is done
+  def test_container_work_queue
     c = Container.new __method__
-    delays = [0.1, 0.03, 0.02, 0.04]
+    delays = [0.1, 0.03, 0.02]
     a = []
     delays.each { |d| c.schedule(d) { a << [d, Time.now] } }
     start = Time.now
@@ -369,7 +345,82 @@ class ContainerTest < MiniTest::Test
     end
   end
 
-  def test_work_queue
+  # Test container work queue finishes due tasks on external stop, drops future tasks
+  def test_container_work_queue_stop
+    q = Queue.new
+    c = Container.new __method__
+    t = Thread.new { c.run }
+    [0.1, 0.2, 0.2, 0.2, 1.0].each { |d| c.schedule(d) { q << d } }
+    assert_equal 0.1, q.pop
+    assert_equal 0.2, q.pop
+    c.stop
+    t.join
+    assert_equal 0.2, q.pop
+    assert_equal 0.2, q.pop
+    assert_empty q
+  end
+
+  # Chain schedule calls from other schedule calls
+  def test_container_schedule_chain
+    c = Container.new(__method__)
+    delays = [0.05, 0.02, 0.04]
+    i = delays.each
+    a = []
+    p = Proc.new { c.schedule(i.next) { a << Time.now; p.call } rescue nil }
+    p.call                 # Schedule the first, which schedules the second etc.
+    start = Time.now
+    c.run
+    assert_equal 3, a.size
+    delays.inject(0) do |d,sum|
+      x = a.shift
+      assert_in_delta  start + d + sum, x, 0.01
+      sum + d
+    end
+  end
+
+  # Schedule calls from handlers
+  def test_container_schedule_handler
+    h = Class.new() do
+      def initialize() @got = []; end
+      attr_reader :got
+      def record(m) @got << m; end
+      def on_container_start(c) c.schedule(0) {record __method__}; end
+      def on_connection_open(c) c.close; c.container.schedule(0) {record __method__}; end
+      def on_connection_close(c) c.container.schedule(0) {record __method__}; end
+    end.new
+    t = ServerContainerThread.new(__method__, nil, 1, h)
+    t.connect(t.url)
+    t.join
+    assert_equal [:on_container_start, :on_connection_open, :on_connection_open, :on_connection_close, :on_connection_close], h.got
+  end
+
+  # Raising from container handler should stop container
+  def test_container_handler_raise
+    h = Class.new() do
+      def on_container_start(c) raise "BROKEN"; end
+    end.new
+    c = Container.new(h, __method__)
+    assert_equal("BROKEN", (assert_raises(RuntimeError) { c.run }).to_s)
+  end
+
+  # Raising from connection handler should stop container
+  def test_connection_handler_raise
+    h = Class.new() do
+      def on_connection_open(c) raise "BROKEN"; end
+    end.new
+    c = ServerContainer.new(__method__, nil, 1, h)
+    c.connect(c.url)
+    assert_equal("BROKEN", (assert_raises(RuntimeError) { c.run }).to_s)
+  end
+
+  # Raising from container schedule should stop container
+  def test_container_schedule_raise
+    c = Container.new(__method__)
+    c.schedule(0) { raise "BROKEN" }
+    assert_equal("BROKEN", (assert_raises(RuntimeError) { c.run }).to_s)
+  end
+
+  def test_connection_work_queue
     cont = ServerContainer.new(__method__, {}, 1)
     c = cont.connect(cont.url)
     t = Thread.new { cont.run }
@@ -391,6 +442,14 @@ class ContainerTest < MiniTest::Test
 
     c.work_queue.add { c.close }
     t.join
-    assert_raises(EOFError) { c.work_queue.add {  } }
+    assert_raises(WorkQueue::StoppedError) { c.work_queue.add {  } }
+  end
+
+  # Raising from connection schedule should stop container
+  def test_connection_work_queue_raise
+    c = ServerContainer.new(__method__)
+    c.connect(c.url)
+    c.work_queue.add { raise "BROKEN" }
+    assert_equal("BROKEN", (assert_raises(RuntimeError) { c.run }).to_s)
   end
 end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dab607e6/proton-c/bindings/ruby/tests/test_tools.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_tools.rb b/proton-c/bindings/ruby/tests/test_tools.rb
index 091322d..171e887 100644
--- a/proton-c/bindings/ruby/tests/test_tools.rb
+++ b/proton-c/bindings/ruby/tests/test_tools.rb
@@ -150,8 +150,8 @@ end
 class ServerContainer < Qpid::Proton::Container
   include Qpid::Proton
 
-  def initialize(id=nil, listener_opts=nil, n=1)
-    super id
+  def initialize(id=nil, listener_opts=nil, n=1, handler=nil)
+    super handler, id
     @listener = listen_io(TCPServer.open(0), ListenOnceHandler.new(listener_opts, n))
   end
 
@@ -162,7 +162,7 @@ class ServerContainer < Qpid::Proton::Container
 end
 
 class ServerContainerThread < ServerContainer
-  def initialize(id=nil, listener_opts=nil, n=1)
+  def initialize(*args)
     super
     @thread = Thread.new { run }
   end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/dab607e6/proton-c/bindings/ruby/tests/test_utils.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_utils.rb b/proton-c/bindings/ruby/tests/test_utils.rb
new file mode 100644
index 0000000..30e5e80
--- /dev/null
+++ b/proton-c/bindings/ruby/tests/test_utils.rb
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+require 'test_tools'
+require 'minitest/unit'
+
+class UtilsTest < MiniTest::Test
+  include Qpid::Proton
+
+  # Make sure Schedule puts tasks in proper order.
+  def test_schedule_empty
+    s = Schedule.new
+    assert_empty s
+    assert_nil s.next_tick
+    assert_nil s.pop(nil)
+  end
+
+  def test_schedule_insert_pop
+    s = Schedule.new
+    [3,5,4].each { |i| s.insert(Time.at(i), i) }
+    assert_equal Time.at(3), s.next_tick
+    assert_nil s.pop(Time.at(2))
+    assert_equal [3,4], [s.pop(Time.at(4)), s.pop(Time.at(4))]
+    refute_empty s
+    assert_nil s.pop(Time.at(4.9))
+    assert_equal Time.at(5), s.next_tick
+    assert_equal 5, s.pop(Time.at(5))
+    assert_empty s
+  end
+
+  # Make sure we sort by time and don't change order if same time
+  def test_schedule_sort
+    s = Schedule.new
+    [4.0, 3, 5, 1, 4.1, 4.2 ].each { |n| s.insert(Time.at(n.to_i), n) }
+    [1, 3, 4.0, 4.1, 4.2].each { |n| assert_equal n, s.pop(Time.at(4)) }
+    refute_empty s              # Not empty but nothing due until time 5
+    assert_equal Time.at(5), s.next_tick
+    assert_nil s.pop(Time.at(4))
+    assert_equal 5, s.pop(Time.at(5))
+  end
+
+  def test_schedule_clear
+    s = Schedule.new
+    [3, 5].each { |n| s.insert(Time.at(n.to_i), n) }
+    refute_empty s
+    s.clear
+    assert_empty s
+  end
+end


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-proton git commit: PROTON-1820: [ruby] Improve error handling in container

Posted by ac...@apache.org.
PROTON-1820: [ruby] Improve error handling in container


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d9345931
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d9345931
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d9345931

Branch: refs/heads/master
Commit: d93459311c4d5b35f64ce7e443dbf2c161b1b1bc
Parents: 49f4cb5
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Apr 4 20:45:16 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Apr 5 10:10:53 2018 -0400

----------------------------------------------------------------------
 proton-c/bindings/ruby/lib/core/container.rb | 23 +++++++++++++----------
 1 file changed, 13 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9345931/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index 85dbe69..b683412 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -35,7 +35,7 @@ module Qpid::Proton
     include TimeCompare
 
     # Error raised if the container is used after {#stop} has been called.
-    class StoppedError < RuntimeError
+    class StoppedError < StateError
       def initialize(*args) super("container has been stopped"); end
     end
 
@@ -53,7 +53,7 @@ module Qpid::Proton
     #   concurrently.
     #
     def initialize(*args)
-      @handler, @id, @panic = nil
+      @handler, @id = nil
       case args.size
       when 2 then @handler, @id = args
       when 1 then
@@ -87,6 +87,7 @@ module Qpid::Proton
       @running = 0              # Count of #run threads
       @stopped = false          # #stop called
       @stop_err = nil           # Optional error to pass to tasks, from #stop
+      @panic = nil              # Exception caught in a run thread, to be raised by all run threads
     end
 
     # @return [MessagingHandler] The container-wide handler
@@ -95,6 +96,9 @@ module Qpid::Proton
     # @return [String] unique identifier for this container
     attr_reader :id
 
+    def to_s() "#<#{self.class} id=#{id.inspect}>"; end
+    def inspect() to_s; end
+
     # Auto-stop flag.
     #
     # True (the default) means that the container will stop automatically, as if {#stop}
@@ -198,12 +202,13 @@ module Qpid::Proton
       while task = @work.pop
         run_one(task, Time.now)
       end
-      raise @panic if @panic
+      @lock.synchronize { raise @panic if @panic }
     ensure
       @lock.synchronize do
         if (@running -= 1) > 0
           work_wake nil         # Signal the next thread
         else
+          # This is the last thread, no need to do maybe_panic around this final handler call.
           @adapter.on_container_stop(self) if @adapter.respond_to? :on_container_stop
         end
       end
@@ -217,15 +222,13 @@ module Qpid::Proton
     # is finished.
     #
     # The container can no longer be used, using a stopped container raises
-    # {StoppedError} on attempting.  Create a new container if you want to
-    # resume activity.
+    # {StoppedError}.  Create a new container if you want to resume activity.
     #
     # @param error [Condition] Optional error condition passed to
     #  {MessagingHandler#on_transport_error} for each connection and
     #  {Listener::Handler::on_error} for each listener.
     #
-    # @param panic [Exception] Optional exception raised by all concurrent calls
-    # to run()
+    # @param panic [Exception] Optional exception to raise from all calls to run()
     #
     def stop(error=nil, panic=nil)
       @lock.synchronize do
@@ -338,14 +341,14 @@ module Qpid::Proton
         @lock.synchronize do
           return if @set        # Don't write if already has data
           @set = true
-          begin @wr.write_nonblock('x') rescue IO::WaitWritable end
+          @wr.write_nonblock('x') rescue nil
         end
       end
 
       def reset
         @lock.synchronize do
           return unless @set
-          begin @rd.read_nonblock(1) rescue IO::WaitReadable end
+          @rd.read_nonblock(1) rescue nil
           @set = false
         end
       end
@@ -361,7 +364,7 @@ module Qpid::Proton
       case task
 
       when :start
-        @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
+        maybe_panic { @adapter.on_container_start(self) } if @adapter.respond_to? :on_container_start
 
       when :select
         # Compute read/write select sets and minimum next_tick for select timeout


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org