You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/03/23 15:08:59 UTC

[5/8] qpid-proton git commit: PROTON-1803: [ruby] Container support for scheduled tasks

PROTON-1803: [ruby] Container support for scheduled tasks


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/02f49551
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/02f49551
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/02f49551

Branch: refs/heads/master
Commit: 02f495510cb9f5e5308393fe8bb8e44df8ecebcd
Parents: 1108c4e
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 22 16:01:05 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 09:39:39 2018 -0400

----------------------------------------------------------------------
 proton-c/bindings/ruby/lib/core/container.rb   | 115 ++++++++++++++++----
 proton-c/bindings/ruby/lib/qpid_proton.rb      |   1 +
 proton-c/bindings/ruby/lib/util/schedule.rb    |  63 +++++++++++
 proton-c/bindings/ruby/tests/test_container.rb |  40 ++++++-
 4 files changed, 194 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index 2d920b4..c581d34 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -21,6 +21,8 @@ require 'set'
 require_relative 'listener'
 
 module Qpid::Proton
+  public
+
   # An AMQP container manages a set of {Listener}s and {Connection}s which
   # contain {#Sender} and {#Receiver} links to transfer messages.  Usually, each
   # AMQP client or server process has a single container for all of its
@@ -29,6 +31,8 @@ module Qpid::Proton
   # One or more threads can call {#run}, events generated by all the listeners and
   # connections will be dispatched in the {#run} threads.
   class Container
+    include TimeCompare
+
     # Error raised if the container is used after {#stop} has been called.
     class StoppedError < RuntimeError
       def initialize(*args) super("container has been stopped"); end
@@ -62,16 +66,17 @@ module Qpid::Proton
 
       # Implementation note:
       #
-      # - #run threads take work from @work
-      # - Each driver and the Container itself is processed by at most one #run thread at a time
-      # - The Container thread does IO.select
+      # - #run threads take work items from @work, process them, and rearm them for select
+      # - work items are: ConnectionTask, ListenTask, :start, :select, :schedule
       # - nil on the @work queue makes a #run thread exit
 
       @work = Queue.new
       @work << :start
-      @work << self             # Issue start and start start selecting
+      @work << :select
       @wake = SelectWaker.new   # Wakes #run thread in IO.select
       @auto_stop = true         # Stop when @active drops to 0
+      @schedule = Schedule.new
+      @schedule_working = false # True if :schedule is on the work queue
 
       # Following instance variables protected by lock
       @lock = Mutex.new
@@ -234,6 +239,17 @@ module Qpid::Proton
       @wake.wake
     end
 
+    # Schedule code to be executed after a delay.
+    # @param delay [Numeric] delay in seconds, must be >= 0
+    # @yield [ ] the block is invoked with no parameters in a {#run} thread after +delay+ has elapsed
+    def schedule(delay, &block)
+      delay >= 0.0 or raise ArgumentError, "delay=#{delay} must be >= 0"
+      block_given? or raise ArgumentError, "no block given"
+      not_stopped
+      @lock.synchronize { @active += 1 } if @schedule.add(Time.now + delay, &block)
+      @wake.wake
+    end
+
     private
 
     # Container driver applies options and adds container context to events
@@ -288,6 +304,8 @@ module Qpid::Proton
         STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix
         @handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method)
       end
+
+      def next_tick() nil; end
     end
 
     # Selectable object that can be used to wake IO.select from another thread
@@ -329,35 +347,44 @@ module Qpid::Proton
       when :start
         @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
 
-      when Container
+      when :select
+        # Compute read/write select sets and minimum next_tick for select timeout
         r, w = [@wake], []
-        next_tick = nil
+        next_tick = @schedule.next_tick
         @lock.synchronize do
           @selectable.each do |s|
             r << s if s.send :can_read?
             w << s if s.send :can_write?
-            next_tick = next_tick_min(s, next_tick)
+            next_tick = earliest(s.next_tick, next_tick)
           end
         end
+
         now = Time.now
         timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
         r, w = IO.select(r, w, nil, timeout)
         now = Time.now
-        selected = Set.new(r).delete(@wake)
+        @wake.reset if r && r.delete(@wake)
+
+        # selected is a Set to eliminate duplicates between r, w and next_tick due.
+        selected = Set.new
+        selected.merge(r) if r
         selected.merge(w) if w
-        selected.merge(@selectable.select { |s| next_tick_due(s, now) })
-        @wake.reset
-        stop_select = nil
         @lock.synchronize do
-          if stop_select = @stopped # close everything
-            selected += @selectable
-            selected.each { |s| s.close @stop_err }
+          if @stopped # close everything
+            @selectable.each { |s| s.close @stop_err; @work << s }
+            @selectable.clear
             @wake.close
+            return
           end
-          @selectable -= selected # Remove selected tasks
+          if !@schedule_working && before_eq(@schedule.next_tick, now)
+            @schedule_working = true
+            @work << :schedule
+          end
+          selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) })
+          @selectable -= selected # Remove selected tasks from @selectable
         end
         selected.each { |s| @work << s } # Queue up tasks needing #process
-        @work << self unless stop_select
+        @work << :select        # Enable next select
 
       when ConnectionTask then
         maybe_panic { task.process }
@@ -367,17 +394,56 @@ module Qpid::Proton
         io, opts = maybe_panic { task.process }
         add(connection_driver(io, opts, true)) if io
         rearm task
-      end
-    end
 
-    def next_tick_due(x, now)
-      nt = x.respond_to?(:next_tick) && x.next_tick
-      nt && (nt <= now)
+      when :schedule then
+        if maybe_panic { @schedule.process Time.now }
+          @lock.synchronize { @active -= 1; check_stop_lh }
+        else
+          @lock.synchronize { @schedule_working = false }
+        end
+      end
     end
 
