You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/03/23 15:09:02 UTC
[8/8] qpid-proton git commit: PROTON-1778: [ruby] thread safe
work_queue
PROTON-1778: [ruby] thread safe work_queue
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/aa8d3727
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/aa8d3727
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/aa8d3727
Branch: refs/heads/master
Commit: aa8d372723ec3fe8de0d16c165ed5944b65c40f0
Parents: 02f4955
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 22 17:19:45 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 11:08:27 2018 -0400
----------------------------------------------------------------------
proton-c/bindings/ruby/lib/core/connection.rb | 4 ++
proton-c/bindings/ruby/lib/core/container.rb | 33 ++++++++----
proton-c/bindings/ruby/lib/core/endpoint.rb | 3 ++
proton-c/bindings/ruby/lib/core/work_queue.rb | 59 +++++++++++++++++++++
proton-c/bindings/ruby/lib/util/schedule.rb | 24 +++++++--
proton-c/bindings/ruby/tests/test_container.rb | 27 +++++++++-
6 files changed, 135 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/connection.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection.rb b/proton-c/bindings/ruby/lib/core/connection.rb
index 327be10..c0d161e 100644
--- a/proton-c/bindings/ruby/lib/core/connection.rb
+++ b/proton-c/bindings/ruby/lib/core/connection.rb
@@ -288,6 +288,10 @@ module Qpid::Proton
@link_prefix + "/" + (@link_count += 1).to_s(32)
end
+ # @return [WorkQueue] work queue for code that should be run in the thread
+ # context for this connection
+ attr_reader :work_queue
+
protected
def _local_condition
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index c581d34..78f8013 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -19,6 +19,7 @@
require 'thread'
require 'set'
require_relative 'listener'
+require_relative 'work_queue'
module Qpid::Proton
public
@@ -193,7 +194,7 @@ module Qpid::Proton
raise StoppedError if @stopped
end
while task = @work.pop
- run_one task
+ run_one(task, Time.now)
end
raise @panic if @panic
ensure
@@ -242,23 +243,36 @@ module Qpid::Proton
# Schedule code to be executed after a delay.
# @param delay [Numeric] delay in seconds, must be >= 0
# @yield [ ] the block is invoked with no parameters in a {#run} thread after +delay+ has elapsed
- def schedule(delay, &block)
- delay >= 0.0 or raise ArgumentError, "delay=#{delay} must be >= 0"
- block_given? or raise ArgumentError, "no block given"
+ # @return [void]
+ # @raise [ThreadError] if +non_block+ is true and the operation would block
+ def schedule(delay, non_block=false, &block)
not_stopped
- @lock.synchronize { @active += 1 } if @schedule.add(Time.now + delay, &block)
+ @lock.synchronize { @active += 1 } if @schedule.add(Time.now + delay, non_block, &block)
@wake.wake
end
private
+ def wake() @wake.wake; end
+
# Container driver applies options and adds container context to events
class ConnectionTask < Qpid::Proton::HandlerDriver
+ include TimeCompare
+
def initialize container, io, opts, server=false
super io, opts[:handler]
transport.set_server if server
transport.apply opts
connection.apply opts
+ @work_queue = WorkQueue.new container
+ connection.instance_variable_set(:@work_queue, @work_queue)
+ end
+ def next_tick() earliest(super, @work_queue.send(:next_tick)); end
+ def process(now) @work_queue.send(:process, now); super(); end
+
+ def dispatch # Intercept dispatch to close work_queue
+ super
+ @work_queue.send(:close) if read_closed? && write_closed?
end
end
@@ -341,7 +355,7 @@ module Qpid::Proton
end
# Handle a single item from the @work queue, this is the heart of the #run loop.
- def run_one(task)
+ def run_one(task, now)
case task
when :start
@@ -359,10 +373,9 @@ module Qpid::Proton
end
end
- now = Time.now
timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
r, w = IO.select(r, w, nil, timeout)
- now = Time.now
+ now = Time.now unless timeout == 0
@wake.reset if r && r.delete(@wake)
# selected is a Set to eliminate duplicates between r, w and next_tick due.
@@ -387,7 +400,7 @@ module Qpid::Proton
@work << :select # Enable next select
when ConnectionTask then
- maybe_panic { task.process }
+ maybe_panic { task.process now }
rearm task
when ListenTask then
@@ -396,7 +409,7 @@ module Qpid::Proton
rearm task
when :schedule then
- if maybe_panic { @schedule.process Time.now }
+ if maybe_panic { @schedule.process now }
@lock.synchronize { @active -= 1; check_stop_lh }
else
@lock.synchronize { @schedule_working = false }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/endpoint.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/endpoint.rb b/proton-c/bindings/ruby/lib/core/endpoint.rb
index 86d8efe..77372a0 100644
--- a/proton-c/bindings/ruby/lib/core/endpoint.rb
+++ b/proton-c/bindings/ruby/lib/core/endpoint.rb
@@ -67,6 +67,9 @@ module Qpid::Proton
self.connection.transport
end
+ # @return [WorkQueue] the work queue for work on this endpoint.
+ def work_queue() connection.work_queue; end
+
# @private
# @return [Bool] true if {#state} has all the bits of `mask` set
def check_state(mask) (self.state & mask) == mask; end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/work_queue.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/work_queue.rb b/proton-c/bindings/ruby/lib/core/work_queue.rb
new file mode 100644
index 0000000..d08d883
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/core/work_queue.rb
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+module Qpid::Proton
+
+ # A queue of work items to be executed, possibly in a different thread.
+ class WorkQueue
+
+ # Add code to be executed by the WorkQueue immediately.
+ # @param non_block [Boolean] if true raise {ThreadError} if the operation would block.
+ # @yield [ ] the block will be invoked with no parameters in the {WorkQueue} context,
+ # which may be a different thread.
+ # @return [void]
+ # @raise [ThreadError] if +non_block+ is true and the operation would block
+ # @raise [EOFError] if the queue is closed and cannot accept more work
+ def add(non_block=false, &block)
+ @schedule.add(Time.at(0), non_block, &block)
+ @container.send :wake
+ end
+
+ # Schedule work to be executed by the WorkQueue after a delay.
+ # Note that tasks scheduled after the WorkQueue closes will be silently dropped
+ #
+ # @param delay delay in seconds until the block is added to the queue.
+ # @param (see #add)
+ # @yield (see #add)
+ # @return [void]
+ # @raise (see #add)
+ def schedule(delay, non_block=false, &block)
+ @schedule.add(Time.now + delay, non_block, &block)
+ @container.send :wake
+ end
+
+ private
+
+ def initialize(container)
+ @schedule = Schedule.new
+ @container = container
+ end
+
+ def close() @schedule.close; end
+ def process(now) @schedule.process(now); end
+ def next_tick() @schedule.next_tick; end
+ end
+end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/util/schedule.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/schedule.rb b/proton-c/bindings/ruby/lib/util/schedule.rb
index ef80c1b..f1bfb36 100644
--- a/proton-c/bindings/ruby/lib/util/schedule.rb
+++ b/proton-c/bindings/ruby/lib/util/schedule.rb
@@ -33,13 +33,23 @@ module Qpid::Proton
include TimeCompare
Item = Struct.new(:time, :proc)
- def initialize() @lock = Mutex.new; @items = []; end
+ def initialize()
+ @lock = Mutex.new
+ @items = []
+ @closed = false
+ end
- def next_tick() @lock.synchronize { @items.empty? ? nil : @items.first.time } end
+ def next_tick()
+ @lock.synchronize { @items.first.time unless @items.empty? }
+ end
# @return true if the Schedule was previously empty
- def add(time, &proc)
+ # @raise EOFError if schedule is closed
+ # @raise ThreadError if +non_block+ and operation would block
+ def add(time, non_block=false, &proc)
+ # non_block ignored for now, but we may implement a bounded schedule in future.
@lock.synchronize do
+ raise EOFError if @closed
if at = (0...@items.size).bsearch { |i| @items[i].time > time }
@items.insert(at, Item.new(time, proc))
else
@@ -53,11 +63,17 @@ module Qpid::Proton
def process(now)
due = []
empty = @lock.synchronize do
- due << @items.shift while !@items.empty? && before_eq(@items.first.time, now)
+ due << @items.shift while (!@items.empty? && before_eq(@items.first.time, now))
@items.empty?
end
due.each { |i| i.proc.call() }
return empty && !due.empty?
end
+
+ # #add raises EOFError after #close.
+ # #process can still be called to drain the schedule.
+ def close()
+ @lock.synchronize { @closed = true }
+ end
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index c9f54cc..7f89205 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -336,7 +336,32 @@ class ContainerTest < MiniTest::Test
delays.sort.each do |d|
x = a.shift
assert_equal d, x[0]
- assert_in_delta start + d, x[1], 0.01, "#{d}"
+ assert_in_delta start + d, x[1], 0.01
end
end
+
+ def test_work_queue
+ cont = ServerContainer.new(__method__, {}, 1)
+ c = cont.connect(cont.url)
+ t = Thread.new { cont.run }
+ q = Queue.new
+
+ start = Time.now
+ c.work_queue.schedule(0.02) { q << [3, Thread.current] }
+ c.work_queue.add { q << [1, Thread.current] }
+ c.work_queue.schedule(0.04) { q << [4, Thread.current] }
+ c.work_queue.add { q << [2, Thread.current] }
+
+ assert_equal [1, t], q.pop
+ assert_equal [2, t], q.pop
+ assert_in_delta 0.0, Time.now - start, 0.01
+ assert_equal [3, t], q.pop
+ assert_in_delta 0.02, Time.now - start, 0.01
+ assert_equal [4, t], q.pop
+ assert_in_delta 0.04, Time.now - start, 0.01
+
+ c.work_queue.add { c.close }
+ t.join
+ assert_raises(EOFError) { c.work_queue.add { } }
+ end
end
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org