You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/03/23 15:08:55 UTC

[1/8] qpid-proton git commit: NO-JIRA: [cpp] remove out-of-date FIXME comment.

Repository: qpid-proton
Updated Branches:
  refs/heads/master fbfdeb784 -> aa8d37272


NO-JIRA: [cpp] remove out-of-date FIXME comment.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a65a1751
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a65a1751
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a65a1751

Branch: refs/heads/master
Commit: a65a17517cadc62a629f004e6ba5a9ac60cf5f71
Parents: fbfdeb7
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 22 13:19:25 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 09:27:41 2018 -0400

----------------------------------------------------------------------
 proton-c/bindings/cpp/src/container_test.cpp | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a65a1751/proton-c/bindings/cpp/src/container_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_test.cpp b/proton-c/bindings/cpp/src/container_test.cpp
index 8f9075f..5bfaa28 100644
--- a/proton-c/bindings/cpp/src/container_test.cpp
+++ b/proton-c/bindings/cpp/src/container_test.cpp
@@ -364,7 +364,6 @@ int test_container_mt_stop() {
     return 0;
 }
 
-// FIXME aconway 2018-01-04: test busy stop from other thread
 #endif
 
 } // namespace


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/8] qpid-proton git commit: NO-JIRA: [ruby] Removed unused Timeout module.

Posted by ac...@apache.org.
NO-JIRA: [ruby] Removed unused Timeout module.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1108c4e4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1108c4e4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1108c4e4

Branch: refs/heads/master
Commit: 1108c4e47a439f630d72127f6b00fdd9003cb9b7
Parents: d65528c
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 22 14:46:06 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 09:39:39 2018 -0400

----------------------------------------------------------------------
 proton-c/bindings/ruby/lib/qpid_proton.rb       |  1 -
 proton-c/bindings/ruby/lib/util/timeout.rb      | 49 --------------------
 .../ruby/spec/exception_handling_spec.rb        |  2 +-
 3 files changed, 1 insertion(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1108c4e4/proton-c/bindings/ruby/lib/qpid_proton.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb
index c446a79..c52310b 100644
--- a/proton-c/bindings/ruby/lib/qpid_proton.rb
+++ b/proton-c/bindings/ruby/lib/qpid_proton.rb
@@ -45,7 +45,6 @@ require "util/deprecation"
 require "util/version"
 require "util/error_handler"
 require "util/wrapper"
-require "util/timeout"
 
 # Types
 require "types/type"

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1108c4e4/proton-c/bindings/ruby/lib/util/timeout.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/timeout.rb b/proton-c/bindings/ruby/lib/util/timeout.rb
deleted file mode 100644
index 5e299dd..0000000
--- a/proton-c/bindings/ruby/lib/util/timeout.rb
+++ /dev/null
@@ -1,49 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-module Qpid::Proton::Util
-
-  # Provides methods for converting between milliseconds, seconds
-  # and timeout values.
-  #
-  # @private
-  module Timeout
-
-    def sec_to_millis(s)
-      return (s * 1000).to_int
-    end
-
-    def millis_to_sec(ms)
-      return (ms.to_f / 1000.0).to_int
-    end
-
-    def timeout_to_millis(s)
-      return Cproton::PN_MILLIS_MAX if s.nil?
-
-      return sec_to_millis(s)
-    end
-
-    def millis_to_timeout(ms)
-      return nil if ms == Cproton::PN_MILLIS_MAX
-
-      return millis_to_sec(ms)
-    end
-
-  end
-
-end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1108c4e4/proton-c/bindings/ruby/spec/exception_handling_spec.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/spec/exception_handling_spec.rb b/proton-c/bindings/ruby/spec/exception_handling_spec.rb
index acfb55e..3d65b40 100644
--- a/proton-c/bindings/ruby/spec/exception_handling_spec.rb
+++ b/proton-c/bindings/ruby/spec/exception_handling_spec.rb
@@ -70,7 +70,7 @@ module Qpid
         }.must_raise(Qpid::Proton::ArgumentError)
       end
 
-      it "raises Timeout on PN_TIMEOUT" do
+      it "raises TimeoutError on PN_TIMEOUT" do
         proc {
           @handler.check_for_error(Qpid::Proton::Error::TIMEOUT)
         }.must_raise(Qpid::Proton::TimeoutError)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[4/8] qpid-proton git commit: NO-JIRA: [ruby] move yard/options to standard location .yardopts

Posted by ac...@apache.org.
NO-JIRA: [ruby] move yard/options to standard location .yardopts


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1f682a6c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1f682a6c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1f682a6c

Branch: refs/heads/master
Commit: 1f682a6ceff7d8cf6e7c21f7112019d3b79de995
Parents: 5a34fda
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Mar 21 14:01:01 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 09:39:39 2018 -0400

