You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/11/27 21:10:40 UTC

[1/3] qpid-proton git commit: PROTON-1636: [ruby] fix race condition in ruby container tests

Repository: qpid-proton
Updated Branches:
  refs/heads/master 55afeabaf -> f9d8cc4ac


PROTON-1636: [ruby] fix race condition in ruby container tests

Made "stopped" a terminal state for the Container to avoid races during
shut-down, e.g. because threads have not even entered #run when the container
auto-stops.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f9d8cc4a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f9d8cc4a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f9d8cc4a

Branch: refs/heads/master
Commit: f9d8cc4acdbe11ad58d3a3004abfa42b0e3dd098
Parents: 1bf4a32
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Nov 27 13:56:19 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Nov 27 16:09:47 2017 -0500

----------------------------------------------------------------------
 .../bindings/ruby/lib/core/connection_driver.rb |  14 ++-
 proton-c/bindings/ruby/lib/core/container.rb    | 117 ++++++++++++-------
 proton-c/bindings/ruby/tests/test_container.rb  |  56 +++++----
 3 files changed, 121 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f9d8cc4a/proton-c/bindings/ruby/lib/core/connection_driver.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection_driver.rb b/proton-c/bindings/ruby/lib/core/connection_driver.rb
index 537e308..f757328 100644
--- a/proton-c/bindings/ruby/lib/core/connection_driver.rb
+++ b/proton-c/bindings/ruby/lib/core/connection_driver.rb
@@ -67,6 +67,13 @@ module Qpid
       # Get the next event to dispatch, nil if no events available
       def event() Event::Event.wrap(Cproton.pn_connection_driver_next_event(@impl)); end
 
+      # Iterator for all available events
+      def each_event()
+        while e = event
+          yield e
+        end
+      end
+
       # Non-blocking read from {#io}, generate events for {#event}
       # IO errors are returned as transport errors by {#event}, not raised
       def read
@@ -161,9 +168,7 @@ module Qpid
       # Dispatch all events available from {#event} to {#handler}
       # @param handlers [Enum<Handler::MessagingHandler>]
       def dispatch()
-        while e = event
-          e.dispatch @handler
-        end
+        each_event { |e| e.dispatch @handler }
       end
 
       # Do {#read}, {#tick}, {#write} and {#dispatch} without blocking.
@@ -176,8 +181,9 @@ module Qpid
       def process(now=Time.now)
         read
         next_tick = tick(now)
+        dispatch                # May generate more data to write
         write
-        dispatch
+        dispatch                # Make sure we consume all events
         return next_tick
       end
     end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f9d8cc4a/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index 38dcd74..39ac150 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -58,14 +58,12 @@ module Qpid::Proton
         if env && ["true", "1", "yes", "on"].include?(env.downcase)
           @log_prefix = "[0x#{object_id.to_s(16)}](PN_LISTENER_"
         end
+        dispatch(:on_open);
       end
 
       def process
+        return if @closed
         unless @closing
-          unless @open_dispatched
-            dispatch(:on_open);
-            @open_dispatched = true
-          end
           begin
             return @io.accept, dispatch(:on_accept)
           rescue IO::WaitReadable, Errno::EINTR
@@ -73,20 +71,21 @@ module Qpid::Proton
             close e
           end
         end
+      ensure
         if @closing
           @io.close rescue nil
-          @closing = false
           @closed = true
           dispatch(:on_error, @condition) if @condition
           dispatch(:on_close)
         end
       end
 
-      def can_read?() !@closed; end
+      def can_read?() !finished?; end
       def can_write?() false; end
       def finished?() @closed; end
 
       def dispatch(method, *args)
+        # TODO aconway 2017-11-27: better logging
         STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix
         @handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method)
       end
@@ -94,6 +93,11 @@ module Qpid::Proton
 
     public
 
+    # Error raised if the container is used after {#stop} has been called.
+    class StoppedError < RuntimeError
+      def initialize(*args) super("container has been stopped"); end
+    end
+
     # Create a new Container
     # @overload initialize(id=nil)
     #   @param id [String] A unique ID for this container, use random UUID if nil.
@@ -125,17 +129,17 @@ module Qpid::Proton
       # - nil on the @work queue makes a #run thread exit
 
       @work = Queue.new
