You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/11/07 19:26:51 UTC

[09/10] qpid-proton git commit: PROTON-1064: [ruby] New Container with native ruby IO

PROTON-1064: [ruby] New Container with native ruby IO

Updated all examples and tests to use new container, old Reactor::Container
still in place, will be removed or re-implemented over the new container.

- Based on ConnectionDriver, wrapper for pn_connection_driver_t
- Native polling with standard ruby IO.select()
- Thread-friendly: no blocking IO in C code, all in ruby
- connect/listen use standard TCPSocket/TCPServer
- connect_with/listen_with can use any type of IO socket object


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/51cda9d5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/51cda9d5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/51cda9d5

Branch: refs/heads/master
Commit: 51cda9d5d50059d1cfe7af57ec4bfb2e2ab1e70e
Parents: 86de6b5
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Oct 31 17:16:08 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Nov 7 13:31:51 2017 -0500

----------------------------------------------------------------------
 examples/ruby/README.md                         |  54 ++--
 examples/ruby/broker.rb                         |  63 +----
 examples/ruby/client.rb                         |  25 +-
 examples/ruby/direct_recv.rb                    |  35 +--
 examples/ruby/direct_send.rb                    |  37 +--
 examples/ruby/example_test.rb                   | 125 +++++-----
 examples/ruby/helloworld.rb                     |  38 +--
 examples/ruby/helloworld_direct.rb              |  38 +--
 examples/ruby/server.rb                         |  39 ++-
 examples/ruby/simple_recv.rb                    |  36 +--
 examples/ruby/simple_send.rb                    |  33 +--
 proton-c/bindings/ruby/CMakeLists.txt           |   4 +-
 proton-c/bindings/ruby/lib/core/connection.rb   |  67 +++--
 .../bindings/ruby/lib/core/connection_driver.rb |  99 ++++----
 proton-c/bindings/ruby/lib/core/container.rb    | 249 +++++++++++++++++++
 proton-c/bindings/ruby/lib/core/endpoint.rb     |  11 +-
 proton-c/bindings/ruby/lib/core/listener.rb     | 110 ++++++++
 proton-c/bindings/ruby/lib/core/session.rb      |   2 +-
 proton-c/bindings/ruby/lib/core/transport.rb    |  32 +--
 proton-c/bindings/ruby/lib/core/uri.rb          |  50 ++++
 proton-c/bindings/ruby/lib/core/url.rb          |   5 +
 proton-c/bindings/ruby/lib/event/event.rb       |   7 +-
 .../ruby/lib/handler/endpoint_state_handler.rb  |   6 +-
 proton-c/bindings/ruby/lib/qpid_proton.rb       |   8 +
 proton-c/bindings/ruby/lib/reactor/reactor.rb   |   1 +
 proton-c/bindings/ruby/lib/util/condition.rb    |  35 ++-
 proton-c/bindings/ruby/lib/util/engine.rb       |   4 +-
 proton-c/bindings/ruby/lib/util/uri.rb          |  27 --
 .../ruby/tests/test_connection_driver.rb        |   6 +-
 proton-c/bindings/ruby/tests/test_container.rb  | 155 +++++++++---
 proton-c/bindings/ruby/tests/test_tools.rb      |  38 ++-
 31 files changed, 903 insertions(+), 536 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/README.md
----------------------------------------------------------------------
diff --git a/examples/ruby/README.md b/examples/ruby/README.md
index 38cc6ba..8526e29 100644
--- a/examples/ruby/README.md
+++ b/examples/ruby/README.md
@@ -1,32 +1,24 @@
-## What Is The Reactor?
-
-A little outside of the scope of this document, but the reactor is an event source for letting an application know about events in the Proton messaging system. With this set of APIs an application can be register handlers that are notified when a connection is created, a message received, or a session closes.
-
-### Handlers
-
-An application creates **handlers**, objects which provide methods through which the reactor notifies the application's components of events and allows them each to handle the ones in which they are interested (see the Chain Of Responsibility design pattern for more on this idea). There are some pre-defined handlers for responding to incoming message events, outgoing message events, data flow and managing the AMQP endpoints. Look in the **Qpid::Proton::Handlers** package for more details on these classes.
-
-## Simple Reactor Examples
+## Simple Examples
 
 ### The Broker
 
-The reactor examples come with a sample broker which can be used by other examples and which also works as an example itself. For now we'll just start up the broker example and tell it to listen on port 8888:
+The examples come with a sample broker which can be used by other examples and which also works as an example itself. For now we'll just start up the broker example and tell it to listen on port 8888:
 
 ````
-$ ruby ../examples/ruby/reactor/broker.rb  --address=0.0.0.0:8888
-Listening on 0.0.0.0:8888
+$ ruby broker.rb amqp://:8888
+Listening on amqp://:8888
 ````
 
 This example broker will receive messages, create queues as needed, and deliver messages to endpoints.
 
 ### Hello World Using A Broker
 
-Our first example creates an endpoint that sends messages to a queue to which it is subscribed. So it both sends and receives its message in one pass.
+Our first example creates an endpoint that sends messages to a queue to which it is subscribed. So it both sends and receives a message.
 
 To start it, simply run:
 
 ```
-$ ruby ../examples/ruby/reactor/helloworld.rb --address=0.0.0.0:8888 --queue=examples
+$ ruby helloworld.rb //:8888
 Hello world!
 ```
 
@@ -42,12 +34,13 @@ The following events occur while **helloworld.rb** runs:
 
 ### Hello World Without A Broker required
 
-The next example we'll look at will send the classic "Hello world" message to itself directly. This example shows some very fundamental elements of the reactor APIs that you should understand.
+The next example we'll look at will send the classic "Hello world" message to itself directly,
+without going through a broker.
 
 To launch the example:
 
 ```
- $ ruby helloworld_direct.rb --address=0.0.0.0:8888/examples
+ $ ruby helloworld_direct.rb //:9999
  Hello world!
 ```
 
@@ -58,26 +51,25 @@ The direct version takes on the responsibility for listening to incoming connect
  * **on_accepted** - Fired when a message is received.
  * **on_connection_closed** - Fired when an endpoint closes its connection.
 
-## More Complex Reactor Examples
+## More Complex Examples
 
 Now that we've covered the basics with the archetypical hello world app, let's look at some more interesting examples.
 
-There are four example applications that demonstrate how to send and receive messages both directly and through an intermediary, such as a broker:
+The following two client examples send and receive messages to an external broker or server:
 
- * **simple_send.rb** - sends messages to a receiver at a specific address and receives responses via an intermediary,
- * **simple_recv.rb** - receives messages from via an intermediary,
- * **direct_send.rb** - sends messages directly to a receiver and listens for responses itself, and
- * **direct_recv.rb** - receives messages directly.
+ * **simple_send.rb** - connect to a server, send messages to an address
+ * **simple_recv.rb** - connect to a server, receives messages from an address
+
+For example: start `broker.rb`; run `simple_send.rb` to send messages to a
+broker queue; then `simple_recv.rb` to receive the messages from the broker.
 
- Simple send and direct send may, at first, seem to be so similar that you wonder why they're not just the same applciation. And I know for me I was wonder as I wrote the list above why there were two examples. The reason is that **simple_send.rb** uses the intermediary transfer responses to the messages it sends, while **direct_send.rb** uses an *Acceptor* to listen for an process responses.
+The following two examples are *servers* that can be connected to directly, without a broker:
 
- You can use the examples in the follow ways:
+ * **direct_send.rb** - sends messages directly to a receiver and listens for responses itself, and
+ * **direct_recv.rb** - receives messages directly.
 
- ```
- simple_send.rb -> broker <- simple_recv.rb
- simple_send.rb -> direct_recv.rb
- direct_send.rb -> simple_recv.rb
- ```
+For example if you start `direct_recv.rb`, you can connect to it directly with
+`simple_send.rb` vice-versa with `direct_send.rb` and `simple_recv.rb`
 
 In this set of examples we see the following event occurring, in addition to what we've seen before:
 
@@ -91,11 +83,11 @@ The way the broker works is to listen to incoming connections, examine the compo
 
 The components of the broker example include:
  * **Broker** - A class that extends the MessagingHandler class. It accepts incoming connections, manages subscribing them to exchanges, and transfers messages between them.
- * **Exchange** - A class that represents a message queue, tracking what endpoints are subscribed to it.
+ * **MessageQueue** - Distributes messages to subscriptions.
 
 The Broker manages a map connecting a queue address to the instance of Exchange that holds references to the endpoints of interest.
 
-The broker application demonstrates a new set of reactor events:
+The broker application demonstrates a new set of events:
 
  * **on_link_opening** - Fired when a remote link is opened but the local end is not yet open. From this event the broker grabs the address and subscribes the link to an exchange for that address.
  * **on_link_closing** - Fired when a remote link is closed but the local end is still open. From this event the broker grabs the address and unsubscribes the link from that exchange.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/broker.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/broker.rb b/examples/ruby/broker.rb
index 2c37a39..bd461ed 100644
--- a/examples/ruby/broker.rb
+++ b/examples/ruby/broker.rb
@@ -21,11 +21,7 @@ require 'qpid_proton'
 require 'optparse'
 require 'pathname'
 
-def debug(text)
-  print "[#{Time.now.strftime('%s')}] #{text}\n" if $options[:debug]
-end
-
-class Exchange
+class MessageQueue
 
   def initialize(dynamic = false)
     @dynamic = dynamic
@@ -34,30 +30,22 @@ class Exchange
   end
 
   def subscribe(consumer)
-    debug("subscribing #{consumer}")
     @consumers << (consumer)
-    debug(" there are #{@consumers.size} consumers")
   end
 
   def unsubscribe(consumer)
-    debug("unsubscribing #{consumer}")
     if @consumers.include?(consumer)
       @consumers.delete(consumer)
-    else
-      debug(" consumer doesn't exist")
     end
-    debug("  there are #{@consumers.size} consumers")
     @consumers.empty? && (@dynamic || @queue.empty?)
   end
 
   def publish(message)
-    debug("queueing message: #{message.body}")
     @queue << message
     self.dispatch
   end
 
   def dispatch(consumer = nil)
-    debug("dispatching: consumer=#{consumer}")
     if consumer
       c = [consumer]
     else
@@ -69,10 +57,8 @@ class Exchange
   end
 
   def deliver_to(consumers)
