You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/01 14:13:48 UTC

svn commit: r681666 - in /incubator/qpid/trunk/qpid/java/tools: bin/ bin/qpid-bench src/main/java/org/apache/qpid/tools/QpidBench.java

Author: rhs
Date: Fri Aug  1 05:13:47 2008
New Revision: 681666

URL: http://svn.apache.org/viewvc?rev=681666&view=rev
Log:
added benchmark tool for java native + jms APIs

Added:
    incubator/qpid/trunk/qpid/java/tools/bin/
    incubator/qpid/trunk/qpid/java/tools/bin/qpid-bench
    incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java   (with props)

Added: incubator/qpid/trunk/qpid/java/tools/bin/qpid-bench
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/tools/bin/qpid-bench?rev=681666&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/tools/bin/qpid-bench (added)
+++ incubator/qpid/trunk/qpid/java/tools/bin/qpid-bench Fri Aug  1 05:13:47 2008
@@ -0,0 +1,36 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+if [ -z "$QPID_HOME" ]; then
+    export QPID_HOME=$(dirname $(dirname $(readlink -f $0)))
+    export PATH=${PATH}:${QPID_HOME}/bin
+fi
+
+# Set classpath to include Qpid jar with all required jars in manifest
+QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar
+
+# Set other variables used by the qpid-run script before calling
+export JAVA=java \
+       JAVA_VM=-server \
+       JAVA_MEM=-Xmx1024m \
+       JAVA_GC="-XX:+UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError" \
+       QPID_CLASSPATH=$QPID_LIBS
+
+. qpid-run org.apache.qpid.tools.QpidBench "$@"

