You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Darryl L. Pierce" <dp...@redhat.com> on 2012/07/25 20:05:58 UTC

[RESEND] Ruby bindings for Proton

This set of patches supercedes the previous set to provide the langauge
bindings. The patch with the actual swig components now produces a file
named libproton_ruby.so rather than cproton.so since that target caused
a conflict with the PHP bindings. We need to fix that so that eventually
it creates the properly named file.


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[PATCH 6/6] Ruby example for setting up a mail exchange server.

Posted by "Darryl L. Pierce" <dp...@redhat.com>.
From: "Darryl L. Pierce" <dp...@redhat.com>

 * server.rb - Acts as the intermediary, storing messages in mailboxes.
 * post.rb   - Submits messages for storing in mailboxes.
 * fetch.rb  - Retrieves messages from mailboxes.
---
 examples/ruby/EXAMPLES      |   4 +
 examples/ruby/fetch.rb      | 204 ++++++++++++++++++++++++++++
 examples/ruby/mailserver.rb | 322 ++++++++++++++++++++++++++++++++++++++++++++
 examples/ruby/post.rb       | 191 ++++++++++++++++++++++++++
 4 files changed, 721 insertions(+)
 create mode 100644 examples/ruby/fetch.rb
 create mode 100644 examples/ruby/mailserver.rb
 create mode 100644 examples/ruby/post.rb

diff --git a/examples/ruby/EXAMPLES b/examples/ruby/EXAMPLES
index f2eaa61..d88fb3d 100644
--- a/examples/ruby/EXAMPLES
+++ b/examples/ruby/EXAMPLES
@@ -4,3 +4,7 @@ EXAMPLES:
    Demonstrates the use of the messenger APIs for sending and receiving
    messages from point to point.
 