-      @work << self             # Let the first #run thread start selecting
+      @work << :on_start << self # Issue on_start and start start selecting
       @wake = IO.pipe           # Wakes #run thread in IO.select
-      @auto_stop = true         # Exit #run when @active drops to 0
+      @auto_stop = true         # Stop when @active drops to 0
 
       # Following instance variables protected by lock
       @lock = Mutex.new
       @active = 0               # All active tasks, in @selectable, @work or being processed
       @selectable = Set.new     # Tasks ready to block in IO.select
       @running = 0              # Count of #run threads
-      @stopping = false         # #stop called, closing tasks
-      @stop_err = nil           # Optional error from #stop
+      @stopped = false          # #stop called
+      @stop_err = nil           # Optional error to pass to tasks, from #stop
     end
 
     # @return [String] unique identifier for this container
@@ -143,15 +147,22 @@ module Qpid::Proton
 
     # Auto-stop flag.
     #
-    # True (the default) means all calls to {#run} will return when the container
-    # transitions from active to inactive - all connections and listeners
-    # have closed.
+    # True (the default) means that the container will stop automatically, as if {#stop}
+    # had been called, when the last listener or connection closes.
     #
     # False means {#run} will not return unless {#stop} is called.
     #
     # @return [Bool] auto-stop state
     attr_accessor :auto_stop
 
+    # True if the container has been stopped and can no longer be used.
+    # @return [Bool] stopped state
+    attr_accessor :stopped
+
+    # Number of threads in {#run}
+    # @return [Bool] {#run} thread count
+    def running; @lock.synchronize { @running }; end
+
     # Open an AMQP connection.
     #
     # @param url [String, URI] Open a {TCPSocket} to url.host, url.port.
@@ -160,6 +171,7 @@ module Qpid::Proton
     # @option (see Connection#open)
     # @return [Connection] The new AMQP connection
     def connect(url, opts = {})
+      not_stopped
       url = Qpid::Proton::uri(url)
       opts[:user] ||= url.user
       opts[:password] ||= url.password
@@ -171,6 +183,7 @@ module Qpid::Proton
     # @param io [IO] An existing {IO} object, e.g. a {TCPSocket}
     # @option (see Connection#open)
     def connect_io(io, opts = {})
+      not_stopped
       cd = connection_driver(io, opts)
       cd.connection.open()
       add(cd)
@@ -185,6 +198,7 @@ module Qpid::Proton
     # @return [Listener] The AMQP listener.
     #
     def listen(url, handler=Listener::Handler.new)
+      not_stopped
       url = Qpid::Proton::uri(url)
       # TODO aconway 2017-11-01: amqps
       listen_io(TCPServer.new(url.host, url.port), handler)
@@ -195,6 +209,7 @@ module Qpid::Proton
     # @param handler [Listener::Handler] Handler for events from this listener
     #
     def listen_io(io, handler=Listener::Handler.new)
+      not_stopped
       l = ListenTask.new(io, handler, self)
       add(l)
       l
@@ -208,23 +223,21 @@ module Qpid::Proton
     # listener, even if the container has multiple threads.
     #
     def run
-      need_on_start = nil
       @lock.synchronize do
-        @running += 1
-        need_on_start = !@on_start_called &&  @handler && @handler.respond_to?(:on_start)
-        @on_start_called = true
+        @running += 1        # Note: ensure clause below will decrement @running
+        raise StoppedError if @stopped
       end
-      if need_on_start
-        # TODO aconway 2017-10-28: proper synthesized event for on_start
-        event = Class.new do
-          def initialize(c) @container = c; end
-          attr_reader :container
-        end.new(self)
-        @handler.on_start(event)
-      end
-
       while task = @work.pop
         case task
+
+        when :on_start
+          # TODO aconway 2017-11-27: proper syntesized events
+          event = Class.new do
+            def initialize(c) @container = c; end
+            attr_reader :container
+          end.new(self)
+          @handler.on_start(event) if @handler.respond_to? :on_start
+
         when Container
           r, w = [@wake[0]], []
           @lock.synchronize do
@@ -236,21 +249,23 @@ module Qpid::Proton
           r, w = IO.select(r, w)
           selected = Set.new(r).merge(w)
           drain_wake if selected.delete?(@wake[0])