Added: incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java?rev=681666&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java (added)
+++ incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java Fri Aug  1 05:13:47 2008
@@ -0,0 +1,884 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import javax.jms.*;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.io.IoTransport;
+
+import static org.apache.qpid.tools.QpidBench.Mode.*;
+
+/**
+ * QpidBench
+ *
+ */
+
+public class QpidBench
+{
+
+    static enum Mode
+    {
+        PUBLISH, CONSUME, BOTH
+    }
+
+    private static class Options
+    {
+        private StringBuilder usage = new StringBuilder("qpid-bench <options>");
+
+        void usage(String name, String description, Object def)
+        {
+            String defval = "";
+            if (def != null)
+            {
+                defval = String.format(" (%s)", def);
+            }
+            usage.append(String.format("\n  %-15s%-15s %s", name, defval, description));
+        }
+
+        public String broker = "localhost";
+        public int port = 5672;
+        public long count = 1000000;
+        public long window = 100000;
+        public long sample = window;
+        public int size = 1024;
+        public Mode mode = BOTH;
+        public boolean timestamp = false;
+        public boolean message_id = false;
+        public boolean message_cache = false;
+        public boolean persistent = false;
+        public boolean jms_publish = false;
+        public boolean jms_consume = false;
+        public boolean help = false;
+
+        {
+            usage("-b, --broker", "the broker hostname", broker);
+        }
+
+        public void parse__broker(String b)
+        {
+            this.broker = b;
+        }
+
+        public void parse_b(String b)
+        {
+            parse__broker(b);
+        }
+
+        {
+            usage("-p, --port", "the broker port", port);
+        }
+
+        public void parse__port(String p)
+        {
+            this.port = Integer.parseInt(p);
+        }
+
+        public void parse_p(String p)
+        {
+            parse__port(p);
+        }
+
+        {
+            usage("-c, --count", "the number of messages to send and/or receive", count);
+        }
+
+        public void parse__count(String c)
+        {
+            this.count = Long.parseLong(c);
+        }
+
+        public void parse_c(String c)
+        {
+            parse__count(c);
+        }
+
+        {
+            usage("-w, --window", "the number of messages to send before blocking", window);
+        }
+
+        public void parse__window(String w)
+        {
+            this.window = Long.parseLong(w);
+        }
+
+        public void parse_w(String w)
+        {
+            parse__window(w);
+        }
+
+        {
+            usage("--sample", "print stats after this many messages", sample);
+        }
+
+        public void parse__sample(String s)
+        {
+            this.sample = Long.parseLong(s);
+        }
+
+        {
+            usage("-i, --interval", "sets both --window and --sample", window);
+        }
+
+        public void parse__interval(String i)
+        {
+            this.window = Long.parseLong(i);
+            this.sample = window;
+        }
+
+        public void parse_i(String i)
+        {
+            parse__interval(i);
+        }
+
+        {
+            usage("-s, --size", "the message size", size);
+        }
+
+        public void parse__size(String s)
+        {
+            this.size = Integer.parseInt(s);
+        }
+
+        public void parse_s(String s)
+        {
+            parse__size(s);
+        }
+
+        {
+            usage("-m, --mode", "one of publish, consume, or both", mode);
+        }
+
+        public void parse__mode(String m)
+        {
+            if (m.equalsIgnoreCase("publish"))
+            {
+                this.mode = PUBLISH;
+            }
+            else if (m.equalsIgnoreCase("consume"))
+            {
+                this.mode = CONSUME;
+            }
+            else if (m.equalsIgnoreCase("both"))
+            {
+                this.mode = BOTH;
+            }
+            else
+            {
+                throw new IllegalArgumentException
+                    ("must be one of 'publish', 'consume', or 'both'");
+            }
+        }
+
+        public void parse_m(String m)
+        {
+            parse__mode(m);
+        }
+
+        {
+            usage("--timestamp", "set timestamps on each message if true", timestamp);
+        }
+
+        public void parse__timestamp(String t)
+        {
+            this.timestamp = Boolean.parseBoolean(t);
+        }
+
+        {
+            usage("--mesage-id", "set the message-id on each message if true", message_id);
+        }
+
+        public void parse__message_id(String m)
+        {
+            this.message_id = Boolean.parseBoolean(m);
+        }
+
+        {
+            usage("--message-cache", "reuse the same for each send if true", message_cache);
+        }
+
+        public void parse__message_cache(String c)
+        {
+            this.message_cache = Boolean.parseBoolean(c);
+        }
+
+        {
+            usage("--persistent", "set the delivery-mode to persistent if true", persistent);
+        }
+
+        public void parse__persistent(String p)
+        {
+            this.persistent = Boolean.parseBoolean(p);
+        }
+
+        {
+            usage("--jms-publish", "use the jms client for publish", jms_publish);
+        }
+
+        public void parse__jms_publish(String jp)
+        {
+            this.jms_publish = Boolean.parseBoolean(jp);
+        }
+
+        {
+            usage("--jms-consume", "use the jms client for consume", jms_consume);
+        }
+
+        public void parse__jms_consume(String jc)
+        {
+            this.jms_consume = Boolean.parseBoolean(jc);
+        }
+
+        {
+            usage("--jms", "sets both --jms-publish and --jms-consume", false);
+        }
+
+        public void parse__jms(String j)
+        {
+            this.jms_publish = this.jms_consume = Boolean.parseBoolean(j);
+        }
+
+        {
+            usage("-h, --help", "prints this message", null);
+        }
+
+        public void parse__help()
+        {
+            this.help = true;
+        }
+
+        public void parse_h()
+        {
+            parse__help();
+        }
+
+        public String parse(String ... args)
+        {
+            Class klass = getClass();
+            List<String> arguments = new ArrayList<String>();
+            for (int i = 0; i < args.length; i++)
+            {
+                String option = args[i];
+
+                if (!option.startsWith("-"))
+                {
+                    arguments.add(option);
+                    continue;
+                }
+
+                String method = "parse" + option.replace('-', '_');
+                try
+                {
+                    try
+                    {
+                        Method parser = klass.getMethod(method);
+                        parser.invoke(this);
+                    }
+                    catch (NoSuchMethodException e)
+                    {
+                        try
+                        {
+                            Method parser = klass.getMethod(method, String.class);
+
+                            String value = null;
+                            if (i + 1 < args.length)
+                            {
+                                value = args[i+1];
+                                i++;
+                            }
+                            else
+                            {
+                                return option + " requires a value";
+                            }
+
+                            parser.invoke(this, value);
+                        }
+                        catch (NoSuchMethodException e2)
+                        {
+                            return "no such option: " + option;
+                        }
+                    }
+                }
+                catch (InvocationTargetException e)
+                {
+                    Throwable t = e.getCause();
+                    return String.format
+                        ("error parsing %s: %s: %s", option, t.getClass().getName(),
+                         t.getMessage());
+                }
+                catch (IllegalAccessException e)
+                {
+                    throw new RuntimeException
+                        ("unable to access parse method: " + option, e);
+                }
+            }
+
+            return parseArguments(arguments);
+        }
+
+        public String parseArguments(List<String> arguments)
+        {
+            if (arguments.size() > 0)
+            {
+                String args = arguments.toString();
+                return "unrecognized arguments: " + args.substring(1, args.length() - 1);
+            }
+            else
+            {
+                return null;
+            }
+        }
+
+        public String toString()
+        {
+            Class klass = getClass();
+            Field[] fields = klass.getFields();
+            StringBuilder str = new StringBuilder();
+            for (int i = 0; i < fields.length; i++)
+            {
+                if (i > 0)
+                {
+                    str.append("\n");
+                }
+
+                String name = fields[i].getName();
+                str.append(name);
+                str.append(" = ");
+                Object value;
+                try
+                {
+                    value = fields[i].get(this);
+                }
+                catch (IllegalAccessException e)
+                {
+                    throw new RuntimeException
+                        ("unable to access field: " + name, e);
+                }
+                str.append(value);
+            }
+
+            return str.toString();
+        }
+    }
+
+    public static final void main(String[] args) throws Exception
+    {
+        final Options opts = new Options();
+        String error = opts.parse(args);
+        if (error != null)
+        {
+            System.err.println(error);
+            System.exit(-1);
+            return;
+        }
+
+        if (opts.help)
+        {
+            System.out.println(opts.usage);
+            return;
+        }
+
+        System.out.println(opts);
+
+        switch (opts.mode)
+        {
+        case CONSUME:
+        case BOTH:
+            new Thread()
+            {
+                public void run()
+                {
+                    try
+                    {
+                        if (opts.jms_consume)
+                        {
+                            jms_consumer(opts);
+                        }
+                        else
+                        {
+                            native_consumer(opts);
+                        }
+                    }
+                    catch (Exception e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }.start();
+            break;
+        }
+
+        switch (opts.mode)
+        {
+        case PUBLISH:
+        case BOTH:
+            new Thread()
+            {
+                public void run()
+                {
+                    try
+                    {
+                        if (opts.jms_publish)
+                        {
+                            jms_publisher(opts);
+                        }
+                        else
+                        {
+                            native_publisher(opts);
+                        }
+                    }
+                    catch (Exception e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }.start();
+            break;
+        }
+    }
+
+    private static enum Column
+    {
+        LEFT, RIGHT
+    }
+
+    private static final void sample(Options opts, Column col, String name, long count,
+                                     long start, long time, long lastTime)
+    {
+        String pfx = "";
+        String sfx = "";
+        if (opts.mode == BOTH)
+        {
+            if (col == Column.RIGHT)
+            {
+                pfx = "               --                   ";
+            }
+            else
+            {
+                sfx = "               --";
+            }
+        }
+
+        if (count == 0)
+        {
+            String stats = String.format("%s: %tc", name, start);
+            System.out.println(String.format("%s%-36s%s", pfx, stats, sfx));
+            return;
+        }
+
+        double cumulative = 1000 * (double) count / (double) (time - start);
+        double interval = 1000 * ((double) opts.sample / (double) (time - lastTime));
+
+        String stats = String.format
+            ("%s: %d %.2f %.2f", name, count, cumulative, interval);
+        System.out.println(String.format("%s%-36s%s", pfx, stats, sfx));
+    }
+
+    private static final javax.jms.Connection getJMSConnection(Options opts) throws Exception
+    {
+        String url = String.format
+            ("amqp://guest:guest@clientid/test?brokerlist='tcp://%s:%d'",
+             opts.broker, opts.port);
+        return new AMQConnection(url);
+    }
+
+    private static final void jms_publisher(Options opts) throws Exception
+    {
+        javax.jms.Connection conn = getJMSConnection(opts);
+
+        javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+        Destination dest = ssn.createQueue("test-queue");
+        Destination echo_dest = ssn.createQueue("echo-queue");
+        MessageProducer prod = ssn.createProducer(dest);
+        MessageConsumer cons = ssn.createConsumer(echo_dest);
+        prod.setDisableMessageID(!opts.message_id);
+        prod.setDisableMessageTimestamp(!opts.timestamp);
+        prod.setDeliveryMode(opts.persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+        StringBuilder str = new StringBuilder();
+        for (int i = 0; i < opts.size; i++)
+        {
+            str.append((char) i);
+        }
+
+        String body = str.toString();
+
+        TextMessage cached = ssn.createTextMessage();
+        cached.setText(body);
+
+        conn.start();
+
+        long count = 0;
+        long lastTime = 0;
+        long start = System.currentTimeMillis();
+        while (opts.count == 0 || count < opts.count)
+        {
+            if (opts.window > 0 && (count % opts.window) == 0 && count > 0)
+            {
+                Message echo = cons.receive();
+            }
+
+            if (opts.sample > 0 && (count % opts.sample) == 0)
+            {
+                long time = System.currentTimeMillis();
+                sample(opts, Column.LEFT, "JP", count, start, time, lastTime);
+                lastTime = time;
+            }
+
+            TextMessage m;
+            if (opts.message_cache)
+            {
+                m = cached;
+            }
+            else
+            {
+                m = ssn.createTextMessage();
+                m.setText(body);
+            }
+
+            prod.send(m);
+            count++;
+        }
+
+        conn.close();
+    }
+
+    private static final void jms_consumer(final Options opts) throws Exception
+    {
+        final javax.jms.Connection conn = getJMSConnection(opts);
+        javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+        Destination dest = ssn.createQueue("test-queue");
+        Destination echo_dest = ssn.createQueue("echo-queue");
+        MessageConsumer cons = ssn.createConsumer(dest);
+        final MessageProducer prod = ssn.createProducer(echo_dest);
+        prod.setDisableMessageID(true);
+        prod.setDisableMessageTimestamp(true);
+        prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        final TextMessage echo = ssn.createTextMessage();
+        echo.setText("ECHO");
+
+        final Object done = new Object();
+        cons.setMessageListener(new MessageListener()
+        {
+            private long count = 0;
+            private long lastTime = 0;
+            private long start;
+
+            public void onMessage(Message m)
+            {
+                if (count == 0)
+                {
+                    start = System.currentTimeMillis();
+                }
+
+                try
+                {
+                    boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
+                    long time = sample ? System.currentTimeMillis() : 0;
+
+                    if (opts.window > 0 && (count % opts.window) == 0)
+                    {
+                        prod.send(echo);
+                    }
+
+                    if (sample)
+                    {
+                        sample(opts, Column.RIGHT, "JC", count, start, time, lastTime);
+                        lastTime = time;
+                    }
+                }
+                catch (JMSException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                count++;
+
+                if (opts.count > 0 && count >= opts.count)
+                {
+                    synchronized (done)
+                    {
+                        done.notify();
+                    }
+                }
+            }
+        });
+
+        conn.start();
+        synchronized (done)
+        {
+            done.wait();
+        }
+        conn.close();
+    }
+
+    private static final org.apache.qpid.transport.Connection getConnection
+        (Options opts, final SessionDelegate delegate)
+    {
+        final Object lock = new Object();
+        org.apache.qpid.transport.Connection conn =
+            IoTransport.connect(opts.broker, opts.port,
+                                new ClientDelegate()
+                                {
+                                    public SessionDelegate getSessionDelegate()
+                                    {
+                                        return delegate;
+                                    }
+                                    public void exception(Throwable t)
+                                    {
+                                        t.printStackTrace();
+                                    }
+                                    public void closed() {}
+                                    @Override public void connectionOpenOk(Channel ch,
+                                                                           ConnectionOpenOk ok)
+                                    {
+                                        synchronized (lock)
+                                        {
+                                            lock.notify();
+                                        }
+                                    }
+                                });
+        conn.send(new ProtocolHeader(1, 0, 10));
+
+        synchronized (lock)
+        {
+            try
+            {
+                lock.wait();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        return conn;
+    }
+
+    private static final void native_publisher(Options opts) throws Exception
+    {
+        final long[] echos = { 0 };
+        org.apache.qpid.transport.Connection conn = getConnection
+            (opts,
+             new SessionDelegate() {
+                 @Override public void messageTransfer
+                     (org.apache.qpid.transport.Session ssn,
+                      MessageTransfer mt)
+                 {
+                     synchronized (echos)
+                     {
+                         echos[0]++;
+                         echos.notify();
+                     }
+                     ssn.processed(mt);
+                 }
+             });
+
+        Channel ch = conn.getChannel(0);
+        org.apache.qpid.transport.Session ssn = new org.apache.qpid.transport.Session("spam-session".getBytes());
+        ssn.attach(ch);
+        ssn.sessionAttach(ssn.getName());
+
+        ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
+        ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
+        ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue"));
+        ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue"));
+
+        MessageProperties cached_mp = new MessageProperties();
+        DeliveryProperties cached_dp = new DeliveryProperties();
+        cached_dp.setRoutingKey("test-queue");
+        cached_dp.setDeliveryMode
+            (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
+
+        int size = opts.size;
+        ByteBuffer body = ByteBuffer.allocate(size);
+        for (int i = 0; i < size; i++)
+        {
+            body.put((byte) i);
+        }
+        body.flip();
+
+        ssn.invoke(new MessageSubscribe()
+                   .queue("echo-queue")
+                   .destination("echo-queue")
+                   .acceptMode(MessageAcceptMode.NONE)
+                   .acquireMode(MessageAcquireMode.PRE_ACQUIRED));
+        ssn.messageSetFlowMode("echo-queue", MessageFlowMode.WINDOW);
+        ssn.messageFlow("echo-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF);
+        ssn.messageFlow("echo-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF);
+
+        long count = 0;
+        long lastTime = 0;
+        long start = System.currentTimeMillis();
+        while (opts.count == 0 || count < opts.count)
+        {
+            if (opts.window > 0 && (count % opts.window) == 0 && count > 0)
+            {
+                synchronized (echos)
+                {
+                    while (echos[0] < (count/opts.window))
+                    {
+                        echos.wait();
+                    }
+                }
+            }
+
+            if (opts.sample > 0 && (count % opts.sample) == 0)
+            {
+                long time = System.currentTimeMillis();
+                sample(opts, Column.LEFT, "NP", count, start, time, lastTime);
+                lastTime = time;
+            }
+
+            MessageProperties mp;
+            DeliveryProperties dp;
+            if (opts.message_cache)
+            {
+                mp = cached_mp;
+                dp = cached_dp;
+            }
+            else
+            {
+                mp = new MessageProperties();
+                dp = new DeliveryProperties();
+                dp.setRoutingKey("test-queue");
+                dp.setDeliveryMode
+                    (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
+
+            }
+
+            if (opts.message_id)
+            {
+                mp.setMessageId(UUID.randomUUID());
+            }
+
+            if (opts.timestamp)
+            {
+                dp.setTimestamp(System.currentTimeMillis());
+            }
+
+            ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
+            ssn.header(dp, mp);
+            ssn.data(body.slice());
+            ssn.endData();
+            count++;
+        }
+
+        ssn.messageCancel("echo-queue");
+
+        ssn.sync();
+        conn.close();
+    }
+
+    private static final void native_consumer(final Options opts) throws Exception
+    {
+        final DeliveryProperties dp = new DeliveryProperties();
+        final byte[] echo = new byte[0];
+        dp.setRoutingKey("echo-queue");
+        dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
+        final MessageProperties mp = new MessageProperties();
+        final Object done = new Object();
+        org.apache.qpid.transport.Connection conn = getConnection
+            (opts,
+             new SessionDelegate() {
+
+                 private long count = 0;
+                 private long lastTime = 0;
+                 private long start;
+
+                 @Override public void messageTransfer
+                     (org.apache.qpid.transport.Session ssn,
+                      MessageTransfer mt)
+                 {
+                     if (count == 0)
+                     {
+                         start = System.currentTimeMillis();
+                     }
+
+                     boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
+                     long time = sample ? System.currentTimeMillis() : 0;
+
+                     if (opts.window > 0 && (count % opts.window) == 0)
+                     {
+                         ssn.messageTransfer("amq.direct",
+                                             MessageAcceptMode.NONE,
+                                             MessageAcquireMode.PRE_ACQUIRED);
+                         ssn.header(dp, mp);
+                         ssn.data(echo);
+                         ssn.endData();
+                     }
+
+                     if (sample)
+                     {
+                         sample(opts, Column.RIGHT, "NC", count, start, time, lastTime);
+                         lastTime = time;
+                     }
+                     ssn.processed(mt);
+                     count++;
+
+                     if (opts.count > 0 && count >= opts.count)
+                     {
+                         synchronized (done)
+                         {
+                             done.notify();
+                         }
+                     }
+                 }
+             });
+
+        Channel ch = conn.getChannel(0);
+        org.apache.qpid.transport.Session ssn = new org.apache.qpid.transport.Session("listener-session".getBytes());
+        ssn.attach(ch);
+        ssn.sessionAttach(ssn.getName());
+
+        ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
+        ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
+        ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue"));
+        ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue"));
+
+        ssn.invoke(new MessageSubscribe()
+                   .queue("test-queue")
+                   .destination("test-queue")
+                   .acceptMode(MessageAcceptMode.NONE)
+                   .acquireMode(MessageAcquireMode.PRE_ACQUIRED));
+        ssn.messageSetFlowMode("test-queue", MessageFlowMode.WINDOW);
+        ssn.messageFlow("test-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF);
+        ssn.messageFlow("test-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF);
+
+        synchronized (done)
+        {
+            done.wait();
+        }
+
+        ssn.messageCancel("test-queue");
+
+        ssn.sync();
+        conn.close();
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
------------------------------------------------------------------------------
    svn:eol-style = native