----------------------------------------------------------------------
 proton-c/bindings/ruby/.yardopts      | 8 ++++++++
 proton-c/bindings/ruby/CMakeLists.txt | 2 +-
 proton-c/bindings/ruby/yard/options   | 8 --------
 3 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1f682a6c/proton-c/bindings/ruby/.yardopts
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/.yardopts b/proton-c/bindings/ruby/.yardopts
new file mode 100644
index 0000000..166989d
--- /dev/null
+++ b/proton-c/bindings/ruby/.yardopts
@@ -0,0 +1,8 @@
+--readme README.rdoc
+--quiet
+--no-progress
+--no-private
+--default-return ""
+--hide-void-return
+--template default
+--api qpid

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1f682a6c/proton-c/bindings/ruby/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/CMakeLists.txt b/proton-c/bindings/ruby/CMakeLists.txt
index 22b0be0..d8f9a44 100644
--- a/proton-c/bindings/ruby/CMakeLists.txt
+++ b/proton-c/bindings/ruby/CMakeLists.txt
@@ -144,7 +144,7 @@ if (YARD_EXE)
   add_custom_command(
     OUTPUT ${bin}/doc
     WORKING_DIRECTORY ${src}
-    COMMAND ${YARD_EXE} -o ${bin}/doc -b ${bin}/.yardoc --yardopts ${src}/yard/options -p ${src}/yard/templates
+    COMMAND ${YARD_EXE} -o ${bin}/doc -b ${bin}/.yardoc --yardopts ${src}/.yardopts -p ${src}/yard/templates
     DEPENDS ${RUBY_SRC}
     )
   add_custom_target(docs-ruby DEPENDS ${bin}/doc)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1f682a6c/proton-c/bindings/ruby/yard/options
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/yard/options b/proton-c/bindings/ruby/yard/options
deleted file mode 100644
index 166989d..0000000
--- a/proton-c/bindings/ruby/yard/options
+++ /dev/null
@@ -1,8 +0,0 @@
---readme README.rdoc
---quiet
---no-progress
---no-private
---default-return ""
---hide-void-return
---template default
---api qpid


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/8] qpid-proton git commit: NO-JIRA: [ruby] Clean up 'ruby -W2' warnings

Posted by ac...@apache.org.
NO-JIRA: [ruby] Clean up 'ruby -W2' warnings


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5a34fda9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5a34fda9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5a34fda9

Branch: refs/heads/master
Commit: 5a34fda9d3844d27c8ef2349246e930e66299593
Parents: a65a175
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 22 10:28:16 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 09:39:39 2018 -0400

----------------------------------------------------------------------
 proton-c/bindings/ruby/lib/core/container.rb   | 1 +
 proton-c/bindings/ruby/lib/core/transport.rb   | 1 +
 proton-c/bindings/ruby/lib/types/array.rb      | 1 +
 proton-c/bindings/ruby/tests/test_container.rb | 4 +++-
 proton-c/bindings/ruby/tests/test_tools.rb     | 1 -
 5 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a34fda9/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index d4fe5d1..f8ff032 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -205,6 +205,7 @@ module Qpid::Proton
     #   concurrently.
     #
     def initialize(*args)
+      @handler, @id, @panic = nil
       case args.size
       when 2 then @handler, @id = args
       when 1 then

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a34fda9/proton-c/bindings/ruby/lib/core/transport.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/transport.rb b/proton-c/bindings/ruby/lib/core/transport.rb
index e629b0c..6a6c4f5 100644
--- a/proton-c/bindings/ruby/lib/core/transport.rb
+++ b/proton-c/bindings/ruby/lib/core/transport.rb
@@ -170,6 +170,7 @@ module Qpid::Proton
     # Creates a new transport instance.
     def initialize(impl = Cproton.pn_transport)
       @impl = impl
+      @ssl = nil
       self.class.store_instance(self, :pn_transport_attachments)
     end
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a34fda9/proton-c/bindings/ruby/lib/types/array.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/types/array.rb b/proton-c/bindings/ruby/lib/types/array.rb
index d7e15e6..62d9274 100644
--- a/proton-c/bindings/ruby/lib/types/array.rb
+++ b/proton-c/bindings/ruby/lib/types/array.rb
@@ -49,6 +49,7 @@ module Qpid::Proton
       # @param descriptor [Object] Optional array descriptor
       def initialize(type, elements=nil, descriptor=nil)
         @type, @descriptor = type, descriptor
+        @proton_array_header = nil
         raise ArgumentError, "no type specified for array" if @type.nil?
         super elements if elements
       end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a34fda9/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index a0c32ed..faa505a 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -32,6 +32,8 @@ class ContainerTest < MiniTest::Test
     send_handler = Class.new(ExceptionMessagingHandler) do
       attr_reader :accepted, :sent
 
+      def initialize() @sent, @accepted = nil; end
+
       def on_sendable(sender)
         unless @sent
           m = Message.new("hello")
@@ -152,7 +154,7 @@ class ContainerTest < MiniTest::Test
   def test_bad_host
     cont = Container.new(__method__)
     assert_raises (SocketError) { cont.listen("badlisten.example.com:999") }
-    assert_raises (SocketError) { c = cont.connect("badconnect.example.com:999") }
+    assert_raises (SocketError) { cont.connect("badconnect.example.com:999") }
   end
 
   # Verify that connection options are sent to the peer

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a34fda9/proton-c/bindings/ruby/tests/test_tools.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_tools.rb b/proton-c/bindings/ruby/tests/test_tools.rb
index 911669b..091322d 100644
--- a/proton-c/bindings/ruby/tests/test_tools.rb
+++ b/proton-c/bindings/ruby/tests/test_tools.rb
@@ -105,7 +105,6 @@ end
 
 # Add port/url to Listener, assuming a TCP socket
 class Qpid::Proton::Listener
-  def port() to_io.addr[1]; end
   def url() "amqp://:#{port}"; end
 end
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[5/8] qpid-proton git commit: PROTON-1803: [ruby] Container support for scheduled tasks

Posted by ac...@apache.org.
PROTON-1803: [ruby] Container support for scheduled tasks


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/02f49551
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/02f49551
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/02f49551

Branch: refs/heads/master
Commit: 02f495510cb9f5e5308393fe8bb8e44df8ecebcd
Parents: 1108c4e
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 22 16:01:05 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 09:39:39 2018 -0400