-    debug("delivering to #{consumers.size} consumer(s)")
     result = false
     consumers.each do |consumer|
-      debug(" current consumer=#{consumer} credit=#{consumer.credit}")
       if consumer.credit > 0 && !@queue.empty?
         consumer.send(@queue.pop(true))
         result = true
@@ -92,32 +78,23 @@ class Broker < Qpid::Proton::Handler::MessagingHandler
   end
 
   def on_start(event)
-    debug("on_start event")
     @acceptor = event.container.listen(@url)
     print "Listening on #{@url}\n"
   end
 
   def queue(address)
-    debug("fetching queue for #{address}: (there are #{@queues.size} queues)")
     unless @queues.has_key?(address)
-      debug(" creating new queue")
-      @queues[address] = Exchange.new
-    else
-      debug(" using existing queue")
+      @queues[address] = MessageQueue.new
     end
-    result = @queues[address]
-    debug(" returning #{result}")
-    return result
+    @queues[address]
   end
 
   def on_link_opening(event)
-    debug("processing on_link_opening")
-    debug("link is#{event.link.sender? ? '' : ' not'} a sender")
     if event.link.sender?
       if event.link.remote_source.dynamic?
         address = SecureRandom.uuid
         event.link.source.address = address
-        q = Exchange.new(true)
+        q = MessageQueue.new(true)
         @queues[address] = q
         q.subscribe(event.link)
       elsif event.link.remote_source.address
@@ -130,7 +107,6 @@ class Broker < Qpid::Proton::Handler::MessagingHandler
   end
 
   def unsubscribe(link)
-    debug("unsubscribing #{link.source.address}")
     if @queues.has_key?(link.source.address)
       if @queues[link.source.address].unsubscribe(link)
         @queues.delete(link.source.address)
@@ -159,40 +135,21 @@ class Broker < Qpid::Proton::Handler::MessagingHandler
   end
 
   def on_sendable(event)
-    debug("on_sendable event")
     q = self.queue(event.link.source.address)
-    debug(" dispatching #{event.message} to #{q}")
     q.dispatch(event.link)
   end
 
   def on_message(event)
-    debug("on_message event")
     q = self.queue(event.link.target.address)
-    debug(" dispatching #{event.message} to #{q}")
     q.publish(event.message)
   end
 
 end
 
-$options = {
-  :address => "localhost:5672",
-  :debug => false
-}
-
-OptionParser.new do |opts|
-  opts.banner = "Usage: #{Pathname.new(__FILE__).basename} [$options]"
-
-  opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{$options[:address]}).") do |address|
-    $options[:address] = address
-  end
-
-  opts.on("-d", "--debug", "Enable debugging output (def. #{$options[:debug]})") do
-    $options[:debug] = true
-  end
-
-end.parse!
-
-begin
-  Qpid::Proton::Reactor::Container.new(Broker.new($options[:address])).run
-rescue Interrupt
+if ARGV.size != 1
+  STDERR.puts "Usage: #{__FILE__} URL
+Start an example broker listening on URL"
+  return 1
 end
+url, = ARGV
+Qpid::Proton::Container.new(Broker.new(url)).run

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/client.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/client.rb b/examples/ruby/client.rb
index 8c38f38..46c1251 100644
--- a/examples/ruby/client.rb
+++ b/examples/ruby/client.rb
@@ -22,15 +22,17 @@ require 'optparse'
 
 class Client < Qpid::Proton::Handler::MessagingHandler
 
-  def initialize(url, requests)
+  def initialize(url, address, requests)
     super()
     @url = url
+    @address = address
     @requests = requests
   end
 
   def on_start(event)
-    @sender = event.container.create_sender(@url)
-    @receiver = event.container.create_receiver(@sender.connection, :dynamic => true)
+    c = event.container.connect(@url)
+    @sender = c.open_sender(@address)
+    @receiver = c.open_receiver({:dynamic => true})
   end
 
   def next_request
@@ -70,13 +72,10 @@ REQUESTS = ["Twas brillig, and the slithy toves",
             "All mimsy were the borogroves,",
             "And the mome raths outgrabe."]
 
-options = {
-  :address => "localhost:5672/examples",
-}
-
-OptionParser.new do |opts|
-  opts.banner = "Usage: client.rb [options]"
-  opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{options[:address]}).") { |address| options[:address] = address }
-end.parse!
-
-Qpid::Proton::Reactor::Container.new(Client.new(options[:address], REQUESTS)).run
+if ARGV.size != 2
+  STDERR.puts "Usage: #{__FILE__} URL ADDRESS
+Connect to URL and send messages to ADDRESS"
+  return 1
+end
+url, address = ARGV
+Qpid::Proton::Container.new(Client.new(url, address, REQUESTS)).run

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/direct_recv.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/direct_recv.rb b/examples/ruby/direct_recv.rb
index 411efba..b2b0ba9 100644
--- a/examples/ruby/direct_recv.rb
+++ b/examples/ruby/direct_recv.rb
@@ -22,15 +22,16 @@ require 'optparse'
 
 class DirectReceive < Qpid::Proton::Handler::MessagingHandler
 
-  def initialize(url, expected)
+  def initialize(url, address, count)
     super()
     @url = url
-    @expected = expected
+    @address = address
+    @expected = count
     @received = 0
   end
 
   def on_start(event)
-    @acceptor = event.container.listen(@url)
+    event.container.listen(@url)
   end
 
   def on_message(event)
@@ -38,29 +39,17 @@ class DirectReceive < Qpid::Proton::Handler::MessagingHandler
       puts "Received: #{event.message.body}"
       @received = @received + 1
       if @received == @expected
-        event.connection.close
-        @acceptor.close
+        event.container.stop
       end
     end
   end
 end
 
-options = {
-  :address => "localhost:5672/examples",
-  :messages => 100,
-}
-
-OptionParser.new do |opts|
-  opts.banner = "Usage: simple_send.rb [options]"
-
-  opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{options[:address]}).") do |address|
-    options[:address] = address
-  end
-
-  opts.on("-m", "--messages=COUNT", "The number of messages to send (def. #{options[:messages]}",
-    OptionParser::DecimalInteger) do |messages|
-    options[:messages] = messages
-  end
-end.parse!
+unless (2..3).include? ARGV.size
+  STDERR.puts "Usage: #{__FILE__} URL ADDRESS [COUNT]
+Listen on URL and receive COUNT messages from ADDRESS"
+  return 1
+end
+url, address, count = ARGV
+Qpid::Proton::Container.new(DirectReceive.new(url, address, count || 10)).run
 
-Qpid::Proton::Reactor::Container.new(DirectReceive.new(options[:address], options[:messages])).run

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/direct_send.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/direct_send.rb b/examples/ruby/direct_send.rb
index ed679aa..e54e1ab 100644
--- a/examples/ruby/direct_send.rb
+++ b/examples/ruby/direct_send.rb
@@ -20,23 +20,19 @@
 require 'qpid_proton'
 require 'optparse'
 
-options = {
-  :address => "localhost:5672/examples",
-  :messages => 100,
-}
+class DirectSend < Qpid::Proton::Handler::MessagingHandler
 
-class SimpleSend < Qpid::Proton::Handler::MessagingHandler
-
-  def initialize(url, expected)
+  def initialize(url, address, expected)
     super()
     @url = url
+    @address = address
     @sent = 0
     @confirmed = 0
     @expected = expected
   end
 
   def on_start(event)
-    @acceptor = event.container.listen(@url)
+*co    event.container.listen(@url)
   end
 
   def on_sendable(event)
@@ -51,26 +47,15 @@ class SimpleSend < Qpid::Proton::Handler::MessagingHandler
     @confirmed = @confirmed + 1
     if @confirmed == @expected
       puts "All #{@expected} messages confirmed!"
-      event.connection.close
+      event.container.stop
     end
   end
 end
 
-OptionParser.new do |opts|
-  opts.banner = "Usage: simple_send.rb [options]"
-
-  opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{options[:address]}).") do |address|
-    options[:address] = address
-  end
-
-  opts.on("-m", "--messages=COUNT", "The number of messages to send (def. #{options[:messages]}",
-    OptionParser::DecimalInteger) do |messages|
-    options[:messages] = messages
-  end
-end.parse!
-
-begin
-  Qpid::Proton::Reactor::Container.new(SimpleSend.new(options[:address], options[:messages])).run
-rescue Interrupt => error
-  puts "ERROR: #{error}"
+unless (2..3).include? ARGV.size
+  STDERR.puts "Usage: #{__FILE__} URL ADDRESS [COUNT]
+Listen on URL and send COUNT messages to ADDRESS"
+  return 1
 end
+url, address, count = ARGV
+Qpid::Proton::Container.new(DirectSend.new(url, address, count || 10)).run

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/example_test.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/example_test.rb b/examples/ruby/example_test.rb
index 12ab784..bcb6aa6 100755
--- a/examples/ruby/example_test.rb
+++ b/examples/ruby/example_test.rb
@@ -1,4 +1,4 @@
-#!/usr/bin/enc ruby
+#!/usr/bin/env ruby
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -22,56 +22,36 @@ require 'minitest/autorun'
 require 'qpid_proton'
 require 'socket'
 
-class ExampleTest < MiniTest::Test
-
-  def run_script(script, port)
-    cmd = [RbConfig.ruby, script]
-    cmd += ["-a", ":#{port}/examples"] if port
-    return IO.popen(cmd)
+# Wait for the broker to be listening
+def wait_for(url, timeout = 5)
+  deadline = Time.now + 5
+  begin
+    TCPSocket.open("", URI(url).port).close
+  rescue Errno::ECONNREFUSED
+    retry if Time.now < deadline
+    raise
   end
+end
 
-  def assert_output(script, want, port=nil)
-    out = run_script(script, port)
-    assert_equal want, out.read.strip
-  end
+# URL with an unused port
+def test_url()
+  "amqp://:#{TCPServer.open(0) { |s| s.addr[1] }}"
+end
 
-  def test_helloworld
-    assert_output("helloworld.rb", "Hello world!", $port)
-  end
 
-  def test_send_recv
-    assert_output("simple_send.rb", "All 100 messages confirmed!", $port)
-    want = (0..99).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
-    assert_output("simple_recv.rb", want.strip, $port)
-  end
+class ExampleTest < MiniTest::Test
 
