You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mc...@apache.org on 2015/06/03 22:31:03 UTC

[26/34] qpid-proton git commit: PROTON-799: Added the engine_send and engine_recv examples to Ruby.

PROTON-799: Added the engine_send and engine_recv examples to Ruby.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f5f94310
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f5f94310
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f5f94310

Branch: refs/heads/master
Commit: f5f943107f7cd76fcb23634f83cb516975be6dfb
Parents: 740b05e
Author: Darryl L. Pierce <mc...@gmail.com>
Authored: Tue Jan 13 16:27:57 2015 -0500
Committer: Darryl L. Pierce <mc...@gmail.com>
Committed: Wed Jun 3 16:29:24 2015 -0400

----------------------------------------------------------------------
 examples/ruby/engine_recv.rb       | 158 ++++++++++++++++++++++++++++++++
 examples/ruby/engine_send.rb       | 143 +++++++++++++++++++++++++++++
 examples/ruby/lib/driver.rb        |  69 ++++++++++++++
 examples/ruby/lib/qpid_examples.rb |  28 ++++++
 examples/ruby/lib/selectable.rb    | 120 ++++++++++++++++++++++++
 5 files changed, 518 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f5f94310/examples/ruby/engine_recv.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/engine_recv.rb b/examples/ruby/engine_recv.rb
