You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/11/27 21:10:40 UTC
[1/3] qpid-proton git commit: PROTON-1636: [ruby] fix race condition
in ruby container tests
Repository: qpid-proton
Updated Branches:
refs/heads/master 55afeabaf -> f9d8cc4ac
PROTON-1636: [ruby] fix race condition in ruby container tests
Made "stopped" a terminal state for the Container to avoid races during
shut-down, e.g. because threads have not even entered #run when the container
auto-stops.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f9d8cc4a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f9d8cc4a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f9d8cc4a
Branch: refs/heads/master
Commit: f9d8cc4acdbe11ad58d3a3004abfa42b0e3dd098
Parents: 1bf4a32
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Nov 27 13:56:19 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Nov 27 16:09:47 2017 -0500
----------------------------------------------------------------------
.../bindings/ruby/lib/core/connection_driver.rb | 14 ++-
proton-c/bindings/ruby/lib/core/container.rb | 117 ++++++++++++-------
proton-c/bindings/ruby/tests/test_container.rb | 56 +++++----
3 files changed, 121 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f9d8cc4a/proton-c/bindings/ruby/lib/core/connection_driver.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection_driver.rb b/proton-c/bindings/ruby/lib/core/connection_driver.rb
index 537e308..f757328 100644
--- a/proton-c/bindings/ruby/lib/core/connection_driver.rb
+++ b/proton-c/bindings/ruby/lib/core/connection_driver.rb
@@ -67,6 +67,13 @@ module Qpid
# Get the next event to dispatch, nil if no events available
def event() Event::Event.wrap(Cproton.pn_connection_driver_next_event(@impl)); end
+ # Iterator for all available events
+ def each_event()
+ while e = event
+ yield e
+ end
+ end
+
# Non-blocking read from {#io}, generate events for {#event}
# IO errors are returned as transport errors by {#event}, not raised
def read
@@ -161,9 +168,7 @@ module Qpid
# Dispatch all events available from {#event} to {#handler}
# @param handlers [Enum<Handler::MessagingHandler>]
def dispatch()
- while e = event
- e.dispatch @handler
- end
+ each_event { |e| e.dispatch @handler }
end
# Do {#read}, {#tick}, {#write} and {#dispatch} without blocking.
@@ -176,8 +181,9 @@ module Qpid
def process(now=Time.now)
read
next_tick = tick(now)
+ dispatch # May generate more data to write
write
- dispatch
+ dispatch # Make sure we consume all events
return next_tick
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f9d8cc4a/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index 38dcd74..39ac150 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -58,14 +58,12 @@ module Qpid::Proton
if env && ["true", "1", "yes", "on"].include?(env.downcase)
@log_prefix = "[0x#{object_id.to_s(16)}](PN_LISTENER_"
end
+ dispatch(:on_open);
end
def process
+ return if @closed
unless @closing
- unless @open_dispatched
- dispatch(:on_open);
- @open_dispatched = true
- end
begin
return @io.accept, dispatch(:on_accept)
rescue IO::WaitReadable, Errno::EINTR
@@ -73,20 +71,21 @@ module Qpid::Proton
close e
end
end
+ ensure
if @closing
@io.close rescue nil
- @closing = false
@closed = true
dispatch(:on_error, @condition) if @condition
dispatch(:on_close)
end
end
- def can_read?() !@closed; end
+ def can_read?() !finished?; end
def can_write?() false; end
def finished?() @closed; end
def dispatch(method, *args)
+ # TODO aconway 2017-11-27: better logging
STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix
@handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method)
end
@@ -94,6 +93,11 @@ module Qpid::Proton
public
+ # Error raised if the container is used after {#stop} has been called.
+ class StoppedError < RuntimeError
+ def initialize(*args) super("container has been stopped"); end
+ end
+
# Create a new Container
# @overload initialize(id=nil)
# @param id [String] A unique ID for this container, use random UUID if nil.
@@ -125,17 +129,17 @@ module Qpid::Proton
# - nil on the @work queue makes a #run thread exit
@work = Queue.new
- @work << self # Let the first #run thread start selecting
+ @work << :on_start << self # Issue on_start and start start selecting
@wake = IO.pipe # Wakes #run thread in IO.select
- @auto_stop = true # Exit #run when @active drops to 0
+ @auto_stop = true # Stop when @active drops to 0
# Following instance variables protected by lock
@lock = Mutex.new
@active = 0 # All active tasks, in @selectable, @work or being processed
@selectable = Set.new # Tasks ready to block in IO.select
@running = 0 # Count of #run threads
- @stopping = false # #stop called, closing tasks
- @stop_err = nil # Optional error from #stop
+ @stopped = false # #stop called
+ @stop_err = nil # Optional error to pass to tasks, from #stop
end
# @return [String] unique identifier for this container
@@ -143,15 +147,22 @@ module Qpid::Proton
# Auto-stop flag.
#
- # True (the default) means all calls to {#run} will return when the container
- # transitions from active to inactive - all connections and listeners
- # have closed.
+ # True (the default) means that the container will stop automatically, as if {#stop}
+ # had been called, when the last listener or connection closes.
#
# False means {#run} will not return unless {#stop} is called.
#
# @return [Bool] auto-stop state
attr_accessor :auto_stop
+ # True if the container has been stopped and can no longer be used.
+ # @return [Bool] stopped state
+ attr_accessor :stopped
+
+ # Number of threads in {#run}
+ # @return [Bool] {#run} thread count
+ def running; @lock.synchronize { @running }; end
+
# Open an AMQP connection.
#
# @param url [String, URI] Open a {TCPSocket} to url.host, url.port.
@@ -160,6 +171,7 @@ module Qpid::Proton
# @option (see Connection#open)
# @return [Connection] The new AMQP connection
def connect(url, opts = {})
+ not_stopped
url = Qpid::Proton::uri(url)
opts[:user] ||= url.user
opts[:password] ||= url.password
@@ -171,6 +183,7 @@ module Qpid::Proton
# @param io [IO] An existing {IO} object, e.g. a {TCPSocket}
# @option (see Connection#open)
def connect_io(io, opts = {})
+ not_stopped
cd = connection_driver(io, opts)
cd.connection.open()
add(cd)
@@ -185,6 +198,7 @@ module Qpid::Proton
# @return [Listener] The AMQP listener.
#
def listen(url, handler=Listener::Handler.new)
+ not_stopped
url = Qpid::Proton::uri(url)
# TODO aconway 2017-11-01: amqps
listen_io(TCPServer.new(url.host, url.port), handler)
@@ -195,6 +209,7 @@ module Qpid::Proton
# @param handler [Listener::Handler] Handler for events from this listener
#
def listen_io(io, handler=Listener::Handler.new)
+ not_stopped
l = ListenTask.new(io, handler, self)
add(l)
l
@@ -208,23 +223,21 @@ module Qpid::Proton
# listener, even if the container has multiple threads.
#
def run
- need_on_start = nil
@lock.synchronize do
- @running += 1
- need_on_start = !@on_start_called && @handler && @handler.respond_to?(:on_start)
- @on_start_called = true
+ @running += 1 # Note: ensure clause below will decrement @running
+ raise StoppedError if @stopped
end
- if need_on_start
- # TODO aconway 2017-10-28: proper synthesized event for on_start
- event = Class.new do
- def initialize(c) @container = c; end
- attr_reader :container
- end.new(self)
- @handler.on_start(event)
- end
-
while task = @work.pop
case task
+
+ when :on_start
+ # TODO aconway 2017-11-27: proper syntesized events
+ event = Class.new do
+ def initialize(c) @container = c; end
+ attr_reader :container
+ end.new(self)
+ @handler.on_start(event) if @handler.respond_to? :on_start
+
when Container
r, w = [@wake[0]], []
@lock.synchronize do
@@ -236,21 +249,23 @@ module Qpid::Proton
r, w = IO.select(r, w)
selected = Set.new(r).merge(w)
drain_wake if selected.delete?(@wake[0])
+ stop_select = nil
@lock.synchronize do
- if @stopping # close everything
+ if stop_select = @stopped # close everything
selected += @selectable
selected.each { |s| s.close @stop_err }
+ @wake.each { |fd| fd.close() }
end
@selectable -= selected # Remove selected tasks
end
selected.each { |s| @work << s } # Queue up tasks needing #process
- @work << self # Allow another thread to select()
+ @work << self unless stop_select
+
when ConnectionTask then
- task.close @stop_err if @lock.synchronize { @stopping }
task.process
rearm task
- when Listener then
- task.close @stop_err if @lock.synchronize { @stopping }
+
+ when ListenTask then
io, opts = task.process
add(connection_driver(io, opts, true)) if io
rearm task
@@ -259,20 +274,31 @@ module Qpid::Proton
end
ensure
@lock.synchronize do
- @stopping, @stop_err = nil if (@running -= 1).zero? # Last out, reset for next #run
+ @running -= 1
+ work_wake nil if @running > 0 # Tell the next thread to exit
end
end
- # Disconnect all listeners and connections without a polite AMQP close sequence.
- # {#stop} returns immediately, calls to {#run} will return when all activity is finished.
+ # Stop the container.
+ #
+ # Close all listeners and abort all connections without doing AMQP protocol close.
+ #
+ # {#stop} returns immediately, calls to {#run} will return when all activity
+ # is finished.
+ #
+ # The container can no longer be used, using a stopped container raises
+ # {StoppedError} on attempting. Create a new container if you want to
+ # resume activity.
+ #
# @param error [Condition] Optional transport/listener error condition
#
def stop(error=nil)
@lock.synchronize do
- @stopping = true
+ raise StoppedError if @stopped
+ @stopped = true
@stop_err = Condition.make(error)
check_stop_lh
- # NOTE: @stopping =>
+ # NOTE: @stopped =>
# - no new run threads can join
# - no more select calls after next wakeup
# - once @active == 0, all threads will be stopped with nil
@@ -284,6 +310,8 @@ module Qpid::Proton
def wake; @wake[1].write_nonblock('x') rescue nil; end
+ # Normally if we add work we need to set a wakeup to ensure a single #run
+ # thread doesn't get stuck in select while there is other work on the queue.
def work_wake(task) @work << task; wake; end
def drain_wake
@@ -300,8 +328,12 @@ module Qpid::Proton
ConnectionTask.new(self, io, opts, server)
end
+ # All new tasks are added here
def add task
- @lock.synchronize { @active += 1 }
+ @lock.synchronize do
+ @active += 1
+ task.close @stop_err if @stopped
+ end
work_wake task
end
@@ -310,6 +342,9 @@ module Qpid::Proton
if task.finished?
@active -= 1
check_stop_lh
+ elsif @stopped
+ task.close @stop_err
+ work_wake task
else
@selectable << task
end
@@ -318,10 +353,14 @@ module Qpid::Proton
end
def check_stop_lh
- if @active.zero? && (@auto_stop || @stopping)
- @running.times { @work << nil } # Signal all threads to stop
+ if @active.zero? && (@auto_stop || @stopped)
+ @stopped = true
+ work_wake nil # Signal threads to stop
true
end
end
+
+ def not_stopped; raise StoppedError if @lock.synchronize { @stopped }; end
+
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f9d8cc4a/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index 9384d65..988acfb 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -25,6 +25,9 @@ Message = Qpid::Proton::Message
SASL = Qpid::Proton::SASL
Disposition = Qpid::Proton::Disposition
+# Easier debugging of thread problems
+Thread::abort_on_exception=true
+
# Container that listens on a random port
class TestContainer < Container
@@ -84,39 +87,46 @@ class ContainerTest < Minitest::Test
def on_connection_closing(e) e.connection.close; end
end
- def test_auto_stop
- c1 = Container.new
- c2 = Container.new
-
+ def test_auto_stop_one
# A listener and a connection
- t1 = 3.times.collect { Thread.new { c1.run } }
- l = c1.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new}))
- c1.connect("amqp://:#{l.to_io.addr[1]}", { :handler => CloseOnOpenHandler.new} )
- t1.each { |t| assert t.join(1) }
+ c = Container.new
+ threads = 3.times.collect { Thread.new { c.run } }
+ sleep(0.01) while c.running < 3
+ l = c.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new}))
+ c.connect("amqp://:#{l.to_io.addr[1]}", { :handler => CloseOnOpenHandler.new} )
+ threads.each { |t| assert t.join(1) }
+ assert_raises(Container::StoppedError) { c.run }
+ end
- # Connect between different containers, c2 has only a connection
- t1 = Thread.new { c1.run }
+ def test_auto_stop_two
+ # Connect between different containers
+ c1, c2 = Container.new, Container.new
+ threads = [ Thread.new {c1.run }, Thread.new {c2.run } ]
l = c1.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new}))
- t2 = Thread.new {c2.run }
c2.connect("amqp://:#{l.to_io.addr[1]}", { :handler => CloseOnOpenHandler.new} )
- assert t2.join(1)
- assert t1.join(1)
+ assert threads.each { |t| t.join(1) }
+ assert_raises(Container::StoppedError) { c1.run }
+ assert_raises(Container::StoppedError) { c2.connect("") }
end
def test_auto_stop_listener_only
- c1 = Container.new
+ c = Container.new
# Listener only, external close
- t1 = Thread.new { c1.run }
- l = c1.listen_io(TCPServer.new(0))
+ t = Thread.new { c.run }
+ l = c.listen_io(TCPServer.new(0))
l.close
- assert t1.join(1)
+ assert t.join(1)
end
def test_stop_empty
c = Container.new
threads = 3.times.collect { Thread.new { c.run } }
+ sleep(0.01) while c.running < 3
assert_nil threads[0].join(0.001) # Not stopped
c.stop
+ assert c.stopped
+ assert_raises(Container::StoppedError) { c.connect("") }
+ assert_raises(Container::StoppedError) { c.run }
threads.each { |t| assert t.join(1) }
end
@@ -126,22 +136,22 @@ class ContainerTest < Minitest::Test
l = c.listen_io(TCPServer.new(0))
threads = 3.times.collect { Thread.new { c.run } }
+ sleep(0.01) while c.running < 3
l.close
assert_nil threads[0].join(0.001) # Not stopped, no auto_stop
l = c.listen_io(TCPServer.new(0)) # New listener
conn = c.connect("amqp://:#{l.to_io.addr[1]}")
c.stop
+ assert c.stopped
+
threads.each { |t| assert t.join(1) }
+
+ assert_raises(Container::StoppedError) { c.run }
+ assert_equal 0, c.running
assert_nil l.condition
assert_nil conn.condition
-
- # We should be able to run the container again once stopped.
- threads = 5.times.collect { Thread.new { c.run } }
- assert_nil threads[0].join(0.01)
-
end
-
end
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/3] qpid-proton git commit: PROTON-1699: [ruby] add messenger tests
to backward compatibility suite
Posted by ac...@apache.org.
PROTON-1699: [ruby] add messenger tests to backward compatibility suite
This closes #114, and adds the direct_ examples to the old_examples test suite.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1bf4a32c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1bf4a32c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1bf4a32c
Branch: refs/heads/master
Commit: 1bf4a32ccbadfe3cfe103661f8a87d7fdb4ad4a7
Parents: 4530774
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Nov 27 09:24:44 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Nov 27 16:09:47 2017 -0500
----------------------------------------------------------------------
examples/ruby/example_test.rb | 3 +-
.../bindings/ruby/tests/old_examples/broker.rb | 1 +
.../ruby/tests/old_examples/direct_recv.rb | 7 +-
.../ruby/tests/old_examples/direct_send.rb | 6 +-
.../ruby/tests/old_examples/old_example_test.rb | 73 ++++++++++++--------
.../bindings/ruby/tests/old_examples/recv.rb | 23 ++++++
.../bindings/ruby/tests/old_examples/send.rb | 21 ++++++
.../ruby/tests/old_examples/simple_recv.rb | 2 +-
.../ruby/tests/old_examples/simple_send.rb | 4 +-
9 files changed, 104 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1bf4a32c/examples/ruby/example_test.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/example_test.rb b/examples/ruby/example_test.rb
index 19f1638..dc31a18 100755
--- a/examples/ruby/example_test.rb
+++ b/examples/ruby/example_test.rb
@@ -35,8 +35,7 @@ class ExampleTest < MiniTest::Test
end
def assert_output(want, *args)
- p = run_script(*args)
- assert_equal(want, p.read.strip)
+ assert_equal(want.strip, run_script(*args).read.strip)
end
def test_helloworld
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1bf4a32c/proton-c/bindings/ruby/tests/old_examples/broker.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/old_examples/broker.rb b/proton-c/bindings/ruby/tests/old_examples/broker.rb
index e1ababd..e7cc9b3 100644
--- a/proton-c/bindings/ruby/tests/old_examples/broker.rb
+++ b/proton-c/bindings/ruby/tests/old_examples/broker.rb
@@ -97,6 +97,7 @@ class Broker < Qpid::Proton::Handler::MessagingHandler
debug("on_start event") if $options[:debug]
@acceptor = event.container.listen(@url)
print "Listening on #{@url}\n"
+ STDOUT.flush
end
def queue(address)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1bf4a32c/proton-c/bindings/ruby/tests/old_examples/direct_recv.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/old_examples/direct_recv.rb b/proton-c/bindings/ruby/tests/old_examples/direct_recv.rb
index e8b52f3..ff3366a 100644
--- a/proton-c/bindings/ruby/tests/old_examples/direct_recv.rb
+++ b/proton-c/bindings/ruby/tests/old_examples/direct_recv.rb
@@ -30,6 +30,11 @@ class DirectReceive < ExampleReceive
def on_start(event)
@acceptor = event.container.listen(self.url)
+ puts "Listening"; STDOUT.flush
+ end
+
+ def on_connection_opening(event)
+ @acceptor.close
end
def on_message(event)
@@ -41,7 +46,7 @@ end
options = {
:address => "localhost:5672/examples",
- :messages => 100,
+ :messages => 10,
}
OptionParser.new do |opts|
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1bf4a32c/proton-c/bindings/ruby/tests/old_examples/direct_send.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/old_examples/direct_send.rb b/proton-c/bindings/ruby/tests/old_examples/direct_send.rb
index 2164304..50543b2 100644
--- a/proton-c/bindings/ruby/tests/old_examples/direct_send.rb
+++ b/proton-c/bindings/ruby/tests/old_examples/direct_send.rb
@@ -24,7 +24,7 @@ require_relative 'lib/send_and_receive'
options = {
:address => "localhost:5672/examples",
- :messages => 100,
+ :messages => 10,
}
class SimpleSend < ExampleSend
@@ -35,8 +35,12 @@ class SimpleSend < ExampleSend
def on_start(event)
@acceptor = event.container.listen(url)
+ puts "Listening"; STDOUT.flush
end
+ def on_connection_opening(event)
+ @acceptor.close
+ end
end
OptionParser.new do |opts|
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1bf4a32c/proton-c/bindings/ruby/tests/old_examples/old_example_test.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/old_examples/old_example_test.rb b/proton-c/bindings/ruby/tests/old_examples/old_example_test.rb
index 16c390d..0ade719 100755
--- a/proton-c/bindings/ruby/tests/old_examples/old_example_test.rb
+++ b/proton-c/bindings/ruby/tests/old_examples/old_example_test.rb
@@ -22,29 +22,35 @@ require 'minitest/autorun'
require 'qpid_proton'
require 'socket'
-class ExampleTest < MiniTest::Test
+def unused_port; TCPServer.open(0) { |s| s.addr[1] } end
+def make_url(port, path) "amqp://:#{port}/${path}"; end # Make a proton pseudo-url
- def run_script(script, port)
- assert File.exist? script
- cmd = [RbConfig.ruby, script]
- cmd += ["-a", ":#{port}/examples"] if port
- return IO.popen(cmd)
- end
+class OldExampleTest < MiniTest::Test
+ def run_script(*args)
+ IO.popen [RbConfig.ruby, "-W0", *args];
+ end
- def assert_output(script, want, port=nil)
- out = run_script(script, port)
- assert_equal want, out.read.strip
+ def assert_output(want, args)
+ assert_equal want.strip, run_script(*args).read.strip
end
def test_helloworld
- assert_output("helloworld.rb", "Hello world!", $port)
+ assert_output "Hello world!", ["helloworld.rb", "-a", make_url($port, __method__)]
end
def test_send_recv
- assert_output("simple_send.rb", "All 100 messages confirmed!", $port)
- want = (0..99).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
- assert_output("simple_recv.rb", want.strip, $port)
+ assert_output "All 10 messages confirmed!", ["simple_send.rb", "-a", make_url($port, __method__)]
+ want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
+ assert_output want, ["simple_recv.rb", "-a", make_url($port, __method__)]
+ end
+
+ def test_smoke
+ url = "127.0.0.1:#{unused_port}"
+ recv = run_script("recv.rb", "~#{url}")
+ recv.readline # Wait for "Listening"
+ assert_output("Status: ACCEPTED", ["send.rb", url])
+ assert_equal "Got: Hello World!", recv.read.strip
end
def test_client_server
@@ -58,28 +64,37 @@ class ExampleTest < MiniTest::Test
-> And the mome raths outgrabe.
<- AND THE MOME RATHS OUTGRABE.
EOS
- srv = run_script("server.rb", $port)
- assert_output("client.rb", want.strip, $port)
-
+ srv = run_script("server.rb", "-a", make_url($port, __method__))
+ assert_output(want, ["client.rb", "-a", make_url($port, __method__)])
ensure
Process.kill :TERM, srv.pid if srv
end
-end
-# Start the broker before all tests.
-$port = TCPServer.open(0) do |s| s.addr[1]; end # find an unused port
-$broker = spawn("#{RbConfig.ruby} broker.rb -a :#{$port}")
+ def test_direct_recv
+ url = make_url unused_port, __method__
+ p = run_script("direct_recv.rb", "-a", url)
+ p.readline # Wait till ready
+ assert_output("All 10 messages confirmed!", ["simple_send.rb", "-a", url])
+ want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
+ assert_equal(want.strip, p.read.strip)
+ end
-# Wait for the broker to be listening
-deadline = Time.now + 5
-begin
- TCPSocket.open("", $port).close
-rescue Errno::ECONNREFUSED
- retry if Time.now < deadline
- raise
+ def test_direct_send
+ url = make_url unused_port, __method__
+ p = run_script("direct_send.rb", "-a", url)
+ p.readline # Wait till ready
+ want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
+ assert_output(want, ["simple_recv.rb", "-a", url])
+ assert_equal("All 10 messages confirmed!", p.read.strip)
+ end
end
+# Start the broker before all tests.
+$port = unused_port
+$broker = IO.popen [RbConfig.ruby, "-W0", "broker.rb", "-a", ":#{$port}"]
+$broker.readline # Wait for "Listening"
+
# Kill the broker after all tests
MiniTest.after_run do
- Process.kill(:TERM, $broker) if $broker
+ Process.kill(:TERM, $broker.pid) if $broker
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1bf4a32c/proton-c/bindings/ruby/tests/old_examples/recv.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/old_examples/recv.rb b/proton-c/bindings/ruby/tests/old_examples/recv.rb
new file mode 100755
index 0000000..a29f123
--- /dev/null
+++ b/proton-c/bindings/ruby/tests/old_examples/recv.rb
@@ -0,0 +1,23 @@
+#!/usr/bin/env ruby
+
+require 'qpid_proton.rb'
+
+messenger = Qpid::Proton::Messenger::Messenger.new()
+messenger.incoming_window = 1
+message = Qpid::Proton::Message.new()
+
+address = ARGV[0]
+if not address then
+ address = "~0.0.0.0"
+end
+messenger.subscribe(address)
+
+messenger.start()
+
+puts "Listening"; STDOUT.flush
+messenger.receive()
+messenger.get(message)
+puts "Got: #{message.body}"
+messenger.accept()
+
+messenger.stop()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1bf4a32c/proton-c/bindings/ruby/tests/old_examples/send.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/old_examples/send.rb b/proton-c/bindings/ruby/tests/old_examples/send.rb
new file mode 100755
index 0000000..73016d0
--- /dev/null
+++ b/proton-c/bindings/ruby/tests/old_examples/send.rb
@@ -0,0 +1,21 @@
+#!/usr/bin/env ruby
+
+require 'qpid_proton.rb'
+
+messenger = Qpid::Proton::Messenger::Messenger.new()
+messenger.outgoing_window = 10
+message = Qpid::Proton::Message.new()
+
+address = ARGV[0]
+if not address then
+ address = "0.0.0.0"
+end
+
+message.address = address
+message.body = "Hello World!"
+
+messenger.start()
+tracker = messenger.put(message)
+messenger.send()
+print "Status: ", messenger.status(tracker), "\n"
+messenger.stop()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1bf4a32c/proton-c/bindings/ruby/tests/old_examples/simple_recv.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/old_examples/simple_recv.rb b/proton-c/bindings/ruby/tests/old_examples/simple_recv.rb
index 91cb30c..136e332 100644
--- a/proton-c/bindings/ruby/tests/old_examples/simple_recv.rb
+++ b/proton-c/bindings/ruby/tests/old_examples/simple_recv.rb
@@ -36,7 +36,7 @@ end
options = {
:address => "localhost:5672/examples",
- :messages => 100,
+ :messages => 10,
}
OptionParser.new do |opts|
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1bf4a32c/proton-c/bindings/ruby/tests/old_examples/simple_send.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/old_examples/simple_send.rb b/proton-c/bindings/ruby/tests/old_examples/simple_send.rb
index 13e40f0..5e5d0a6 100644
--- a/proton-c/bindings/ruby/tests/old_examples/simple_send.rb
+++ b/proton-c/bindings/ruby/tests/old_examples/simple_send.rb
@@ -24,7 +24,7 @@ require_relative 'lib/send_and_receive'
options = {
:address => "localhost:5672/examples",
- :messages => 100,
+ :messages => 10,
}
class SimpleSend < ExampleSend
@@ -47,7 +47,7 @@ OptionParser.new do |opts|
end
opts.on("-m", "--messages=COUNT", "The number of messages to send (def. #{options[:messages]}",
- OptionParser::DecimalInteger) do |messages|
+ OptionParser::DecimalInteger) do |messages|
options[:messages] = messages
end
end.parse!
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/3] qpid-proton git commit: NO-JIRA: [go] Fix race condition in
authentication test
Posted by ac...@apache.org.
NO-JIRA: [go] Fix race condition in authentication test
TestBadPass was failing sporadically due to racy checks on asynchronous errors.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/45307744
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/45307744
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/45307744
Branch: refs/heads/master
Commit: 45307744b9729a5a1b29949fb5e4f8dc34c9d486
Parents: 55afeab
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Nov 27 10:29:46 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Nov 27 16:09:47 2017 -0500
----------------------------------------------------------------------
.../go/src/qpid.apache.org/electron/electron_test.go | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/45307744/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
index 4cd8453..d8862b9 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
@@ -76,10 +76,10 @@ func newServer(t *testing.T, cont Container, opts ...ConnectionOption) (net.Addr
func newClient(t *testing.T, cont Container, addr net.Addr, opts ...ConnectionOption) Session {
conn, err := net.Dial(addr.Network(), addr.String())
fatalIf(t, err)
- c, err := cont.Connection(conn, opts...)
- fatalIf(t, err)
- sn, err := c.Session()
- fatalIf(t, err)
+ // Don't bother checking error here, it's an async error so it's racy to do so anyway.
+ // Let caller use Sync() or catch it on first use.
+ c, _ := cont.Connection(conn, opts...)
+ sn, _ := c.Session()
return sn
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org