+          stop_select = nil
           @lock.synchronize do
-            if @stopping # close everything
+            if stop_select = @stopped # close everything
               selected += @selectable
               selected.each { |s| s.close @stop_err }
+              @wake.each { |fd| fd.close() }
             end
             @selectable -= selected # Remove selected tasks
           end
           selected.each { |s| @work << s } # Queue up tasks needing #process
-          @work << self                    # Allow another thread to select()
+          @work << self unless stop_select
+
         when ConnectionTask then
-          task.close @stop_err if @lock.synchronize { @stopping }
           task.process
           rearm task
-        when Listener then
-          task.close @stop_err if @lock.synchronize { @stopping }
+
+        when ListenTask then
           io, opts = task.process
           add(connection_driver(io, opts, true)) if io
           rearm task
@@ -259,20 +274,31 @@ module Qpid::Proton
       end
     ensure
       @lock.synchronize do
-        @stopping, @stop_err = nil if (@running -= 1).zero? # Last out, reset for next #run
+        @running -= 1
+        work_wake nil if @running > 0         # Tell the next thread to exit
       end
     end
 
-    # Disconnect all listeners and connections without a polite AMQP close sequence.
-    # {#stop} returns immediately, calls to {#run} will return when all activity is finished.
+    # Stop the container.
+    #
+    # Close all listeners and abort all connections without doing AMQP protocol close.
+    #
+    # {#stop} returns immediately, calls to {#run} will return when all activity
+    # is finished.
+    #
+    # The container can no longer be used, using a stopped container raises
+    # {StoppedError} on attempting.  Create a new container if you want to
+    # resume activity.
+    #
     # @param error [Condition] Optional transport/listener error condition
     #
     def stop(error=nil)
       @lock.synchronize do
-        @stopping = true
+        raise StoppedError if @stopped
+        @stopped = true
         @stop_err = Condition.make(error)
         check_stop_lh
-        # NOTE: @stopping =>
+        # NOTE: @stopped =>
         # - no new run threads can join
         # - no more select calls after next wakeup
         # - once @active == 0, all threads will be stopped with nil
@@ -284,6 +310,8 @@ module Qpid::Proton
 
     def wake; @wake[1].write_nonblock('x') rescue nil; end
 
+    # Normally if we add work we need to set a wakeup to ensure a single #run
+    # thread doesn't get stuck in select while there is other work on the queue.
     def work_wake(task) @work << task; wake; end
 
     def drain_wake
@@ -300,8 +328,12 @@ module Qpid::Proton
       ConnectionTask.new(self, io, opts, server)
     end
 
+    # All new tasks are added here
     def add task
-      @lock.synchronize { @active += 1 }
+      @lock.synchronize do
+        @active += 1
+        task.close @stop_err if @stopped
+      end
       work_wake task
     end
 
@@ -310,6 +342,9 @@ module Qpid::Proton
         if task.finished?
           @active -= 1
           check_stop_lh
+        elsif @stopped
+          task.close @stop_err
+          work_wake task
         else
           @selectable << task
         end
@@ -318,10 +353,14 @@ module Qpid::Proton
     end
 
     def check_stop_lh
-      if @active.zero? && (@auto_stop || @stopping)
-        @running.times { @work << nil } # Signal all threads to stop
+      if @active.zero? && (@auto_stop || @stopped)
+        @stopped = true
+        work_wake nil          # Signal threads to stop
         true
       end
     end
+
+    def not_stopped; raise StoppedError if @lock.synchronize { @stopped }; end
+
   end
 end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f9d8cc4a/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index 9384d65..988acfb 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -25,6 +25,9 @@ Message = Qpid::Proton::Message
 SASL = Qpid::Proton::SASL
 Disposition = Qpid::Proton::Disposition
 
+# Easier debugging of thread problems
+Thread::abort_on_exception=true
+
 # Container that listens on a random port
 class TestContainer < Container
 
@@ -84,39 +87,46 @@ class ContainerTest < Minitest::Test
     def on_connection_closing(e) e.connection.close; end
   end
 
-  def test_auto_stop
-    c1 = Container.new
-    c2 = Container.new
-
+  def test_auto_stop_one
     # A listener and a connection