-  def test_direct_recv
-    TestPort.new do |tp|
-      p = run_script("direct_recv.rb", tp.port)
-      wait_port tp.port
-      assert_output("simple_send.rb", "All 100 messages confirmed!", tp.port)
-      want = (0..99).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
-      assert_equal(want.strip, p.read.strip)
-    end
+  def run_script(*args)
+    return IO.popen([ RbConfig.ruby ] + args.map { |a| a.to_s })
   end
 
-  def test_direct_send
-    TestPort.new do |tp|
-      p = run_script("direct_send.rb", tp.port)
-      wait_port tp.port
-      want = (0..99).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
-      assert_output("simple_recv.rb", want.strip, $port)
-      assert_equal("All 100 messages confirmed!", p.read.strip)
-    end
+  def assert_output(want, *args)
+    out = run_script(*args)
+    assert_equal(want, out.read.strip)
   end
 
-  def test_direct_send
-    TestPort.start_wait do |port|
-      p = run_script("direct_recv.rb", port)
-      assert_output("simple_send.rb", "All 100 messages confirmed!", port)
-      want = (0..99).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
-      assert_equal(want.strip, p.read.strip)
-    end
+  def test_helloworld
+    assert_output("Hello world!", "helloworld.rb", $url, __method__)
   end
 
   def test_client_server
@@ -85,28 +65,57 @@ class ExampleTest < MiniTest::Test
 -> And the mome raths outgrabe.
 <- AND THE MOME RATHS OUTGRABE.
 EOS
-    srv = run_script("server.rb", $port)
-    assert_output("client.rb", want.strip, $port)
-
+    server = run_script("server.rb", $url, __method__)
+    assert_output(want.strip, "client.rb", $url, __method__)
   ensure
-    Process.kill :TERM, srv.pid if srv
+    Process.kill :TERM, server.pid if server
   end
-end
 
-# Start the broker before all tests.
-$port = TCPServer.open(0) do |s| s.addr[1]; end # find an unused port
-$broker = spawn("#{RbConfig.ruby} reactor/broker.rb -a :#{$port}")
+  def test_send_recv
+    assert_output("All 10 messages confirmed!", "simple_send.rb", $url, __method__)
+    want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
+    assert_output(want.strip, "simple_recv.rb", $url, __method__)
+  end
 
-# Wait for the broker to be listening
-deadline = Time.now + 5
-begin
-  TCPSocket.open("", $port).close
-rescue Errno::ECONNREFUSED
-  retry if Time.now < deadline
-  raise
+  def test_helloworld_direct
+    url = test_url
+    assert_output("Hello world!", "helloworld_direct.rb", url, __method__)
+  end
+
+  def test_direct_recv
+    url = test_url
+    p = run_script("direct_recv.rb", url, __method__)
+    wait_for url
+    assert_output("All 10 messages confirmed!", "simple_send.rb", url, __method__)
+    want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
+    assert_equal(want.strip, p.read.strip)
+  end
+
+  def test_direct_send
+    url = test_url
+    p = run_script("direct_send.rb", url, __method__)
+    wait_for url
+    want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
+    assert_output(want.strip, "simple_recv.rb", url, __method__)
+    assert_equal("All 10 messages confirmed!", p.read.strip)
+  end
+
+  def test_direct_send
+    url = test_url
+    p = run_script("direct_recv.rb", url, __method__)
+    wait_for url
+    assert_output("All 10 messages confirmed!", "simple_send.rb", url, __method__)
+    want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" }
+    assert_equal(want.strip, p.read.strip)
+  end
 end
 
+# Start the broker before all tests.
+$url = test_url
+$broker = IO.popen([RbConfig.ruby, 'broker.rb', $url])
+wait_for $url
+
 # Kill the broker after all tests
 MiniTest.after_run do
-  Process.kill(:TERM, $broker) if $broker
+  Process.kill(:TERM, $broker.pid) if $broker
 end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/helloworld.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/helloworld.rb b/examples/ruby/helloworld.rb
index 9b02e8a..2b2d4f5 100644
--- a/examples/ruby/helloworld.rb
+++ b/examples/ruby/helloworld.rb
@@ -22,16 +22,15 @@ require 'optparse'
 
 class HelloWorld < Qpid::Proton::Handler::MessagingHandler
 
-  def initialize(server, address)
+  def initialize(url, address)
     super()
-    @server = server
-    @address = address
+    @url, @address = url, address
   end
 
   def on_start(event)
-    conn = event.container.connect(:address => @server)
-    event.container.create_sender(conn, :target => @address)
-    event.container.create_receiver(conn, :source => @address)
+    conn = event.container.connect(@url)
+    conn.open_sender(@address)
+    conn.open_receiver(@address)
   end
 
   def on_sendable(event)
@@ -51,23 +50,10 @@ class HelloWorld < Qpid::Proton::Handler::MessagingHandler
   end
 end
 
-options = {
-  :address => "localhost:5672",
-  :queue => "examples"
-}
-
-OptionParser.new do |opts|
-  opts.banner = "Usage: helloworld_direct.rb [options]"
-
-  opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{options[:address]}).") do |address|
-    options[:address] = address
-  end
-
-  opts.on("-q", "--queue=QUEUE", "Send messages to QUEUE (def. #{options[:queue]})") do |queue|
-    options[:queue] = queue
-  end
-
-end.parse!
-
-hw = HelloWorld.new(options[:address], "examples")
-Qpid::Proton::Reactor::Container.new(hw).run
+if ARGV.size != 2
+  STDERR.puts "Usage: #{__FILE__} URL ADDRESS
+Connect to URL, send a message to ADDRESS and receive it back"
+  return 1
+end
+url, address = ARGV
+Qpid::Proton::Container.new(HelloWorld.new(url, address)).run

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/helloworld_direct.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/helloworld_direct.rb b/examples/ruby/helloworld_direct.rb
index e98cc1f..dab368b 100644
--- a/examples/ruby/helloworld_direct.rb
+++ b/examples/ruby/helloworld_direct.rb
@@ -20,22 +20,19 @@
 require 'qpid_proton'
 require 'optparse'
 
-options = {
-  :address => "localhost:5672/examples",
-}
-
 class HelloWorldDirect < Qpid::Proton::Handler::MessagingHandler
 
   include Qpid::Proton::Util::Wrapper
 
-  def initialize(url)
+  def initialize(url, address)
     super()
-    @url = url
+    @url, @address = url, address
   end
 
   def on_start(event)
-    @acceptor = event.container.listen(@url)
-    event.container.create_sender(@url)
+    event.container.listen(@url)
+    c = event.container.connect(@url) # Connect to self!
+    c.open_sender(@address)
   end
 
   def on_sendable(event)
@@ -50,25 +47,14 @@ class HelloWorldDirect < Qpid::Proton::Handler::MessagingHandler
   end
 
   def on_accepted(event)
-    event.connection.close
-  end
-
-  def on_connection_closed(event)
-    @acceptor.close
+    event.container.stop
   end
-
 end
 
-OptionParser.new do |opts|
-  opts.banner = "Usage: helloworld_direct.rb [options]"
-
-  opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{options[:address]}).") do |address|
-    options[:address] = address
-  end
-
-end.parse!
-
-begin
-  Qpid::Proton::Reactor::Container.new(HelloWorldDirect.new(options[:address])).run
-rescue Interrupt => error
+if ARGV.size != 2
+  STDERR.puts "Usage: #{__FILE__} URL ADDRESS
+Listen on and connect to URL (connect to self), send a message to ADDRESS and receive it back"
+  return 1
 end
+url, address = ARGV
+Qpid::Proton::Container.new(HelloWorldDirect.new(url, address)).run

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/server.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/server.rb b/examples/ruby/server.rb
index 9373272..87eba99 100644
--- a/examples/ruby/server.rb
+++ b/examples/ruby/server.rb
@@ -22,35 +22,31 @@ require 'optparse'
 
 class Server < Qpid::Proton::Handler::MessagingHandler
 
-  def initialize(url)
+  def initialize(url, address)
     super()
-    @url = Qpid::Proton::URL.new url
-    @address = @url.path
+    @url = url
+    @address = address
     @senders = {}
   end
 
   def on_start(event)
-    @container = event.container
-    @conn = @container.connect(:url => @url)
-    @receiver = @container.create_receiver(@conn, :source => @address)
+    c = event.container.connect(@url)
+    c.open_receiver(@address)
     @relay = nil
   end
 
   def on_connection_opened(event)
     if event.connection.remote_offered_capabilities &&
-      event.connection.remote_offered_capabilities.contain?("ANONYMOUS-RELAY")
-      @relay = @container.create_sender(@conn, nil)
+        event.connection.remote_offered_capabilities.contain?("ANONYMOUS-RELAY")
+      @relay = event.connection.open_sender({:target => nil})
     end
   end
 
   def on_message(event)
     msg = event.message
+    return unless msg.reply_to  # Not a request message
     puts "<- #{msg.body}"
-    sender = @relay || @senders[msg.reply_to]
-    if sender.nil?
-      sender = @container.create_sender(@conn, :target => msg.reply_to)
-      @senders[msg.reply_to] = sender
-    end
+    sender = @relay || (@senders[msg.reply_to] ||= event.connection.open_sender(msg.reply_to))
     reply = Qpid::Proton::Message.new
     reply.address = msg.reply_to
     reply.body = msg.body.upcase
@@ -64,13 +60,10 @@ class Server < Qpid::Proton::Handler::MessagingHandler
   end
 end
 
-options = {
-  :address => "localhost:5672/examples",
-}
-
-OptionParser.new do |opts|
-  opts.banner = "Usage: server.rb [options]"
-  opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{options[:address]}).") { |address| options[:address] = address }
-end.parse!
-
-Qpid::Proton::Reactor::Container.new(Server.new(options[:address])).run()
+if ARGV.size != 2
+  STDERR.puts "Usage: #{__FILE__} URL ADDRESS
+Server listening on URL, reply to messages to ADDRESS"
+  return 1
+end
+url, address = ARGV
+Qpid::Proton::Container.new(Server.new(url, address)).run

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/simple_recv.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/simple_recv.rb b/examples/ruby/simple_recv.rb
index 47b21ed..d1a9607 100644
--- a/examples/ruby/simple_recv.rb
+++ b/examples/ruby/simple_recv.rb
@@ -20,17 +20,19 @@
 require 'qpid_proton'
 require 'optparse'
 
