You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/03/23 15:09:00 UTC

[6/8] qpid-proton git commit: NO-JIRA: [ruby] Re-organize Container methods public first.

NO-JIRA: [ruby] Re-organize Container methods public first.

Remove protected section, no longer needed


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d65528c0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d65528c0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d65528c0

Branch: refs/heads/master
Commit: d65528c01cc3d3a82527fca4dd150cb10feaf220
Parents: d37c32c
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 22 09:15:24 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 09:39:39 2018 -0400

----------------------------------------------------------------------
 proton-c/bindings/ruby/lib/core/container.rb | 316 +++++++++++-----------
 1 file changed, 156 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d65528c0/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index f8ff032..2d920b4 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -29,163 +29,6 @@ module Qpid::Proton
   # One or more threads can call {#run}, events generated by all the listeners and
   # connections will be dispatched in the {#run} threads.
   class Container
-    private
-
-    # Container driver applies options and adds container context to events
-    class ConnectionTask < Qpid::Proton::HandlerDriver
-      def initialize container, io, opts, server=false
-        super io, opts[:handler]
-        transport.set_server if server
-        transport.apply opts
-        connection.apply opts
-      end
-    end
-
-    class ListenTask < Listener
-
-      def initialize(io, handler, container)
-        super
-        @closing = @closed = nil
-        env = ENV['PN_TRACE_EVT']
-        if env && ["true", "1", "yes", "on"].include?(env.downcase)
-          @log_prefix = "[0x#{object_id.to_s(16)}](PN_LISTENER_"
-        else
-          @log_prefix = nil
-        end
-        dispatch(:on_open);
-      end
-
-      def process
-        return if @closed
-        unless @closing
-          begin
-            return @io.accept, dispatch(:on_accept)
-          rescue IO::WaitReadable, Errno::EINTR
-          rescue IOError, SystemCallError => e
-            close e
-          end
-        end
-      ensure
-        if @closing
-          @io.close rescue nil
-          @closed = true
-          dispatch(:on_error, @condition) if @condition
-          dispatch(:on_close)
-        end
-      end
-
-      def can_read?() !finished?; end
-      def can_write?() false; end
-      def finished?() @closed; end
-
-      def dispatch(method, *args)
-        # TODO aconway 2017-11-27: better logging
-        STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix
-        @handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method)
-      end
-    end
-
-    # Selectable object that can be used to wake IO.select from another thread
-    class SelectWaker
-      def initialize
-        @rd, @wr = IO.pipe
-        @lock = Mutex.new
-        @set = false
-      end
-
-      def to_io() @rd; end
-
-      def wake
-        @lock.synchronize do
-          return if @set        # Don't write if already has data
-          @set = true
-          begin @wr.write_nonblock('x') rescue IO::WaitWritable end
-        end
-      end
-
-      def reset
-        @lock.synchronize do
-          return unless @set
-          begin @rd.read_nonblock(1) rescue IO::WaitReadable end
-          @set = false
-        end
-      end
-
-      def close
-        @rd.close
-        @wr.close
-      end
-    end
-
-    def next_tick_due(x, now)
-      nt = x.respond_to?(:next_tick) && x.next_tick
-      nt && (nt <= now)
-    end
-
-    def next_tick_min(x, t)
-      nt = x.respond_to?(:next_tick) && x.next_tick
-      nt if !t || (nt < t)
-    end
-
-    # Rescue any exception raised by the block and stop the container.
-    def maybe_panic
-      begin
-        yield
-      rescue Exception => e
-        stop(nil, e)
-      end
-    end
-
-    # Handle a single item from the @work queue, this is the heart of the #run loop.
-    def run_one(task)
-      case task
-
-      when :start
-        @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
-
-      when Container
-        r, w = [@wake], []
-        next_tick = nil
-        @lock.synchronize do
-          @selectable.each do |s|
-            r << s if s.send :can_read?
-            w << s if s.send :can_write?
-            next_tick = next_tick_min(s, next_tick)
-          end
-        end
-        now = Time.now
-        timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
-        r, w = IO.select(r, w, nil, timeout)
-        now = Time.now
-        selected = Set.new(r).delete(@wake)
-        selected.merge(w) if w
-        selected.merge(@selectable.select { |s| next_tick_due(s, now) })
-        @wake.reset
-        stop_select = nil
-        @lock.synchronize do
-          if stop_select = @stopped # close everything
-            selected += @selectable
-            selected.each { |s| s.close @stop_err }
-            @wake.close
-          end
-          @selectable -= selected # Remove selected tasks
-        end
-        selected.each { |s| @work << s } # Queue up tasks needing #process
-        @work << self unless stop_select
-
-      when ConnectionTask then
-        maybe_panic { task.process }
-        rearm task
-
-      when ListenTask then
-        io, opts = maybe_panic { task.process }
-        add(connection_driver(io, opts, true)) if io
-        rearm task
-      end
-    end
-
-    public
-
     # Error raised if the container is used after {#stop} has been called.
     class StoppedError < RuntimeError
       def initialize(*args) super("container has been stopped"); end