-    t1 = 3.times.collect { Thread.new { c1.run } }
-    l = c1.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new}))
-    c1.connect("amqp://:#{l.to_io.addr[1]}", { :handler => CloseOnOpenHandler.new} )
-    t1.each { |t| assert t.join(1) }
+    c = Container.new
+    threads = 3.times.collect { Thread.new { c.run } }
+    sleep(0.01) while c.running < 3
+    l = c.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new}))
+    c.connect("amqp://:#{l.to_io.addr[1]}", { :handler => CloseOnOpenHandler.new} )
+    threads.each { |t| assert t.join(1) }
+    assert_raises(Container::StoppedError) { c.run }
+  end
 
-    # Connect between different containers, c2 has only a connection
-    t1 = Thread.new { c1.run }
+  def test_auto_stop_two
+    # Connect between different containers
+    c1, c2 = Container.new, Container.new
+    threads = [ Thread.new {c1.run }, Thread.new {c2.run } ]
     l = c1.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new}))
-    t2 = Thread.new {c2.run }
     c2.connect("amqp://:#{l.to_io.addr[1]}", { :handler => CloseOnOpenHandler.new} )
-    assert t2.join(1)
-    assert t1.join(1)
+    assert threads.each { |t| t.join(1) }
+    assert_raises(Container::StoppedError) { c1.run }
+    assert_raises(Container::StoppedError) { c2.connect("") }
   end
 
   def test_auto_stop_listener_only
-    c1 = Container.new
+    c = Container.new
     # Listener only, external close
-    t1 = Thread.new { c1.run }
-    l = c1.listen_io(TCPServer.new(0))
+    t = Thread.new { c.run }
+    l = c.listen_io(TCPServer.new(0))
     l.close
-    assert t1.join(1)
+    assert t.join(1)
   end
 
   def test_stop_empty
     c = Container.new
     threads = 3.times.collect { Thread.new { c.run } }
+    sleep(0.01) while c.running < 3
     assert_nil threads[0].join(0.001) # Not stopped
     c.stop
+    assert c.stopped
+    assert_raises(Container::StoppedError) { c.connect("") }
+    assert_raises(Container::StoppedError) { c.run }
     threads.each { |t| assert t.join(1) }
   end
 
@@ -126,22 +136,22 @@ class ContainerTest < Minitest::Test
 
     l = c.listen_io(TCPServer.new(0))
     threads = 3.times.collect { Thread.new { c.run } }
+    sleep(0.01) while c.running < 3
     l.close
     assert_nil threads[0].join(0.001) # Not stopped, no auto_stop
 
     l = c.listen_io(TCPServer.new(0)) # New listener
     conn = c.connect("amqp://:#{l.to_io.addr[1]}")
     c.stop
+    assert c.stopped
+
     threads.each { |t| assert t.join(1) }
+
+    assert_raises(Container::StoppedError) { c.run }
+    assert_equal 0, c.running
     assert_nil l.condition
     assert_nil conn.condition
-
-    # We should be able to run the container again once stopped.
-    threads = 5.times.collect { Thread.new { c.run } }
-    assert_nil threads[0].join(0.01)
-
   end
-
 end
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-proton git commit: PROTON-1699: [ruby] add messenger tests to backward compatibility suite

Posted by ac...@apache.org.
PROTON-1699: [ruby] add messenger tests to backward compatibility suite

This closes #114, and adds the direct_ examples to the old_examples test suite.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1bf4a32c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1bf4a32c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1bf4a32c

Branch: refs/heads/master
Commit: 1bf4a32ccbadfe3cfe103661f8a87d7fdb4ad4a7
Parents: 4530774
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Nov 27 09:24:44 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Nov 27 16:09:47 2017 -0500

----------------------------------------------------------------------
 examples/ruby/example_test.rb                   |  3 +-
 .../bindings/ruby/tests/old_examples/broker.rb  |  1 +
 .../ruby/tests/old_examples/direct_recv.rb      |  7 +-
 .../ruby/tests/old_examples/direct_send.rb      |  6 +-
 .../ruby/tests/old_examples/old_example_test.rb | 73 ++++++++++++--------
 .../bindings/ruby/tests/old_examples/recv.rb    | 23 ++++++
 .../bindings/ruby/tests/old_examples/send.rb    | 21 ++++++
 .../ruby/tests/old_examples/simple_recv.rb      |  2 +-
 .../ruby/tests/old_examples/simple_send.rb      |  4 +-
 9 files changed, 104 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1bf4a32c/examples/ruby/example_test.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/example_test.rb b/examples/ruby/example_test.rb