+ * server.rb, post.rb, fetch.rb:
+   Demonstrates setting up a mail exchange via the server app and then posting
+   and fetching messages to specific mailboxes via post and fetch,
+   respectively.
diff --git a/examples/ruby/fetch.rb b/examples/ruby/fetch.rb
new file mode 100644
index 0000000..6bff13f
--- /dev/null
+++ b/examples/ruby/fetch.rb
@@ -0,0 +1,204 @@
+#!/usr/bin/env ruby
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+require 'cproton'
+require 'optparse'
+
+$options = {
+  :hostname => "0.0.0.0",
+  :port => "5672",
+  :count => 1,
+  :verbose => false
+}
+messages = []
+
+OptionParser.new do |opts|
+  opts.banner = "Usage: #{File.basename $0} [options] <mailbox>"
+
+  opts.on("-s", "--server <address>", String, :REQUIRED,
+          "Address of the server") do |server|
+  end
+
+  opts.on("-p", "--port <port>", Integer, :REQUIRED,
+          "Port on the server") do |port|
+    $options[:port] = port
+  end
+
+  opts.on("-c", "--count <#>", Integer, :REQUIRED,
+          "Number of messages to read from the mailbox") do |count|
+    $options[:count] = count
+  end
+
+  opts.on("-v", "--verbose", :NONE,
+          "Turn on extra trace messages.") do |verbose|
+    $options[:verbose] = true
+  end
+
+  begin
+    ARGV << "-h" if ARGV.empty?
+    opts.parse!
+  rescue OptionParser::ParseError => error
+    STDERR.puts error.message, "\n", opts
+    exit 1
+  end
+
+  $options[:mailbox] = ARGV.first unless ARGV.empty?
+
+  abort("No mailbox specified.") if $options[:mailbox].nil?
+
+end
+
+def log(text, return_code = 0)
+  STDOUT.puts "#{Time.new}: #{text}" if $options[:verbose] || return_code.nonzero?
+
+  # if we were given a non-zero code, then exit
+  exit return_code unless return_code.zero?
+end
+
+class FetchClient
+  attr_reader :sasl
+  attr_reader :link
+  attr_reader :conn
+
+  def initialize(hostname, port, mailbox)
+    @hostname = hostname
+    @port = port
+    @mailbox = mailbox
+  end
+
+  def setup
+    # setup a driver connection to the server
+    log "Connecting to server host = #{@hostname}:#{@port}"
+    @driver = Cproton::pn_driver
+    @cxtr = Cproton::pn_connector(@driver, @hostname, @port, nil)
+
+    # configure SASL
+    @sasl = Cproton::pn_connector_sasl @cxtr
+    Cproton::pn_sasl_mechanisms @sasl, "ANONYMOUS"
+    Cproton::pn_sasl_client @sasl
+
+    # inform the engine about the connection, and link the driver to it.
+    @conn = Cproton::pn_connection
+    Cproton::pn_connector_set_connection @cxtr, @conn
+
+    # create a session and link for receiving from the mailbox
+    log "Fetching from the mailbox = #{@mailbox}"
+    @ssn = Cproton::pn_session @conn
+    @link = Cproton::pn_receiver @ssn, "receiver"
+    Cproton::pn_set_source @link, @mailbox
+
+    # now open all the engine endpoints
+    Cproton::pn_connection_open @conn
+    Cproton::pn_session_open @ssn
+    Cproton::pn_link_open @link
+  end
+
+  def wait
+    log "Waiting for events..."
+    Cproton::pn_connector_process @cxtr
+    Cproton::pn_driver_wait @driver, -1
+    Cproton::pn_connector_process @cxtr
+    log "...waiting done!"
+  end
+
+  def settle
+    # locally settle any remotely settled deliveries
+    d = Cproton::pn_unsettled_head @link
+    while d && !Cproton::pn_readable(d)
+      # delivery that has not yet been read
+      _next = Cproton::pn_unsettled_next d
+      Cproton::pn_settle(d) if Cproton::pn_remote_settled(d)
+      d = _next
+    end
+  end
+
+end
+
+if __FILE__ == $PROGRAM_NAME
+  receiver = FetchClient.new($options[:hostname],
+                             $options[:port],
+                             $options[:mailbox])
+
+  receiver.setup
+
+  # wait until we authenticate with the server
+  while ![Cproton::PN_SASL_PASS, Cproton::PN_SASL_FAIL].include? Cproton::pn_sasl_state(receiver.sasl)
+    receiver.wait
+  end
+
+  if Cproton::pn_sasl_state(receiver.sasl) == Cproton::PN_SASL_FAIL
+    log "ERROR: Authentication failure", -1
+  end
+
+  # wait until the server has opened the connection
+  while ((Cproton::pn_link_state(receiver.link) & Cproton::PN_REMOTE_ACTIVE) != Cproton::PN_REMOTE_ACTIVE)
+    receiver.wait
+  end
+
+  # check if the server recognizes the mailbox, fail if it does not
+  if Cproton::pn_remote_source(receiver.link) != $options[:mailbox]
+    log "ERROR: mailbox #{$options[:mailbox]} does not exist!", -2
+  end
+
+  # Allow the server to send the expected number of messages to the receiver
+  # by setting the credit to the expected count
+  Cproton::pn_flow(receiver.link, $options[:count])
+
+  # main loop: continue fetching messages until all the expected number of
+  # messages have been retrieved
+
+  while Cproton::pn_credit(receiver.link) > 0
+    # wait for some messages to arrive
+    receiver.wait if Cproton::pn_queued(receiver.link).zero?
+
+    # read all queued deliveries
+    while Cproton::pn_queued(receiver.link) > 0
+      delivery = Cproton::pn_current(receiver.link)
+
+      # read all bytes of message
+      (rc, msg) = Cproton::pn_recv(receiver.link, Cproton::pn_pending(delivery))
+      log "Received count/status=#{rc}"
+
+      log("ERROR: Receive failed (#{rc}), exiting...", -3) if rc < 0
+
+      puts "#{msg}"
+
+      # let the server know we accept the message
+      Cproton::pn_disposition(delivery, Cproton::PN_ACCEPTED)
+
+      # go to the next deliverable
+      Cproton::pn_advance(receiver.link)
+    end
+
+    receiver.settle
+  end
+
+  # block until any leftover deliveries are settled
+  while Cproton::pn_unsettled(receiver.link) > 0
+    receiver.wait
+    receiver.settle
+  end
+
+  # we're done,c lose and wait for the remote to close also
+  Cproton::pn_connection_close(receiver.conn)
+  while ((Cproton::pn_connection_state(receiver.conn) & Cproton::PN_REMOTE_CLOSED) != Cproton::PN_REMOTE_CLOSED)
+    receiver.wait
+  end
+
+end
diff --git a/examples/ruby/mailserver.rb b/examples/ruby/mailserver.rb
new file mode 100644
index 0000000..582d164
--- /dev/null
+++ b/examples/ruby/mailserver.rb
@@ -0,0 +1,322 @@
+#!/usr/bin/env ruby
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'cproton'
+require 'optparse'
+
+FAILED         = 0
+CONNECTION_UP  = 1
+AUTHENTICATING = 2
+
+$options  = {
+  :verbose => false,
+  :hostname => "0.0.0.0",
+  :port => "5672"
+}
+
+OptionParser.new do |opts|
+  opts.banner = "Usage: mailserver [options] <server-address> [<server-port>]"
+
+  opts.on("-v", "--verbose", :NONE,
+          "Print status messages to stdout") do |f|
+    $options[:verbose] = true
+  end
+
+  opts.parse!
+
+  $options[:hostname] = ARGV[0] if ARGV.length > 0
+  $options[:port] = ARGV[1] if ARGV.length > 1
+end
+
+def log(text)
+  STDOUT.puts "#{Time.new}: #{text}" if $options[:verbose]
+end
+
+class MailServer
+  def initialize(hostname, port, verbose)
+    @hostname = hostname
+    @port = port
+    @verbose = verbose
+    @counter = 0
+    @mailboxes = {}
+
+    @driver = nil
+  end
+
+  def setup
+    @driver = Cproton::pn_driver
+    @listener = Cproton::pn_listener @driver, @hostname, @port, nil
+    raise "Error: could not listen on #{@hostname}:#{@port}" if @listener.nil?
+  end
+
+  def wait
+    log "Driver sleep."
+    Cproton::pn_driver_wait @driver, -1
+    log "Driver wakeup."
+  end
+
+  def accept_connection_requests
+    l = Cproton::pn_driver_listener @driver
+
+    while !l.nil?
+      log "Accepting connection."
+      cxtr = Cproton::pn_listener_accept l
+      Cproton::pn_connector_set_context cxtr, AUTHENTICATING
+      l = Cproton::pn_driver_listener @driver
+    end
+  end
+
+  def process_connections
+    cxtr = Cproton::pn_driver_connector @driver
+
+    while cxtr
+      log "Process connector"
+
+      Cproton::pn_connector_process cxtr
+
+      state = Cproton::pn_connector_context cxtr
+      case state
+      when AUTHENTICATING
+        log "Authenticating..."
+        self.authenticate_connector cxtr
+
+      when CONNECTION_UP
+        log "Connection established..."
+        self.service_connector cxtr
+
+      else
+        raise "Unknown connection state #{state}"
+      end
+
+      Cproton::pn_connector_process cxtr
+
+      if Cproton::pn_connector_closed cxtr
+        log "Closing connector."
+        Cproton::pn_connector_free cxtr
+      end
+
+      cxtr = Cproton::pn_driver_connector @driver
+
+    end
+  end
+
+  def authenticate_connector(cxtr)
+    log "Authenticating..."
+
+    sasl = Cproton::pn_connector_sasl cxtr
+    state = Cproton::pn_sasl_state sasl
+    while [Cproton::PN_SASL_CONF, Cproton::PN_SASL_STEP].include? state
+
+      case state
+      when Cproton::PN_SASL_CONF
+        log "Authenticating-CONF..."
+        Cproton::pn_sasl_mechanisms sasl, "ANONYMOUS"
+        Cproton::pn_sasl_server sasl
+
+      when Cproton::PN_SASL_STEP
+        log "Authenticating-STEP..."
+        mech = Cproton::pn_sasl_remote_mechanisms sasl
+        if mech == "ANONYMOUS"
+          Cproton::pn_sasl_done sasl, Cproton::PN_SASL_OK
+        else
+          Cproton::pn_sasl_done sasl, Cproton::PN_SASL_AUTH
+        end
+      end
+
+      state = Cproton::pn_sasl_state sasl
+
+    end
+
+    case state
+    when Cproton::PN_SASL_PASS
+      Cproton::pn_connector_set_connection cxtr, Cproton::pn_connection
+      Cproton::pn_connector_set_context cxtr, CONNECTION_UP
+      log "Authentication-PASSED"
+
+    when Cproton::PN_SASL_FAIL
+      Cproton::pn_connector_set_context cxtr, FAILED
+      log "Authentication-FAILED"
+
+    else
+      log "Authentication-PENDING"
+    end
+
+  end
+
+  def service_connector(cxtr)
+    log "I/O processing starting."
+
+    conn = Cproton::pn_connector_connection cxtr
+    if ((Cproton::pn_connection_state(conn) & Cproton::PN_LOCAL_UNINIT) == Cproton::PN_LOCAL_UNINIT)
+      log "Connection opened."
+      Cproton::pn_connection_open(conn)
+    end
+
+    ssn = Cproton::pn_session_head conn, Cproton::PN_LOCAL_UNINIT
+    while ssn
+      Cproton::pn_session_open ssn
+      log "Session opened."
+      ssn = Cproton::pn_session_next ssn, Cproton::PN_LOCAL_UNINIT
+    end
+
+    link = Cproton::pn_link_head conn, Cproton::PN_LOCAL_UNINIT
+    while link
+      setup_link link
+      link = Cproton::pn_link_next link, Cproton::PN_LOCAL_UNINIT
+    end
+
+    delivery = Cproton::pn_work_head conn
+    while delivery
+      log "Process delivery #{Cproton::pn_delivery_tag delivery}"
+      if Cproton::pn_readable delivery
+        process_receive delivery
+      elsif Cproton::pn_writable delivery
+        send_message delivery
+      end
+
+      if Cproton::pn_updated delivery
+        log "Remote disposition for #{Cproton::pn_delivery_tag delivery}: #{Cproton::pn_remote_disposition delivery}"
+        Cproton::pn_settle(delivery) if Cproton::pn_remote_disposition delivery
+      end
+
+      delivery = Cproton::pn_work_next delivery
+    end
+
+    link = Cproton::pn_link_head conn, Cproton::PN_LOCAL_ACTIVE | Cproton::PN_REMOTE_CLOSED
+    while link
+      Cproton::pn_link_close link
+      log "Link closed."
+      link = Cproton::pn_link_next link, Cproton::PN_LOCAL_ACTIVE | Cproton::PN_REMOTE_CLOSED
+    end
+
+    ssn = Cproton::pn_session_head conn, Cproton::PN_LOCAL_ACTIVE | Cproton::PN_REMOTE_CLOSED
+    while ssn
+      Cproton::pn_session_close ssn
+      log "Session closed."
+      ssn = Cproton::pn_session_next ssn, Cproton::PN_LOCAL_ACTIVE | Cproton::PN_REMOTE_CLOSED
+    end
+
+    if Cproton::pn_connection_state(conn) == (Cproton::PN_LOCAL_ACTIVE | Cproton::PN_REMOTE_CLOSED)
+      log "Connection closed."
+      Cproton::pn_connection_close conn
+    end
+
+  end
+
+  def process_receive(delivery)
+    link = Cproton::pn_link delivery
+    mbox = Cproton::pn_remote_target link
+    (rc, msg) = Cproton::pn_recv link, 1024
+
+    log "Message received #{rc}"
+
+    while rc >= 0
+      unless @mailboxes.include? mbox
+        log "Error: cannot send to mailbox #{mbox} - dropping message."
+      else
+        @mailboxes[mbox] << msg
+        log "Mailbox #{mbox} contains: #{@mailboxes[mbox].size}"
+      end
+
+      (rc, msg) = Cproton::pn_recv link, 1024
+    end
+
+    log "Messages accepted."
+
+    Cproton::pn_disposition delivery, Cproton::PN_ACCEPTED
+    Cproton::pn_settle delivery
+    Cproton::pn_advance link
+
+    Cproton::pn_flow(link, 1) if Cproton::pn_credit(link).zero?
+  end
+
+  def send_message(delivery)
+    link = Cproton::pn_link delivery
+    mbox = Cproton::pn_remote_source link
+    log "Request for Mailbox=#{mbox}"
+
+    if @mailboxes.include?(mbox) && !@mailboxes[mbox].empty?
+      msg = @mailboxes[mbox].first
+      @mailboxes[mbox].delete_at 0
+      log "Fetching message #{msg}"
+    else
+      log "Warning: mailbox #{mbox} is empty, sending empty message"
+      msg = ""
+    end
+
+    sent = Cproton::pn_send link, msg
+    log "Message sent: #{sent}"
+
+    if Cproton::pn_advance link
+      Cproton::pn_delivery link, "server-delivery-#{@counter}"
+      @counter += 1
+    end
+  end
+
+  def setup_link(link)
+    r_tgt = Cproton::pn_remote_target link
+    r_src = Cproton::pn_remote_source link
+
+    if Cproton::pn_is_sender link
+      log "Opening link to read from mailbox: #{r_src}"
+
+      unless @mailboxes.include? r_src
+        log "Error: mailbox #{r_src} does not exist!"
+        r_src = nil
+      end
+    else
+      log "Opening link to write to mailbox: #{r_tgt}"
+
+      @mailboxes[r_tgt] = [] unless @mailboxes.include? r_tgt
+    end
+
+    Cproton::pn_set_target link, r_tgt
+    Cproton::pn_set_source link, r_src
+
+    if Cproton::pn_is_sender link
+      Cproton::pn_delivery link, "server-delivery-#{@counter}"
+      @counter += 1
+    else
+      Cproton::pn_flow link, 1
+    end
+
+    Cproton::pn_link_open link
+
+  end
+
+end
+
+#------------------
+# Begin entry point
+#------------------
+
+if __FILE__ == $PROGRAM_NAME
+  server = MailServer.new($options[:hostname],
+                          $options[:port],
+                          $options[:verbose])
+
+  server.setup
+  loop do
+    server.wait
+    server.accept_connection_requests
+    server.process_connections
+  end
+end
+
diff --git a/examples/ruby/post.rb b/examples/ruby/post.rb
new file mode 100644
index 0000000..dfff488
--- /dev/null
+++ b/examples/ruby/post.rb
@@ -0,0 +1,191 @@
+#!/usr/bin/env ruby
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'cproton'
+require 'optparse'
+
+
+$options  = {
+  :verbose => false,
+  :hostname => "0.0.0.0",
+  :port => "5672"
+}
+
+
+OptionParser.new do |opts|
+  opts.banner = "Usage: mailserver [options] <server-address> [<server-port>] <message-string> [<message-string> ...]"
+
+  opts.on("-s", "--server", String, :REQUIRED,
+          "The server hostname (def. #{$options[:hostname]})") do |hostname|
+    $options[:hostname] = hostname
+  end
+
+  opts.on("-p", "--port", String, :REQUIRED,
+          "The server port (def. #{$options[:port]})") do |port|
+    $options[:post] = port
+  end
+
+  opts.on("-m", "--mailbox", String, :REQUIRED,
+          "Name of the mailbox on the server") do |mailbox|
+    $options[:mailbox] = mailbox
+  end
+
+  opts.on("-v", "--verbose", :NONE,
+             "Enable verbose output (def. #{$options[:verbose]})") do
+    $options[:verbose] = true
+  end
+
+  begin
+    ARGV << "-h" if ARGV.empty?
+    opts.parse!(ARGV)
+  rescue OptionParser::ParseError => error
+    STDERR.puts error.message, "\n", opts
+    exit 1
+  end
+
+  $options[:messages] = ARGV
+
+  abort "No mailbox specified." if $options[:mailbox].nil?
+  abort "No messages specified." if $options[:messages].empty?
+
+end
+
+def log(text)
+  printf "#{Time.new}: #{text}\n" if $options[:verbose]
+end
+
+
+class PostClient
+
+  attr_reader :sasl
+  attr_reader :link
+  attr_reader :conn
+
+  def initialize(hostname, port, mailbox, verbose)
+    @hostname = hostname
+    @port = port
+    @mailbox = mailbox
+    @verbose = verbose
+  end
+
+  def setup
+    log "Connection to server host = #{@hostname}:#{@port}"
+    @driver = Cproton::pn_driver
+    @cxtr = Cproton::pn_connector(@driver, @hostname, @port, nil)
+
+    # configure SASL
+    @sasl = Cproton::pn_connector_sasl @cxtr
+    Cproton::pn_sasl_mechanisms @sasl, "ANONYMOUS"
+    Cproton::pn_sasl_client @sasl
+
+    # inform the engine about the connection, and link the driver to it.
+    @conn = Cproton::pn_connection
+    Cproton::pn_connector_set_connection @cxtr, @conn
+
+    # create a session and link for receiving from the mailbox
+    log "Posting to mailbox = #{@mailbox}"
+    @ssn = Cproton::pn_session @conn
+    @link = Cproton::pn_sender @ssn, "sender"
+    Cproton::pn_set_target @link, @mailbox
+
+    # now open all the engien end points
+    Cproton::pn_connection_open @conn
+    Cproton::pn_session_open @ssn
+    Cproton::pn_link_open @link
+  end
+
+  def wait
+    log "Waiting for events..."
+    Cproton::pn_connector_process @cxtr
+    Cproton::pn_driver_wait @driver, -1
+    Cproton::pn_connector_process @cxtr
+    log "..waiting done!"
+  end
+
+  def settle
+    d = Cproton::pn_unsettled_head @link
+    while d
+      _next = Cproton::pn_unsettled_next d
+      disp = Cproton::pn_remote_disposition d
+
+      if disp.nonzero? && disp != Cproton::PN_ACCEPTED
+        log "Warning: message was not accepted by the remote!"
+      end
+
+      if disp || Cproton::pn_remote_settled(disp)
+        Cproton::pn_settle(d)
+      end
+
+      d = _next
+    end
+  end
+
+end
+
+
+if __FILE__ == $PROGRAM_NAME
+  sender = PostClient.new($options[:hostname],
+                          $options[:port],
+                          $options[:mailbox],
+                          $options[:verbose])
+  sender.setup
+
+  sender.wait while ![Cproton::PN_SASL_PASS, Cproton::PN_SASL_FAIL].include? Cproton::pn_sasl_state(sender.sasl)
+
+  abort "Error: Authentication failure" if Cproton::pn_sasl_state(sender.sasl) == Cproton::PN_SASL_FAIL
+
+  while !$options[:messages].empty? do
+
+    while Cproton::pn_credit(sender.link).zero?
+      log "Waiting for credit"
+      sender.wait
+    end
+
+    while Cproton::pn_credit(sender.link) > 0
+      msg = $options[:messages].first
+      $options[:messages].delete_at 0
+      log "Sending #{msg}"
+      d = Cproton::pn_delivery sender.link, "post-deliver-#{$options[:messages].length}"
+      rc = Cproton::pn_send(sender.link, msg)
+
+      abort "Error: sending message: #{msg}" if rc < 0
+
+      fail unless rc == msg.length
+
+      Cproton::pn_advance sender.link
+    end
+
+    sender.settle
+  end
+
+  while Cproton::pn_unsettled(sender.link) > 0
+    log "Settling things with the server..."
+    sender.wait
+    sender.settle
+    log "Finished settling."
+  end
+
+  Cproton::pn_connection_close sender.conn
+  while ((Cproton::pn_connection_state(sender.conn) & Cproton::PN_REMOTE_CLOSED) != Cproton::PN_REMOTE_CLOSED)
+    log "Waiting for things to become closed..."
+    sender.wait
+  end
+
+end
-- 
1.7.11.2


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[PATCH 5/6] Ruby examples for sending/receiving direct messages.