-class Receiver < Qpid::Proton::Handler::MessagingHandler
+class SimpleReceive < Qpid::Proton::Handler::MessagingHandler
 
-  def initialize(url, count)
+  def initialize(url, address, count)
     super()
     @url = url
+    @address = address
     @expected = count
     @received = 0
   end
 
   def on_start(event)
-    event.container.create_receiver(@url)
+    c = event.container.connect(@url)
+    c.open_receiver(@address)
   end
 
   def on_message(event)
@@ -42,28 +44,14 @@ class Receiver < Qpid::Proton::Handler::MessagingHandler
       end
     end
   end
-
 end
 
-options = {
-  :address => "localhost:5672/examples",
-  :messages => 100,
-}
-
-OptionParser.new do |opts|
-  opts.banner = "Usage: simple_send.rb [options]"
-
-  opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{options[:address]}).") do |address|
-    options[:address] = address
-  end
+unless (2..3).include? ARGV.size
+  STDERR.puts "Usage: #{__FILE__} URL ADDRESS [COUNT]}
+Connect to URL and receive COUNT messages from ADDRESS"
+  return 1
+end
+url, address, count = ARGV
 
-  opts.on("-m", "--messages=COUNT", "The number of messages to send (def. #{options[:messages]}",
-    OptionParser::DecimalInteger) do |messages|
-    options[:messages] = messages
-  end
-end.parse!
+Qpid::Proton::Container.new(SimpleReceive.new(url, address, count || 10)).run
 
-begin
-  Qpid::Proton::Reactor::Container.new(Receiver.new(options[:address], options[:messages])).run
-rescue Interrupt
-end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/simple_send.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/simple_send.rb b/examples/ruby/simple_send.rb
index 38857f3..25fc56c 100644
--- a/examples/ruby/simple_send.rb
+++ b/examples/ruby/simple_send.rb
@@ -20,23 +20,20 @@
 require 'qpid_proton'
 require 'optparse'
 
-options = {
-  :address => "localhost:5672/examples",
-  :messages => 100,
-}
-
 class SimpleSend < Qpid::Proton::Handler::MessagingHandler
 
-  def initialize(url, expected)
+  def initialize(url, address, expected)
     super()
     @url = url
+    @address = address
     @sent = 0
     @confirmed = 0
     @expected = expected
   end
 
   def on_start(event)
-    event.container.create_sender(@url)
+    c = event.container.connect(@url)
+    c.open_sender(@address)
   end
 
   def on_sendable(event)
@@ -54,20 +51,12 @@ class SimpleSend < Qpid::Proton::Handler::MessagingHandler
       event.connection.close
     end
   end
-
 end
 
-OptionParser.new do |opts|
-  opts.banner = "Usage: simple_send.rb [options]"
-
-  opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{options[:address]}).") do |address|
-    options[:address] = address
-  end
-
-  opts.on("-m", "--messages=COUNT", "The number of messages to send (def. #{options[:messages]}",
-    OptionParser::DecimalInteger) do |messages|
-    options[:messages] = messages
-  end
-end.parse!
-
-Qpid::Proton::Reactor::Container.new(SimpleSend.new(options[:address], options[:messages])).run
+unless (2..3).include? ARGV.size
+  STDERR.puts "Usage: #{__FILE__} URL ADDRESS [COUNT]}
+Connect to URL and send COUNT messages to ADDRESS"
+  return 1
+end
+url, address, count = ARGV
+Qpid::Proton::Container.new(SimpleSend.new(url, address, count || 10)).run

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/CMakeLists.txt b/proton-c/bindings/ruby/CMakeLists.txt
index 46b7ba8..c00fd3d 100644
--- a/proton-c/bindings/ruby/CMakeLists.txt
+++ b/proton-c/bindings/ruby/CMakeLists.txt
@@ -45,7 +45,7 @@ set_target_properties(cproton-ruby
 
 ##  Make a gem
 
-file(GLOB_RECURSE RUBY_SRC RELATIVE . *.rb *.rdoc)
+file(GLOB_RECURSE RUBY_SRC RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} *.rb *.rdoc)
 
 find_program(GEM_EXE gem DOC "Program to build and install ruby gem packages")
 mark_as_advanced(GEM_EXE)
@@ -133,7 +133,7 @@ if (YARD_EXE)
   add_custom_command(
     OUTPUT ${bin}/doc
     WORKING_DIRECTORY ${src}
-    COMMAND ${YARD_EXE} -q -o ${bin}/doc -b ${bin}/.yardoc --no-private -r README.rdoc
+    COMMAND ${YARD_EXE} -q -o ${bin}/doc -b ${bin}/.yardoc --no-progress --no-private -r README.rdoc
     DEPENDS ${RUBY_SRC}
     )
   add_custom_target(docs-ruby DEPENDS ${bin}/doc)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/core/connection.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection.rb b/proton-c/bindings/ruby/lib/core/connection.rb
index 6caf589..0c878d4 100644
--- a/proton-c/bindings/ruby/lib/core/connection.rb
+++ b/proton-c/bindings/ruby/lib/core/connection.rb
@@ -23,21 +23,17 @@ module Qpid::Proton
   class Connection < Endpoint
 
     protected
-    PROTON_METHOD_PREFIX = "pn_connection"
     include Util::SwigHelper
+    PROTON_METHOD_PREFIX = "pn_connection"
 
     public
 
     # @!attribute hostname
-    #
-    # @return [String] The AMQP hostname for the connection.
-    #
+    #   @return [String] The AMQP hostname for the connection.
     proton_accessor :hostname
 
     # @!attribute user
-    #   The user name for authentication.
-    #
-    #   @return [String] the user name
+    #   @return [String] User name used for authentication (outgoing connection) or the authenticated user name (incoming connection)
     proton_accessor :user
 
     # @private
@@ -45,6 +41,7 @@ module Qpid::Proton
 
     # @private
     attr_accessor :overrides
+    # @private
     attr_accessor :session_policy
 
     # @private
@@ -68,12 +65,11 @@ module Qpid::Proton
     def initialize(impl = Cproton.pn_connection)
       super()
       @impl = impl
-      @offered_capabilities = nil
-      @desired_capabilities = nil
-      @properties = nil
       @overrides = nil
       @collector = nil
       @session_policy = nil
+      @link_count = 0
+      @link_prefix = ""
       self.class.store_instance(self, :pn_connection_attachments)
     end
 
@@ -195,18 +191,34 @@ module Qpid::Proton
 
     # Open the local end of the connection.
     #
-    # @option options [String] :container_id Unique AMQP container ID, defaults to a UUID
-    # @option options [String] :link_prefix Prefix for generated link names, default is container_id
-    #
-    def open(options={})
-      object_to_data(@offered_capabilities, Cproton.pn_connection_offered_capabilities(@impl))
-      object_to_data(@desired_capabilities, Cproton.pn_connection_desired_capabilities(@impl))
-      object_to_data(@properties, Cproton.pn_connection_properties(@impl))
-      cid = options[:container_id] || SecureRandom.uuid
-      Cproton.pn_connection_set_container(@impl, cid)
-      @link_prefix = options[:link_prefix] || cid
-      @link_prefix = SecureRandom.uuid if !@link_prefix || @link_prefix.empty?
-      @link_count = 0
+    # @option opts [MessagingHandler] :handler handler for events related to this connection.
+    # @option opts [String] :user user-name for authentication.
+    # @option opts [String] :password password for authentication.
+    # @option opts [Numeric] :idle_timeout seconds before closing an idle connection
+    # @option opts [Boolean] :sasl_enabled Enable or disable SASL.
+    # @option opts [Boolean] :sasl_allow_insecure_mechs Allow mechanisms that disclose clear text
+    #   passwords, even over an insecure connection.
+    # @option opts [String] :sasl_allowed_mechs the allowed SASL mechanisms for use on the connection.
+    # @option opts [String] :container_id AMQP container ID, normally provided by {Container}
+    #
+    def open(opts={})
+      return if local_active?
+      apply opts
+      Cproton.pn_connection_open(@impl)
+    end
+
+    # @private
+    def apply opts
+      # NOTE: Only connection options are set here. Transport options are set
+      # with {Transport#apply} from the connection_driver (or in
+      # on_connection_bound if not using a connection_driver)
+      Cproton.pn_connection_set_container(@impl, opts[:container_id] || SecureRandom.uuid)
+      Cproton.pn_connection_set_user(@impl, opts[:user]) if opts[:user]
+      Cproton.pn_connection_set_password(@impl, opts[:password]) if opts[:password]
+      @link_prefix = opts[:link_prefix] || container_id
+      object_to_data(opts[:offered_capabilities], Cproton.pn_connection_offered_capabilities(@impl))
+      object_to_data(opts[:desired_capabilities], Cproton.pn_connection_desired_capabilities(@impl))
+      object_to_data(opts[:properties], Cproton.pn_connection_properties(@impl))
       Cproton.pn_connection_open(@impl)
     end
 
@@ -220,8 +232,11 @@ module Qpid::Proton
     # Once this operation has completed, the #LOCAL_CLOSED state flag will be
     # set.
     #
-    def close
-      self._update_condition
+    def close(error = nil)
+      if error
+        @condition = Condition.make error
+        self._update_condition
+      end
       Cproton.pn_connection_close(@impl)
     end
 
@@ -257,10 +272,10 @@ module Qpid::Proton
     end
 
     # Open a sender on the default_session
-    def open_sender(*args, &block) default_session.open_sender(*args, &block) end
+    def open_sender(opts = {}) default_session.open_sender(opts) end
 
     # Open a  on the default_session
-    def open_receiver(*args, &block) default_session.open_receiver(*args, &block) end
+    def open_receiver(opts = {}) default_session.open_receiver(opts) end
 
     # Returns the first session from the connection that matches the specified
     # state mask.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/core/connection_driver.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection_driver.rb b/proton-c/bindings/ruby/lib/core/connection_driver.rb
index b5b38ac..6c85659 100644
--- a/proton-c/bindings/ruby/lib/core/connection_driver.rb
+++ b/proton-c/bindings/ruby/lib/core/connection_driver.rb
@@ -36,23 +36,30 @@ module Qpid
       #
       # @param io [#read_nonblock, #write_nonblock] An {IO} or {IO}-like object that responds
       #   to #read_nonblock and #write_nonblock.