new file mode 100644
index 0000000..1529964
--- /dev/null
+++ b/examples/ruby/engine_recv.rb
@@ -0,0 +1,158 @@
+#--
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#++
+
+require "qpid_examples"
+require "optparse"
+
+DEFAULT_PORT = 5672
+
+options = {
+  :port => DEFAULT_PORT,
+  :debug => false,
+  :verbose => false,
+}
+
+OptionParser.new do |opts|
+  opts.banner = "Usage: engine_recv.rb [options]"
+
+  opts.on("-p [port]", "--port [port]",
+          "The port to use  (def. #{DEFAULT_PORT})") do |port|
+    options[:port] = port
+  end
+
+  opts.on("-v", "--verbose",
+          "Enable verbose output") do
+    options[:verbose] = true
+  end
+
+  opts.on("-d",
+          "--debug", "Enable debugging") do
+    options[:debug] = true
+  end
+
+  opts.parse!
+end
+
+server = TCPServer.new('localhost', options[:port])
+
+last_time = Time.now
+
+message_count = 0
+driver = Driver.new
+
+collector = Qpid::Proton::Event::Collector.new
+
+loop do
+  begin
+    client = server.accept_nonblock
+  rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EINTR, Errno::EWOULDBLOCK => error
+
+  end
+
+  unless client.nil?
+    puts "Connection from #{client.peeraddr.last}"
+    connection = Qpid::Proton::Connection.new
+    connection.collect(collector)
+    transport = Qpid::Proton::Transport.new(Qpid::Proton::Transport::SERVER)
+    transport.bind(connection)
+    selectable = Selectable.new(transport, client)
+    driver.add(selectable)
+  end
+
+  # let the driver process data
+  driver.process
+
+  event = collector.peek
+
+  while !event.nil?
+    puts "EVENT: #{event}" if options[:debug]
+
+    case event.type
+    when Qpid::Proton::Event::CONNECTION_INIT
+      conn = event.connection
+      if conn.state & Qpid::Proton::Endpoint::REMOTE_UNINIT
+        conn.transport.sasl.done(Qpid::Proton::SASL::OK)
+      end
+
+    when Qpid::Proton::Event::CONNECTION_BOUND
+      conn = event.connection
+      if conn.state & Qpid::Proton::Endpoint::LOCAL_UNINIT
+        conn.open
+      end
+
+    when Qpid::Proton::Event::CONNECTION_REMOTE_CLOSE
+      conn = event.context
+      if !(conn.state & Qpid::Proton::Endpoint::LOCAL_CLOSED)
+        conn.close
+      end
+
+    when Qpid::Proton::Event::SESSION_REMOTE_OPEN
+      session = event.session
+      if session.state & Qpid::Proton::Endpoint::LOCAL_UNINIT
+        session.incoming_capacity = 1000000
+        session.open
+      end
+
+    when Qpid::Proton::Event::SESSION_REMOTE_CLOSE
+      session = event.session
+      if !(session.state & Qpid::Proton::Endpoint::LOCAL_CLOSED)
+        session.close
+      end
+
+    when Qpid::Proton::Event::LINK_REMOTE_OPEN
+      link = event.link
+      if link.state & Qpid::Proton::Endpoint::LOCAL_UNINIT
+        link.open
+        link.flow 400
+      end
+
+    when Qpid::Proton::Event::LINK_REMOTE_CLOSE
+      link = event.context
+      if !(link.state & Qpid::Proton::Endpoint::LOCAL_CLOSED)
+        link.close
+      end
+
+    when Qpid::Proton::Event::DELIVERY
+      link = event.link
+      delivery = event.delivery
+      if delivery.readable? && !delivery.partial?
+        # decode the message and display it
+        msg = Qpid::Proton::Util::Engine.receive_message(delivery)
+        message_count += 1
+        puts "Received:"
+        puts " Count=#{message_count}" if options[:verbose]
+        puts " From=#{msg.id}" if msg.id
+        puts " Reply to=#{msg.reply_to}" if msg.reply_to
+        puts " Subject=#{msg.subject}" if msg.subject
+        puts " Body=#{msg.body}" if msg.body
+        puts ""
+        delivery.settle
+        credit = link.credit
+        link.flow(200) if credit <= 200
+      end
+
+    when Qpid::Proton::Event::TRANSPORT
+      driver.process
+
+    end
+
+    collector.pop
+    event = collector.peek
+  end
+end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f5f94310/examples/ruby/engine_send.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/engine_send.rb b/examples/ruby/engine_send.rb
new file mode 100644
index 0000000..189c7fd
--- /dev/null
+++ b/examples/ruby/engine_send.rb
@@ -0,0 +1,143 @@
+#--
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#++
+
+require 'qpid_examples'
+require "optparse"
+
+DEFAULT_ADDRESS = "0.0.0.0:5672"
+
+options = {
+  :address => DEFAULT_ADDRESS,
+  :debug => false,
+  :verbose => false,
+  :count => 1,
+  :content => "This message was sent #{Time.new}"
+}
+
+OptionParser.new do |opts|
+  opts.banner = "Usage: engine_recv.rb [options]"
+
+  opts.on("-a [address]", "--address [address]",
+          "The target address (def. #{DEFAULT_ADDRESS})") do |address|
+    options[:address] = address
+  end
+
+  opts.on("-C [content]", "--content [content]",
+          "The message content") do |content|
+    options[:content] = content
+  end
+
+  opts.on("-c [count]", "--count [count]",
+          "The number of messages to send (def. 1)") do |count|
+    options[:count] = count.to_i
+  end
+
+  opts.on("-v", "--verbose",
+          "Enable verbose output") do
+    options[:verbose] = true
+  end
+
+  opts.on("-d",
+          "--debug", "Enable debugging") do
+    options[:debug] = true
+  end
+
+  opts.parse!
+end
+
+
+driver = Driver.new
+
+conn = Qpid::Proton::Connection.new
+collector = Qpid::Proton::Event::Collector.new
+conn.collect(collector)
+
+session = conn.session
+conn.open
+session.open
+
+sender = session.sender("tvc_15_1")
+sender.target.address = "queue"
+sender.open
+
+transport = Qpid::Proton::Transport.new
+transport.bind(conn)
+
+address, port = options[:address].split(":")
+
+socket = TCPSocket.new(address, port)
+selectable = Selectable.new(transport, socket)
+sent_count = 0
+
+sent_count = 0
+
+driver.add(selectable)
+
+loop do
+  # let the driver process
+  driver.process
+
+  event = collector.peek
+
+  unless event.nil?
+
+    print "EVENT: #{event}\n" if options[:debug]
+
+    case event.type
+
+    when Qpid::Proton::Event::LINK_FLOW
+      sender = event.sender
+      credit = sender.credit
+
+      message = Qpid::Proton::Message.new
+
+      if credit > 0 && sent_count < options[:count]
+        sent_count = sent_count.next
+        message.clear
+        message.address = options[:address]
+        message.subject = "Message #{sent_count}..."
+        message.body = options[:content]
+
+        delivery = sender.delivery("#{sent_count}")
+        sender.send(message.encode)
+        delivery.settle
+        sender.advance
+        credit = sender.credit
+      else
+        sender.close
+      end
+
+    when Qpid::Proton::Event::LINK_LOCAL_CLOSE
+      link = event.link
+      link.close
+      link.session.close
+
+    when Qpid::Proton::Event::SESSION_LOCAL_CLOSE
+      session = event.session
+      session.connection.close
+
+    when Qpid::Proton::Event::CONNECTION_LOCAL_CLOSE
+      break
+
+    end
+
+    collector.pop
+    event = collector.peek
+  end
+end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f5f94310/examples/ruby/lib/driver.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/lib/driver.rb b/examples/ruby/lib/driver.rb
new file mode 100644
index 0000000..4e223d0
--- /dev/null
+++ b/examples/ruby/lib/driver.rb
@@ -0,0 +1,69 @@
+#--
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#++
+
+class Driver
+
+  def initialize
+    @selectables = {}
+  end
+
+  def add(selectable)
+    @selectables[selectable.fileno] = selectable
+  end
+
+  def process
+    reading = []
+    writing = []
+
+    @selectables.each_value do |sel|
+      if sel.closed? || sel.fileno.nil?
+        @selectables.delete(sel.fileno)
+      else
+        begin
+          reading << sel.to_io if sel.reading?
+          writing << sel.to_io if sel.writing?
+        rescue Exception => error
+          puts "Error: #{error}"
+          puts error.backtrace.join("\n");
+          # @selectables.delete(sel.fileno)
+        end
+      end
+    end
+
+    read_from, write_to = IO.select(reading, writing, [], 0)
+
+    unless read_from.nil?
+      read_from.each do |r|
+        sel = @selectables[r.fileno]
+        sel.readable unless sel.nil? || sel.closed?
+      end
+    end
+
+    begin
+      unless write_to.nil?
+        write_to.each do |w|
+          sel = @selectables[w.fileno]
+          sel.writable unless sel.nil? || sel.closed?
+        end
+      end
+
+    end
+  end
+
+end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f5f94310/examples/ruby/lib/qpid_examples.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/lib/qpid_examples.rb b/examples/ruby/lib/qpid_examples.rb
new file mode 100644
index 0000000..8503fbe
--- /dev/null
+++ b/examples/ruby/lib/qpid_examples.rb
@@ -0,0 +1,28 @@
+#--
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#++
+
+require "qpid_proton"
+
+require "selectable"
+require "driver"
+require "socket"
+require "monitor"
+
+include Socket::Constants
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f5f94310/examples/ruby/lib/selectable.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/lib/selectable.rb b/examples/ruby/lib/selectable.rb
new file mode 100644
index 0000000..779ea24
--- /dev/null
+++ b/examples/ruby/lib/selectable.rb
@@ -0,0 +1,120 @@
+#--
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#++
+
+class Selectable
+
+  attr_reader :transport
+
+  def initialize(transport, socket)
+    @transport = transport
+    @socket = socket
+    @socket.autoclose = true
+    @write_done = false
+    @read_done = false
+  end
+
+  def closed?
+    return true if @socket.closed?
+    return false if !@read_done && !@write_done
+    @socket.close
+    true
+  end
+
+  def fileno
+    @socket.fileno unless @socket.closed?
+  end
+
+  def to_io
+    @socket
+  end
+
+  def reading?
+    return false if @read_done
+    c = @transport.capacity
+    if c > 0
+      return true
+    elsif c < 0
+      @read_done = true
+      return false
+    else
+      return false
+    end
+  end
+
+  def writing?
+    return false if @write_done
+    begin
+      p = @transport.pending
+      if p > 0
+        return true
+      elsif p < 0
+        @write_done = true
+        return false
+      else
+        return false
+      end
+    rescue Qpid::Proton::TransportError => error
+      @write_done = true
+      return false
+    end
+  end
+
+  def readable
+    c = @transport.capacity
+    if c > 0
+      begin
+        data = @socket.recv(c)
+        if data
+          @transport.push(data)
+        else
+          @transport.close_tail
+        end
+      rescue Exception => error
+        puts "read error; #{error}"
+        @transport.close_tail
+        @read_done = true
+      end
+    elsif c < 0
+      @read_done = true
+    end
+  end
+
+  def writable
+    begin
+      p = @transport.pending
+      if p > 0
+        data = @transport.peek(p)
+        n = @socket.send(data, 0)
+        @transport.pop(n)
+      elsif p < 0
+        @write_done = true
+      end
+    rescue Exception => error
+      puts "write error: #{error}"
+      puts error.backtrace.join("\n")
+      @transport.close_head
+      @write_done = true
+    end
+  end
+
+  def tick(now)
+    @transport.tick(now)
+  end
+
+end


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org