You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/09/19 11:47:01 UTC

svn commit: r1524673 - in /qpid/trunk/qpid/python/examples/api: drain hello hello_xml server spout statistics.py

Author: gsim
Date: Thu Sep 19 09:47:01 2013
New Revision: 1524673

URL: http://svn.apache.org/r1524673
Log:
QPID-4924: restore examples to the pure python component

Added:
    qpid/trunk/qpid/python/examples/api/drain   (with props)
    qpid/trunk/qpid/python/examples/api/hello   (with props)
    qpid/trunk/qpid/python/examples/api/hello_xml   (with props)
    qpid/trunk/qpid/python/examples/api/server   (with props)
    qpid/trunk/qpid/python/examples/api/spout   (with props)
    qpid/trunk/qpid/python/examples/api/statistics.py

Added: qpid/trunk/qpid/python/examples/api/drain
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/drain?rev=1524673&view=auto
==============================================================================
--- qpid/trunk/qpid/python/examples/api/drain (added)
+++ qpid/trunk/qpid/python/examples/api/drain Thu Sep 19 09:47:01 2013
@@ -0,0 +1,97 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse
+from qpid.messaging import *
+from qpid.util import URL
+from qpid.log import enable, DEBUG, WARN
+
+parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...",
+                               description="Drain messages from the supplied address.")
+parser.add_option("-b", "--broker", default="localhost",
+                  help="connect to specified BROKER (default %default)")
+parser.add_option("-c", "--count", type="int",
+                  help="number of messages to drain")
+parser.add_option("-f", "--forever", action="store_true",
+                  help="ignore timeout and wait forever")
+parser.add_option("-r", "--reconnect", action="store_true",
+                  help="enable auto reconnect")
+parser.add_option("-i", "--reconnect-interval", type="float", default=3,
+                  help="interval between reconnect attempts")
+parser.add_option("-l", "--reconnect-limit", type="int",
+                  help="maximum number of reconnect attempts")
+parser.add_option("-t", "--timeout", type="float", default=0,
+                  help="timeout in seconds to wait before exiting (default %default)")
+parser.add_option("-p", "--print", dest="format", default="%(M)s",
+                  help="format string for printing messages (default %default)")
+parser.add_option("-v", dest="verbose", action="store_true",
+                  help="enable logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+  enable("qpid", DEBUG)
+else:
+  enable("qpid", WARN)
+
+if args:
+  addr = args.pop(0)
+else:
+  parser.error("address is required")
+if opts.forever:
+  timeout = None
+else:
+  timeout = opts.timeout
+
+class Formatter:
+
+  def __init__(self, message):
+    self.message = message
+    self.environ = {"M": self.message,
+                    "P": self.message.properties,
+                    "C": self.message.content}
+
+  def __getitem__(self, st):
+    return eval(st, self.environ)
+
+conn = Connection(opts.broker,
+                  reconnect=opts.reconnect,
+                  reconnect_interval=opts.reconnect_interval,
+                  reconnect_limit=opts.reconnect_limit)
+try:
+  conn.open()
+  ssn = conn.session()
+  rcv = ssn.receiver(addr)
+
+  count = 0
+  while not opts.count or count < opts.count:
+    try:
+      msg = rcv.fetch(timeout=timeout)
+      print opts.format % Formatter(msg)
+      count += 1
+      ssn.acknowledge()
+    except Empty:
+      break
+except ReceiverError, e:
+  print e
+except KeyboardInterrupt:
+  pass
+
+conn.close()

Propchange: qpid/trunk/qpid/python/examples/api/drain
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/trunk/qpid/python/examples/api/hello
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/hello?rev=1524673&view=auto
==============================================================================
--- qpid/trunk/qpid/python/examples/api/hello (added)
+++ qpid/trunk/qpid/python/examples/api/hello Thu Sep 19 09:47:01 2013
@@ -0,0 +1,52 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from qpid.messaging import *
+
+if len(sys.argv)<2:
+  broker =  "localhost:5672" 
+else:
+  broker = sys.argv[1]
+
+if len(sys.argv)<3: 
+  address = "amq.topic" 
+else:
+  address = sys.argv[2]
+
+connection = Connection(broker)
+
+try:
+  connection.open()
+  session = connection.session()
+
+  sender = session.sender(address)
+  receiver = session.receiver(address)
+
+  sender.send(Message("Hello world!"));
+
+  message = receiver.fetch()
+  print message.content
+  session.acknowledge()
+
+except MessagingError,m:
+  print m
+
+connection.close()

Propchange: qpid/trunk/qpid/python/examples/api/hello
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/trunk/qpid/python/examples/api/hello_xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/hello_xml?rev=1524673&view=auto
==============================================================================
--- qpid/trunk/qpid/python/examples/api/hello_xml (added)
+++ qpid/trunk/qpid/python/examples/api/hello_xml Thu Sep 19 09:47:01 2013
@@ -0,0 +1,77 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from qpid.messaging import *
+
+broker =  "localhost:5672"
+connection = Connection(broker)
+
+try:
+  connection.open()
+  session = connection.session()
+
+# Set up the receiver
+  query = """
+   let $w := ./weather
+   return $w/station = 'Raleigh-Durham International Airport (KRDU)'
+      and $w/temperature_f > 50
+      and $w/temperature_f - $w/dewpoint > 5
+      and $w/wind_speed_mph > 7
+      and $w/wind_speed_mph < 20   """
+
+#  query="./weather"
+
+  address = """
+    xml; {
+       create: always, 
+       node:{ type: queue }, 
+       link: { 
+         x-bindings: [{ exchange: xml, key: weather, arguments: { xquery: %r} }] 
+       } 
+    }
+    """ % query
+
+  receiver = session.receiver(address)
+
+# Send an observation
+
+  observations = """
+      <weather>
+         <station>Raleigh-Durham International Airport (KRDU)</station>
+         <wind_speed_mph>16</wind_speed_mph>
+         <temperature_f>70</temperature_f>
+         <dewpoint>35</dewpoint>
+      </weather>  """
+
+  message = Message(subject="weather", content=observations)
+  sender = session.sender("xml")
+  sender.send(message)
+
+# Retrieve matching message from the receiver and print it
+
+  message = receiver.fetch(timeout=1)
+  print message.content
+  session.acknowledge()
+
+except MessagingError,m:
+  print m
+
+connection.close()

Propchange: qpid/trunk/qpid/python/examples/api/hello_xml
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/trunk/qpid/python/examples/api/server
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/server?rev=1524673&view=auto
==============================================================================
--- qpid/trunk/qpid/python/examples/api/server (added)
+++ qpid/trunk/qpid/python/examples/api/server Thu Sep 19 09:47:01 2013
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, sys, traceback
+from qpid.messaging import *
+from qpid.util import URL
+from subprocess import Popen, STDOUT, PIPE
+from qpid.log import enable, DEBUG, WARN
+
+parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...",
+                               description="handle requests from the supplied address.")
+parser.add_option("-b", "--broker", default="localhost",
+                  help="connect to specified BROKER (default %default)")
+parser.add_option("-r", "--reconnect", action="store_true",
+                  help="enable auto reconnect")
+parser.add_option("-i", "--reconnect-interval", type="float", default=3,
+                  help="interval between reconnect attempts")
+parser.add_option("-l", "--reconnect-limit", type="int",
+                  help="maximum number of reconnect attempts")
+parser.add_option("-v", dest="verbose", action="store_true",
+                  help="enable logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+  enable("qpid", DEBUG)
+else:
+  enable("qpid", WARN)
+
+if args:
+  addr = args.pop(0)
+else:
+  parser.error("address is required")
+
+conn = Connection(opts.broker,
+                  reconnect=opts.reconnect,
+                  reconnect_interval=opts.reconnect_interval,
+                  reconnect_limit=opts.reconnect_limit)
+def dispatch(msg):
+  msg_type = msg.properties.get("type")
+  if msg_type == "shell":
+    proc = Popen(msg.content, shell=True, stderr=STDOUT, stdin=PIPE, stdout=PIPE)
+    output, _ = proc.communicate()
+    result = Message(output)
+    result.properties["exit"] = proc.returncode
+  elif msg_type == "eval":
+    try:
+      content = eval(msg.content)
+    except:
+      content = traceback.format_exc()
+    result = Message(content)
+  else:
+    result = Message("unrecognized message type: %s" % msg_type)
+  return result
+
+try:
+  conn.open()
+  ssn = conn.session()
+  rcv = ssn.receiver(addr)
+
+  while True:
+    msg = rcv.fetch()
+    response = dispatch(msg)
+    snd = None
+    try:
+      snd = ssn.sender(msg.reply_to)
+      snd.send(response)
+    except SendError, e:
+      print e
+    if snd is not None:
+      snd.close()
+    ssn.acknowledge()
+except ReceiverError, e:
+  print e
+except KeyboardInterrupt:
+  pass
+
+conn.close()

Propchange: qpid/trunk/qpid/python/examples/api/server
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/trunk/qpid/python/examples/api/spout
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/spout?rev=1524673&view=auto
==============================================================================
--- qpid/trunk/qpid/python/examples/api/spout (added)
+++ qpid/trunk/qpid/python/examples/api/spout Thu Sep 19 09:47:01 2013
@@ -0,0 +1,133 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, time
+from qpid.messaging import *
+from qpid.util import URL
+from qpid.log import enable, DEBUG, WARN
+
+def nameval(st):
+  idx = st.find("=")
+  if idx >= 0:
+    name = st[0:idx]
+    value = st[idx+1:]
+  else:
+    name = st
+    value = None
+  return name, value
+
+parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]",
+                               description="Send messages to the supplied address.")
+parser.add_option("-b", "--broker", default="localhost",
+                  help="connect to specified BROKER (default %default)")
+parser.add_option("-r", "--reconnect", action="store_true",
+                  help="enable auto reconnect")
+parser.add_option("-i", "--reconnect-interval", type="float", default=3,
+                  help="interval between reconnect attempts")
+parser.add_option("-l", "--reconnect-limit", type="int",
+                  help="maximum number of reconnect attempts")
+parser.add_option("-c", "--count", type="int", default=1,
+                  help="stop after count messages have been sent, zero disables (default %default)")
+parser.add_option("-d", "--durable", action="store_true",
+                  help="make the message persistent")
+parser.add_option("-t", "--timeout", type="float", default=None,
+                  help="exit after the specified time")
+parser.add_option("-I", "--id", help="use the supplied id instead of generating one")
+parser.add_option("-S", "--subject", help="specify a subject")
+parser.add_option("-R", "--reply-to", help="specify reply-to address")
+parser.add_option("-P", "--property", dest="properties", action="append", default=[],
+                  metavar="NAME=VALUE", help="specify message property")
+parser.add_option("-M", "--map", dest="entries", action="append", default=[],
+                  metavar="KEY=VALUE",
+                  help="specify map entry for message body")
+parser.add_option("-v", dest="verbose", action="store_true",
+                  help="enable logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+  enable("qpid", DEBUG)
+else:
+  enable("qpid", WARN)
+
+if opts.id is None:
+  spout_id = str(uuid4())
+else:
+  spout_id = opts.id
+if args:
+  addr = args.pop(0)
+else:
+  parser.error("address is required")
+
+content = None
+content_type = None
+
+if args:
+  text = " ".join(args)
+else:
+  text = None
+
+if opts.entries:
+  content = {}
+  if text:
+    content["text"] = text
+  for e in opts.entries:
+    name, val = nameval(e)
+    content[name] = val
+else:
+  content = text
+  # no entries were supplied, so assume text/plain for
+  # compatibility with java (and other) clients
+  content_type = "text/plain"
+
+conn = Connection(opts.broker,
+                  reconnect=opts.reconnect,
+                  reconnect_interval=opts.reconnect_interval,
+                  reconnect_limit=opts.reconnect_limit)
+try:
+  conn.open()
+  ssn = conn.session()
+  snd = ssn.sender(addr)
+
+  count = 0
+  start = time.time()
+  while (opts.count == 0 or count < opts.count) and \
+        (opts.timeout is None or time.time() - start < opts.timeout):
+    msg = Message(subject=opts.subject,
+                  reply_to=opts.reply_to,
+                  content=content)
+    if opts.durable:
+      msg.durable = True
+    if content_type is not None:
+        msg.content_type = content_type
+    msg.properties["spout-id"] = "%s:%s" % (spout_id, count)
+    for p in opts.properties:
+      name, val = nameval(p)
+      msg.properties[name] = val
+
+    snd.send(msg)
+    count += 1
+    print msg
+except SendError, e:
+  print e
+except KeyboardInterrupt:
+  pass
+
+conn.close()

Propchange: qpid/trunk/qpid/python/examples/api/spout
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/trunk/qpid/python/examples/api/statistics.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/examples/api/statistics.py?rev=1524673&view=auto
==============================================================================
--- qpid/trunk/qpid/python/examples/api/statistics.py (added)
+++ qpid/trunk/qpid/python/examples/api/statistics.py Thu Sep 19 09:47:01 2013
@@ -0,0 +1,139 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+
+TS = "ts"
+TIME_SEC = 1000000000
+MILLISECOND = 1000
+
+class Statistic:
+    def message(self, msg):
+        return
+    def report(self):
+        return ""
+    def header(self):
+        return ""
+
+
+class Throughput(Statistic):
+    def __init__(self):
+        self.messages = 0
+        self.started = False
+
+    def message(self, m):
+        self.messages += 1
+        if not self.started:
+            self.start = time.time()
+            self.started = True
+
+    def header(self):
+        return "tp(m/s)"
+
+    def report(self):
+	if self.started:
+            elapsed = time.time() - self.start
+            return str(int(self.messages/elapsed))
+	else:
+	    return "0"
+
+
+class ThroughputAndLatency(Throughput):
+    def __init__(self):
+	Throughput.__init__(self)
+        self.total = 0.0
+        self.min = float('inf')
+        self.max = -float('inf')
+        self.samples = 0
+
+    def message(self, m):
+        Throughput.message(self, m)
+        if TS in m.properties:
+            self.samples+=1
+            latency = MILLISECOND * (time.time() - float(m.properties[TS])/TIME_SEC)
+            if latency > 0:
+                self.total += latency
+                if latency < self.min:
+                    self.min = latency
+                if latency > self.max:
+                    self.max = latency
+
+    def header(self):
+#        Throughput.header(self)
+        return "%s\tl-min\tl-max\tl-avg" % Throughput.header(self)
+
+    def report(self):
+        output = Throughput.report(self)
+        if (self.samples > 0):
+	    output += "\t%.2f\t%.2f\t%.2f" %(self.min, self.max, self.total/self.samples)
+	return output
+
+
+# Report batch and overall statistics
+class ReporterBase:
+    def __init__(self, batch, wantHeader):
+        self.batchSize = batch
+        self.batchCount = 0
+        self.headerPrinted = not wantHeader
+        self.overall = None
+        self.batch = None
+
+    def create(self):
+        return
+
+    # Count message in the statistics
+    def message(self, m):
+        if self.overall == None:
+            self.overall = self.create()
+        self.overall.message(m)
+        if self.batchSize:
+            if self.batch == None:
+                self.batch = self.create()
+            self.batch.message(m)
+            self.batchCount+=1
+            if self.batchCount == self.batchSize:
+                self.header()
+                print self.batch.report()
+                self.create()
+                self.batchCount = 0
+
+    # Print overall report.
+    def report(self):
+        if self.overall == None:
+            self.overall = self.create()
+        self.header()
+        print self.overall.report()
+
+    def header(self):
+        if not self.headerPrinted:
+            if self.overall == None:
+                self.overall = self.create()
+            print self.overall.header()
+            self.headerPrinted = True
+
+
+class Reporter(ReporterBase):
+    def __init__(self, batchSize, wantHeader, Stats):
+        ReporterBase.__init__(self, batchSize, wantHeader)
+        self.__stats = Stats
+
+    def create(self):
+        ClassName = self.__stats.__class__
+        return ClassName()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org