-    def next_tick_min(x, t)
-      nt = x.respond_to?(:next_tick) && x.next_tick
-      nt if !t || (nt < t)
+    def do_select
+      # Compute the sets to select for read and write, and the minimum next_tick for the timeout
+      r, w = [@wake], []
+      next_tick = nil
+      @lock.synchronize do
+        @selectable.each do |s|
+          r << s if s.can_read?
+          w << s if s.can_write?
+          next_tick = earliest(s.next_tick, next_tick)
+        end
+      end
+      next_tick = earliest(@schedule.next_tick, next_tick)
+
+      # Do the select and queue up all resulting work
+      now = Time.now
+      timeout = next_tick - now if next_tick
+      r, w = (timeout.nil? || timeout > 0) && IO.select(r, w, nil, timeout)
+      @wake.reset
+      selected = Set.new
+      @lock.synchronize do
+        if @stopped
+          @selectable.each { |s| s.close @stop_err; @work << s }
+          @wake.close
+          return
+        end
+        # Check if schedule has items due and is not already working
+        if !@schedule_working && before_eq(@schedule.next_tick, now)
+          @work << :schedule
+          @schedule_working = true
+        end
+        # Eliminate duplicates between r, w and next_tick due.
+        selected.merge(r) if r
+        selected.delete(@wake)
+        selected.merge(w) if w
+        @selectable -= selected
+        selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) })
+        @selectable -= selected
+      end
+      selected.each { |s| @work << s } # Queue up tasks needing #process
+      @work << :select
     end
 
     # Rescue any exception raised by the block and stop the container.
@@ -386,6 +452,7 @@ module Qpid::Proton
         yield
       rescue Exception => e
         stop(nil, e)
+        nil
       end
     end
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/lib/qpid_proton.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb
index c52310b..21a15b9 100644
--- a/proton-c/bindings/ruby/lib/qpid_proton.rb
+++ b/proton-c/bindings/ruby/lib/qpid_proton.rb
@@ -44,6 +44,7 @@ require "core/exceptions"
 require "util/deprecation"
 require "util/version"
 require "util/error_handler"
+require "util/schedule"
 require "util/wrapper"
 
 # Types

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/lib/util/schedule.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/schedule.rb b/proton-c/bindings/ruby/lib/util/schedule.rb
new file mode 100644
index 0000000..ef80c1b
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/util/schedule.rb
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+module Qpid::Proton
+
+  # @private
+  module TimeCompare
+    # t1 <= t2, where nil is treated as "distant future"
+    def before_eq(t1, t2) (t1 && t2) ? (t1 <= t2) : t1; end
+
+    # min(t1, t2) where nil is treated as "distant future"
+    def earliest(t1, t2) before_eq(t1, t2) ? t1 :  t2; end
+  end
+
+  # @private
+  # A sorted, thread-safe list of scheduled Proc.
+  # Note: calls to #process are always serialized, but calls to #add may be concurrent.
+  class Schedule
+    include TimeCompare
+    Item = Struct.new(:time, :proc)
+
+    def initialize() @lock = Mutex.new; @items = []; end
+
+    def next_tick() @lock.synchronize { @items.empty? ? nil : @items.first.time } end
+
+    # @return true if the Schedule was previously empty
+    def add(time, &proc)
+      @lock.synchronize do
+        if at = (0...@items.size).bsearch { |i| @items[i].time > time }
+          @items.insert(at, Item.new(time, proc))
+        else
+          @items << Item.new(time, proc)
+        end
+        return @items.size == 1
+      end
+    end
+
+    # @return true if the Schedule became empty as a result of this call
+    def process(now)
+      due = []
+      empty = @lock.synchronize do
+        due << @items.shift while !@items.empty? && before_eq(@items.first.time, now)
+        @items.empty?
+      end
+      due.each { |i| i.proc.call() }
+      return empty && !due.empty?
+    end
+  end
+end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index faa505a..c9f54cc 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -300,5 +300,43 @@ class ContainerTest < MiniTest::Test
     assert_raises(Container::StoppedError) { cont.run }
     assert_raises(Container::StoppedError) { cont.listen "" }
   end
-end
 
+  # Make sure Container::Scheduler puts tasks in proper order.
+  def test_scheduler
+    a = []
+    s = Schedule.new
+
+    assert_equal true,  s.add(Time.at 3) { a << 3 }
+    assert_equal false, s.process(Time.at 2)      # Should not run
+    assert_equal [], a
+    assert_equal true, s.process(Time.at 3)      # Should run
+    assert_equal [3], a
+
+    a = []
+    assert_equal true, s.add(Time.at 3) { a << 3 }
+    assert_equal false, s.add(Time.at 5) { a << 5 }
+    assert_equal false, s.add(Time.at 1) { a << 1 }
+    assert_equal false, s.add(Time.at 4) { a << 4 }
+    assert_equal false, s.add(Time.at 4) { a << 4.1 }
+    assert_equal false, s.add(Time.at 4) { a << 4.2 }
+    assert_equal false, s.process(Time.at 4)
+    assert_equal [1, 3, 4, 4.1, 4.2], a
+    a = []
+    assert_equal true, s.process(Time.at 5)
+    assert_equal [5], a
+  end
+
+  def test_container_schedule
+    c = Container.new __method__
+    delays = [0.1, 0.03, 0.02, 0.04]
+    a = []
+    delays.each { |d| c.schedule(d) { a << [d, Time.now] } }
+    start = Time.now
+    c.run
+    delays.sort.each do |d|
+      x = a.shift
+      assert_equal d, x[0]
+      assert_in_delta  start + d, x[1], 0.01, "#{d}"
+    end
+  end
+end


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org