----------------------------------------------------------------------
 proton-c/bindings/ruby/lib/core/container.rb   | 115 ++++++++++++++++----
 proton-c/bindings/ruby/lib/qpid_proton.rb      |   1 +
 proton-c/bindings/ruby/lib/util/schedule.rb    |  63 +++++++++++
 proton-c/bindings/ruby/tests/test_container.rb |  40 ++++++-
 4 files changed, 194 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index 2d920b4..c581d34 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -21,6 +21,8 @@ require 'set'
 require_relative 'listener'
 
 module Qpid::Proton
+  public
+
   # An AMQP container manages a set of {Listener}s and {Connection}s which
   # contain {#Sender} and {#Receiver} links to transfer messages.  Usually, each
   # AMQP client or server process has a single container for all of its
@@ -29,6 +31,8 @@ module Qpid::Proton
   # One or more threads can call {#run}, events generated by all the listeners and
   # connections will be dispatched in the {#run} threads.
   class Container
+    include TimeCompare
+
     # Error raised if the container is used after {#stop} has been called.
     class StoppedError < RuntimeError
       def initialize(*args) super("container has been stopped"); end
@@ -62,16 +66,17 @@ module Qpid::Proton
 
       # Implementation note:
       #
-      # - #run threads take work from @work
-      # - Each driver and the Container itself is processed by at most one #run thread at a time
-      # - The Container thread does IO.select
+      # - #run threads take work items from @work, process them, and rearm them for select
+      # - work items are: ConnectionTask, ListenTask, :start, :select, :schedule
       # - nil on the @work queue makes a #run thread exit
 
       @work = Queue.new
       @work << :start
-      @work << self             # Issue start and start start selecting
+      @work << :select
       @wake = SelectWaker.new   # Wakes #run thread in IO.select
       @auto_stop = true         # Stop when @active drops to 0
+      @schedule = Schedule.new
+      @schedule_working = false # True if :schedule is on the work queue
 
       # Following instance variables protected by lock
       @lock = Mutex.new
@@ -234,6 +239,17 @@ module Qpid::Proton
       @wake.wake
     end
 
+    # Schedule code to be executed after a delay.
+    # @param delay [Numeric] delay in seconds, must be >= 0
+    # @yield [ ] the block is invoked with no parameters in a {#run} thread after +delay+ has elapsed
+    def schedule(delay, &block)
+      delay >= 0.0 or raise ArgumentError, "delay=#{delay} must be >= 0"
+      block_given? or raise ArgumentError, "no block given"
+      not_stopped
+      @lock.synchronize { @active += 1 } if @schedule.add(Time.now + delay, &block)
+      @wake.wake
+    end
+
     private
 
     # Container driver applies options and adds container context to events
@@ -288,6 +304,8 @@ module Qpid::Proton
         STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix
         @handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method)
       end
+
+      def next_tick() nil; end
     end
 
     # Selectable object that can be used to wake IO.select from another thread
@@ -329,35 +347,44 @@ module Qpid::Proton
       when :start
         @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
 
-      when Container
+      when :select
+        # Compute read/write select sets and minimum next_tick for select timeout
         r, w = [@wake], []
-        next_tick = nil
+        next_tick = @schedule.next_tick
         @lock.synchronize do
           @selectable.each do |s|
             r << s if s.send :can_read?
             w << s if s.send :can_write?
-            next_tick = next_tick_min(s, next_tick)
+            next_tick = earliest(s.next_tick, next_tick)
           end
         end
+
         now = Time.now
         timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
         r, w = IO.select(r, w, nil, timeout)
         now = Time.now
-        selected = Set.new(r).delete(@wake)
+        @wake.reset if r && r.delete(@wake)
+
+        # selected is a Set to eliminate duplicates between r, w and next_tick due.
+        selected = Set.new
+        selected.merge(r) if r
         selected.merge(w) if w
-        selected.merge(@selectable.select { |s| next_tick_due(s, now) })
-        @wake.reset
-        stop_select = nil
         @lock.synchronize do
-          if stop_select = @stopped # close everything
-            selected += @selectable
-            selected.each { |s| s.close @stop_err }
+          if @stopped # close everything
+            @selectable.each { |s| s.close @stop_err; @work << s }
+            @selectable.clear
             @wake.close
+            return
           end
-          @selectable -= selected # Remove selected tasks
+          if !@schedule_working && before_eq(@schedule.next_tick, now)
+            @schedule_working = true
+            @work << :schedule
+          end
+          selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) })
+          @selectable -= selected # Remove selected tasks from @selectable
         end
         selected.each { |s| @work << s } # Queue up tasks needing #process
-        @work << self unless stop_select
+        @work << :select        # Enable next select
 
       when ConnectionTask then
         maybe_panic { task.process }
@@ -367,17 +394,56 @@ module Qpid::Proton
         io, opts = maybe_panic { task.process }
         add(connection_driver(io, opts, true)) if io
         rearm task
-      end
-    end
 
-    def next_tick_due(x, now)
-      nt = x.respond_to?(:next_tick) && x.next_tick
-      nt && (nt <= now)
+      when :schedule then
+        if maybe_panic { @schedule.process Time.now }
+          @lock.synchronize { @active -= 1; check_stop_lh }
+        else
+          @lock.synchronize { @schedule_working = false }
+        end
+      end
     end
 