-      # @param handler [MessagingHandler] The handler to be invoked for AMQP events
-      #
-      def initialize io, handler=nil
+      # @param opts [Hash] See {Connection#open} - transport options are set here,
+      # remaining options
+      # @pram server [Bool] If true create a server (incoming) connection
+      def initialize(io, opts = {}, server=false)
         @impl = Cproton.pni_connection_driver or raise RuntimeError, "cannot create connection driver"
         @io = io
-        @handler = handler || Handler::MessagingHandler.new # Default handler for default behaviour
-        @rbuf = ""                                          # String to re-use as read buffer
+        @handler = opts[:handler] || Handler::MessagingHandler.new # Default handler if missing
+        @rbuf = ""              # String to re-use as read buffer
+        connection.apply opts
+        transport.set_server if server
+        transport.apply opts
       end
 
-      # @return [MessagingHandler]
       attr_reader :handler
 
       # @return [Connection]
-      def connection() Connection.wrap(Cproton.pni_connection_driver_connection(@impl)); end
+      def connection()
+        @connection ||= Connection.wrap(Cproton.pni_connection_driver_connection(@impl))
+      end
 
       # @return [Transport]
-      def transport() Transport.wrap(Cproton.pni_connection_driver_transport(@impl)); end
+      def transport()
+        @transport ||= Transport.wrap(Cproton.pni_connection_driver_transport(@impl))
+      end
 
       # @return [IO] Allows ConnectionDriver to be passed directly to {IO#select}
       def to_io() @io; end
@@ -63,21 +70,15 @@ module Qpid
       # @return [Bool] True if the driver has data to write
       def can_write?() Cproton.pni_connection_driver_write_size(@impl) > 0; end
 
-      # True if read and write sides of the IO are closed. Note this does not imply
-      # {#finished?} since there may still be events to dispatch.
-      def closed?
-        Cproton.pn_connection_driver_read_closed(@impl) &&
-          Cproton.pn_connection_driver_read_closed(@impl)
-      end
-
-      # True if the ConnectionDriver has nothing left to do: {#closed?} and
-      # there are no more events to dispatch.
+      # True if the ConnectionDriver has nothing left to do: both sides of the
+      # transport are closed and there are no events to dispatch.
       def finished?() Cproton.pn_connection_driver_finished(@impl); end
 
       # Dispatch available events, call the relevant on_* methods on the {#handler}.
       def dispatch(extra_handlers = nil)
         extra_handlers ||= []
         while event = Event::Event.wrap(Cproton.pn_connection_driver_next_event(@impl))
+          pre_dispatch(event)
           event.dispatch(@handler)
           extra_handlers.each { |h| event.dispatch h }
         end
@@ -90,13 +91,12 @@ module Qpid
         return if size <= 0
         @io.read_nonblock(size, @rbuf) # Use the same string rbuf for reading each time
         Cproton.pni_connection_driver_read_copy(@impl, @rbuf) unless @rbuf.empty?
-        rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
-          # Try again later.
-        rescue EOFError         # EOF is not an error
-          Cproton.pn_connection_driver_read_close(@impl)
-        rescue IOError => e     # IOError is passed to the transport
-          error "read: #{e}"
-          Cproton.pn_connection_driver_read_close(@impl)
+      rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
+        # Try again later.
+      rescue EOFError         # EOF is not an error
+        close_read
+      rescue IOError, SystemCallError => e     #  is passed to the transport
+        close e
       end
 
       # Write to IO without blocking.
@@ -106,9 +106,8 @@ module Qpid
         Cproton.pn_connection_driver_write_done(@impl, n) if n > 0
       rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
         # Try again later.
-      rescue IOError => e
-        error "write: #{e}"
-        Cproton.pn_connection_driver_write_close(@impl)
+      rescue IOError, SystemCallError => e
+        close e
       end
 
       # Generate timed events and IO, for example idle-timeout and heart-beat events.
@@ -142,41 +141,37 @@ module Qpid
         return next_tick
       end
 
-      # Close the read side of the IO with optional error.
-      # @param e [#to_s] Non-nil error will call {#handler}.on_transport_error on next {#dispatch}
-      def close_read(e=nil)
-          @io.close_read
-          error(e)
-          Cproton.pn_connection_driver_read_close(@impl)
+      # Close the read side of the transport
+      def close_read
+        return if Cproton.pn_connection_driver_read_closed(@impl)
+        Cproton.pn_connection_driver_read_close(@impl)
+        @io.close_read
       end
 
-      # Close the write side of the IO with optional error
-      # @param e [#to_s] Non-nil error will call {#handler}.on_transport_error on next {#dispatch}
-      def close_write(e=nil)
-          @io.close_write
-          error(e)
-          Cproton.pn_connection_driver_write_close(@impl)
+      # Close the write side of the transport
+      def close_write
+        return if Cproton.pn_connection_driver_write_closed(@impl)
+        Cproton.pn_connection_driver_write_close(@impl)
+        @io.close_write
       end
 
       # Close both sides of the IO with optional error
-      # @param e [#to_s] Non-nil error will call {#handler}.on_transport_error on next {#dispatch}
-      def close(e=nil)
-        if !closed?
-          close_read(e)
-          close_write(e)
+      # @param error [Condition] If non-nil pass to {#handler}.on_transport_error on next {#dispatch}
+      # Note `error` can be any value accepted by [Condition##make]
+      def close(error=nil)
+        if error
+          cond = Condition.make(error, "proton:io")
+          Cproton.pn_connection_driver_errorf(@impl, cond.name, "%s", cond.description)
         end
+        close_read
+        close_write
       end
 
-      def to_s
-        transport = Cproton.pni_connection_driver_tranport(@impl)
-        return "#<#{self.class.name}[#{transport}]:#{@io}>"
-      end
+      protected
 
-      private
+      # Override in subclass to add event context
+      def pre_dispatch(event) event; end
 
-      def error(e)
-        Cproton.pn_connection_driver_errorf(@impl, "proton:io", "%s", e.to_s) if e
-      end
     end
   end
 end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
new file mode 100644
index 0000000..29df51b
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -0,0 +1,249 @@
+#--
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#++
+
+require 'thread'
+require 'set'
+require_relative 'listener'
+
+module Qpid::Proton
+
+  # An AMQP container manages a set of {Connection}s which contain {#Sender} and
+  # {#Receiver} links to transfer messages.
+  #
+  # TODO aconway 2017-10-26: documentthreading/dispatch role
+  #
+  # Usually, each AMQP client or server process has a single container for
+  # all of its connections and links.
+  class Container
+    private
+
+    def amqp_uri(s) Qpid::Proton::amqp_uri s; end
+
+    class ConnectionDriver < Qpid::Proton::ConnectionDriver
+      def initialize container, io, opts, server=false
+        super io, opts, server
+        @container = container
+      end
+
+      def final() end
+      def pre_dispatch(event) event.container = @container; end
+    end
+
+    public
+
+    # Create a new Container
+    #
+    # @param opts [Hash] Options
+    # @option opts [String] :id A unique ID for this container. Defaults to a random UUID.
+    # @option opts [MessagingHandler] :handler Default handler for connections
+    #   that do not have their own handler (see {#connect} and {#listen})
+    #
+    #   @note In a multi-threaded, multi-connection container the default
+    #   handler can be called concurrently for different connections. An
+    #   individual handler attached to a single connection is never called
+    #   concurrently, so that is the recommended approach for multi-threading.
+    def initialize(opts = {})
+      opts = { :handler => opts } unless opts.is_a? Hash # Allow handler as only parameter
+      opts = { :handler => opts } if opts.is_a? String   # Allow ID as only parameter option
+      @handler = opts[:handler]
+      @id = String.new(opts[:id] || SecureRandom.uuid).freeze
+      @work = Queue.new
+      @work.push self           # Let the first #run thread select
+      @wake = IO.pipe
+      @dummy = ""               # Dummy buffer for draining wake pipe
+      @lock = Mutex.new
+      @selectables = Set.new    # ConnectionDrivers and Listeners
+      @auto_stop = true
+      @active = 0               # activity (connection, listener) counter for auto_stop
+      @running = 0              # concurrent calls to #run
+    end
+
+    # @return [String] Unique identifier for this container
+    attr_reader :id
+
+    # Open an AMQP connection.
+    #
+    # @param url [String, URI] Open a {TCPSocket} to url.host, url.port.
+    # url.scheme must be "amqp" or "amqps", url.scheme.nil? is treated as "amqp"
+    # url.user, url.password are used as defaults if opts[:user], opts[:password] are nil
+    # @option (see Connection#open)
+    # @return [Connection] The new AMQP connection
+    #
+    def connect(url, opts = {})
+      url = amqp_uri(url)
+      opts[:user] ||= url.user
+      opts[:password] ||= url.password
+      # TODO aconway 2017-10-26: Use SSL for amqps URLs
+      connect_io(TCPSocket.new(url.host, url.port), opts)
+    end
+
+    # Open an AMQP protocol connection on an existing {IO} object
+    # @param io [IO] An existing {IO} object, e.g. a {TCPSocket}
+    # @option (see Connection#open)
+    def connect_io(io, opts = {})
+      cd = connection_driver(io, opts)
+      cd.connection.open()
+      add(cd).connection
+    end
+
+    # Listen for incoming AMQP connections
+    #
+    # @param url [String,URI] Listen on host:port of the AMQP URL
+    # @param handler [ListenHandler] A {ListenHandler} object that will be called
+    # with events for this listener and can generate a new set of options for each one.
+    # @return [Listener] The AMQP listener.
+    def listen(url, handler=ListenHandler.new)
+      url = amqp_uri(url)
+      # TODO aconway 2017-11-01: amqps
+      listen_io(TCPServer.new(url.host, url.port), handler)
+    end
+
+    # Listen for incoming AMQP connections on an existing server socket.
+    # @param io A server socket, for example a {TCPServer}
+    # @param handler [ListenHandler] Handler for events from this listener
+    def listen_io(io, handler=ListenHandler.new)
+      add(Listener.new(io, handler))
+    end
+
+    # Run the container: wait for IO activity, dispatch events to handlers.
+    #
+    # More than one thread can call {#run} concurrently, the container will use
+    # all the {#run} ,threads as a pool to handle multiple connections
+    # concurrently.  The container ensures that handler methods for a single
+    # connection (or listener) instance are serialized, even if the container
+    # has multiple threads.
+    #
+    def run()
+      @lock.synchronize { @running += 1 }
+
+      unless @on_start
+        @on_start = true
+        # TODO aconway 2017-10-28: proper synthesized event for on_start
+        event = Class.new do
+          def initialize(c) @container = c; end
+          attr_reader :container
+        end.new(self)
+        @handler.on_start(event) if @handler && @handler.respond_to?(:on_start)
+      end
+
+      while x = @work.pop
+        case x
+        when Container then
+          # Only one thread can select at a time
+          r, w = [@wake[0]], []
+          @lock.synchronize do
+            @selectables.each do |s|
+              r << s if s.send :can_read?
+              w << s if s.send :can_write?
+            end
+          end
+          r, w = IO.select(r, w)
+          selected = Set.new(r).merge(w)
+          if selected.delete?(@wake[0]) # Drain the wake pipe
+            begin
+              @wake[0].read_nonblock(256, @dummy) while true
+            rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
+            end
+          end
+          @lock.synchronize do
+            if @stop_all
+              selected = @selectables
+              @selectables = Set.new
+              selected.each { |s| s.close }
+              @work << nil if selected.empty? # Already idle, initiate stop now
+              @stop_all = false
+            else
+              @selectables.subtract(selected)
+            end
+          end
+          # Move selected items to the work queue for serialized processing.
+          @lock.synchronize { @selectables.subtract(selected) }
+          selected.each { |s| @work << s } # Queue up all the work
+          @work << self                    # Allow another thread to select()
+        when ConnectionDriver then
+          x.process
+          rearm x
+        when Listener then
+          io, opts = x.send :process
+          add(connection_driver(io, opts, true)) if io
+          rearm x
+        end
+        # TODO aconway 2017-10-26: scheduled tasks
+      end
+    ensure
+      @running -= 1
+      if @running > 0 # Signal the next #run thread that we are stopping
+        @work << nil
+        wake
+      end
+    end
+
+    # @!attribute auto_stop [rw]
+    #   @return [Bool] With auto_stop enabled, all calls to {#run} will return when the
+    #   container's last activity (connection, listener or scheduled event) is
+    #   closed/completed. With auto_stop disabled {#run} does not return.
+    def auto_stop=(enabled) @lock.synchronize { @auto_stop=enabled }; wake; end
+    def auto_stop() @lock.synchronize { @auto_stop }; end
+
+    # Enable {#auto_stop} and close all connections and listeners with error.
+    # {#stop} returns immediately, calls to {#run} will return when all activity is finished.
+    # @param error [Condition] If non-nil pass to {#handler}.on_error
+    # Note `error` can be any value accepted by [Condition##make]
+    def stop(error=nil)
+      @lock.synchronize do
+        @auto_stop = true
+        @stop_all = true
+        wake
+      end
+    end
+
+    private
+
+    # Always wake when we add new work
+    def work(s) work << s; wake; end
+
+    def wake()
+      @wake[1].write_nonblock('x') rescue nil
+    end
+
+    def connection_driver(io, opts, server=false)
+      opts[:container_id] ||= @id
+      opts[:handler] ||= @handler
+      ConnectionDriver.new(self, io, opts, server)
+    end
+
+    def add(s)
+      @lock.synchronize do
+        @active += 1
+      end
+      @work << s
+      wake
+      return s
+    end
+
+    def rearm s
+      if s.send :finished?
+        @lock.synchronize { @work << nil if (@active -= 1).zero? && @auto_stop }
+      else
+        @lock.synchronize { @selectables << s }
+      end
+      wake
+    end
+  end
+end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/core/endpoint.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/endpoint.rb b/proton-c/bindings/ruby/lib/core/endpoint.rb
index 7c6f0a3..04551eb 100644
--- a/proton-c/bindings/ruby/lib/core/endpoint.rb
+++ b/proton-c/bindings/ruby/lib/core/endpoint.rb
@@ -69,6 +69,9 @@ module Qpid::Proton
       object_to_condition(@condition, self._local_condition)
     end
 