@@ -261,7 +104,7 @@ module Qpid::Proton
 
     # Number of threads in {#run}
     # @return [Bool] {#run} thread count
-    def running; @lock.synchronize { @running }; end
+    def running() @lock.synchronize { @running }; end
 
     # Open an AMQP connection.
     #
@@ -391,7 +234,160 @@ module Qpid::Proton
       @wake.wake
     end
 
-    protected
+    private
+
+    # Container driver applies options and adds container context to events
+    class ConnectionTask < Qpid::Proton::HandlerDriver
+      def initialize container, io, opts, server=false
+        super io, opts[:handler]
+        transport.set_server if server
+        transport.apply opts
+        connection.apply opts
+      end
+    end
+
+    class ListenTask < Listener
+
+      def initialize(io, handler, container)
+        super
+        @closing = @closed = nil
+        env = ENV['PN_TRACE_EVT']
+        if env && ["true", "1", "yes", "on"].include?(env.downcase)
+          @log_prefix = "[0x#{object_id.to_s(16)}](PN_LISTENER_"
+        else
+          @log_prefix = nil
+        end
+        dispatch(:on_open);
+      end
+
+      def process
+        return if @closed
+        unless @closing
+          begin
+            return @io.accept, dispatch(:on_accept)
+          rescue IO::WaitReadable, Errno::EINTR
+          rescue IOError, SystemCallError => e
+            close e
+          end
+        end
+      ensure
+        if @closing
+          @io.close rescue nil
+          @closed = true
+          dispatch(:on_error, @condition) if @condition
+          dispatch(:on_close)
+        end
+      end
+
+      def can_read?() !finished?; end
+      def can_write?() false; end
+      def finished?() @closed; end
+
+      def dispatch(method, *args)
+        # TODO aconway 2017-11-27: better logging
+        STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix
+        @handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method)
+      end
+    end
+
+    # Selectable object that can be used to wake IO.select from another thread
+    class SelectWaker
+      def initialize
+        @rd, @wr = IO.pipe
+        @lock = Mutex.new
+        @set = false
+      end
+
+      def to_io() @rd; end
+
+      def wake
+        @lock.synchronize do
+          return if @set        # Don't write if already has data
+          @set = true
+          begin @wr.write_nonblock('x') rescue IO::WaitWritable end
+        end
+      end
+
+      def reset
+        @lock.synchronize do
+          return unless @set
+          begin @rd.read_nonblock(1) rescue IO::WaitReadable end
+          @set = false
+        end
+      end
+
+      def close
+        @rd.close
+        @wr.close
+      end
+    end
+
+    # Handle a single item from the @work queue, this is the heart of the #run loop.
+    def run_one(task)
+      case task
+
+      when :start
+        @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
+
+      when Container
+        r, w = [@wake], []
+        next_tick = nil
+        @lock.synchronize do
+          @selectable.each do |s|
+            r << s if s.send :can_read?
+            w << s if s.send :can_write?
+            next_tick = next_tick_min(s, next_tick)
+          end
+        end
+        now = Time.now
+        timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
+        r, w = IO.select(r, w, nil, timeout)
+        now = Time.now
+        selected = Set.new(r).delete(@wake)
+        selected.merge(w) if w
+        selected.merge(@selectable.select { |s| next_tick_due(s, now) })
+        @wake.reset
+        stop_select = nil
+        @lock.synchronize do
+          if stop_select = @stopped # close everything
+            selected += @selectable
+            selected.each { |s| s.close @stop_err }
+            @wake.close
+          end
+          @selectable -= selected # Remove selected tasks
+        end
+        selected.each { |s| @work << s } # Queue up tasks needing #process
+        @work << self unless stop_select
+
+      when ConnectionTask then
+        maybe_panic { task.process }
+        rearm task
+
+      when ListenTask then
+        io, opts = maybe_panic { task.process }
+        add(connection_driver(io, opts, true)) if io
+        rearm task
+      end
+    end
+
+    def next_tick_due(x, now)
+      nt = x.respond_to?(:next_tick) && x.next_tick
+      nt && (nt <= now)
+    end
+
+    def next_tick_min(x, t)
+      nt = x.respond_to?(:next_tick) && x.next_tick
+      nt if !t || (nt < t)
+    end
+
+    # Rescue any exception raised by the block and stop the container.
+    def maybe_panic
+      begin
+        yield
+      rescue Exception => e
+        stop(nil, e)
+      end
+    end
 
     # Normally if we add work we need to set a wakeup to ensure a single #run
     # thread doesn't get stuck in select while there is other work on the queue.
@@ -439,7 +435,7 @@ module Qpid::Proton
       end
     end
 
-    def not_stopped; raise StoppedError if @lock.synchronize { @stopped }; end
+    def not_stopped() raise StoppedError if @lock.synchronize { @stopped }; end
 
   end
 end


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org