Posted by "Darryl L. Pierce" <dp...@redhat.com>.
From: "Darryl L. Pierce" <dp...@redhat.com>

The example apps are:

 * recv.rb - Listens on the default port and receives messages
 * send.rb - Sends messages that are processed by recv.rb
---
 examples/ruby/EXAMPLES |  5 ++++
 examples/ruby/recv.rb  | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++
 examples/ruby/send.rb  | 65 ++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 140 insertions(+)
 create mode 100644 examples/ruby/recv.rb
 create mode 100644 examples/ruby/send.rb

diff --git a/examples/ruby/EXAMPLES b/examples/ruby/EXAMPLES
index 468f072..f2eaa61 100644
--- a/examples/ruby/EXAMPLES
+++ b/examples/ruby/EXAMPLES
@@ -1 +1,6 @@
 EXAMPLES:
+
+ * send.rb, recv.rb:
+   Demonstrates the use of the messenger APIs for sending and receiving
+   messages from point to point.
+
diff --git a/examples/ruby/recv.rb b/examples/ruby/recv.rb
new file mode 100644
index 0000000..55a97c0
--- /dev/null
+++ b/examples/ruby/recv.rb
@@ -0,0 +1,70 @@
+#!/usr/bin/env ruby
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+require 'cproton'
+require 'optparse'
+
+addresses = []
+
+OptionParser.new do |opts|
+  opts.banner = "Usage: recv.rb <addr1> ... <addrn>"
+  opts.parse!
+
+  addresses = ARGV
+end
+
+addresses = ["//~0.0.0.0"] if addresses.empty?
+
+mng = Cproton::pn_messenger nil
+
+if Cproton::pn_messenger_start(mng).nonzero?
+  puts "ERROR: #{Cproton::pn_messenger_error mng}"
+end
+
+addresses.each do |address|
+  if Cproton::pn_messenger_subscribe(mng, address).nonzero?
+    puts "ERROR: #{Cproton::pn_messenger_error(mng)}"
+    exit
+  end
+end
+
+msg = Cproton::pn_message
+
+loop do
+  if Cproton::pn_messenger_recv(mng, 10).nonzero?
+    puts "ERROR: #{Cproton::pn_messenger_error mng}"
+    exit
+  end
+
+  while Cproton::pn_messenger_incoming(mng).nonzero?
+    if Cproton::pn_messenger_get(mng, msg).nonzero?
+      puts "ERROR: #{Cproton::pn_messenger_error mng}"
+      exit
+    else
+      (cd, body) = Cproton::pn_message_save(msg, 1024)
+      puts "Address: #{Cproton::pn_message_get_address msg}"
+      subject = Cproton::pn_message_get_subject(msg) || "(no subject)"
+      puts "Subject: #{subject}"
+      puts "Content: #{body}"
+    end
+  end
+end
+
+Cproton::pn_messenger_stop mng
+Cproton::pn_messenger_free mng
diff --git a/examples/ruby/send.rb b/examples/ruby/send.rb
new file mode 100644
index 0000000..64c872f
--- /dev/null
+++ b/examples/ruby/send.rb
@@ -0,0 +1,65 @@
+#!/usr/bin/env ruby
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+require 'cproton'
+require 'optparse'
+
+options = {}
+messages = []
+
+OptionParser.new do |opts|
+  opts.banner = "Usage: send.rb [options] <msg1> ... <msgn>"
+  opts.on("-a", "--address [addr]", "The receiver's address (def. //0.0.0.0)") do |f|
+    options[:address] = f
+  end
+
+  opts.parse!
+
+  messages = ARGV
+end
+
+options[:address] = "//0.0.0.0" unless options[:address]
+messages << "Hello world!" if messages.empty?
+
+mng = Cproton::pn_messenger nil
+
+Cproton::pn_messenger_start mng
+
+msg = Cproton::pn_message
+
+messages.each do |message|
+  Cproton::pn_message_set_address msg, options[:address]
+#  Cproton::pn_message_set_subject msg, "Message sent on #{Time.new}"
+  Cproton::pn_message_load msg, message
+
+  if Cproton::pn_messenger_put(mng, msg).nonzero?
+    puts "ERROR: #{Cproton::pn_messenger_error mng}"
+    exit
+  end
+end
+
+if Cproton::pn_messenger_send(mng).nonzero?
+  puts "ERROR: #{Cproton::pn_messenger_error mng}"
+  exit
+else
+  puts "SENT: " + messages.join(",")
+end
+
+Cproton::pn_messenger_stop mng
+Cproton::pn_messenger_free mng
-- 
1.7.11.2


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[PATCH 4/6] Ruby language bindings for Proton.

Posted by "Darryl L. Pierce" <dp...@redhat.com>.
From: "Darryl L. Pierce" <dp...@redhat.com>

The library file that's generated is named "libcproton_ruby.so" which
needs to be renamed in order to be used.
---
 examples/ruby/EXAMPLES                |   1 +
 proton-c/bindings/CMakeLists.txt      |   6 ++
 proton-c/bindings/ruby/CMakeLists.txt |  29 +++++++
 proton-c/bindings/ruby/cproton.i      | 157 ++++++++++++++++++++++++++++++++++
 4 files changed, 193 insertions(+)
 create mode 100644 examples/ruby/EXAMPLES
 create mode 100644 proton-c/bindings/ruby/CMakeLists.txt
 create mode 100644 proton-c/bindings/ruby/cproton.i

diff --git a/examples/ruby/EXAMPLES b/examples/ruby/EXAMPLES
new file mode 100644
index 0000000..468f072
--- /dev/null
+++ b/examples/ruby/EXAMPLES
@@ -0,0 +1 @@
+EXAMPLES:
diff --git a/proton-c/bindings/CMakeLists.txt b/proton-c/bindings/CMakeLists.txt
index 459c5d6..9bb644c 100644
--- a/proton-c/bindings/CMakeLists.txt
+++ b/proton-c/bindings/CMakeLists.txt
@@ -18,11 +18,17 @@
 #
 
 include(UseSWIG)
+include(FindRuby)
 
 # Build wrapper for Python:
 # @todo: conditionalize on whether python is available!
 add_subdirectory(python)
 
+# Build wrapper for Ruby:
+if (RUBY_FOUND)
+   add_subdirectory(ruby)
+endif (RUBY_FOUND)
+
 # Build wrapper for PHP
 # For now, assume PHP support if the 'php-config' tool is present.
 # @todo: allow user to specify which php-config if multiple PHP sources installed!
