You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/03/23 15:08:59 UTC
[5/8] qpid-proton git commit: PROTON-1803: [ruby] Container support
for scheduled tasks
PROTON-1803: [ruby] Container support for scheduled tasks
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/02f49551
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/02f49551
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/02f49551
Branch: refs/heads/master
Commit: 02f495510cb9f5e5308393fe8bb8e44df8ecebcd
Parents: 1108c4e
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 22 16:01:05 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 09:39:39 2018 -0400
----------------------------------------------------------------------
proton-c/bindings/ruby/lib/core/container.rb | 115 ++++++++++++++++----
proton-c/bindings/ruby/lib/qpid_proton.rb | 1 +
proton-c/bindings/ruby/lib/util/schedule.rb | 63 +++++++++++
proton-c/bindings/ruby/tests/test_container.rb | 40 ++++++-
4 files changed, 194 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index 2d920b4..c581d34 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -21,6 +21,8 @@ require 'set'
require_relative 'listener'
module Qpid::Proton
+ public
+
# An AMQP container manages a set of {Listener}s and {Connection}s which
# contain {#Sender} and {#Receiver} links to transfer messages. Usually, each
# AMQP client or server process has a single container for all of its
@@ -29,6 +31,8 @@ module Qpid::Proton
# One or more threads can call {#run}, events generated by all the listeners and
# connections will be dispatched in the {#run} threads.
class Container
+ include TimeCompare
+
# Error raised if the container is used after {#stop} has been called.
class StoppedError < RuntimeError
def initialize(*args) super("container has been stopped"); end
@@ -62,16 +66,17 @@ module Qpid::Proton
# Implementation note:
#
- # - #run threads take work from @work
- # - Each driver and the Container itself is processed by at most one #run thread at a time
- # - The Container thread does IO.select
+ # - #run threads take work items from @work, process them, and rearm them for select
+ # - work items are: ConnectionTask, ListenTask, :start, :select, :schedule
# - nil on the @work queue makes a #run thread exit
@work = Queue.new
@work << :start
- @work << self # Issue start and start start selecting
+ @work << :select
@wake = SelectWaker.new # Wakes #run thread in IO.select
@auto_stop = true # Stop when @active drops to 0
+ @schedule = Schedule.new
+ @schedule_working = false # True if :schedule is on the work queue
# Following instance variables protected by lock
@lock = Mutex.new
@@ -234,6 +239,17 @@ module Qpid::Proton
@wake.wake
end
+ # Schedule code to be executed after a delay.
+ # @param delay [Numeric] delay in seconds, must be >= 0
+ # @yield [ ] the block is invoked with no parameters in a {#run} thread after +delay+ has elapsed
+ def schedule(delay, &block)
+ delay >= 0.0 or raise ArgumentError, "delay=#{delay} must be >= 0"
+ block_given? or raise ArgumentError, "no block given"
+ not_stopped
+ @lock.synchronize { @active += 1 } if @schedule.add(Time.now + delay, &block)
+ @wake.wake
+ end
+
private
# Container driver applies options and adds container context to events
@@ -288,6 +304,8 @@ module Qpid::Proton
STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix
@handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method)
end
+
+ def next_tick() nil; end
end
# Selectable object that can be used to wake IO.select from another thread
@@ -329,35 +347,44 @@ module Qpid::Proton
when :start
@adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
- when Container
+ when :select
+ # Compute read/write select sets and minimum next_tick for select timeout
r, w = [@wake], []
- next_tick = nil
+ next_tick = @schedule.next_tick
@lock.synchronize do
@selectable.each do |s|
r << s if s.send :can_read?
w << s if s.send :can_write?
- next_tick = next_tick_min(s, next_tick)
+ next_tick = earliest(s.next_tick, next_tick)
end
end
+
now = Time.now
timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
r, w = IO.select(r, w, nil, timeout)
now = Time.now
- selected = Set.new(r).delete(@wake)
+ @wake.reset if r && r.delete(@wake)
+
+ # selected is a Set to eliminate duplicates between r, w and next_tick due.
+ selected = Set.new
+ selected.merge(r) if r
selected.merge(w) if w
- selected.merge(@selectable.select { |s| next_tick_due(s, now) })
- @wake.reset
- stop_select = nil
@lock.synchronize do
- if stop_select = @stopped # close everything
- selected += @selectable
- selected.each { |s| s.close @stop_err }
+ if @stopped # close everything
+ @selectable.each { |s| s.close @stop_err; @work << s }
+ @selectable.clear
@wake.close
+ return
end
- @selectable -= selected # Remove selected tasks
+ if !@schedule_working && before_eq(@schedule.next_tick, now)
+ @schedule_working = true
+ @work << :schedule
+ end
+ selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) })
+ @selectable -= selected # Remove selected tasks from @selectable
end
selected.each { |s| @work << s } # Queue up tasks needing #process
- @work << self unless stop_select
+ @work << :select # Enable next select
when ConnectionTask then
maybe_panic { task.process }
@@ -367,17 +394,56 @@ module Qpid::Proton
io, opts = maybe_panic { task.process }
add(connection_driver(io, opts, true)) if io
rearm task
- end
- end
- def next_tick_due(x, now)
- nt = x.respond_to?(:next_tick) && x.next_tick
- nt && (nt <= now)
+ when :schedule then
+ if maybe_panic { @schedule.process Time.now }
+ @lock.synchronize { @active -= 1; check_stop_lh }
+ else
+ @lock.synchronize { @schedule_working = false }
+ end
+ end
end
- def next_tick_min(x, t)
- nt = x.respond_to?(:next_tick) && x.next_tick
- nt if !t || (nt < t)
+ def do_select
+ # Compute the sets to select for read and write, and the minimum next_tick for the timeout
+ r, w = [@wake], []
+ next_tick = nil
+ @lock.synchronize do
+ @selectable.each do |s|
+ r << s if s.can_read?
+ w << s if s.can_write?
+ next_tick = earliest(s.next_tick, next_tick)
+ end
+ end
+ next_tick = earliest(@schedule.next_tick, next_tick)
+
+ # Do the select and queue up all resulting work
+ now = Time.now
+ timeout = next_tick - now if next_tick
+ r, w = (timeout.nil? || timeout > 0) && IO.select(r, w, nil, timeout)
+ @wake.reset
+ selected = Set.new
+ @lock.synchronize do
+ if @stopped
+ @selectable.each { |s| s.close @stop_err; @work << s }
+ @wake.close
+ return
+ end
+ # Check if schedule has items due and is not already working
+ if !@schedule_working && before_eq(@schedule.next_tick, now)
+ @work << :schedule
+ @schedule_working = true
+ end
+ # Eliminate duplicates between r, w and next_tick due.
+ selected.merge(r) if r
+ selected.delete(@wake)
+ selected.merge(w) if w
+ @selectable -= selected
+ selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) })
+ @selectable -= selected
+ end
+ selected.each { |s| @work << s } # Queue up tasks needing #process
+ @work << :select
end
# Rescue any exception raised by the block and stop the container.
@@ -386,6 +452,7 @@ module Qpid::Proton
yield
rescue Exception => e
stop(nil, e)
+ nil
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/lib/qpid_proton.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb
index c52310b..21a15b9 100644
--- a/proton-c/bindings/ruby/lib/qpid_proton.rb
+++ b/proton-c/bindings/ruby/lib/qpid_proton.rb
@@ -44,6 +44,7 @@ require "core/exceptions"
require "util/deprecation"
require "util/version"
require "util/error_handler"
+require "util/schedule"
require "util/wrapper"
# Types
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/lib/util/schedule.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/schedule.rb b/proton-c/bindings/ruby/lib/util/schedule.rb
new file mode 100644
index 0000000..ef80c1b
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/util/schedule.rb
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+module Qpid::Proton
+
+ # @private
+ module TimeCompare
+ # t1 <= t2, where nil is treated as "distant future"
+ def before_eq(t1, t2) (t1 && t2) ? (t1 <= t2) : t1; end
+
+ # min(t1, t2) where nil is treated as "distant future"
+ def earliest(t1, t2) before_eq(t1, t2) ? t1 : t2; end
+ end
+
+ # @private
+ # A sorted, thread-safe list of scheduled Proc.
+ # Note: calls to #process are always serialized, but calls to #add may be concurrent.
+ class Schedule
+ include TimeCompare
+ Item = Struct.new(:time, :proc)
+
+ def initialize() @lock = Mutex.new; @items = []; end
+
+ def next_tick() @lock.synchronize { @items.empty? ? nil : @items.first.time } end
+
+ # @return true if the Schedule was previously empty
+ def add(time, &proc)
+ @lock.synchronize do
+ if at = (0...@items.size).bsearch { |i| @items[i].time > time }
+ @items.insert(at, Item.new(time, proc))
+ else
+ @items << Item.new(time, proc)
+ end
+ return @items.size == 1
+ end
+ end
+
+ # @return true if the Schedule became empty as a result of this call
+ def process(now)
+ due = []
+ empty = @lock.synchronize do
+ due << @items.shift while !@items.empty? && before_eq(@items.first.time, now)
+ @items.empty?
+ end
+ due.each { |i| i.proc.call() }
+ return empty && !due.empty?
+ end
+ end
+end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index faa505a..c9f54cc 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -300,5 +300,43 @@ class ContainerTest < MiniTest::Test
assert_raises(Container::StoppedError) { cont.run }
assert_raises(Container::StoppedError) { cont.listen "" }
end
-end
+ # Make sure Container::Scheduler puts tasks in proper order.
+ def test_scheduler
+ a = []
+ s = Schedule.new
+
+ assert_equal true, s.add(Time.at 3) { a << 3 }
+ assert_equal false, s.process(Time.at 2) # Should not run
+ assert_equal [], a
+ assert_equal true, s.process(Time.at 3) # Should run
+ assert_equal [3], a
+
+ a = []
+ assert_equal true, s.add(Time.at 3) { a << 3 }
+ assert_equal false, s.add(Time.at 5) { a << 5 }
+ assert_equal false, s.add(Time.at 1) { a << 1 }
+ assert_equal false, s.add(Time.at 4) { a << 4 }
+ assert_equal false, s.add(Time.at 4) { a << 4.1 }
+ assert_equal false, s.add(Time.at 4) { a << 4.2 }
+ assert_equal false, s.process(Time.at 4)
+ assert_equal [1, 3, 4, 4.1, 4.2], a
+ a = []
+ assert_equal true, s.process(Time.at 5)
+ assert_equal [5], a
+ end
+
+ def test_container_schedule
+ c = Container.new __method__
+ delays = [0.1, 0.03, 0.02, 0.04]
+ a = []
+ delays.each { |d| c.schedule(d) { a << [d, Time.now] } }
+ start = Time.now
+ c.run
+ delays.sort.each do |d|
+ x = a.shift
+ assert_equal d, x[0]
+ assert_in_delta start + d, x[1], 0.01, "#{d}"
+ end
+ end
+end
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org