You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/03/23 15:09:02 UTC

[8/8] qpid-proton git commit: PROTON-1778: [ruby] thread safe work_queue

PROTON-1778: [ruby] thread safe work_queue


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/aa8d3727
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/aa8d3727
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/aa8d3727

Branch: refs/heads/master
Commit: aa8d372723ec3fe8de0d16c165ed5944b65c40f0
Parents: 02f4955
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 22 17:19:45 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 11:08:27 2018 -0400

----------------------------------------------------------------------
 proton-c/bindings/ruby/lib/core/connection.rb  |  4 ++
 proton-c/bindings/ruby/lib/core/container.rb   | 33 ++++++++----
 proton-c/bindings/ruby/lib/core/endpoint.rb    |  3 ++
 proton-c/bindings/ruby/lib/core/work_queue.rb  | 59 +++++++++++++++++++++
 proton-c/bindings/ruby/lib/util/schedule.rb    | 24 +++++++--
 proton-c/bindings/ruby/tests/test_container.rb | 27 +++++++++-
 6 files changed, 135 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/connection.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection.rb b/proton-c/bindings/ruby/lib/core/connection.rb
index 327be10..c0d161e 100644
--- a/proton-c/bindings/ruby/lib/core/connection.rb
+++ b/proton-c/bindings/ruby/lib/core/connection.rb
@@ -288,6 +288,10 @@ module Qpid::Proton
       @link_prefix + "/" +  (@link_count += 1).to_s(32)
     end
 
+    # @return [WorkQueue] work queue for code that should be run in the thread
+    # context for this connection
+    attr_reader :work_queue
+
     protected
 
     def _local_condition

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index c581d34..78f8013 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -19,6 +19,7 @@
 require 'thread'
 require 'set'
 require_relative 'listener'
+require_relative 'work_queue'
 
 module Qpid::Proton
   public
@@ -193,7 +194,7 @@ module Qpid::Proton
         raise StoppedError if @stopped
       end
       while task = @work.pop
-        run_one task
+        run_one(task, Time.now)
       end
       raise @panic if @panic
     ensure
@@ -242,23 +243,36 @@ module Qpid::Proton
     # Schedule code to be executed after a delay.
     # @param delay [Numeric] delay in seconds, must be >= 0
     # @yield [ ] the block is invoked with no parameters in a {#run} thread after +delay+ has elapsed
-    def schedule(delay, &block)
-      delay >= 0.0 or raise ArgumentError, "delay=#{delay} must be >= 0"
-      block_given? or raise ArgumentError, "no block given"
+    # @return [void]
+    # @raise [ThreadError] if +non_block+ is true and the operation would block
+    def schedule(delay, non_block=false, &block)
       not_stopped
-      @lock.synchronize { @active += 1 } if @schedule.add(Time.now + delay, &block)
+      @lock.synchronize { @active += 1 } if @schedule.add(Time.now + delay, non_block, &block)
       @wake.wake
     end
 
     private
 
+    def wake() @wake.wake; end
+
     # Container driver applies options and adds container context to events
     class ConnectionTask < Qpid::Proton::HandlerDriver
+      include TimeCompare
+
       def initialize container, io, opts, server=false
         super io, opts[:handler]
         transport.set_server if server
         transport.apply opts
         connection.apply opts
+        @work_queue = WorkQueue.new container
+        connection.instance_variable_set(:@work_queue, @work_queue)
+      end
+      def next_tick() earliest(super, @work_queue.send(:next_tick)); end
+      def process(now) @work_queue.send(:process, now); super(); end
+
+      def dispatch              # Intercept dispatch to close work_queue
+        super
+        @work_queue.send(:close) if read_closed? && write_closed? 
       end
     end
 
@@ -341,7 +355,7 @@ module Qpid::Proton
     end
 
     # Handle a single item from the @work queue, this is the heart of the #run loop.
-    def run_one(task)
+    def run_one(task, now)
       case task
 
       when :start
@@ -359,10 +373,9 @@ module Qpid::Proton
           end
         end
 
-        now = Time.now
         timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
         r, w = IO.select(r, w, nil, timeout)
-        now = Time.now
+        now = Time.now unless timeout == 0
         @wake.reset if r && r.delete(@wake)
 
         # selected is a Set to eliminate duplicates between r, w and next_tick due.
@@ -387,7 +400,7 @@ module Qpid::Proton
         @work << :select        # Enable next select
 
       when ConnectionTask then
-        maybe_panic { task.process }
+        maybe_panic { task.process now }
         rearm task
 
       when ListenTask then
@@ -396,7 +409,7 @@ module Qpid::Proton
         rearm task
 
       when :schedule then
-        if maybe_panic { @schedule.process Time.now }
+        if maybe_panic { @schedule.process now }
           @lock.synchronize { @active -= 1; check_stop_lh }
         else
           @lock.synchronize { @schedule_working = false }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/endpoint.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/endpoint.rb b/proton-c/bindings/ruby/lib/core/endpoint.rb
index 86d8efe..77372a0 100644
--- a/proton-c/bindings/ruby/lib/core/endpoint.rb
+++ b/proton-c/bindings/ruby/lib/core/endpoint.rb
@@ -67,6 +67,9 @@ module Qpid::Proton
       self.connection.transport
     end
 