-    def next_tick_min(x, t)
-      nt = x.respond_to?(:next_tick) && x.next_tick
-      nt if !t || (nt < t)
+    def do_select
+      # Compute the sets to select for read and write, and the minimum next_tick for the timeout
+      r, w = [@wake], []
+      next_tick = nil
+      @lock.synchronize do
+        @selectable.each do |s|
+          r << s if s.can_read?
+          w << s if s.can_write?
+          next_tick = earliest(s.next_tick, next_tick)
+        end
+      end
+      next_tick = earliest(@schedule.next_tick, next_tick)
+
+      # Do the select and queue up all resulting work
+      now = Time.now
+      timeout = next_tick - now if next_tick
+      r, w = (timeout.nil? || timeout > 0) && IO.select(r, w, nil, timeout)
+      @wake.reset
+      selected = Set.new
+      @lock.synchronize do
+        if @stopped
+          @selectable.each { |s| s.close @stop_err; @work << s }
+          @wake.close
+          return
+        end
+        # Check if schedule has items due and is not already working
+        if !@schedule_working && before_eq(@schedule.next_tick, now)
+          @work << :schedule
+          @schedule_working = true
+        end
+        # Eliminate duplicates between r, w and next_tick due.
+        selected.merge(r) if r
+        selected.delete(@wake)
+        selected.merge(w) if w
+        @selectable -= selected
+        selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) })
+        @selectable -= selected
+      end
+      selected.each { |s| @work << s } # Queue up tasks needing #process
+      @work << :select
     end
 
     # Rescue any exception raised by the block and stop the container.
@@ -386,6 +452,7 @@ module Qpid::Proton
         yield
       rescue Exception => e
         stop(nil, e)
+        nil
       end
     end
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/lib/qpid_proton.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb
index c52310b..21a15b9 100644
--- a/proton-c/bindings/ruby/lib/qpid_proton.rb
+++ b/proton-c/bindings/ruby/lib/qpid_proton.rb
@@ -44,6 +44,7 @@ require "core/exceptions"
 require "util/deprecation"
 require "util/version"
 require "util/error_handler"
+require "util/schedule"
 require "util/wrapper"
 
 # Types

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/lib/util/schedule.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/schedule.rb b/proton-c/bindings/ruby/lib/util/schedule.rb
new file mode 100644
index 0000000..ef80c1b
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/util/schedule.rb
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+module Qpid::Proton
+
+  # @private
+  module TimeCompare
+    # t1 <= t2, where nil is treated as "distant future"
+    def before_eq(t1, t2) (t1 && t2) ? (t1 <= t2) : t1; end
+
+    # min(t1, t2) where nil is treated as "distant future"
+    def earliest(t1, t2) before_eq(t1, t2) ? t1 :  t2; end
+  end
+
+  # @private
+  # A sorted, thread-safe list of scheduled Proc.
+  # Note: calls to #process are always serialized, but calls to #add may be concurrent.
+  class Schedule
+    include TimeCompare
+    Item = Struct.new(:time, :proc)
+
+    def initialize() @lock = Mutex.new; @items = []; end
+
+    def next_tick() @lock.synchronize { @items.empty? ? nil : @items.first.time } end
+
+    # @return true if the Schedule was previously empty
+    def add(time, &proc)
+      @lock.synchronize do
+        if at = (0...@items.size).bsearch { |i| @items[i].time > time }
+          @items.insert(at, Item.new(time, proc))
+        else
+          @items << Item.new(time, proc)
+        end
+        return @items.size == 1
+      end
+    end
+
+    # @return true if the Schedule became empty as a result of this call
+    def process(now)
+      due = []
+      empty = @lock.synchronize do
+        due << @items.shift while !@items.empty? && before_eq(@items.first.time, now)
+        @items.empty?
+      end
+      due.each { |i| i.proc.call() }
+      return empty && !due.empty?
+    end
+  end
+end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index faa505a..c9f54cc 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -300,5 +300,43 @@ class ContainerTest < MiniTest::Test
     assert_raises(Container::StoppedError) { cont.run }
     assert_raises(Container::StoppedError) { cont.listen "" }
   end
-end
 
+  # Make sure Container::Scheduler puts tasks in proper order.
+  def test_scheduler
+    a = []
+    s = Schedule.new
+
+    assert_equal true,  s.add(Time.at 3) { a << 3 }
+    assert_equal false, s.process(Time.at 2)      # Should not run
+    assert_equal [], a
+    assert_equal true, s.process(Time.at 3)      # Should run
+    assert_equal [3], a
+
+    a = []
+    assert_equal true, s.add(Time.at 3) { a << 3 }
+    assert_equal false, s.add(Time.at 5) { a << 5 }
+    assert_equal false, s.add(Time.at 1) { a << 1 }
+    assert_equal false, s.add(Time.at 4) { a << 4 }
+    assert_equal false, s.add(Time.at 4) { a << 4.1 }
+    assert_equal false, s.add(Time.at 4) { a << 4.2 }
+    assert_equal false, s.process(Time.at 4)
+    assert_equal [1, 3, 4, 4.1, 4.2], a
+    a = []
+    assert_equal true, s.process(Time.at 5)
+    assert_equal [5], a
+  end
+
+  def test_container_schedule
+    c = Container.new __method__
+    delays = [0.1, 0.03, 0.02, 0.04]
+    a = []
+    delays.each { |d| c.schedule(d) { a << [d, Time.now] } }
+    start = Time.now
+    c.run
+    delays.sort.each do |d|
+      x = a.shift
+      assert_equal d, x[0]
+      assert_in_delta  start + d, x[1], 0.01, "#{d}"
+    end
+  end
+end


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[7/8] qpid-proton git commit: NO-JIRA: [ruby] Put external RUBYLIB at end of path for tests