+    def condition
+      condition_to_object(_local_condition) || remote_condition; end
+
     # @private
     def remote_condition
       condition_to_object(self._remote_condition)
@@ -82,6 +85,10 @@ module Qpid::Proton
       self.connection.transport
     end
 
+    # @private
+    # @return [Bool] true if {#state} has all the bits of `mask` set
+    def check_state(mask) (self.state & mask) == mask; end
+
     # @return [Bool] true if endpoint has sent and received a CLOSE frame
     def closed?() check_state(LOCAL_CLOSED | REMOTE_CLOSED); end
 
@@ -137,10 +144,6 @@ module Qpid::Proton
       Cproton.pn_decref(impl)
     end
 
-    private
-
-    def check_state(mask) (self.state & mask) == mask; end
-
   end
 
 end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/core/listener.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/listener.rb b/proton-c/bindings/ruby/lib/core/listener.rb
new file mode 100644
index 0000000..e5f5c0d
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/core/listener.rb
@@ -0,0 +1,110 @@
+#--
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#++
+
+module Qpid::Proton
+  # A listener for incoming connections.
+  #
+  # Create with {Container#listen} or  {Container#listen_with}
+  class Listener
+    # The listener's container
+    attr_reader :container
+
+    # Close the listener
+    # @param error [Condition] Optional error condition.
+    def close(error=nil)
+      @closed ||= Condition.make(error) || true
+      @io.close_read rescue nil # Cause listener to wake out of IO.select
+    end
+
+    # Get the {IO} server socket used by the listener
+    def to_io() @io; end
+
+    private                     # Called by {Container}
+
+    def initialize(io, handler)
+      @io, @handler = io, handler
+    end
+
+    def process
+      unless @closed
+        unless @open_dispatched
+          dispatch(:on_open)
+          @open_dispatched = true
+        end
+        begin
+          return @io.accept, dispatch(:on_accept)
+        rescue IO::WaitReadable, Errno::EINTR
+        rescue IOError, SystemCallError => e
+          close e
+        end
+      end
+      if @closed
+        dispatch(:on_error, @closed) if @closed != true
+        dispatch(:on_close)
+        close @io unless @io.closed? rescue nil
+      end
+    end
+
+    def can_read?() true; end
+    def can_write?() false; end
+    def finished?() @closed; end
+
+    # TODO aconway 2017-11-06: logging strategy
+    TRUE = Set[:true, :"1", :yes, :on]
+    def log?()
+      enabled = ENV['PN_TRACE_EVT']
+      TRUE.include? enabled.downcase.to_sym if enabled
+    end
+
+    def dispatch(method, *args)
+      STDERR.puts "(Listener 0x#{object_id.to_s(16)})[#{method}]" if log?
+      @handler.send(method, self, *args) if @handler && @handler.respond_to?(method)
+    end
+  end
+
+
+  # Class that handles listener events and provides options for accepted
+  # connections. This class simply returns a fixed set of options for every
+  # connection accepted, but you can subclass and override all of the on_
+  # methods to provide more interesting behaviour.
+  class ListenHandler
+    # @param opts [Hash] Options to return from on_accept.
+    def initialize(opts={}) @opts = opts; end
+
+    # Called when the listener is ready to accept connections.
+    # @param listener [Listener] The listener
+    def on_open(listener) end
+
+    # Called if an error occurs.
+    # If there is an error while opening the listener, this method is
+    # called and {#on_open} is not
+    # @param listener [Listener]
+    # @param what [Condition] Information about the error.
+    def on_error(listener, what) end
+
+    # Called when a listener accepts a new connection.
+    # @param listener [Listener] The listener
+    # @return [Hash] Options to apply to the incoming connection, see {#connect}
+    def on_accept(listener) @opts; end
+
+    # Called when the listener closes.
+    # @param listener [Listener] The listener accepting the connection.
+    def on_close(listener) end
+  end
+end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/core/session.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/session.rb b/proton-c/bindings/ruby/lib/core/session.rb
index 81135eb..3120d3f 100644
--- a/proton-c/bindings/ruby/lib/core/session.rb
+++ b/proton-c/bindings/ruby/lib/core/session.rb
@@ -126,7 +126,7 @@ module Qpid::Proton
     # @deprecated use {#open_receiver}
     def receiver(name) Receiver.new(Cproton.pn_receiver(@impl, name)); end
 
-    # TODO aconway 2016-01-04: doc options or target param
+    # TODO aconway 2016-01-04: doc options or target param, move option handling to Link.
     def open_receiver(options = {})
       options = { :source => options } if options.is_a? String
       receiver = Receiver.new Cproton.pn_receiver(@impl, options[:name] || connection.link_name)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/core/transport.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/transport.rb b/proton-c/bindings/ruby/lib/core/transport.rb
index 61767fd..8804c8a 100644
--- a/proton-c/bindings/ruby/lib/core/transport.rb
+++ b/proton-c/bindings/ruby/lib/core/transport.rb
@@ -77,11 +77,6 @@ module Qpid::Proton
     TRACE_DRV = Cproton::PN_TRACE_DRV
 
     # @private
-    CLIENT = 1
-    # @private
-    SERVER = 2
-
-    # @private
     include Util::SwigHelper
 
     # @private
@@ -213,26 +208,19 @@ module Qpid::Proton
     def self.wrap(impl)
       return nil if impl.nil?
 
-      self.fetch_instance(impl, :pn_transport_attachments) || Transport.new(nil, impl)
+      self.fetch_instance(impl, :pn_transport_attachments) || Transport.new(impl)
     end
 
     # Creates a new transport instance.
-    #
-    # @param mode [Integer] The transport mode, either CLIENT or SERVER
-    # @param impl [pn_transport_t] Should not be used.
-    #
-    # @raise [TransportError] If the mode is invalid.
-    #
-    def initialize(mode = nil, impl = Cproton.pn_transport)
+    def initialize(impl = Cproton.pn_transport)
       @impl = impl
-      if mode == SERVER
-        Cproton.pn_transport_set_server(@impl)
-      elsif (!mode.nil? && mode != CLIENT)
-        raise TransportError.new("cannot create transport for mode: #{mode}")
-      end
       self.class.store_instance(self, :pn_transport_attachments)
     end
 