diff --git a/proton-c/bindings/ruby/CMakeLists.txt b/proton-c/bindings/ruby/CMakeLists.txt
new file mode 100644
index 0000000..99ed765
--- /dev/null
+++ b/proton-c/bindings/ruby/CMakeLists.txt
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+include_directories (${RUBY_INCLUDE_PATH})
+
+SWIG_ADD_MODULE(cproton_ruby ruby cproton.i)
+
+SWIG_LINK_LIBRARIES(cproton_ruby qpidproton ${LINK_DEPS} ${RUBY_LIBRARY})
+
+# TODO
+# * Fix shared library name to be simply "cproton.so" since the
+#   "lib" prefix doesn't match the Ruby module name.
+
diff --git a/proton-c/bindings/ruby/cproton.i b/proton-c/bindings/ruby/cproton.i
new file mode 100644
index 0000000..2bf4e36
--- /dev/null
+++ b/proton-c/bindings/ruby/cproton.i
@@ -0,0 +1,157 @@
+%module cproton
+
+%{
+#include <proton/engine.h>
+#include <proton/message.h>
+#include <proton/sasl.h>
+#include <proton/driver.h>
+#include <proton/messenger.h>
+%}
+
+typedef unsigned int size_t;
+typedef signed int ssize_t;
+typedef unsigned char uint8_t;
+typedef unsigned int uint32_t;
+typedef unsigned long int uint64_t;
+typedef int int32_t;
+
+%include <cstring.i>
+
+%cstring_output_withsize(char *OUTPUT, size_t *OUTPUT_SIZE)
+%cstring_output_allocate_size(char **ALLOC_OUTPUT, size_t *ALLOC_SIZE, free(*$1));
+
+%{
+#if !defined(RSTRING_LEN)
+#  define RSTRING_LEN(x) (RSTRING(X)->len)
+#  define RSTRING_PTR(x) (RSTRING(x)->ptr)
+#endif
+%}
+
+%typemap(in) pn_bytes_t {
+  if ($input == Qnil) {
+    $1.start = NULL;
+    $1.size = 0;
+  } else {
+    $1.start = RSTRING_PTR($input);
+    if (!$1.start) {
+      return NULL;
+    }
+    $1.size = RSTRING_LEN($input);
+  }
+}
+
+%typemap(out) pn_bytes_t {
+  $result = rb_str_new($1.start, $1.size);
+}
+
+%typemap (in) void *
+{
+  $1 = (void *) $input;
+}
+
+%typemap (out) void *
+{
+  $result = (VALUE) $1;
+}
+
+int pn_message_load(pn_message_t *msg, char *STRING, size_t LENGTH);
+%ignore pn_message_load;
+
+int pn_message_load_data(pn_message_t *msg, char *STRING, size_t LENGTH);
+%ignore pn_message_load_data;
+
+int pn_message_load_text(pn_message_t *msg, char *STRING, size_t LENGTH);
+%ignore pn_message_load_text;
+
+int pn_message_load_amqp(pn_message_t *msg, char *STRING, size_t LENGTH);
+%ignore pn_message_load_amqp;
+
+int pn_message_load_json(pn_message_t *msg, char *STRING, size_t LENGTH);
+%ignore pn_message_load_json;
+
+int pn_message_encode(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE);
+%ignore pn_message_encode;
+
+int pn_message_save(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE);
+%ignore pn_message_save;
+
+int pn_message_save_data(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE);
+%ignore pn_message_save_data;
+
+int pn_message_save_text(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE);
+%ignore pn_message_save_text;
+
+int pn_message_save_amqp(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE);
+%ignore pn_message_save_amqp;
+
+int pn_message_save_json(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE);
+%ignore pn_message_save_json;
+
+ssize_t pn_send(pn_link_t *transport, char *STRING, size_t LENGTH);
+%ignore pn_send;
+
+%rename(pn_recv) wrap_pn_recv;
+%inline %{
+  int wrap_pn_recv(pn_link_t *link, char *OUTPUT, size_t *OUTPUT_SIZE) {
+    ssize_t sz = pn_recv(link, OUTPUT, *OUTPUT_SIZE);
+    if (sz >= 0) {
+      *OUTPUT_SIZE = sz;
+    } else {
+      *OUTPUT_SIZE = 0;
+    }
+    return sz;
+  }
+%}
+%ignore pn_recv;
+
+ssize_t pn_input(pn_transport_t *transport, char *STRING, size_t LENGTH);
+%ignore pn_input;
+
+%rename(pn_output) wrap_pn_output;
+%inline %{
+  int wrap_pn_output(pn_transport_t *transport, char *OUTPUT, size_t *OUTPUT_SIZE) {
+    ssize_t sz = pn_output(transport, OUTPUT, *OUTPUT_SIZE);
+    if (sz >= 0) {
+      *OUTPUT_SIZE = sz;
+    } else {
+      *OUTPUT_SIZE = 0;
+    }
+    return sz;
+  }
+%}
+%ignore pn_output;
+
+%rename(pn_delivery) wrap_pn_delivery;
+%inline %{
+  pn_delivery_t *wrap_pn_delivery(pn_link_t *link, char *STRING, size_t LENGTH) {
+    return pn_delivery(link, pn_dtag(STRING, LENGTH));
+  }
+%}
+%ignore pn_delivery;
+
+%rename(pn_delivery_tag) wrap_pn_delivery_tag;
+%inline %{
+  void wrap_pn_delivery_tag(pn_delivery_t *delivery, char **ALLOC_OUTPUT, size_t *ALLOC_SIZE) {
+    pn_delivery_tag_t tag = pn_delivery_tag(delivery);
+    *ALLOC_OUTPUT = malloc(tag.size);
+    *ALLOC_SIZE = tag.size;
+    memcpy(*ALLOC_OUTPUT, tag.bytes, tag.size);
+  }
+%}
+%ignore pn_delivery_tag;
+
+%rename(pn_message_data) wrap_pn_message_data;
+%inline %{
+  int wrap_pn_message_data(char *STRING, size_t LENGTH, char *OUTPUT, size_t *OUTPUT_SIZE) {
+    ssize_t sz = pn_message_data(OUTPUT, *OUTPUT_SIZE, STRING, LENGTH);
+    if (sz >= 0) {
+      *OUTPUT_SIZE = sz;
+    } else {
+      *OUTPUT_SIZE = 0;
+    }
+    return sz;
+  }
+%}
+%ignore pn_message_data;
+
+%include "../cproton.i"
-- 
1.7.11.2


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[PATCH 3/6] Updated the header protection macros, removed the leading underscore.

Posted by "Darryl L. Pierce" <dp...@redhat.com>.
From: "Darryl L. Pierce" <dp...@redhat.com>

The underscores were causing the Ruby language bindings to output
warning errors concerning their names.
---
 proton-c/include/proton/buffer.h    | 4 ++--
 proton-c/include/proton/codec.h     | 4 ++--
 proton-c/include/proton/driver.h    | 4 ++--
 proton-c/include/proton/engine.h    | 4 ++--
 proton-c/include/proton/error.h     | 4 ++--
 proton-c/include/proton/framing.h   | 4 ++--
 proton-c/include/proton/message.h   | 4 ++--
 proton-c/include/proton/messenger.h | 4 ++--
 proton-c/include/proton/parser.h    | 4 ++--
 proton-c/include/proton/sasl.h      | 4 ++--
 proton-c/include/proton/scanner.h   | 4 ++--
 proton-c/include/proton/types.h     | 4 ++--
 proton-c/include/proton/util.h      | 4 ++--
 13 files changed, 26 insertions(+), 26 deletions(-)