Posted by ac...@apache.org.
NO-JIRA: [ruby] Put external RUBYLIB at end of path for tests

Avoid picking up installed proton packages when testing.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d37c32c1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d37c32c1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d37c32c1

Branch: refs/heads/master
Commit: d37c32c1d631a36574f07800e8f4443e91f1a9d5
Parents: 1f682a6
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Mar 21 14:17:15 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 09:39:39 2018 -0400

----------------------------------------------------------------------
 proton-c/bindings/ruby/CMakeLists.txt | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d37c32c1/proton-c/bindings/ruby/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/CMakeLists.txt b/proton-c/bindings/ruby/CMakeLists.txt
index d8f9a44..f6f95f7 100644
--- a/proton-c/bindings/ruby/CMakeLists.txt
+++ b/proton-c/bindings/ruby/CMakeLists.txt
@@ -105,8 +105,8 @@ install(DIRECTORY lib/ DESTINATION ${RUBY_ARCHLIB_DIR} COMPONENT Ruby)
 
 ## Tests
 
-to_native_path("$ENV{RUBYLIB};${src}/lib;${src}/tests;${src}/spec;${bin};${c_lib_dir}" RUBYLIB)
-to_native_path("$ENV{PATH};${bin};${c_lib_dir}" PATH)
+to_native_path("${src}/lib;${src}/tests;${src}/spec;${bin};${c_lib_dir};$ENV{RUBYLIB}" RUBYLIB)
+to_native_path("${bin};${c_lib_dir};$ENV{PATH}" PATH)
 
 execute_process(COMMAND ${RUBY_EXECUTABLE} -r minitest -e ""
   RESULT_VARIABLE result OUTPUT_QUIET ERROR_QUIET)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[8/8] qpid-proton git commit: PROTON-1778: [ruby] thread safe work_queue

Posted by ac...@apache.org.
PROTON-1778: [ruby] thread safe work_queue


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/aa8d3727
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/aa8d3727
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/aa8d3727

Branch: refs/heads/master
Commit: aa8d372723ec3fe8de0d16c165ed5944b65c40f0
Parents: 02f4955
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 22 17:19:45 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 11:08:27 2018 -0400

----------------------------------------------------------------------
 proton-c/bindings/ruby/lib/core/connection.rb  |  4 ++
 proton-c/bindings/ruby/lib/core/container.rb   | 33 ++++++++----
 proton-c/bindings/ruby/lib/core/endpoint.rb    |  3 ++
 proton-c/bindings/ruby/lib/core/work_queue.rb  | 59 +++++++++++++++++++++
 proton-c/bindings/ruby/lib/util/schedule.rb    | 24 +++++++--
 proton-c/bindings/ruby/tests/test_container.rb | 27 +++++++++-
 6 files changed, 135 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/connection.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection.rb b/proton-c/bindings/ruby/lib/core/connection.rb
index 327be10..c0d161e 100644
--- a/proton-c/bindings/ruby/lib/core/connection.rb
+++ b/proton-c/bindings/ruby/lib/core/connection.rb
@@ -288,6 +288,10 @@ module Qpid::Proton
       @link_prefix + "/" +  (@link_count += 1).to_s(32)
     end
 
+    # @return [WorkQueue] work queue for code that should be run in the thread
+    # context for this connection
+    attr_reader :work_queue
+
     protected
 
     def _local_condition

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index c581d34..78f8013 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -19,6 +19,7 @@
 require 'thread'
 require 'set'
 require_relative 'listener'
+require_relative 'work_queue'
 
 module Qpid::Proton
   public
@@ -193,7 +194,7 @@ module Qpid::Proton
         raise StoppedError if @stopped
       end
       while task = @work.pop
-        run_one task
+        run_one(task, Time.now)
       end
       raise @panic if @panic
     ensure
@@ -242,23 +243,36 @@ module Qpid::Proton
     # Schedule code to be executed after a delay.
     # @param delay [Numeric] delay in seconds, must be >= 0
     # @yield [ ] the block is invoked with no parameters in a {#run} thread after +delay+ has elapsed
-    def schedule(delay, &block)
-      delay >= 0.0 or raise ArgumentError, "delay=#{delay} must be >= 0"
-      block_given? or raise ArgumentError, "no block given"
+    # @return [void]
+    # @raise [ThreadError] if +non_block+ is true and the operation would block
+    def schedule(delay, non_block=false, &block)
       not_stopped
-      @lock.synchronize { @active += 1 } if @schedule.add(Time.now + delay, &block)
+      @lock.synchronize { @active += 1 } if @schedule.add(Time.now + delay, non_block, &block)
       @wake.wake
     end
 
     private
 
+    def wake() @wake.wake; end
+
     # Container driver applies options and adds container context to events
     class ConnectionTask < Qpid::Proton::HandlerDriver
+      include TimeCompare
+
       def initialize container, io, opts, server=false
         super io, opts[:handler]
         transport.set_server if server
         transport.apply opts
         connection.apply opts
+        @work_queue = WorkQueue.new container
+        connection.instance_variable_set(:@work_queue, @work_queue)
+      end
+      def next_tick() earliest(super, @work_queue.send(:next_tick)); end
+      def process(now) @work_queue.send(:process, now); super(); end
+
+      def dispatch              # Intercept dispatch to close work_queue
+        super
+        @work_queue.send(:close) if read_closed? && write_closed? 
       end
     end
 