+    # @return [WorkQueue] the work queue for work on this endpoint.
+    def work_queue() connection.work_queue; end
+
     # @private
     # @return [Bool] true if {#state} has all the bits of `mask` set
     def check_state(mask) (self.state & mask) == mask; end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/work_queue.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/work_queue.rb b/proton-c/bindings/ruby/lib/core/work_queue.rb
new file mode 100644
index 0000000..d08d883
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/core/work_queue.rb
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+module Qpid::Proton
+
+  # A queue of work items to be executed, possibly in a different thread.
+  class WorkQueue
+
+    # Add code to be executed by the WorkQueue immediately.
+    # @param non_block [Boolean] if true raise {ThreadError} if the operation would block.
+    # @yield [ ] the block will be invoked with no parameters in the {WorkQueue} context,
+    #  which may be a different thread.
+    # @return [void]
+    # @raise [ThreadError] if +non_block+ is true and the operation would block
+    # @raise [EOFError] if the queue is closed and cannot accept more work
+    def add(non_block=false, &block)
+      @schedule.add(Time.at(0), non_block, &block)
+      @container.send :wake
+    end
+
+    # Schedule work to be executed by the WorkQueue after a delay.
+    # Note that tasks scheduled after the WorkQueue closes will be silently dropped
+    #
+    # @param delay delay in seconds until the block is added to the queue.
+    # @param (see #add)
+    # @yield (see #add)
+    # @return [void]
+    # @raise (see #add)
+    def schedule(delay, non_block=false, &block)
+      @schedule.add(Time.now + delay, non_block, &block)
+      @container.send :wake
+    end
+
+    private
+
+    def initialize(container)
+      @schedule = Schedule.new
+      @container = container
+    end
+
+    def close() @schedule.close; end
+    def process(now) @schedule.process(now); end
+    def next_tick() @schedule.next_tick; end
+  end
+end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/util/schedule.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/schedule.rb b/proton-c/bindings/ruby/lib/util/schedule.rb
index ef80c1b..f1bfb36 100644
--- a/proton-c/bindings/ruby/lib/util/schedule.rb
+++ b/proton-c/bindings/ruby/lib/util/schedule.rb
@@ -33,13 +33,23 @@ module Qpid::Proton
     include TimeCompare
     Item = Struct.new(:time, :proc)
 
-    def initialize() @lock = Mutex.new; @items = []; end
+    def initialize()
+      @lock = Mutex.new
+      @items = []
+      @closed = false
+    end
 
-    def next_tick() @lock.synchronize { @items.empty? ? nil : @items.first.time } end
+    def next_tick()
+      @lock.synchronize { @items.first.time unless @items.empty? }
+    end
 
     # @return true if the Schedule was previously empty
-    def add(time, &proc)
+    # @raise EOFError if schedule is closed
+    # @raise ThreadError if +non_block+ and operation would block
+    def add(time, non_block=false, &proc)
+      # non_block ignored for now, but we may implement a bounded schedule in future.
       @lock.synchronize do
+        raise EOFError if @closed
         if at = (0...@items.size).bsearch { |i| @items[i].time > time }
           @items.insert(at, Item.new(time, proc))
         else
@@ -53,11 +63,17 @@ module Qpid::Proton
     def process(now)
       due = []
       empty = @lock.synchronize do
-        due << @items.shift while !@items.empty? && before_eq(@items.first.time, now)
+        due << @items.shift while (!@items.empty? && before_eq(@items.first.time, now))
         @items.empty?
       end
       due.each { |i| i.proc.call() }
       return empty && !due.empty?
     end
+
+    # #add raises EOFError after #close.
+    # #process can still be called to drain the schedule.
+    def close()
+      @lock.synchronize { @closed = true }
+    end
   end
 end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index c9f54cc..7f89205 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -336,7 +336,32 @@ class ContainerTest < MiniTest::Test
     delays.sort.each do |d|
       x = a.shift
       assert_equal d, x[0]
-      assert_in_delta  start + d, x[1], 0.01, "#{d}"
+      assert_in_delta  start + d, x[1], 0.01
     end
   end
+
+  def test_work_queue
+    cont = ServerContainer.new(__method__, {}, 1)
+    c = cont.connect(cont.url)
+    t = Thread.new { cont.run }
+    q = Queue.new
+
+    start = Time.now
+    c.work_queue.schedule(0.02) { q << [3, Thread.current] }
+    c.work_queue.add { q << [1, Thread.current] }
+    c.work_queue.schedule(0.04) { q << [4, Thread.current] }
+    c.work_queue.add { q << [2, Thread.current] }
+
+    assert_equal [1, t], q.pop
+    assert_equal [2, t], q.pop
+    assert_in_delta  0.0, Time.now - start, 0.01
+    assert_equal [3, t], q.pop
+    assert_in_delta  0.02, Time.now - start, 0.01
+    assert_equal [4, t], q.pop
+    assert_in_delta  0.04, Time.now - start, 0.01
+
+    c.work_queue.add { c.close }
+    t.join
+    assert_raises(EOFError) { c.work_queue.add {  } }
+  end
 end


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org