index 19f1638..dc31a18 100755
--- a/examples/ruby/example_test.rb
+++ b/examples/ruby/example_test.rb
@@ -35,8 +35,7 @@ class ExampleTest < MiniTest::Test
   end
 
   def assert_output(want, *args)
-    p = run_script(*args)
-    assert_equal(want, p.read.strip)
+    assert_equal(want.strip, run_script(*args).read.strip)
   end
 
   def test_helloworld

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1bf4a32c/proton-c/bindings/ruby/tests/old_examples/broker.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/old_examples/broker.rb b/proton-c/bindings/ruby/tests/old_examples/broker.rb
index e1ababd..e7cc9b3 100644
--- a/proton-c/bindings/ruby/tests/old_examples/broker.rb
+++ b/proton-c/bindings/ruby/tests/old_examples/broker.rb
@@ -97,6 +97,7 @@ class Broker < Qpid::Proton::Handler::MessagingHandler
     debug("on_start event") if $options[:debug]
     @acceptor = event.container.listen(@url)
     print "Listening on #{@url}\n"
+    STDOUT.flush
   end
 
   def queue(address)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1bf4a32c/proton-c/bindings/ruby/tests/old_examples/direct_recv.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/old_examples/direct_recv.rb b/proton-c/bindings/ruby/tests/old_examples/direct_recv.rb
index e8b52f3..ff3366a 100644
--- a/proton-c/bindings/ruby/tests/old_examples/direct_recv.rb
+++ b/proton-c/bindings/ruby/tests/old_examples/direct_recv.rb
@@ -30,6 +30,11 @@ class DirectReceive < ExampleReceive
 
   def on_start(event)
     @acceptor = event.container.listen(self.url)
+    puts "Listening"; STDOUT.flush
+  end
+
+  def on_connection_opening(event)
+    @acceptor.close
   end
 
   def on_message(event)
@@ -41,7 +46,7 @@ end
 
 options = {
   :address => "localhost:5672/examples",
-  :messages => 100,
+  :messages => 10,
 }
 
 OptionParser.new do |opts|

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1bf4a32c/proton-c/bindings/ruby/tests/old_examples/direct_send.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/old_examples/direct_send.rb b/proton-c/bindings/ruby/tests/old_examples/direct_send.rb
index 2164304..50543b2 100644
--- a/proton-c/bindings/ruby/tests/old_examples/direct_send.rb
+++ b/proton-c/bindings/ruby/tests/old_examples/direct_send.rb
@@ -24,7 +24,7 @@ require_relative 'lib/send_and_receive'
 
 options = {
   :address => "localhost:5672/examples",
-  :messages => 100,
+  :messages => 10,
 }
 
 class SimpleSend < ExampleSend
@@ -35,8 +35,12 @@ class SimpleSend < ExampleSend
 
   def on_start(event)
     @acceptor = event.container.listen(url)
+    puts "Listening"; STDOUT.flush
   end
 
+  def on_connection_opening(event)
+    @acceptor.close
+  end
 end
 
 OptionParser.new do |opts|

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1bf4a32c/proton-c/bindings/ruby/tests/old_examples/old_example_test.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/old_examples/old_example_test.rb b/proton-c/bindings/ruby/tests/old_examples/old_example_test.rb
index 16c390d..0ade719 100755
--- a/proton-c/bindings/ruby/tests/old_examples/old_example_test.rb
+++ b/proton-c/bindings/ruby/tests/old_examples/old_example_test.rb
@@ -22,29 +22,35 @@ require 'minitest/autorun'
 require 'qpid_proton'
 require 'socket'
 
-class ExampleTest < MiniTest::Test
+def unused_port; TCPServer.open(0) { |s| s.addr[1] } end
+def make_url(port, path) "amqp://:#{port}/${path}"; end # Make a proton pseudo-url
 