+    # Set server mode for this tranport - enables protocol detection
+    # and server-side authentication for incoming connections
+    def set_server() Cproton.pn_transport_set_server(@impl); end
+
     # Returns whether the transport has any buffered data.
     #
     # @return [Boolean] True if the transport has no buffered data.
@@ -418,6 +406,12 @@ module Qpid::Proton
       !@ssl.nil?
     end
 
+    # @private
+    def apply opts
+      if opts[:sasl_enabled] != false # SASL is not disabled.
+        sasl.allow_insecure_mechs = opts[:sasl_allow_insecure_mechs] if opts[:sasl_allow_insecure_mechs]
+        sasl.allowed_mechs = opts[:sasl_allowed_mechs] if opts[:sasl_allowed_mechs]
+      end
+    end
   end
-
 end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/core/uri.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/uri.rb b/proton-c/bindings/ruby/lib/core/uri.rb
new file mode 100644
index 0000000..b29a719
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/core/uri.rb
@@ -0,0 +1,50 @@
+#--
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#++
+
+require 'uri'
+
+module URI
+  # AMQP URI scheme for the AMQP protocol
+  class AMQP < Generic
+    DEFAULT_PORT = 5672
+  end
+  @@schemes['AMQP'] = AMQP
+
+  # AMQPS URI scheme for the AMQP protocol over TLS
+  class AMQPS < AMQP
+    DEFAULT_PORT = 5671
+  end
+  @@schemes['AMQPS'] = AMQPS
+end
+
+module Qpid::Proton
+  # Convert s to an {URI::AMQP} or {URI::AMQPS}
+  # @param s [String,URI] If s has no scheme, use the {URI::AMQP} scheme
+  # @return [URI::AMQP]
+  # @raise [BadURIError] If s has a scheme that is not "amqp" or "amqps"
+  def self.amqp_uri(s)
+    u = URI(s)
+    u.host ||= ""               # Behaves badly with nil host
+    return u if u.is_a? URI::AMQP
+    raise URI::BadURIError, "Not an AMQP URI: '#{u}'" if u.scheme
+    u.scheme = "amqp" unless u.scheme
+    u = URI::parse(u.to_s)      # Re-parse with amqp scheme
+    return u
+  end
+end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/core/url.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/url.rb b/proton-c/bindings/ruby/lib/core/url.rb
index 30d2d87..b54df66 100644
--- a/proton-c/bindings/ruby/lib/core/url.rb
+++ b/proton-c/bindings/ruby/lib/core/url.rb
@@ -23,6 +23,7 @@ module Qpid::Proton
 
     attr_reader :scheme
     attr_reader :username
+    alias :user :username
     attr_reader :password
     attr_reader :host
     attr_reader :port
@@ -60,10 +61,14 @@ module Qpid::Proton
       Cproton.pn_url_get_port(@url).to_i
     end
 
+    # @return [String] Convert to string
     def to_s
       "#{@scheme}://#{@username.nil? ? '' : @username}#{@password.nil? ? '' : '@' + @password + ':'}#{@host}:#{@port}/#{@path}"
     end
 
+    # @return [String] Allow implicit conversion by {String#try_convert}
+    alias :to_str :to_s
+
     private
 
     def defaults

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/event/event.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/event/event.rb b/proton-c/bindings/ruby/lib/event/event.rb
index e839f63..67d5e92 100644
--- a/proton-c/bindings/ruby/lib/event/event.rb
+++ b/proton-c/bindings/ruby/lib/event/event.rb
@@ -210,7 +210,7 @@ module Qpid::Proton
           Cproton.pn_handler_dispatch(handler.impl, @impl, type.number)
         else
           result = Qpid::Proton::Event.dispatch(handler, type.method, self)
-          if (result != "DELEGATED") && handler.respond_to?(:handlers)
+          if (result != "DELEGATED") && handler.respond_to?(:handlers) && handler.handlers
             handler.handlers.each do |hndlr|
               self.dispatch(hndlr)
             end
@@ -228,10 +228,11 @@ module Qpid::Proton
       end
 
       def container
-        impl = Cproton.pn_event_reactor(@impl)
-        Qpid::Proton::Util::ClassWrapper::WRAPPERS["pn_reactor"].call(impl)
+        @container || Util::ClassWrapper::WRAPPERS["pn_reactor"].call(Cproton.pn_event_reactor(@impl))
       end
 
+      def container=(c); @container = c; end
+
       # Returns the transport for this event.
       #
       # @return [Transport, nil] The transport.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/handler/endpoint_state_handler.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/handler/endpoint_state_handler.rb b/proton-c/bindings/ruby/lib/handler/endpoint_state_handler.rb
index 11e970a..8a1a16e 100644
--- a/proton-c/bindings/ruby/lib/handler/endpoint_state_handler.rb
+++ b/proton-c/bindings/ruby/lib/handler/endpoint_state_handler.rb
@@ -84,11 +84,11 @@ module Qpid::Proton::Handler
     end
 
     def on_connection_remote_open(event)
-      if !(event.connection.state & Qpid::Proton::Endpoint::LOCAL_ACTIVE).zero?
+      if event.connection.local_active?
         self.on_connection_opened(event)
       elsif event.connection.local_uninit?
         self.on_connection_opening(event)
-        event.connection.open
+        event.connection.open unless event.connection.local_active?
       end
     end
 
@@ -110,7 +110,7 @@ module Qpid::Proton::Handler
     end
 
     def on_link_remote_open(event)
-      if !(event.link.state & Qpid::Proton::Endpoint::LOCAL_ACTIVE).zero?
+      if event.link.local_active?
         self.on_link_opened(event)
       elsif event.link.local_uninit?
         self.on_link_opening(event)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/qpid_proton.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb
index dae9b5f..43b8071 100644
--- a/proton-c/bindings/ruby/lib/qpid_proton.rb
+++ b/proton-c/bindings/ruby/lib/qpid_proton.rb
@@ -27,6 +27,9 @@ else
   require "securerandom"
 end
 
+DEPRECATION = "[DEPRECATION]"
+def deprecated(old, new) warn "#{DEPRECATION} #{old} is deprecated, use #{new}"; end
+
 # Exception classes
 require "core/exceptions"
 
@@ -61,6 +64,7 @@ require "event/collector"
 
 # Main Proton classes
 require "core/selectable"
+require "core/uri"
 require "core/message"
 require "core/endpoint"
 require "core/session"
@@ -109,6 +113,10 @@ require "reactor/session_per_connection"
 require "reactor/container"
 require "reactor/link_option"
 
+# Core classes that depend on handlers and events
+require "core/container"
+require "core/connection_driver"
+
 module Qpid::Proton
   # @private
   def self.registry

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/reactor/reactor.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/reactor/reactor.rb b/proton-c/bindings/ruby/lib/reactor/reactor.rb
index a84a716..88d062e 100644
--- a/proton-c/bindings/ruby/lib/reactor/reactor.rb
+++ b/proton-c/bindings/ruby/lib/reactor/reactor.rb
@@ -51,6 +51,7 @@ module Qpid::Proton::Reactor
     end
 
     def initialize(handlers, options = {})
+      deprecated(self.class, "Qpid::Proton::Container")
       @impl = options[:impl]
       if @impl.nil?
         @impl = Cproton.pn_reactor

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/util/condition.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/condition.rb b/proton-c/bindings/ruby/lib/util/condition.rb
index ad49595..37c41a0 100644
--- a/proton-c/bindings/ruby/lib/util/condition.rb
+++ b/proton-c/bindings/ruby/lib/util/condition.rb
@@ -17,7 +17,7 @@
 # under the License.
 #++
 
-module Qpid::Proton::Util
+module Qpid::Proton
 
   class Condition
 
@@ -29,19 +29,40 @@ module Qpid::Proton::Util
       @info = info
     end
 
-    # @private
-    def to_s
-      "Condition(#{@name}, #{@description}, #{@info})"
-    end
+    def to_s() "#{@name}: #{@description
+    def to_s() "#{@name}: #{@description}"; end
+}"; end
+
+    def inspect() "#{self.class.name}(#{@name.inspect}, #{@description.inspect}, #{@info.inspect})"; end
 
-    # @private
     def ==(other)