diff --git a/proton-c/include/proton/buffer.h b/proton-c/include/proton/buffer.h
index 007d5f0..aa6f3d0 100644
--- a/proton-c/include/proton/buffer.h
+++ b/proton-c/include/proton/buffer.h
@@ -1,5 +1,5 @@
-#ifndef _PROTON_BUFFER_H
-#define _PROTON_BUFFER_H 1
+#ifndef PROTON_BUFFER_H
+#define PROTON_BUFFER_H 1
 
 /*
  *
diff --git a/proton-c/include/proton/codec.h b/proton-c/include/proton/codec.h
index b1df55b..3738ef1 100644
--- a/proton-c/include/proton/codec.h
+++ b/proton-c/include/proton/codec.h
@@ -1,5 +1,5 @@
-#ifndef _PROTON_CODEC_H
-#define _PROTON_CODEC_H 1
+#ifndef PROTON_CODEC_H
+#define PROTON_CODEC_H 1
 
 /*
  *
diff --git a/proton-c/include/proton/driver.h b/proton-c/include/proton/driver.h
index e4eea4c..5ae18ab 100644
--- a/proton-c/include/proton/driver.h
+++ b/proton-c/include/proton/driver.h
@@ -1,5 +1,5 @@
-#ifndef _PROTON_DRIVER_H
-#define _PROTON_DRIVER_H 1
+#ifndef PROTON_DRIVER_H
+#define PROTON_DRIVER_H 1
 
 /*
  *
diff --git a/proton-c/include/proton/engine.h b/proton-c/include/proton/engine.h
index 610ab47..ad0c7c1 100644
--- a/proton-c/include/proton/engine.h
+++ b/proton-c/include/proton/engine.h
@@ -1,5 +1,5 @@
-#ifndef _PROTON_ENGINE_H
-#define _PROTON_ENGINE_H 1
+#ifndef PROTON_ENGINE_H
+#define PROTON_ENGINE_H 1
 
 /*
  *
diff --git a/proton-c/include/proton/error.h b/proton-c/include/proton/error.h
index ee317ab..34aa956 100644
--- a/proton-c/include/proton/error.h
+++ b/proton-c/include/proton/error.h
@@ -1,5 +1,5 @@
-#ifndef _PROTON_ERROR_H
-#define _PROTON_ERROR_H 1
+#ifndef PROTON_ERROR_H
+#define PROTON_ERROR_H 1
 
 /*
  *
diff --git a/proton-c/include/proton/framing.h b/proton-c/include/proton/framing.h
index 9f8a182..95259db 100644
--- a/proton-c/include/proton/framing.h
+++ b/proton-c/include/proton/framing.h
@@ -1,5 +1,5 @@
-#ifndef _PROTON_FRAMING_H
-#define _PROTON_FRAMING_H 1
+#ifndef PROTON_FRAMING_H
+#define PROTON_FRAMING_H 1
 
 /*
  *
diff --git a/proton-c/include/proton/message.h b/proton-c/include/proton/message.h
index 8543dc1..0354c1f 100644
--- a/proton-c/include/proton/message.h
+++ b/proton-c/include/proton/message.h
@@ -1,5 +1,5 @@
-#ifndef _PROTON_MESSAGE_H
-#define _PROTON_MESSAGE_H 1
+#ifndef PROTON_MESSAGE_H
+#define PROTON_MESSAGE_H 1
 
 /*
  *
diff --git a/proton-c/include/proton/messenger.h b/proton-c/include/proton/messenger.h
index c53bc4a..97c2dd9 100644
--- a/proton-c/include/proton/messenger.h
+++ b/proton-c/include/proton/messenger.h
@@ -1,5 +1,5 @@
-#ifndef _PROTON_MESSENGER_H
-#define _PROTON_MESSENGER_H 1
+#ifndef PROTON_MESSENGER_H
+#define PROTON_MESSENGER_H 1
 
 /*
  *
diff --git a/proton-c/include/proton/parser.h b/proton-c/include/proton/parser.h
index 03fbacc..68bd13c 100644
--- a/proton-c/include/proton/parser.h
+++ b/proton-c/include/proton/parser.h
@@ -1,5 +1,5 @@
-#ifndef _PROTON_PARSER_H
-#define _PROTON_PARSER_H 1
+#ifndef PROTON_PARSER_H
+#define PROTON_PARSER_H 1
 
 /*
  *
diff --git a/proton-c/include/proton/sasl.h b/proton-c/include/proton/sasl.h
index c128496..ce294ce 100644
--- a/proton-c/include/proton/sasl.h
+++ b/proton-c/include/proton/sasl.h
@@ -1,5 +1,5 @@
-#ifndef _PROTON_SASL_H
-#define _PROTON_SASL_H 1
+#ifndef PROTON_SASL_H
+#define PROTON_SASL_H 1
 
 /*
  *
diff --git a/proton-c/include/proton/scanner.h b/proton-c/include/proton/scanner.h
index 21d431d..0563cfb 100644
--- a/proton-c/include/proton/scanner.h
+++ b/proton-c/include/proton/scanner.h
@@ -1,5 +1,5 @@
-#ifndef _PROTON_SCANNER_H
-#define _PROTON_SCANNER_H 1
+#ifndef PROTON_SCANNER_H
+#define PROTON_SCANNER_H 1
 
 /*
  *
diff --git a/proton-c/include/proton/types.h b/proton-c/include/proton/types.h
index 9268225..647bb53 100644
--- a/proton-c/include/proton/types.h
+++ b/proton-c/include/proton/types.h
@@ -1,5 +1,5 @@
-#ifndef _PROTON_TYPES_H
-#define _PROTON_TYPES_H 1
+#ifndef PROTON_TYPES_H
+#define PROTON_TYPES_H 1
 
 /*
  *
diff --git a/proton-c/include/proton/util.h b/proton-c/include/proton/util.h
index 3d24b61..e616556 100644
--- a/proton-c/include/proton/util.h
+++ b/proton-c/include/proton/util.h
@@ -1,5 +1,5 @@
-#ifndef _PROTON_UTIL_H
-#define _PROTON_UTIL_H 1
+#ifndef PROTON_UTIL_H
+#define PROTON_UTIL_H 1
 
 /*
  *
-- 
1.7.11.2


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[PATCH 2/6] Added a gitignore file.

Posted by "Darryl L. Pierce" <dp...@redhat.com>.
From: "Darryl L. Pierce" <dp...@redhat.com>

The first entry ignores temporary files from Emacs and similar editors.
---
 .gitignore | 1 +
 1 file changed, 1 insertion(+)
 create mode 100644 .gitignore

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..b25c15b
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+*~
-- 
1.7.11.2


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


Re: [PATCH 1/6] added timeouts to messenger API; added messenger test suite; tweaked logging to identify connection; added PN_TRACE_DRV log flag

Posted by Rafael Schloming <ra...@redhat.com>.
This looks like a patch for a commit I made on trunk a while back. Did
something go awry with creating the patch set?

--Rafael

On Wed, 2012-07-25 at 14:05 -0400, Darryl L. Pierce wrote:
> From: rhs <rh...@13f79535-47bb-0310-9956-ffa450edef68>
> 
> git-svn-id: https://svn.apache.org/repos/asf/qpid/proton/trunk@1356935 13f79535-47bb-0310-9956-ffa450edef68
> ---
>  proton-c/include/proton/engine.h     |   1 +
>  proton-c/include/proton/error.h      |   1 +
>  proton-c/include/proton/messenger.h  |  18 +++++
>  proton-c/src/dispatcher/dispatcher.c |   4 +-
>  proton-c/src/driver.c                |  25 ++++---
>  proton-c/src/messenger.c             | 129 +++++++++++++++++++++++------------
>  tests/proton_tests/__init__.py       |   1 +
>  tests/proton_tests/messenger.py      |  88 ++++++++++++++++++++++++
>  8 files changed, 212 insertions(+), 55 deletions(-)
>  create mode 100644 tests/proton_tests/messenger.py
> 
> diff --git a/proton-c/include/proton/engine.h b/proton-c/include/proton/engine.h
> index b8873bb..610ab47 100644
> --- a/proton-c/include/proton/engine.h
> +++ b/proton-c/include/proton/engine.h
> @@ -76,6 +76,7 @@ typedef int pn_trace_t;
>  #define PN_TRACE_OFF (0)
>  #define PN_TRACE_RAW (1)
>  #define PN_TRACE_FRM (2)
> +#define PN_TRACE_DRV (4)
>  
>  #define PN_SESSION_WINDOW (1024)
>  
> diff --git a/proton-c/include/proton/error.h b/proton-c/include/proton/error.h
> index 86115c1..ee317ab 100644
> --- a/proton-c/include/proton/error.h
> +++ b/proton-c/include/proton/error.h
> @@ -32,6 +32,7 @@ typedef struct pn_error_t pn_error_t;
>  #define PN_UNDERFLOW (-4)
>  #define PN_STATE_ERR (-5)
>  #define PN_ARG_ERR (-6)
> +#define PN_TIMEOUT (-7)
>  
>  const char *pn_code(int code);
>  
> diff --git a/proton-c/include/proton/messenger.h b/proton-c/include/proton/messenger.h
> index 42d8cc4..c53bc4a 100644
> --- a/proton-c/include/proton/messenger.h
> +++ b/proton-c/include/proton/messenger.h
> @@ -48,6 +48,24 @@ pn_messenger_t *pn_messenger(const char *name);
>   */
>  const char *pn_messenger_name(pn_messenger_t *messenger);
>  
> +/** Sets the timeout for a Messenger. A negative timeout means
> + * infinite.
> + *
> + * @param[in] messenger the messenger
> + * @param[timeout] the new timeout for the messenger, in milliseconds
> + *
> + * @return an error code or zero if there is no error
> + */
> +int pn_messenger_set_timeout(pn_messenger_t *messenger, int timeout);
> +
> +/** Retrieves the timeout for a Messenger.
> + *
> + * @param[in] messenger the messenger
> + *
> + * @return the timeout for the messenger, in milliseconds
> + */
> +int pn_messenger_get_timeout(pn_messenger_t *messenger);
> +
>  /** Frees a Messenger.
>   *
>   * @param[in] messenger the messenger to free, no longer valid on
> diff --git a/proton-c/src/dispatcher/dispatcher.c b/proton-c/src/dispatcher/dispatcher.c
> index a9733ae..671a791 100644
> --- a/proton-c/src/dispatcher/dispatcher.c
> +++ b/proton-c/src/dispatcher/dispatcher.c
> @@ -82,8 +82,8 @@ static void pn_do_trace(pn_dispatcher_t *disp, uint16_t ch, pn_dir_t dir,
>      uint8_t code = scanned ? code64 : 0;
>      size_t n = SCRATCH;
>      pn_data_format(args, disp->scratch, &n);
> -    fprintf(stderr, "[%u] %s %s %s", ch, dir == OUT ? "->" : "<-",
> -            disp->names[code], disp->scratch);
> +    fprintf(stderr, "[%p:%u] %s %s %s", (void *) disp, ch,
> +            dir == OUT ? "->" : "<-", disp->names[code], disp->scratch);
>      if (size) {
>        size_t capacity = 4*size + 1;
>        char buf[capacity];
> diff --git a/proton-c/src/driver.c b/proton-c/src/driver.c
> index af99fe4..dda169b 100644
> --- a/proton-c/src/driver.c
> +++ b/proton-c/src/driver.c
> @@ -68,11 +68,13 @@ struct pn_listener_t {
>  };
>  
>  #define IO_BUF_SIZE (4*1024)
> +#define NAME_MAX (256)
>  
>  struct pn_connector_t {
>    pn_driver_t *driver;
>    pn_connector_t *connector_next;
>    pn_connector_t *connector_prev;
> +  char name[256];
>    int idx;
>    bool pending_tick;
>    bool pending_read;
> @@ -165,7 +167,8 @@ pn_listener_t *pn_listener(pn_driver_t *driver, const char *host,
>  
>    pn_listener_t *l = pn_listener_fd(driver, sock, context);
>  
> -  printf("Listening on %s:%s\n", host, port);
> +  if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
> +    printf("Listening on %s:%s\n", host, port);
>    return l;
>  }
>  
> @@ -226,9 +229,10 @@ pn_connector_t *pn_listener_accept(pn_listener_t *l)
>        return NULL;
>      } else {
>        pn_configure_sock(sock);
> -      if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW))
> -        printf("accepted from %s:%s\n", host, serv);
> +      if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
> +        fprintf(stderr, "Accepted from %s:%s\n", host, serv);
>        pn_connector_t *c = pn_connector_fd(l->driver, sock, NULL);
> +      snprintf(c->name, NAME_MAX, "%s:%s", host, serv);
>        c->listener = l;
>        return c;
>      }
> @@ -303,7 +307,9 @@ pn_connector_t *pn_connector(pn_driver_t *driver, const char *host,
>    freeaddrinfo(addr);
>  
>    pn_connector_t *c = pn_connector_fd(driver, sock, context);
> -  printf("Connected to %s:%s\n", host, port);
> +  snprintf(c->name, NAME_MAX, "%s:%s", host, port);
> +  if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
> +    fprintf(stderr, "Connected to %s\n", c->name);
>    return c;
>  }
>  
> @@ -332,6 +338,7 @@ pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context)
>    c->pending_tick = false;
>    c->pending_read = false;
>    c->pending_write = false;
> +  c->name[0] = '\0';
>    c->idx = 0;
>    c->fd = fd;
>    c->status = PN_SEL_RD | PN_SEL_WR;
> @@ -460,7 +467,7 @@ static void pn_connector_process_input(pn_connector_t *ctor)
>        if (n == PN_EOS) {
>          pn_connector_consume(ctor, ctor->input_size);
>        } else {
> -        printf("error in process_input: %s\n", pn_code(n));
> +        fprintf(stderr, "error in process_input: %s\n", pn_code(n));
>        }
>        ctor->input_done = true;
>        break;
> @@ -658,8 +665,9 @@ void pn_connector_process(pn_connector_t *c) {
>        c->pending_write = false;
>      }
>      if (c->output_size == 0 && c->input_done && c->output_done) {
> -      if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW))
> -        fprintf(stderr, "closed\n");
> +      if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
> +        fprintf(stderr, "Closed %s\n", c->name);
> +      }
>        pn_connector_close(c);
>      }
>    }
> @@ -686,7 +694,8 @@ pn_driver_t *pn_driver()
>    d->ctrl[0] = 0;
>    d->ctrl[1] = 0;
>    d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) |
> -              (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF));
> +              (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) |
> +              (pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF));
>  
>    // XXX
>    if (pipe(d->ctrl)) {
> diff --git a/proton-c/src/messenger.c b/proton-c/src/messenger.c
> index a12025a..56da348 100644
> --- a/proton-c/src/messenger.c
> +++ b/proton-c/src/messenger.c
> @@ -21,6 +21,7 @@
>  
>  #include <proton/messenger.h>
>  #include <proton/driver.h>
> +#include <proton/util.h>
>  #include <stdlib.h>
>  #include <string.h>
>  #include <stdio.h>
> @@ -29,6 +30,7 @@
>  
>  struct pn_messenger_t {
>    char *name;
> +  int timeout;
>    pn_driver_t *driver;
>    pn_connector_t *connectors[1024];
>    size_t size;
> @@ -57,6 +59,7 @@ pn_messenger_t *pn_messenger(const char *name)
>  
>    if (m) {
>      m->name = build_name(name);
> +    m->timeout = -1;
>      m->driver = pn_driver();
>      m->size = 0;
>      m->listeners = 0;
> @@ -73,6 +76,18 @@ const char *pn_messenger_name(pn_messenger_t *messenger)
>    return messenger->name;
>  }
>  
> +int pn_messenger_set_timeout(pn_messenger_t *messenger, int timeout)
> +{
> +  if (!messenger) return PN_ARG_ERR;
> +  messenger->timeout = timeout;
> +  return 0;
> +}
> +
> +int pn_messenger_get_timeout(pn_messenger_t *messenger)
> +{
> +  return messenger ? messenger->timeout : 0;
> +}
> +
>  void pn_messenger_free(pn_messenger_t *messenger)
>  {
>    if (messenger) {
> @@ -171,14 +186,28 @@ void pn_messenger_reclaim(pn_messenger_t *messenger, pn_connection_t *conn)
>    }
>  }
>  
> -int pn_messenger_sync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *))
> +long int millis(struct timeval tv)
> +{
> +  return tv.tv_sec * 1000 + tv.tv_usec/1000;
> +}
> +
> +int pn_messenger_tsync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *), int timeout)
>  {
>    for (int i = 0; i < messenger->size; i++) {
>      pn_connector_process(messenger->connectors[i]);
>    }
>  
> -  while (!predicate(messenger)) {
> -    pn_driver_wait(messenger->driver, -1);
> +  struct timeval now;
> +  if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
> +  long int deadline = millis(now) + timeout;
> +  bool pred;
> +
> +  while (true) {
> +    pred = predicate(messenger);
> +    int remaining = deadline - millis(now);
> +    if (pred || (timeout >= 0 && remaining < 0)) break;
> +
> +    pn_driver_wait(messenger->driver, remaining);
>  
>      pn_listener_t *l;
>      while ((l = pn_driver_listener(messenger->driver))) {
> @@ -214,47 +243,30 @@ int pn_messenger_sync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_
>          pn_connector_process(c);
>        }
>      }
> +
> +    if (timeout >= 0) {
> +      if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
> +    }
>    }
>  
> -  return 0;
> +  return pred ? 0 : PN_TIMEOUT;
>  }
>  
> -bool pn_messenger_linked(pn_messenger_t *messenger)
> +int pn_messenger_sync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *))
>  {
> -  for (int i = 0; i < messenger->size; i++) {
> -    pn_connector_t *ctor = messenger->connectors[i];
> -    pn_connection_t *conn = pn_connector_connection(ctor);
> -    pn_state_t state = pn_connection_state(conn);
> -    if ((state == (PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT)) ||
> -        (state == (PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE))) {
> -      return false;
> -    }
> -
> -    if (pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT) ||
> -        pn_link_head(conn, PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE)) {
> -      return false;
> -    }
> -  }
> -
> -  return true;
> +  return pn_messenger_tsync(messenger, predicate, messenger->timeout);
>  }
>  
>  int pn_messenger_start(pn_messenger_t *messenger)
>  {
>    if (!messenger) return PN_ARG_ERR;
> -  return pn_messenger_sync(messenger, pn_messenger_linked);
> +  // right now this is a noop
> +  return 0;
>  }
>  
> -bool pn_messenger_unlinked(pn_messenger_t *messenger)
> +bool pn_messenger_stopped(pn_messenger_t *messenger)
>  {
> -  for (int i = 0; i < messenger->size; i++) {
> -    pn_connector_t *ctor = messenger->connectors[i];
> -    pn_connection_t *conn = pn_connector_connection(ctor);
> -    pn_state_t state = pn_connection_state(conn);
> -    if (state != (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED))
> -      return false;
> -  }
> -  return true;
> +  return messenger->size == 0;
>  }
>  
>  int pn_messenger_stop(pn_messenger_t *messenger)
> @@ -272,7 +284,7 @@ int pn_messenger_stop(pn_messenger_t *messenger)
>      pn_connection_close(conn);
>    }
>  
> -  return pn_messenger_sync(messenger, pn_messenger_unlinked);
> +  return pn_messenger_sync(messenger, pn_messenger_stopped);
>  }
>  
>  static void parse_address(char *address, char **domain, char **name)
> @@ -301,6 +313,18 @@ bool pn_streq(const char *a, const char *b)
>  
>  pn_connection_t *pn_messenger_domain(pn_messenger_t *messenger, const char *domain)
>  {
> +  char buf[strlen(domain) + 1];
> +  if (domain) {
> +    strcpy(buf, domain);
> +  } else {
> +    buf[0] = '\0';
> +  }
> +  char *user = NULL;
> +  char *pass = NULL;
> +  char *host = "0.0.0.0";
> +  char *port = "5672";
> +  parse_url(buf, &user, &pass, &host, &port);
> +
>    for (int i = 0; i < messenger->size; i++) {
>      pn_connection_t *connection = pn_connector_connection(messenger->connectors[i]);
>      const char *container = pn_connection_remote_container(connection);
> @@ -309,12 +333,16 @@ pn_connection_t *pn_messenger_domain(pn_messenger_t *messenger, const char *doma
>        return connection;
>    }
>  
> -  pn_connector_t *connector = pn_connector(messenger->driver, domain, "5672", NULL);
> +  pn_connector_t *connector = pn_connector(messenger->driver, host, port, NULL);
>    if (!connector) return NULL;
>    messenger->connectors[messenger->size++] = connector;
>    pn_sasl_t *sasl = pn_connector_sasl(connector);
> -  pn_sasl_mechanisms(sasl, "ANONYMOUS");
> -  pn_sasl_client(sasl);
> +  if (user) {
> +    pn_sasl_plain(sasl, user, pass);
> +  } else {
> +    pn_sasl_mechanisms(sasl, "ANONYMOUS");
> +    pn_sasl_client(sasl);
> +  }
>    pn_connection_t *connection = pn_connection();
>    pn_connection_set_container(connection, messenger->name);
>    pn_connection_set_hostname(connection, domain);
> @@ -378,11 +406,15 @@ pn_listener_t *pn_messenger_isource(pn_messenger_t *messenger, const char *sourc
>  {
>    char buf[strlen(source) + 1];
>    strcpy(buf, source);
> -  char *domain;
> -  char *name;
> +  char *domain, *name;
>    parse_address(buf, &domain, &name);
> +  char *user = NULL;
> +  char *pass = NULL;
> +  char *host = "0.0.0.0";
> +  char *port = "5672";
> +  parse_url(domain + 1, &user, &pass, &host, &port);
>  
> -  pn_listener_t *listener = pn_listener(messenger->driver, domain + 1, "5672", NULL);
> +  pn_listener_t *listener = pn_listener(messenger->driver, host, port, NULL);
>    if (listener) {
>      messenger->listeners++;
>    }
> @@ -428,6 +460,8 @@ static void outward_munge(pn_messenger_t *mng, pn_message_t *msg)
>    }
>  }
>  
> +bool false_pred(pn_messenger_t *messenger) { return false; }
> +
>  int pn_messenger_put(pn_messenger_t *messenger, pn_message_t *msg)
>  {
>    if (!messenger) return PN_ARG_ERR;
> @@ -459,6 +493,7 @@ int pn_messenger_put(pn_messenger_t *messenger, pn_message_t *msg)
>          return n;
>        } else {
>          pn_advance(sender);
> +        pn_messenger_tsync(messenger, false_pred, 0);
>          return 0;
>        }
>      }
> @@ -469,8 +504,6 @@ int pn_messenger_put(pn_messenger_t *messenger, pn_message_t *msg)
>  
>  bool pn_messenger_sent(pn_messenger_t *messenger)
>  {
> -  //  if (!pn_messenger_linked(messenger)) return false;
> -
>    for (int i = 0; i < messenger->size; i++) {
>      pn_connector_t *ctor = messenger->connectors[i];
>      pn_connection_t *conn = pn_connector_connection(ctor);
> @@ -497,8 +530,6 @@ bool pn_messenger_sent(pn_messenger_t *messenger)
>  
>  bool pn_messenger_rcvd(pn_messenger_t *messenger)
>  {
> -  //  if (!pn_messenger_linked(messenger)) return false;
> -
>    for (int i = 0; i < messenger->size; i++) {
>      pn_connector_t *ctor = messenger->connectors[i];
>      pn_connection_t *conn = pn_connector_connection(ctor);
> @@ -532,6 +563,8 @@ int pn_messenger_recv(pn_messenger_t *messenger, int n)
>  
>  int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)
>  {
> +  if (!messenger) return PN_ARG_ERR;
> +
>    for (int i = 0; i < messenger->size; i++) {
>      pn_connector_t *ctor = messenger->connectors[i];
>      pn_connection_t *conn = pn_connector_connection(ctor);
> @@ -545,10 +578,14 @@ int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)
>          ssize_t n = pn_recv(l, buf, 1024);
>          pn_settle(d);
>          if (n < 0) return n;
> -        int err = pn_message_decode(msg, buf, n);
> -        if (err) {
> -          return pn_error_format(messenger->error, err, "error decoding message: %s",
> +        if (msg) {
> +          int err = pn_message_decode(msg, buf, n);
> +          if (err) {
> +            return pn_error_format(messenger->error, err, "error decoding message: %s",
>                                   pn_message_error(msg));
> +          } else {
> +            return 0;
> +          }
>          } else {
>            return 0;
>          }
> @@ -564,6 +601,8 @@ int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)
>  
>  int pn_messenger_queued(pn_messenger_t *messenger, bool sender)
>  {
> +  if (!messenger) return 0;
> +
>    int result = 0;
>  
>    for (int i = 0; i < messenger->size; i++) {
> diff --git a/tests/proton_tests/__init__.py b/tests/proton_tests/__init__.py
> index a8a4d52..b467cf5 100644
> --- a/tests/proton_tests/__init__.py
> +++ b/tests/proton_tests/__init__.py
> @@ -19,3 +19,4 @@
>  
>  import proton_tests.engine
>  import proton_tests.message
> +import proton_tests.messenger
> diff --git a/tests/proton_tests/messenger.py b/tests/proton_tests/messenger.py
> new file mode 100644
> index 0000000..17161cd
> --- /dev/null
> +++ b/tests/proton_tests/messenger.py
> @@ -0,0 +1,88 @@
> +#
> +# Licensed to the Apache Software Foundation (ASF) under one
> +# or more contributor license agreements.  See the NOTICE file
> +# distributed with this work for additional information
> +# regarding copyright ownership.  The ASF licenses this file
> +# to you under the Apache License, Version 2.0 (the
> +# "License"); you may not use this file except in compliance
> +# with the License.  You may obtain a copy of the License at
> +# 
> +#   http://www.apache.org/licenses/LICENSE-2.0
> +# 
> +# Unless required by applicable law or agreed to in writing,
> +# software distributed under the License is distributed on an
> +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> +# KIND, either express or implied.  See the License for the
> +# specific language governing permissions and limitations
> +# under the License.
> +#
> +
> +import os, common, xproton
> +from xproton import *
> +from threading import Thread
> +
> +class Test(common.Test):
> +
> +  def setup(self):
> +    self.server = pn_messenger("server")
> +    pn_messenger_set_timeout(self.server, 10000)
> +    pn_messenger_start(self.server)
> +    pn_messenger_subscribe(self.server, "//~0.0.0.0:12345")
> +    self.thread = Thread(target=self.run)
> +    self.running = True
> +    self.thread.start()
> +
> +    self.client = pn_messenger("client")
> +    pn_messenger_set_timeout(self.client, 10000)
> +    pn_messenger_start(self.client)
> +
> +  def teardown(self):
> +    self.running = False
> +    msg = pn_message()
> +    pn_message_set_address(msg, "//0.0.0.0:12345")
> +    pn_messenger_put(self.client, msg)
> +    pn_messenger_send(self.client)
> +    pn_messenger_stop(self.client)
> +    self.thread.join()
> +    pn_messenger_free(self.client)
> +    pn_messenger_free(self.server)
> +    self.client = None
> +    self.server = None
> +
> +class MessengerTest(Test):
> +
> +  def run(self):
> +    msg = pn_message()
> +    while self.running:
> +      pn_messenger_recv(self.server, 10)
> +      while pn_messenger_incoming(self.server):
> +        if pn_messenger_get(self.server, msg):
> +          print pn_messenger_error(self.server)
> +        else:
> +          reply_to = pn_message_get_reply_to(msg)
> +          if reply_to:
> +            pn_message_set_address(msg, reply_to)
> +            pn_messenger_put(self.server, msg)
> +    pn_messenger_stop(self.server)
> +
> +  def testSendReceive(self):
> +    msg = pn_message()
> +    pn_message_set_address(msg, "//0.0.0.0:12345")
> +    pn_message_set_subject(msg, "Hello World!")
> +    body = "First the world, then the galaxy!"
> +    pn_message_load(msg, body)
> +    pn_messenger_put(self.client, msg)
> +    pn_messenger_send(self.client)
> +
> +    reply = pn_message()
> +    assert not pn_messenger_recv(self.client, 1)
> +    assert pn_messenger_incoming(self.client) == 1
> +    assert not pn_messenger_get(self.client, reply)
> +
> +    assert pn_message_get_subject(reply) == "Hello World!"
> +    cd, rbod = pn_message_save(reply, 1024)
> +    assert not cd
> +    assert rbod == body
> +
> +    pn_message_free(msg)
> +    pn_message_free(reply)



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[PATCH 1/6] added timeouts to messenger API; added messenger test suite; tweaked logging to identify connection; added PN_TRACE_DRV log flag

Posted by "Darryl L. Pierce" <dp...@redhat.com>.
From: rhs <rh...@13f79535-47bb-0310-9956-ffa450edef68>

git-svn-id: https://svn.apache.org/repos/asf/qpid/proton/trunk@1356935 13f79535-47bb-0310-9956-ffa450edef68
---
 proton-c/include/proton/engine.h     |   1 +
 proton-c/include/proton/error.h      |   1 +
 proton-c/include/proton/messenger.h  |  18 +++++
 proton-c/src/dispatcher/dispatcher.c |   4 +-
 proton-c/src/driver.c                |  25 ++++---
 proton-c/src/messenger.c             | 129 +++++++++++++++++++++++------------
 tests/proton_tests/__init__.py       |   1 +
 tests/proton_tests/messenger.py      |  88 ++++++++++++++++++++++++
 8 files changed, 212 insertions(+), 55 deletions(-)
 create mode 100644 tests/proton_tests/messenger.py

diff --git a/proton-c/include/proton/engine.h b/proton-c/include/proton/engine.h
index b8873bb..610ab47 100644
--- a/proton-c/include/proton/engine.h
+++ b/proton-c/include/proton/engine.h
@@ -76,6 +76,7 @@ typedef int pn_trace_t;
 #define PN_TRACE_OFF (0)
 #define PN_TRACE_RAW (1)
 #define PN_TRACE_FRM (2)
+#define PN_TRACE_DRV (4)
 
 #define PN_SESSION_WINDOW (1024)
 
diff --git a/proton-c/include/proton/error.h b/proton-c/include/proton/error.h
index 86115c1..ee317ab 100644
--- a/proton-c/include/proton/error.h
+++ b/proton-c/include/proton/error.h
@@ -32,6 +32,7 @@ typedef struct pn_error_t pn_error_t;
 #define PN_UNDERFLOW (-4)
 #define PN_STATE_ERR (-5)
 #define PN_ARG_ERR (-6)
+#define PN_TIMEOUT (-7)
 
 const char *pn_code(int code);
 
diff --git a/proton-c/include/proton/messenger.h b/proton-c/include/proton/messenger.h
index 42d8cc4..c53bc4a 100644
--- a/proton-c/include/proton/messenger.h
+++ b/proton-c/include/proton/messenger.h
@@ -48,6 +48,24 @@ pn_messenger_t *pn_messenger(const char *name);
  */
 const char *pn_messenger_name(pn_messenger_t *messenger);
 