-  def run_script(script, port)
-    assert File.exist? script
-    cmd = [RbConfig.ruby, script]
-    cmd += ["-a", ":#{port}/examples"] if port
-    return IO.popen(cmd)
-  end
+class OldExampleTest < MiniTest::Test
 
+  def run_script(*args)
+    IO.popen [RbConfig.ruby, "-W0", *args];
+  end
 
-  def assert_output(script, want, port=nil)
-    out = run_script(script, port)
-    assert_equal want, out.read.strip
+  def assert_output(want, args)
+    assert_equal want.strip, run_script(*args).read.strip
   end
 
   def test_helloworld
-    assert_output("helloworld.rb", "Hello world!", $port)
+    assert_output "Hello world!", ["helloworld.rb", "-a", make_url($port, __method__)]
   end
 
   def test_send_recv
-    assert_output("simple_send.rb", "All 100 messages confirmed!", $port)
-    want = (0..99).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
-    assert_output("simple_recv.rb", want.strip, $port)
+    assert_output "All 10 messages confirmed!", ["simple_send.rb", "-a", make_url($port, __method__)]
+    want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
+    assert_output want, ["simple_recv.rb", "-a", make_url($port, __method__)]
+  end
+
+  def test_smoke
+    url = "127.0.0.1:#{unused_port}"
+    recv = run_script("recv.rb", "~#{url}")
+    recv.readline               # Wait for "Listening"
+    assert_output("Status: ACCEPTED", ["send.rb", url])
+    assert_equal "Got: Hello World!", recv.read.strip
   end
 
   def test_client_server
@@ -58,28 +64,37 @@ class ExampleTest < MiniTest::Test
 -> And the mome raths outgrabe.
 <- AND THE MOME RATHS OUTGRABE.
 EOS
-    srv = run_script("server.rb", $port)
-    assert_output("client.rb", want.strip, $port)
-
+    srv = run_script("server.rb", "-a", make_url($port, __method__))
+    assert_output(want, ["client.rb", "-a", make_url($port, __method__)])
   ensure
     Process.kill :TERM, srv.pid if srv
   end
-end
 
-# Start the broker before all tests.
-$port = TCPServer.open(0) do |s| s.addr[1]; end # find an unused port
-$broker = spawn("#{RbConfig.ruby} broker.rb -a :#{$port}")
+  def test_direct_recv
+    url = make_url unused_port, __method__
+    p = run_script("direct_recv.rb", "-a", url)
+    p.readline                # Wait till ready
+    assert_output("All 10 messages confirmed!", ["simple_send.rb", "-a", url])
+    want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
+    assert_equal(want.strip, p.read.strip)
+  end
 
-# Wait for the broker to be listening
-deadline = Time.now + 5
-begin
-  TCPSocket.open("", $port).close
-rescue Errno::ECONNREFUSED
-  retry if Time.now < deadline
-  raise
+  def test_direct_send
+    url = make_url unused_port, __method__
+    p = run_script("direct_send.rb", "-a", url)
+    p.readline                # Wait till ready
+    want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
+    assert_output(want, ["simple_recv.rb", "-a", url])
+    assert_equal("All 10 messages confirmed!", p.read.strip)
+  end
 end
 
+# Start the broker before all tests.
+$port = unused_port
+$broker = IO.popen [RbConfig.ruby, "-W0", "broker.rb", "-a", ":#{$port}"]
+$broker.readline                # Wait for "Listening"
+
 # Kill the broker after all tests
 MiniTest.after_run do
-  Process.kill(:TERM, $broker) if $broker
+  Process.kill(:TERM, $broker.pid) if $broker
 end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1bf4a32c/proton-c/bindings/ruby/tests/old_examples/recv.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/old_examples/recv.rb b/proton-c/bindings/ruby/tests/old_examples/recv.rb