@@ -341,7 +355,7 @@ module Qpid::Proton
     end
 
     # Handle a single item from the @work queue, this is the heart of the #run loop.
-    def run_one(task)
+    def run_one(task, now)
       case task
 
       when :start
@@ -359,10 +373,9 @@ module Qpid::Proton
           end
         end
 
-        now = Time.now
         timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
         r, w = IO.select(r, w, nil, timeout)
-        now = Time.now
+        now = Time.now unless timeout == 0
         @wake.reset if r && r.delete(@wake)
 
         # selected is a Set to eliminate duplicates between r, w and next_tick due.
@@ -387,7 +400,7 @@ module Qpid::Proton
         @work << :select        # Enable next select
 
       when ConnectionTask then
-        maybe_panic { task.process }
+        maybe_panic { task.process now }
         rearm task
 
       when ListenTask then
@@ -396,7 +409,7 @@ module Qpid::Proton
         rearm task
 
       when :schedule then
-        if maybe_panic { @schedule.process Time.now }
+        if maybe_panic { @schedule.process now }
           @lock.synchronize { @active -= 1; check_stop_lh }
         else
           @lock.synchronize { @schedule_working = false }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/endpoint.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/endpoint.rb b/proton-c/bindings/ruby/lib/core/endpoint.rb
index 86d8efe..77372a0 100644
--- a/proton-c/bindings/ruby/lib/core/endpoint.rb
+++ b/proton-c/bindings/ruby/lib/core/endpoint.rb
@@ -67,6 +67,9 @@ module Qpid::Proton
       self.connection.transport
     end
 
+    # @return [WorkQueue] the work queue for work on this endpoint.
+    def work_queue() connection.work_queue; end
+
     # @private
     # @return [Bool] true if {#state} has all the bits of `mask` set
     def check_state(mask) (self.state & mask) == mask; end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/work_queue.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/work_queue.rb b/proton-c/bindings/ruby/lib/core/work_queue.rb
new file mode 100644
index 0000000..d08d883
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/core/work_queue.rb
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+module Qpid::Proton
+
+  # A queue of work items to be executed, possibly in a different thread.
+  class WorkQueue
+
+    # Add code to be executed by the WorkQueue immediately.
+    # @param non_block [Boolean] if true raise {ThreadError} if the operation would block.
+    # @yield [ ] the block will be invoked with no parameters in the {WorkQueue} context,
+    #  which may be a different thread.
+    # @return [void]
+    # @raise [ThreadError] if +non_block+ is true and the operation would block
+    # @raise [EOFError] if the queue is closed and cannot accept more work
+    def add(non_block=false, &block)
+      @schedule.add(Time.at(0), non_block, &block)
+      @container.send :wake
+    end
+
+    # Schedule work to be executed by the WorkQueue after a delay.
+    # Note that tasks scheduled after the WorkQueue closes will be silently dropped
+    #
+    # @param delay delay in seconds until the block is added to the queue.
+    # @param (see #add)
+    # @yield (see #add)
+    # @return [void]
+    # @raise (see #add)
+    def schedule(delay, non_block=false, &block)
+      @schedule.add(Time.now + delay, non_block, &block)
+      @container.send :wake
+    end
+
+    private
+
+    def initialize(container)
+      @schedule = Schedule.new
+      @container = container
+    end
+
+    def close() @schedule.close; end
+    def process(now) @schedule.process(now); end
+    def next_tick() @schedule.next_tick; end
+  end
+end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/util/schedule.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/schedule.rb b/proton-c/bindings/ruby/lib/util/schedule.rb
index ef80c1b..f1bfb36 100644
--- a/proton-c/bindings/ruby/lib/util/schedule.rb
+++ b/proton-c/bindings/ruby/lib/util/schedule.rb
@@ -33,13 +33,23 @@ module Qpid::Proton
     include TimeCompare
     Item = Struct.new(:time, :proc)
 
-    def initialize() @lock = Mutex.new; @items = []; end
+    def initialize()
+      @lock = Mutex.new
+      @items = []
+      @closed = false
+    end
 
-    def next_tick() @lock.synchronize { @items.empty? ? nil : @items.first.time } end
+    def next_tick()
+      @lock.synchronize { @items.first.time unless @items.empty? }
+    end
 
     # @return true if the Schedule was previously empty
-    def add(time, &proc)
+    # @raise EOFError if schedule is closed
+    # @raise ThreadError if +non_block+ and operation would block
+    def add(time, non_block=false, &proc)
+      # non_block ignored for now, but we may implement a bounded schedule in future.
       @lock.synchronize do
+        raise EOFError if @closed
         if at = (0...@items.size).bsearch { |i| @items[i].time > time }
           @items.insert(at, Item.new(time, proc))
         else
@@ -53,11 +63,17 @@ module Qpid::Proton
     def process(now)
       due = []
       empty = @lock.synchronize do
-        due << @items.shift while !@items.empty? && before_eq(@items.first.time, now)
+        due << @items.shift while (!@items.empty? && before_eq(@items.first.time, now))
         @items.empty?
       end
       due.each { |i| i.proc.call() }
       return empty && !due.empty?
     end
+
+    # #add raises EOFError after #close.
+    # #process can still be called to drain the schedule.
+    def close()
+      @lock.synchronize { @closed = true }
+    end
   end
 end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index c9f54cc..7f89205 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -336,7 +336,32 @@ class ContainerTest < MiniTest::Test
     delays.sort.each do |d|
       x = a.shift
       assert_equal d, x[0]
