You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/03/16 12:50:22 UTC

[2/2] qpid-proton git commit: PROTON-1782: [ruby] More efficient wake mechanism

PROTON-1782: [ruby] More efficient wake mechanism

Writing a pipe on every wake is inefficient, use a Mutex to write only when needed.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e9bc2125
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e9bc2125
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e9bc2125

Branch: refs/heads/master
Commit: e9bc2125f649c6de5c71bde615212fb147465701
Parents: 561dcf5
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 15 11:00:30 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Mar 15 19:14:12 2018 -0400

----------------------------------------------------------------------
 proton-c/bindings/ruby/lib/core/container.rb | 56 +++++++++++++++++------
 1 file changed, 41 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9bc2125/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index f262ea1..3b5d23f 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -85,6 +85,38 @@ module Qpid::Proton
       end
     end
 
+    # Selectable object that can be used to wake IO.select from another thread
+    class SelectWaker
+      def initialize
+        @rd, @wr = IO.pipe
+        @lock = Mutex.new
+        @set = false
+      end
+
+      def to_io() @rd; end
+
+      def wake
+        @lock.synchronize do
+          return if @set        # Don't write if already has data
+          @set = true
+          begin @wr.write_nonblock('x') rescue IO::WaitWritable end
+        end
+      end
+
+      def reset
+        @lock.synchronize do
+          return unless @set
+          begin @rd.read_nonblock(1) rescue IO::WaitReadable end
+          @set = false
+        end
+      end
+
+      def close
+        @rd.close
+        @wr.close
+      end
+    end
+
     public
 
     # Error raised if the container is used after {#stop} has been called.
@@ -127,7 +159,7 @@ module Qpid::Proton
       @work = Queue.new
       @work << :start
       @work << self             # Issue start and start start selecting
-      @wake = IO.pipe           # Wakes #run thread in IO.select
+      @wake = SelectWaker.new   # Wakes #run thread in IO.select
       @auto_stop = true         # Stop when @active drops to 0
 
       # Following instance variables protected by lock
@@ -237,7 +269,7 @@ module Qpid::Proton
           @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
 
         when Container
-          r, w = [@wake[0]], []
+          r, w = [@wake], []
           @lock.synchronize do
             @selectable.each do |s|
               r << s if s.send :can_read?
@@ -246,13 +278,13 @@ module Qpid::Proton
           end
           r, w = IO.select(r, w)
           selected = Set.new(r).merge(w)
-          drain_wake if selected.delete?(@wake[0])
+          @wake.reset if selected.delete?(@wake)
           stop_select = nil
           @lock.synchronize do
             if stop_select = @stopped # close everything
               selected += @selectable
               selected.each { |s| s.close @stop_err }
-              @wake.each { |fd| fd.close() }
+              @wake.close
             end
             @selectable -= selected # Remove selected tasks
           end
@@ -304,22 +336,16 @@ module Qpid::Proton
         # - no more select calls after next wakeup
         # - once @active == 0, all threads will be stopped with nil
       end
-      wake
+      @wake.wake
     end
 
     protected
 
-    def wake; @wake[1].write_nonblock('x') rescue nil; end
-
     # Normally if we add work we need to set a wakeup to ensure a single #run
     # thread doesn't get stuck in select while there is other work on the queue.
-    def work_wake(task) @work << task; wake; end
-
-    def drain_wake
-      begin
-        @wake[0].read_nonblock(256) while true
-      rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
-      end
+    def work_wake(task)
+      @work << task
+      @wake.wake
     end
 
     def connection_driver(io, opts=nil, server=false)
@@ -350,7 +376,7 @@ module Qpid::Proton
           @selectable << task
         end
       end
-      wake
+      @wake.wake
     end
 
     def check_stop_lh


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org