-      ((other.class = self.class) &&
+      ((other.is_a? Condition) &&
        (other.name == self.name) && 
        (other.description == self.description) &&
        (other.info == self.info))
     end
 
+    # Make a condition.
+    # @param obj the object to turn into a condition
+    # @param default_name condition name to use if obj does not imply a name
+    # @return
+    # - when Condition return obj unchanged
+    # - when Exception return Condition(obj.class.name, obj.to_s)
+    # - when nil then nil
+    # - else return Condition(default_name, obj.to_s)
+    # If objey
+    def self.make(obj, default_name="proton")
+      case obj
+      when Condition then obj
+      when Exception then Condition.new(obj.class.name, obj.to_s)
+      when nil then nil
+      else Condition.new(default_name, obj.to_s)
+      end
+    end
+
   end
 
+  module Util                   #TODO aconway 2017-10-28: backwards compat
+    Condition = Qpid::Proton::Condition
+  end
 end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/util/engine.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/engine.rb b/proton-c/bindings/ruby/lib/util/engine.rb
index e34faaa..fa5c038 100644
--- a/proton-c/bindings/ruby/lib/util/engine.rb
+++ b/proton-c/bindings/ruby/lib/util/engine.rb
@@ -70,8 +70,8 @@ module Qpid::Proton::Util
       unless object.nil?
         Cproton.pn_condition_set_name(condition, object.name)
         Cproton.pn_condition_set_description(condition, object.description)
-        info = Data.new(Cproton.pn_condition_info(condition))
-        if object.info?
+        if !object.info.nil?
+          info = Data.new(Cproton.pn_condition_info(condition))
           info.object = object.info
         end
       end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/util/uri.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/uri.rb b/proton-c/bindings/ruby/lib/util/uri.rb
deleted file mode 100644
index 0820746..0000000
--- a/proton-c/bindings/ruby/lib/util/uri.rb
+++ /dev/null
@@ -1,27 +0,0 @@
-#--
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#++
-
-require 'uri'
-
-module URI
-  class AMQP < Generic; DEFAULT_PORT = 5672; end
-  @@schemes['AMQP'] = AMQP
-  class AMQPS < Generic; DEFAULT_PORT = 5671; end
-  @@schemes['AMQPS'] = AMQPS
-end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/tests/test_connection_driver.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_connection_driver.rb b/proton-c/bindings/ruby/tests/test_connection_driver.rb
index 2ddc8ef..174e86d 100644
--- a/proton-c/bindings/ruby/tests/test_connection_driver.rb
+++ b/proton-c/bindings/ruby/tests/test_connection_driver.rb
@@ -39,11 +39,11 @@ class ConnectionDriverTest < Minitest::Test
       def on_message(event) @message = event.message; event.connection.close; end
     end
 
-    sender = ConnectionDriver.new(@sockets[0], send_class.new)
+    sender = ConnectionDriver.new(@sockets[0], {:handler => send_class.new})
     sender.connection.open();
     sender.connection.open_sender()
 
-    receiver = ConnectionDriver.new(@sockets[1], recv_class.new)
+    receiver = ConnectionDriver.new(@sockets[1], {:handler => recv_class.new})
     drivers = [sender, receiver]
     until drivers.all? { |d| d.finished? }
       rd = drivers.select {|d| d.can_read? }
@@ -60,7 +60,7 @@ class ConnectionDriverTest < Minitest::Test
     idle_class = Class.new(MessagingHandler) do
       def on_connection_bound(event) event.transport.idle_timeout = 10; end
     end
-    drivers = [ConnectionDriver.new(@sockets[0], idle_class.new), ConnectionDriver.new(@sockets[1])]
+    drivers = [ConnectionDriver.new(@sockets[0], {:handler => idle_class.new}), ConnectionDriver.new(@sockets[1])]
     drivers[0].connection.open()
     now = Time.now
     drivers.each { |d| d.process(true, true, now) } until drivers[0].connection.open?

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index 9c1d46c..41fa133 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -19,63 +19,137 @@
 
 require 'test_tools'
 require 'minitest/unit'
+require 'socket'
 
 Message = Qpid::Proton::Message
 SASL = Qpid::Proton::SASL
-URL = Qpid::Proton::URL
+Disposition = Qpid::Proton::Disposition
 
-class ContainerTest < Minitest::Test
+# Container that listens on a random port
+class TestContainer < Container
 
-  def test_simple()
+  def initialize(opts = {}, lopts = {})
+    super opts
+    @server = TCPServer.open(0)
+    @listener = listen_io(@server, ListenOnceHandler.new(lopts))
+  end
 
-    hc = Class.new(TestServer) do
-      attr_reader :accepted
+  def port() @server.addr[1]; end
+  def url() "amqp://:#{port}"; end
+end
 
-      def on_start(event)
-        super
-        event.container.create_sender("amqp://#{addr}", {:name => "testlink"})
+class ContainerTest < Minitest::Test
+
+  def test_simple()
+    sh = Class.new(MessagingHandler) do
+      attr_reader :accepted, :sent
+      def on_sendable(e)
+        e.link.send Message.new("foo") unless @sent
+        @sent = true
       end
 
-      def on_sendable(event)
-        if @sent.nil? && event.sender.credit > 0
-          event.sender.send(Message.new("testmessage"))
-          @sent = true
-        end
+      def on_accepted(e)
+        @accepted = true
+        e.connection.close
+      end
+    end.new
+
+    rh = Class.new(MessagingHandler) do
+      attr_reader :message, :link
+      def on_link_opening(e)
+        @link = e.link
+        e.link.open
+        e.link.flow(1)
       end
 
-      def on_accepted(event)
-        @accepted = event
-        event.container.stop
+      def on_message(e)
+        @message = e.message;
+        e.delivery.update Disposition::ACCEPTED
+        e.delivery.settle
       end
-    end
-    h = hc.new
-    Container.new(h).run
-    assert_instance_of(Qpid::Proton::Event::Event, h.accepted)
-    assert_equal "testlink", h.links.first.name
-    assert_equal "testmessage", h.messages.first.body
+    end.new
+
+    c = TestContainer.new({:id => __method__.to_s, :handler => rh})
+    c.connect(c.url, {:handler => sh}).open_sender({:name => "testlink"})
+    c.run
+
+    assert sh.accepted
+    assert_equal "testlink", rh.link.name
+    assert_equal "foo", rh.message.body
+    assert_equal "test_simple", rh.link.connection.container_id
   end
+
+  class CloseOnOpenHandler < TestHandler
+    def on_connection_opened(e) e.connection.close; end
+    def on_connection_closing(e) e.connection.close; end
+  end
+
+  def test_auto_stop
+    c1 = Container.new "#{__method__}1"
+    c2 = Container.new "#{__method__}2"
+
+    # A listener and a connection
+    t1 = 3.times.collect { Thread.new { c1.run } }
+    l = c1.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new}))
+    c1.connect("amqp://:#{l.to_io.addr[1]}", { :handler => CloseOnOpenHandler.new} )
+    t1.each { |t| assert t.join(1) }
+
+    # Connect between different containers, c2 has only a connection
+    t1 = Thread.new { c1.run }
+    l = c1.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new}))
+    t2 = Thread.new {c2.run }
+    c2.connect("amqp://:#{l.to_io.addr[1]}", { :handler => CloseOnOpenHandler.new} )
+    assert t2.join(1)
+    assert t1.join(1)
+  end
+
+  def test_auto_stop_listener_only
+    c1 = Container.new "#{__method__}1"
+    # Listener only, external close
+    t1 = Thread.new { c1.run }
+    l = c1.listen_io(TCPServer.new(0))
+    l.close
+    assert t1.join(1)
+  end
+
+  def test_stop
+    c = Container.new __method__
+    c.auto_stop = false
+    l = c.listen_io(TCPServer.new(0))
+    c.connect("amqp://:#{l.to_io.addr[1]}")
+    threads = 5.times.collect { Thread.new { c.run } }
+    assert_nil threads[0].join(0.001)
+    c.stop
+    threads.each { |t| assert t.join(1) }
+    assert c.auto_stop          # Set by stop
+
+    # Stop an empty container
+    threads = 5.times.collect { Thread.new { c.run } }
+    assert_nil threads[0].join(0.001)
+    c.stop
+    threads.each { |t| assert t.join(1) }
+  end
+
 end
 
+
 class ContainerSASLTest < Minitest::Test
 
   # Handler for test client/server that sets up server and client SASL options
-  class SASLHandler < TestServer
+  class SASLHandler < TestHandler
 
-    attr_accessor :url
-
-    def initialize(opts={}, mechanisms=nil, insecure=nil, realm=nil)
+    def initialize(url="amqp://", opts={}, mechanisms=nil, insecure=nil, realm=nil)
       super()
-      @opts, @mechanisms, @insecure, @realm = opts, mechanisms, insecure, realm
+      @url, @opts, @mechanisms, @insecure, @realm = url, opts, mechanisms, insecure, realm
     end
 
     def on_start(e)
       super
-      @client = e.container.connect(@url || "amqp://#{addr}", @opts)
+      @client = e.container.connect("#{@url}:#{e.container.port}", @opts)
     end
 
     def on_connection_bound(e)
       if e.connection != @client # Incoming server connection
-        @listener.close
         sasl = e.transport.sasl
         sasl.allow_insecure_mechs = @insecure unless @insecure.nil?
         sasl.allowed_mechs = @mechanisms unless @mechanisms.nil?
@@ -86,6 +160,7 @@ class ContainerSASLTest < Minitest::Test
     end
 
     attr_reader :auth_user
+
     def on_connection_opened(e)
       super
       if e.connection == @client
@@ -135,26 +210,28 @@ mech_list: EXTERNAL DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS
   end
 
   def test_sasl_anonymous()
-    s = SASLHandler.new({:sasl_allowed_mechs => "ANONYMOUS"}, "ANONYMOUS")
-    Container.new(s).run
+    s = SASLHandler.new("amqp://",  {:sasl_allowed_mechs => "ANONYMOUS"})
+    TestContainer.new({:id => __method__.to_s, :handler => s}, {:sasl_allowed_mechs => "ANONYMOUS"}).run
     assert_nil(s.connections[0].user)
   end
 
   def test_sasl_plain_url()
+    skip unless SASL.extended?
     # Use default realm with URL, should authenticate with "default_password"
-    s = SASLHandler.new({:sasl_allowed_mechs => "PLAIN", :sasl_allow_insecure_mechs => true}, "PLAIN", true)
-    s.url = ("amqp://user:default_password@#{s.addr}")
-    Container.new(s).run
+    opts = {:sasl_allowed_mechs => "PLAIN", :sasl_allow_insecure_mechs => true}
+    s = SASLHandler.new("amqp://user:default_password@",  opts)
+    TestContainer.new({:id => __method__.to_s, :handler => s}, opts).run
     assert_equal(2, s.connections.size)
     assert_equal("user", s.auth_user)
   end
 
   def test_sasl_plain_options()
+    skip unless SASL.extended?
     # Use default realm with connection options, should authenticate with "default_password"
     opts = {:sasl_allowed_mechs => "PLAIN",:sasl_allow_insecure_mechs => true,
             :user => 'user', :password => 'default_password' }
-    s = SASLHandler.new(opts, "PLAIN", true)
-    Container.new(s).run
+    s = SASLHandler.new("amqp://", opts)
+    TestContainer.new({:id => __method__.to_s, :handler => s}, {:sasl_allowed_mechs => "PLAIN",:sasl_allow_insecure_mechs => true}).run
     assert_equal(2, s.connections.size)
     assert_equal("user", s.auth_user)
   end
@@ -162,9 +239,9 @@ mech_list: EXTERNAL DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS
   # Ensure we don't allow PLAIN if allow_insecure_mechs = true is not explicitly set
   def test_disallow_insecure()
     # Don't set allow_insecure_mechs, but try to use PLAIN
-    s = SASLHandler.new({:sasl_allowed_mechs => "PLAIN", :sasl_allow_insecure_mechs => true}, "PLAIN")
-    s.url = "amqp://user:password@#{s.addr}"
-    e = assert_raises(TestError) { Container.new(s).run }
+    s = SASLHandler.new("amqp://user:password@", {:sasl_allowed_mechs => "PLAIN", :sasl_allow_insecure_mechs => true})
+    e = assert_raises(TestError) { TestContainer.new({:id => __method__.to_s, :handler => s}, {:sasl_allowed_mechs => "PLAIN"}).run }
     assert_match(/PN_TRANSPORT_ERROR.*unauthorized-access/, e.to_s)
   end
 end
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org