-      assert_in_delta  start + d, x[1], 0.01, "#{d}"
+      assert_in_delta  start + d, x[1], 0.01
     end
   end
+
+  def test_work_queue
+    cont = ServerContainer.new(__method__, {}, 1)
+    c = cont.connect(cont.url)
+    t = Thread.new { cont.run }
+    q = Queue.new
+
+    start = Time.now
+    c.work_queue.schedule(0.02) { q << [3, Thread.current] }
+    c.work_queue.add { q << [1, Thread.current] }
+    c.work_queue.schedule(0.04) { q << [4, Thread.current] }
+    c.work_queue.add { q << [2, Thread.current] }
+
+    assert_equal [1, t], q.pop
+    assert_equal [2, t], q.pop
+    assert_in_delta  0.0, Time.now - start, 0.01
+    assert_equal [3, t], q.pop
+    assert_in_delta  0.02, Time.now - start, 0.01
+    assert_equal [4, t], q.pop
+    assert_in_delta  0.04, Time.now - start, 0.01
+
+    c.work_queue.add { c.close }
+    t.join
+    assert_raises(EOFError) { c.work_queue.add {  } }
+  end
 end


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[6/8] qpid-proton git commit: NO-JIRA: [ruby] Re-organize Container methods public first.

Posted by ac...@apache.org.
NO-JIRA: [ruby] Re-organize Container methods public first.

Remove protected section, no longer needed


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d65528c0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d65528c0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d65528c0

Branch: refs/heads/master
Commit: d65528c01cc3d3a82527fca4dd150cb10feaf220
Parents: d37c32c
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 22 09:15:24 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 09:39:39 2018 -0400

----------------------------------------------------------------------
 proton-c/bindings/ruby/lib/core/container.rb | 316 +++++++++++-----------
 1 file changed, 156 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d65528c0/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index f8ff032..2d920b4 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -29,163 +29,6 @@ module Qpid::Proton
   # One or more threads can call {#run}, events generated by all the listeners and
   # connections will be dispatched in the {#run} threads.
   class Container
-    private
-
-    # Container driver applies options and adds container context to events
-    class ConnectionTask < Qpid::Proton::HandlerDriver
-      def initialize container, io, opts, server=false
-        super io, opts[:handler]
-        transport.set_server if server
-        transport.apply opts
-        connection.apply opts
-      end
-    end
-
-    class ListenTask < Listener
-
-      def initialize(io, handler, container)
-        super
-        @closing = @closed = nil
-        env = ENV['PN_TRACE_EVT']
-        if env && ["true", "1", "yes", "on"].include?(env.downcase)
-          @log_prefix = "[0x#{object_id.to_s(16)}](PN_LISTENER_"
-        else
-          @log_prefix = nil
-        end
-        dispatch(:on_open);
-      end
-
-      def process
-        return if @closed
-        unless @closing
-          begin
-            return @io.accept, dispatch(:on_accept)
-          rescue IO::WaitReadable, Errno::EINTR
-          rescue IOError, SystemCallError => e
-            close e
-          end
-        end
-      ensure
-        if @closing
-          @io.close rescue nil
-          @closed = true
-          dispatch(:on_error, @condition) if @condition
-          dispatch(:on_close)
-        end
-      end
-
-      def can_read?() !finished?; end
-      def can_write?() false; end
-      def finished?() @closed; end
-
-      def dispatch(method, *args)
-        # TODO aconway 2017-11-27: better logging
-        STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix
-        @handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method)
-      end
-    end
-
-    # Selectable object that can be used to wake IO.select from another thread
-    class SelectWaker
-      def initialize
-        @rd, @wr = IO.pipe
-        @lock = Mutex.new
-        @set = false
-      end
-
-      def to_io() @rd; end
-
-      def wake
-        @lock.synchronize do
-          return if @set        # Don't write if already has data
-          @set = true
-          begin @wr.write_nonblock('x') rescue IO::WaitWritable end
-        end
-      end
-
-      def reset
-        @lock.synchronize do
-          return unless @set
-          begin @rd.read_nonblock(1) rescue IO::WaitReadable end
-          @set = false
-        end
-      end
-
-      def close
-        @rd.close
-        @wr.close
-      end
-    end
-
-    def next_tick_due(x, now)
-      nt = x.respond_to?(:next_tick) && x.next_tick
-      nt && (nt <= now)
-    end
-
-    def next_tick_min(x, t)
-      nt = x.respond_to?(:next_tick) && x.next_tick
-      nt if !t || (nt < t)
-    end
-
-    # Rescue any exception raised by the block and stop the container.
-    def maybe_panic
-      begin
-        yield
-      rescue Exception => e
-        stop(nil, e)
-      end
-    end
-
-    # Handle a single item from the @work queue, this is the heart of the #run loop.
-    def run_one(task)
-      case task
-
-      when :start
-        @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
-
-      when Container
-        r, w = [@wake], []
-        next_tick = nil
-        @lock.synchronize do
-          @selectable.each do |s|
-            r << s if s.send :can_read?
-            w << s if s.send :can_write?
-            next_tick = next_tick_min(s, next_tick)
-          end
-        end
-        now = Time.now
-        timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
-        r, w = IO.select(r, w, nil, timeout)
-        now = Time.now
-        selected = Set.new(r).delete(@wake)
-        selected.merge(w) if w
-        selected.merge(@selectable.select { |s| next_tick_due(s, now) })
-        @wake.reset
-        stop_select = nil
-        @lock.synchronize do
-          if stop_select = @stopped # close everything
-            selected += @selectable
-            selected.each { |s| s.close @stop_err }
-            @wake.close
-          end
-          @selectable -= selected # Remove selected tasks
-        end
-        selected.each { |s| @work << s } # Queue up tasks needing #process
-        @work << self unless stop_select
-
-      when ConnectionTask then
-        maybe_panic { task.process }
-        rearm task
-
-      when ListenTask then
-        io, opts = maybe_panic { task.process }
-        add(connection_driver(io, opts, true)) if io
-        rearm task
-      end
-    end
-
-    public
-
     # Error raised if the container is used after {#stop} has been called.
     class StoppedError < RuntimeError
       def initialize(*args) super("container has been stopped"); end