new file mode 100755
index 0000000..a29f123
--- /dev/null
+++ b/proton-c/bindings/ruby/tests/old_examples/recv.rb
@@ -0,0 +1,23 @@
+#!/usr/bin/env ruby
+
+require 'qpid_proton.rb'
+
+messenger = Qpid::Proton::Messenger::Messenger.new()
+messenger.incoming_window = 1
+message = Qpid::Proton::Message.new()
+
+address = ARGV[0]
+if not address then
+  address = "~0.0.0.0"
+end
+messenger.subscribe(address)
+
+messenger.start()
+
+puts "Listening"; STDOUT.flush
+messenger.receive()
+messenger.get(message)
+puts "Got: #{message.body}"
+messenger.accept()
+
+messenger.stop()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1bf4a32c/proton-c/bindings/ruby/tests/old_examples/send.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/old_examples/send.rb b/proton-c/bindings/ruby/tests/old_examples/send.rb
new file mode 100755
index 0000000..73016d0
--- /dev/null
+++ b/proton-c/bindings/ruby/tests/old_examples/send.rb
@@ -0,0 +1,21 @@
+#!/usr/bin/env ruby
+
+require 'qpid_proton.rb'
+
+messenger = Qpid::Proton::Messenger::Messenger.new()
+messenger.outgoing_window = 10
+message = Qpid::Proton::Message.new()
+
+address = ARGV[0]
+if not address then
+  address = "0.0.0.0"
+end
+
+message.address = address
+message.body = "Hello World!"
+
+messenger.start()
+tracker = messenger.put(message)
+messenger.send()
+print "Status: ", messenger.status(tracker), "\n"
+messenger.stop()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1bf4a32c/proton-c/bindings/ruby/tests/old_examples/simple_recv.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/old_examples/simple_recv.rb b/proton-c/bindings/ruby/tests/old_examples/simple_recv.rb
index 91cb30c..136e332 100644
--- a/proton-c/bindings/ruby/tests/old_examples/simple_recv.rb
+++ b/proton-c/bindings/ruby/tests/old_examples/simple_recv.rb
@@ -36,7 +36,7 @@ end
 
 options = {
   :address => "localhost:5672/examples",
-  :messages => 100,
+  :messages => 10,
 }
 
 OptionParser.new do |opts|

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1bf4a32c/proton-c/bindings/ruby/tests/old_examples/simple_send.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/old_examples/simple_send.rb b/proton-c/bindings/ruby/tests/old_examples/simple_send.rb
index 13e40f0..5e5d0a6 100644
--- a/proton-c/bindings/ruby/tests/old_examples/simple_send.rb
+++ b/proton-c/bindings/ruby/tests/old_examples/simple_send.rb
@@ -24,7 +24,7 @@ require_relative 'lib/send_and_receive'
 
 options = {
   :address => "localhost:5672/examples",
-  :messages => 100,
+  :messages => 10,
 }
 
 class SimpleSend < ExampleSend
@@ -47,7 +47,7 @@ OptionParser.new do |opts|
   end
 
   opts.on("-m", "--messages=COUNT", "The number of messages to send (def. #{options[:messages]}",
-    OptionParser::DecimalInteger) do |messages|
+     OptionParser::DecimalInteger) do |messages|
     options[:messages] = messages
   end
 end.parse!


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-proton git commit: NO-JIRA: [go] Fix race condition in authentication test

Posted by ac...@apache.org.
NO-JIRA: [go] Fix race condition in authentication test

TestBadPass was failing sporadically due to racy checks on asynchronous errors.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/45307744
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/45307744
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/45307744

Branch: refs/heads/master
Commit: 45307744b9729a5a1b29949fb5e4f8dc34c9d486
Parents: 55afeab
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Nov 27 10:29:46 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Nov 27 16:09:47 2017 -0500

----------------------------------------------------------------------
 .../go/src/qpid.apache.org/electron/electron_test.go         | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/45307744/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
index 4cd8453..d8862b9 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
@@ -76,10 +76,10 @@ func newServer(t *testing.T, cont Container, opts ...ConnectionOption) (net.Addr
 func newClient(t *testing.T, cont Container, addr net.Addr, opts ...ConnectionOption) Session {
 	conn, err := net.Dial(addr.Network(), addr.String())
 	fatalIf(t, err)
-	c, err := cont.Connection(conn, opts...)
-	fatalIf(t, err)
-	sn, err := c.Session()
-	fatalIf(t, err)
+	// Don't  bother checking error here, it's an async error so it's racy to do so anyway.
+	// Let caller use Sync() or catch it on first use.
+	c, _ := cont.Connection(conn, opts...)
+	sn, _ := c.Session()
 	return sn
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org