You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/04/10 21:17:33 UTC
[39/55] [abbrv] qpid-proton git commit: NO-JIRA: [ruby] Re-organize
Container methods public first.
NO-JIRA: [ruby] Re-organize Container methods public first.
Remove protected section, no longer needed
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d65528c0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d65528c0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d65528c0
Branch: refs/heads/go1
Commit: d65528c01cc3d3a82527fca4dd150cb10feaf220
Parents: d37c32c
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 22 09:15:24 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 09:39:39 2018 -0400
----------------------------------------------------------------------
proton-c/bindings/ruby/lib/core/container.rb | 316 +++++++++++-----------
1 file changed, 156 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d65528c0/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index f8ff032..2d920b4 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -29,163 +29,6 @@ module Qpid::Proton
# One or more threads can call {#run}, events generated by all the listeners and
# connections will be dispatched in the {#run} threads.
class Container
- private
-
- # Container driver applies options and adds container context to events
- class ConnectionTask < Qpid::Proton::HandlerDriver
- def initialize container, io, opts, server=false
- super io, opts[:handler]
- transport.set_server if server
- transport.apply opts
- connection.apply opts
- end
- end
-
- class ListenTask < Listener
-
- def initialize(io, handler, container)
- super
- @closing = @closed = nil
- env = ENV['PN_TRACE_EVT']
- if env && ["true", "1", "yes", "on"].include?(env.downcase)
- @log_prefix = "[0x#{object_id.to_s(16)}](PN_LISTENER_"
- else
- @log_prefix = nil
- end
- dispatch(:on_open);
- end
-
- def process
- return if @closed
- unless @closing
- begin
- return @io.accept, dispatch(:on_accept)
- rescue IO::WaitReadable, Errno::EINTR
- rescue IOError, SystemCallError => e
- close e
- end
- end
- ensure
- if @closing
- @io.close rescue nil
- @closed = true
- dispatch(:on_error, @condition) if @condition
- dispatch(:on_close)
- end
- end
-
- def can_read?() !finished?; end
- def can_write?() false; end
- def finished?() @closed; end
-
- def dispatch(method, *args)
- # TODO aconway 2017-11-27: better logging
- STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix
- @handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method)
- end
- end
-
- # Selectable object that can be used to wake IO.select from another thread
- class SelectWaker
- def initialize
- @rd, @wr = IO.pipe
- @lock = Mutex.new
- @set = false
- end
-
- def to_io() @rd; end
-
- def wake
- @lock.synchronize do
- return if @set # Don't write if already has data
- @set = true
- begin @wr.write_nonblock('x') rescue IO::WaitWritable end
- end
- end
-
- def reset
- @lock.synchronize do
- return unless @set
- begin @rd.read_nonblock(1) rescue IO::WaitReadable end
- @set = false
- end
- end
-
- def close
- @rd.close
- @wr.close
- end
- end
-
- def next_tick_due(x, now)
- nt = x.respond_to?(:next_tick) && x.next_tick
- nt && (nt <= now)
- end
-
- def next_tick_min(x, t)
- nt = x.respond_to?(:next_tick) && x.next_tick
- nt if !t || (nt < t)
- end
-
- # Rescue any exception raised by the block and stop the container.
- def maybe_panic
- begin
- yield
- rescue Exception => e
- stop(nil, e)
- end
- end
-
- # Handle a single item from the @work queue, this is the heart of the #run loop.
- def run_one(task)
- case task
-
- when :start
- @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
-
- when Container
- r, w = [@wake], []
- next_tick = nil
- @lock.synchronize do
- @selectable.each do |s|
- r << s if s.send :can_read?
- w << s if s.send :can_write?
- next_tick = next_tick_min(s, next_tick)
- end
- end
- now = Time.now
- timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
- r, w = IO.select(r, w, nil, timeout)
- now = Time.now
- selected = Set.new(r).delete(@wake)
- selected.merge(w) if w
- selected.merge(@selectable.select { |s| next_tick_due(s, now) })
- @wake.reset
- stop_select = nil
- @lock.synchronize do
- if stop_select = @stopped # close everything
- selected += @selectable
- selected.each { |s| s.close @stop_err }
- @wake.close
- end
- @selectable -= selected # Remove selected tasks
- end
- selected.each { |s| @work << s } # Queue up tasks needing #process
- @work << self unless stop_select
-
- when ConnectionTask then
- maybe_panic { task.process }
- rearm task
-
- when ListenTask then
- io, opts = maybe_panic { task.process }
- add(connection_driver(io, opts, true)) if io
- rearm task
- end
- end
-
- public
-
# Error raised if the container is used after {#stop} has been called.
class StoppedError < RuntimeError
def initialize(*args) super("container has been stopped"); end
@@ -261,7 +104,7 @@ module Qpid::Proton
# Number of threads in {#run}
# @return [Bool] {#run} thread count
- def running; @lock.synchronize { @running }; end
+ def running() @lock.synchronize { @running }; end
# Open an AMQP connection.
#
@@ -391,7 +234,160 @@ module Qpid::Proton
@wake.wake
end
- protected
+ private
+
+ # Container driver applies options and adds container context to events
+ class ConnectionTask < Qpid::Proton::HandlerDriver
+ def initialize container, io, opts, server=false
+ super io, opts[:handler]
+ transport.set_server if server
+ transport.apply opts
+ connection.apply opts
+ end
+ end
+
+ class ListenTask < Listener
+
+ def initialize(io, handler, container)
+ super
+ @closing = @closed = nil
+ env = ENV['PN_TRACE_EVT']
+ if env && ["true", "1", "yes", "on"].include?(env.downcase)
+ @log_prefix = "[0x#{object_id.to_s(16)}](PN_LISTENER_"
+ else
+ @log_prefix = nil
+ end
+ dispatch(:on_open);
+ end
+
+ def process
+ return if @closed
+ unless @closing
+ begin
+ return @io.accept, dispatch(:on_accept)
+ rescue IO::WaitReadable, Errno::EINTR
+ rescue IOError, SystemCallError => e
+ close e
+ end
+ end
+ ensure
+ if @closing
+ @io.close rescue nil
+ @closed = true
+ dispatch(:on_error, @condition) if @condition
+ dispatch(:on_close)
+ end
+ end
+
+ def can_read?() !finished?; end
+ def can_write?() false; end
+ def finished?() @closed; end
+
+ def dispatch(method, *args)
+ # TODO aconway 2017-11-27: better logging
+ STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix
+ @handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method)
+ end
+ end
+
+ # Selectable object that can be used to wake IO.select from another thread
+ class SelectWaker
+ def initialize
+ @rd, @wr = IO.pipe
+ @lock = Mutex.new
+ @set = false
+ end
+
+ def to_io() @rd; end
+
+ def wake
+ @lock.synchronize do
+ return if @set # Don't write if already has data
+ @set = true
+ begin @wr.write_nonblock('x') rescue IO::WaitWritable end
+ end
+ end
+
+ def reset
+ @lock.synchronize do
+ return unless @set
+ begin @rd.read_nonblock(1) rescue IO::WaitReadable end
+ @set = false
+ end
+ end
+
+ def close
+ @rd.close
+ @wr.close
+ end
+ end
+
+ # Handle a single item from the @work queue, this is the heart of the #run loop.
+ def run_one(task)
+ case task
+
+ when :start
+ @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
+
+ when Container
+ r, w = [@wake], []
+ next_tick = nil
+ @lock.synchronize do
+ @selectable.each do |s|
+ r << s if s.send :can_read?
+ w << s if s.send :can_write?
+ next_tick = next_tick_min(s, next_tick)
+ end
+ end
+ now = Time.now
+ timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
+ r, w = IO.select(r, w, nil, timeout)
+ now = Time.now
+ selected = Set.new(r).delete(@wake)
+ selected.merge(w) if w
+ selected.merge(@selectable.select { |s| next_tick_due(s, now) })
+ @wake.reset
+ stop_select = nil
+ @lock.synchronize do
+ if stop_select = @stopped # close everything
+ selected += @selectable
+ selected.each { |s| s.close @stop_err }
+ @wake.close
+ end
+ @selectable -= selected # Remove selected tasks
+ end
+ selected.each { |s| @work << s } # Queue up tasks needing #process
+ @work << self unless stop_select
+
+ when ConnectionTask then
+ maybe_panic { task.process }
+ rearm task
+
+ when ListenTask then
+ io, opts = maybe_panic { task.process }
+ add(connection_driver(io, opts, true)) if io
+ rearm task
+ end
+ end
+
+ def next_tick_due(x, now)
+ nt = x.respond_to?(:next_tick) && x.next_tick
+ nt && (nt <= now)
+ end
+
+ def next_tick_min(x, t)
+ nt = x.respond_to?(:next_tick) && x.next_tick
+ nt if !t || (nt < t)
+ end
+
+ # Rescue any exception raised by the block and stop the container.
+ def maybe_panic
+ begin
+ yield
+ rescue Exception => e
+ stop(nil, e)
+ end
+ end
# Normally if we add work we need to set a wakeup to ensure a single #run
# thread doesn't get stuck in select while there is other work on the queue.
@@ -439,7 +435,7 @@ module Qpid::Proton
end
end
- def not_stopped; raise StoppedError if @lock.synchronize { @stopped }; end
+ def not_stopped() raise StoppedError if @lock.synchronize { @stopped }; end
end
end
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org