+/** Sets the timeout for a Messenger. A negative timeout means
+ * infinite.
+ *
+ * @param[in] messenger the messenger
+ * @param[timeout] the new timeout for the messenger, in milliseconds
+ *
+ * @return an error code or zero if there is no error
+ */
+int pn_messenger_set_timeout(pn_messenger_t *messenger, int timeout);
+
+/** Retrieves the timeout for a Messenger.
+ *
+ * @param[in] messenger the messenger
+ *
+ * @return the timeout for the messenger, in milliseconds
+ */
+int pn_messenger_get_timeout(pn_messenger_t *messenger);
+
 /** Frees a Messenger.
  *
  * @param[in] messenger the messenger to free, no longer valid on
diff --git a/proton-c/src/dispatcher/dispatcher.c b/proton-c/src/dispatcher/dispatcher.c
index a9733ae..671a791 100644
--- a/proton-c/src/dispatcher/dispatcher.c
+++ b/proton-c/src/dispatcher/dispatcher.c
@@ -82,8 +82,8 @@ static void pn_do_trace(pn_dispatcher_t *disp, uint16_t ch, pn_dir_t dir,
     uint8_t code = scanned ? code64 : 0;
     size_t n = SCRATCH;
     pn_data_format(args, disp->scratch, &n);
-    fprintf(stderr, "[%u] %s %s %s", ch, dir == OUT ? "->" : "<-",
-            disp->names[code], disp->scratch);
+    fprintf(stderr, "[%p:%u] %s %s %s", (void *) disp, ch,
+            dir == OUT ? "->" : "<-", disp->names[code], disp->scratch);
     if (size) {
       size_t capacity = 4*size + 1;
       char buf[capacity];
diff --git a/proton-c/src/driver.c b/proton-c/src/driver.c
index af99fe4..dda169b 100644
--- a/proton-c/src/driver.c
+++ b/proton-c/src/driver.c
@@ -68,11 +68,13 @@ struct pn_listener_t {
 };
 
 #define IO_BUF_SIZE (4*1024)
+#define NAME_MAX (256)
 
 struct pn_connector_t {
   pn_driver_t *driver;
   pn_connector_t *connector_next;
   pn_connector_t *connector_prev;
+  char name[256];
   int idx;
   bool pending_tick;
   bool pending_read;
@@ -165,7 +167,8 @@ pn_listener_t *pn_listener(pn_driver_t *driver, const char *host,
 
   pn_listener_t *l = pn_listener_fd(driver, sock, context);
 
-  printf("Listening on %s:%s\n", host, port);
+  if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+    printf("Listening on %s:%s\n", host, port);
   return l;
 }
 
@@ -226,9 +229,10 @@ pn_connector_t *pn_listener_accept(pn_listener_t *l)
       return NULL;
     } else {
       pn_configure_sock(sock);
-      if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW))
-        printf("accepted from %s:%s\n", host, serv);
+      if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+        fprintf(stderr, "Accepted from %s:%s\n", host, serv);
       pn_connector_t *c = pn_connector_fd(l->driver, sock, NULL);
+      snprintf(c->name, NAME_MAX, "%s:%s", host, serv);
       c->listener = l;
       return c;
     }
@@ -303,7 +307,9 @@ pn_connector_t *pn_connector(pn_driver_t *driver, const char *host,
   freeaddrinfo(addr);
 
   pn_connector_t *c = pn_connector_fd(driver, sock, context);
-  printf("Connected to %s:%s\n", host, port);
+  snprintf(c->name, NAME_MAX, "%s:%s", host, port);
+  if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+    fprintf(stderr, "Connected to %s\n", c->name);
   return c;
 }
 
@@ -332,6 +338,7 @@ pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context)
   c->pending_tick = false;
   c->pending_read = false;
   c->pending_write = false;
+  c->name[0] = '\0';
   c->idx = 0;
   c->fd = fd;
   c->status = PN_SEL_RD | PN_SEL_WR;
@@ -460,7 +467,7 @@ static void pn_connector_process_input(pn_connector_t *ctor)
       if (n == PN_EOS) {
         pn_connector_consume(ctor, ctor->input_size);
       } else {
-        printf("error in process_input: %s\n", pn_code(n));
+        fprintf(stderr, "error in process_input: %s\n", pn_code(n));
       }
       ctor->input_done = true;
       break;
@@ -658,8 +665,9 @@ void pn_connector_process(pn_connector_t *c) {
       c->pending_write = false;
     }
     if (c->output_size == 0 && c->input_done && c->output_done) {
-      if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW))
-        fprintf(stderr, "closed\n");
+      if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
+        fprintf(stderr, "Closed %s\n", c->name);
+      }
       pn_connector_close(c);
     }
   }
@@ -686,7 +694,8 @@ pn_driver_t *pn_driver()
   d->ctrl[0] = 0;
   d->ctrl[1] = 0;
   d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) |
-              (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF));
+              (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) |
+              (pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF));
 
   // XXX
   if (pipe(d->ctrl)) {
diff --git a/proton-c/src/messenger.c b/proton-c/src/messenger.c
index a12025a..56da348 100644
--- a/proton-c/src/messenger.c
+++ b/proton-c/src/messenger.c
@@ -21,6 +21,7 @@
 
 #include <proton/messenger.h>
 #include <proton/driver.h>
+#include <proton/util.h>
 #include <stdlib.h>
 #include <string.h>
 #include <stdio.h>
@@ -29,6 +30,7 @@
 
 struct pn_messenger_t {
   char *name;
+  int timeout;
   pn_driver_t *driver;
   pn_connector_t *connectors[1024];
   size_t size;
@@ -57,6 +59,7 @@ pn_messenger_t *pn_messenger(const char *name)
 
   if (m) {
     m->name = build_name(name);
+    m->timeout = -1;
     m->driver = pn_driver();
     m->size = 0;
     m->listeners = 0;
@@ -73,6 +76,18 @@ const char *pn_messenger_name(pn_messenger_t *messenger)
   return messenger->name;
 }
 
+int pn_messenger_set_timeout(pn_messenger_t *messenger, int timeout)
+{
+  if (!messenger) return PN_ARG_ERR;
+  messenger->timeout = timeout;
+  return 0;
+}
+
+int pn_messenger_get_timeout(pn_messenger_t *messenger)
+{
+  return messenger ? messenger->timeout : 0;
+}
+
 void pn_messenger_free(pn_messenger_t *messenger)
 {
   if (messenger) {
@@ -171,14 +186,28 @@ void pn_messenger_reclaim(pn_messenger_t *messenger, pn_connection_t *conn)
   }
 }
 
-int pn_messenger_sync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *))
+long int millis(struct timeval tv)
+{
+  return tv.tv_sec * 1000 + tv.tv_usec/1000;
+}
+
+int pn_messenger_tsync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *), int timeout)
 {
   for (int i = 0; i < messenger->size; i++) {
     pn_connector_process(messenger->connectors[i]);
   }
 
-  while (!predicate(messenger)) {
-    pn_driver_wait(messenger->driver, -1);
+  struct timeval now;
+  if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
+  long int deadline = millis(now) + timeout;
+  bool pred;
+
+  while (true) {
+    pred = predicate(messenger);
+    int remaining = deadline - millis(now);
+    if (pred || (timeout >= 0 && remaining < 0)) break;
+
+    pn_driver_wait(messenger->driver, remaining);
 
     pn_listener_t *l;
     while ((l = pn_driver_listener(messenger->driver))) {
@@ -214,47 +243,30 @@ int pn_messenger_sync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_
         pn_connector_process(c);
       }
     }
+
+    if (timeout >= 0) {
+      if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
+    }
   }
 
-  return 0;
+  return pred ? 0 : PN_TIMEOUT;
 }
 
-bool pn_messenger_linked(pn_messenger_t *messenger)
+int pn_messenger_sync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *))
 {
-  for (int i = 0; i < messenger->size; i++) {
-    pn_connector_t *ctor = messenger->connectors[i];
-    pn_connection_t *conn = pn_connector_connection(ctor);
-    pn_state_t state = pn_connection_state(conn);
-    if ((state == (PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT)) ||
-        (state == (PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE))) {
-      return false;
-    }
-
-    if (pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT) ||
-        pn_link_head(conn, PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE)) {
-      return false;
-    }
-  }
-
-  return true;
+  return pn_messenger_tsync(messenger, predicate, messenger->timeout);
 }
 
 int pn_messenger_start(pn_messenger_t *messenger)
 {
   if (!messenger) return PN_ARG_ERR;
-  return pn_messenger_sync(messenger, pn_messenger_linked);
+  // right now this is a noop
+  return 0;
 }
 
-bool pn_messenger_unlinked(pn_messenger_t *messenger)
+bool pn_messenger_stopped(pn_messenger_t *messenger)
 {
-  for (int i = 0; i < messenger->size; i++) {
-    pn_connector_t *ctor = messenger->connectors[i];
-    pn_connection_t *conn = pn_connector_connection(ctor);
-    pn_state_t state = pn_connection_state(conn);
-    if (state != (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED))
-      return false;
-  }
-  return true;
+  return messenger->size == 0;
 }
 
 int pn_messenger_stop(pn_messenger_t *messenger)
@@ -272,7 +284,7 @@ int pn_messenger_stop(pn_messenger_t *messenger)
     pn_connection_close(conn);
   }
 
-  return pn_messenger_sync(messenger, pn_messenger_unlinked);
+  return pn_messenger_sync(messenger, pn_messenger_stopped);
 }
 
 static void parse_address(char *address, char **domain, char **name)
@@ -301,6 +313,18 @@ bool pn_streq(const char *a, const char *b)
 
 pn_connection_t *pn_messenger_domain(pn_messenger_t *messenger, const char *domain)
 {
+  char buf[strlen(domain) + 1];
+  if (domain) {
+    strcpy(buf, domain);
+  } else {
+    buf[0] = '\0';
+  }
+  char *user = NULL;
+  char *pass = NULL;
+  char *host = "0.0.0.0";
+  char *port = "5672";
+  parse_url(buf, &user, &pass, &host, &port);
+
   for (int i = 0; i < messenger->size; i++) {
     pn_connection_t *connection = pn_connector_connection(messenger->connectors[i]);
     const char *container = pn_connection_remote_container(connection);
@@ -309,12 +333,16 @@ pn_connection_t *pn_messenger_domain(pn_messenger_t *messenger, const char *doma
       return connection;
   }
 
-  pn_connector_t *connector = pn_connector(messenger->driver, domain, "5672", NULL);
+  pn_connector_t *connector = pn_connector(messenger->driver, host, port, NULL);
   if (!connector) return NULL;
   messenger->connectors[messenger->size++] = connector;
   pn_sasl_t *sasl = pn_connector_sasl(connector);
-  pn_sasl_mechanisms(sasl, "ANONYMOUS");
-  pn_sasl_client(sasl);
+  if (user) {
+    pn_sasl_plain(sasl, user, pass);
+  } else {
+    pn_sasl_mechanisms(sasl, "ANONYMOUS");
+    pn_sasl_client(sasl);
+  }
   pn_connection_t *connection = pn_connection();
   pn_connection_set_container(connection, messenger->name);
   pn_connection_set_hostname(connection, domain);
@@ -378,11 +406,15 @@ pn_listener_t *pn_messenger_isource(pn_messenger_t *messenger, const char *sourc
 {
   char buf[strlen(source) + 1];
   strcpy(buf, source);
-  char *domain;
-  char *name;
+  char *domain, *name;
   parse_address(buf, &domain, &name);
+  char *user = NULL;
+  char *pass = NULL;
+  char *host = "0.0.0.0";
+  char *port = "5672";
+  parse_url(domain + 1, &user, &pass, &host, &port);
 
-  pn_listener_t *listener = pn_listener(messenger->driver, domain + 1, "5672", NULL);
+  pn_listener_t *listener = pn_listener(messenger->driver, host, port, NULL);
   if (listener) {
     messenger->listeners++;
   }
@@ -428,6 +460,8 @@ static void outward_munge(pn_messenger_t *mng, pn_message_t *msg)
   }
 }
 
+bool false_pred(pn_messenger_t *messenger) { return false; }
+
 int pn_messenger_put(pn_messenger_t *messenger, pn_message_t *msg)
 {
   if (!messenger) return PN_ARG_ERR;
@@ -459,6 +493,7 @@ int pn_messenger_put(pn_messenger_t *messenger, pn_message_t *msg)
         return n;
       } else {
         pn_advance(sender);
+        pn_messenger_tsync(messenger, false_pred, 0);
         return 0;
       }
     }
@@ -469,8 +504,6 @@ int pn_messenger_put(pn_messenger_t *messenger, pn_message_t *msg)
 
 bool pn_messenger_sent(pn_messenger_t *messenger)
 {
-  //  if (!pn_messenger_linked(messenger)) return false;
-
   for (int i = 0; i < messenger->size; i++) {
     pn_connector_t *ctor = messenger->connectors[i];
     pn_connection_t *conn = pn_connector_connection(ctor);
@@ -497,8 +530,6 @@ bool pn_messenger_sent(pn_messenger_t *messenger)
 
 bool pn_messenger_rcvd(pn_messenger_t *messenger)
 {
-  //  if (!pn_messenger_linked(messenger)) return false;
-
   for (int i = 0; i < messenger->size; i++) {
     pn_connector_t *ctor = messenger->connectors[i];
     pn_connection_t *conn = pn_connector_connection(ctor);
@@ -532,6 +563,8 @@ int pn_messenger_recv(pn_messenger_t *messenger, int n)
 
 int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)
 {
+  if (!messenger) return PN_ARG_ERR;
+
   for (int i = 0; i < messenger->size; i++) {
     pn_connector_t *ctor = messenger->connectors[i];
     pn_connection_t *conn = pn_connector_connection(ctor);
@@ -545,10 +578,14 @@ int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)
         ssize_t n = pn_recv(l, buf, 1024);
         pn_settle(d);
         if (n < 0) return n;
-        int err = pn_message_decode(msg, buf, n);
-        if (err) {
-          return pn_error_format(messenger->error, err, "error decoding message: %s",
+        if (msg) {
+          int err = pn_message_decode(msg, buf, n);
+          if (err) {
+            return pn_error_format(messenger->error, err, "error decoding message: %s",
                                  pn_message_error(msg));
+          } else {
+            return 0;
+          }
         } else {
           return 0;
         }
@@ -564,6 +601,8 @@ int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)
 
 int pn_messenger_queued(pn_messenger_t *messenger, bool sender)
 {
+  if (!messenger) return 0;
+
   int result = 0;
 
   for (int i = 0; i < messenger->size; i++) {
diff --git a/tests/proton_tests/__init__.py b/tests/proton_tests/__init__.py
index a8a4d52..b467cf5 100644
--- a/tests/proton_tests/__init__.py
+++ b/tests/proton_tests/__init__.py
@@ -19,3 +19,4 @@
 
 import proton_tests.engine
 import proton_tests.message
+import proton_tests.messenger
diff --git a/tests/proton_tests/messenger.py b/tests/proton_tests/messenger.py
new file mode 100644
index 0000000..17161cd
--- /dev/null
+++ b/tests/proton_tests/messenger.py
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, common, xproton
+from xproton import *
+from threading import Thread
+
+class Test(common.Test):
+
+  def setup(self):
+    self.server = pn_messenger("server")
+    pn_messenger_set_timeout(self.server, 10000)
+    pn_messenger_start(self.server)
+    pn_messenger_subscribe(self.server, "//~0.0.0.0:12345")
+    self.thread = Thread(target=self.run)
+    self.running = True
+    self.thread.start()
+
+    self.client = pn_messenger("client")
+    pn_messenger_set_timeout(self.client, 10000)
+    pn_messenger_start(self.client)
+
+  def teardown(self):
+    self.running = False
+    msg = pn_message()
+    pn_message_set_address(msg, "//0.0.0.0:12345")
+    pn_messenger_put(self.client, msg)
+    pn_messenger_send(self.client)
+    pn_messenger_stop(self.client)
+    self.thread.join()
+    pn_messenger_free(self.client)
+    pn_messenger_free(self.server)
+    self.client = None
+    self.server = None
+
+class MessengerTest(Test):
+
+  def run(self):
+    msg = pn_message()
+    while self.running:
+      pn_messenger_recv(self.server, 10)
+      while pn_messenger_incoming(self.server):
+        if pn_messenger_get(self.server, msg):
+          print pn_messenger_error(self.server)
+        else:
+          reply_to = pn_message_get_reply_to(msg)
+          if reply_to:
+            pn_message_set_address(msg, reply_to)
+            pn_messenger_put(self.server, msg)
+    pn_messenger_stop(self.server)
+
+  def testSendReceive(self):
+    msg = pn_message()
+    pn_message_set_address(msg, "//0.0.0.0:12345")
+    pn_message_set_subject(msg, "Hello World!")
+    body = "First the world, then the galaxy!"
+    pn_message_load(msg, body)
+    pn_messenger_put(self.client, msg)
+    pn_messenger_send(self.client)
+
+    reply = pn_message()
+    assert not pn_messenger_recv(self.client, 1)
+    assert pn_messenger_incoming(self.client) == 1
+    assert not pn_messenger_get(self.client, reply)
+
+    assert pn_message_get_subject(reply) == "Hello World!"
+    cd, rbod = pn_message_save(reply, 1024)
+    assert not cd
+    assert rbod == body
+
+    pn_message_free(msg)
+    pn_message_free(reply)
-- 
1.7.11.2


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org