@@ -261,7 +104,7 @@ module Qpid::Proton
 
     # Number of threads in {#run}
     # @return [Bool] {#run} thread count
-    def running; @lock.synchronize { @running }; end
+    def running() @lock.synchronize { @running }; end
 
     # Open an AMQP connection.
     #
@@ -391,7 +234,160 @@ module Qpid::Proton
       @wake.wake
     end
 
-    protected
+    private
+
+    # Container driver applies options and adds container context to events
+    class ConnectionTask < Qpid::Proton::HandlerDriver
+      def initialize container, io, opts, server=false
+        super io, opts[:handler]
+        transport.set_server if server
+        transport.apply opts
+        connection.apply opts
+      end
+    end
+
+    class ListenTask < Listener
+
+      def initialize(io, handler, container)
+        super
+        @closing = @closed = nil
+        env = ENV['PN_TRACE_EVT']
+        if env && ["true", "1", "yes", "on"].include?(env.downcase)
+          @log_prefix = "[0x#{object_id.to_s(16)}](PN_LISTENER_"
+        else
+          @log_prefix = nil
+        end
+        dispatch(:on_open);
+      end
+
+      def process
+        return if @closed
+        unless @closing
+          begin
+            return @io.accept, dispatch(:on_accept)
+          rescue IO::WaitReadable, Errno::EINTR
+          rescue IOError, SystemCallError => e
+            close e
+          end
+        end
+      ensure
+        if @closing
+          @io.close rescue nil
+          @closed = true
+          dispatch(:on_error, @condition) if @condition
+          dispatch(:on_close)
+        end
+      end
+
+      def can_read?() !finished?; end
+      def can_write?() false; end
+      def finished?() @closed; end
+
+      def dispatch(method, *args)
+        # TODO aconway 2017-11-27: better logging
+        STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix
+        @handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method)
+      end
+    end
+
+    # Selectable object that can be used to wake IO.select from another thread
+    class SelectWaker
+      def initialize
+        @rd, @wr = IO.pipe
+        @lock = Mutex.new
+        @set = false
+      end
+
+      def to_io() @rd; end
+
+      def wake
+        @lock.synchronize do
+          return if @set        # Don't write if already has data
+          @set = true
+          begin @wr.write_nonblock('x') rescue IO::WaitWritable end
+        end
+      end
+
+      def reset
+        @lock.synchronize do
+          return unless @set
+          begin @rd.read_nonblock(1) rescue IO::WaitReadable end
+          @set = false
+        end
+      end
+
+      def close
+        @rd.close
+        @wr.close
+      end
+    end
+
+    # Handle a single item from the @work queue, this is the heart of the #run loop.
+    def run_one(task)
+      case task
+
+      when :start
+        @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
+
+      when Container
+        r, w = [@wake], []
+        next_tick = nil
+        @lock.synchronize do
+          @selectable.each do |s|
+            r << s if s.send :can_read?
+            w << s if s.send :can_write?
+            next_tick = next_tick_min(s, next_tick)
+          end
+        end
+        now = Time.now
+        timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
+        r, w = IO.select(r, w, nil, timeout)
+        now = Time.now
+        selected = Set.new(r).delete(@wake)
+        selected.merge(w) if w
+        selected.merge(@selectable.select { |s| next_tick_due(s, now) })
+        @wake.reset
+        stop_select = nil
+        @lock.synchronize do
+          if stop_select = @stopped # close everything
+            selected += @selectable
+            selected.each { |s| s.close @stop_err }
+            @wake.close
+          end
+          @selectable -= selected # Remove selected tasks
+        end
+        selected.each { |s| @work << s } # Queue up tasks needing #process
+        @work << self unless stop_select
+
+      when ConnectionTask then
+        maybe_panic { task.process }
+        rearm task
+
+      when ListenTask then
+        io, opts = maybe_panic { task.process }
+        add(connection_driver(io, opts, true)) if io
+        rearm task
+      end
+    end
+
+    def next_tick_due(x, now)
+      nt = x.respond_to?(:next_tick) && x.next_tick
+      nt && (nt <= now)
+    end
+
+    def next_tick_min(x, t)
+      nt = x.respond_to?(:next_tick) && x.next_tick
+      nt if !t || (nt < t)
+    end
+
+    # Rescue any exception raised by the block and stop the container.
+    def maybe_panic
+      begin
+        yield
+      rescue Exception => e
+        stop(nil, e)
+      end
+    end
 
     # Normally if we add work we need to set a wakeup to ensure a single #run
     # thread doesn't get stuck in select while there is other work on the queue.
@@ -439,7 +435,7 @@ module Qpid::Proton
       end
     end
 
-    def not_stopped; raise StoppedError if @lock.synchronize { @stopped }; end
+    def not_stopped() raise StoppedError if @lock.synchronize { @stopped }; end
 
   end
 end


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org