You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/19 23:58:02 UTC

svn commit: r488847 [3/7] - in /incubator/qpid/branches/new_persistence/java/client: ./ src/old_test/ src/old_test/java/ src/old_test/java/org/ src/old_test/java/org/apache/ src/old_test/java/org/apache/qpid/ src/old_test/java/org/apache/qpid/IBMPerfTe...

Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java?view=auto&rev=488847
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java Tue Dec 19 14:57:56 2006
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.subscriber;
+
+import org.apache.log4j.BasicConfigurator;
+
+/**
+ * Allows you to simply start a monitored subscriber
+ */
+public class MonitoredSubscriptionWrapper {
+
+    private static MonitoredSubscriber _subscriber;
+
+    /**
+     * Create a monitored subscriber and start it
+     * @param args - no params required
+     */
+    public static void main(String args[])
+    {
+        //switch on logging
+        BasicConfigurator.configure();
+
+        _subscriber = new MonitoredSubscriber();
+
+        _subscriber.subscribe();
+    }
+
+    /**
+     * Stop subscribing now ...
+     */
+    public static void stop()
+    {
+        _subscriber.stop();
+    }
+}

Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java?view=auto&rev=488847
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java Tue Dec 19 14:57:56 2006
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.subscriber;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnectionFactory;
+
+import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.naming.InitialContext;
+
+import org.apache.qpid.example.shared.InitialContextHelper;
+
+/**
+ * Subscriber which consumes messages from a queue
+ */
+
+public class Subscriber
+{
+    private static final Logger _log = Logger.getLogger(Subscriber.class);
+
+    protected static Connection _connection;
+
+    protected static MessageConsumer _consumer;
+
+    protected static InitialContextHelper _contextHelper;
+
+    protected static AMQConnectionFactory _connectionFactory;
+
+    protected String _destinationName;
+
+    public Subscriber()
+    {
+        try
+        {
+            //get an initial context from default properties
+            _contextHelper = new InitialContextHelper(null);
+            InitialContext ctx = _contextHelper.getInitialContext();
+
+            //then create a connection using the AMQConnectionFactory
+            _connectionFactory = (AMQConnectionFactory) ctx.lookup("local");
+
+            //lookup queue name
+            _destinationName = (String) ctx.lookup("MyQueue");
+
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+            _log.error(e);
+        }
+    }
+
+    /**
+     * Listener class that handles messages
+     */
+    public static class ExampleMessageListener implements MessageListener
+    {
+        private String _name;
+
+        public ExampleMessageListener(String name)
+        {
+            _name = name;
+
+        }
+
+        /**
+         * Listens for message callbacks, handles and then acknowledges them
+         * @param message - the message received
+         */
+        public void onMessage(javax.jms.Message message)
+        {
+            _log.info(_name + " got message '" + message + "'");
+
+            try
+            {
+                //NB: Handle your message appropriately for your application here
+                //do some stuff
+
+                _log.debug("Acknowledging recieved message");
+
+                //Now acknowledge the message to clear it from our queue
+                message.acknowledge();
+            }
+            catch(JMSException j)
+            {
+                _log.error("JMSException trying to acknowledge message receipt");
+                j.printStackTrace();
+            }
+            catch(Exception e)
+            {
+                _log.error("Unexpected exception trying to handle message");
+                e.printStackTrace();
+            }
+        }
+    }
+
+    /**
+     * Subscribes to example Queue and attaches listener
+     */
+    public void subscribe()
+    {
+        _log.info("Starting subscription ...");
+
+        try
+        {
+             _connection = _connectionFactory.createConnection();
+
+             //create a transactional session
+            Session session =  _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+            //Queue is non-exclusive and not deleted when last consumer detaches
+            Destination destination = session.createQueue(_destinationName);
+
+            //Create a consumer with a destination of our queue which will use defaults for prefetch etc
+            _consumer = session.createConsumer(destination);
+
+            //give the message listener a name of it's own
+            _consumer.setMessageListener(new ExampleMessageListener("MessageListener " + System.currentTimeMillis()));
+
+            _connection.start();
+        }
+        catch (Throwable t)
+        {
+            _log.error("Fatal error: " + t);
+            t.printStackTrace();
+        }
+
+        _log.info("Waiting for messages ...");
+
+        //wait for messages and sleep to survive failover
+        try
+        {
+            while(true)
+            {
+                Thread.sleep(Long.MAX_VALUE);
+            }
+        }
+        catch (Exception e)
+        {
+            _log.warn("Exception while Subscriber sleeping",e);
+        }
+    }
+
+    /**
+     * Set destination (queue or topic) name
+     * @param name
+     */
+    public void setDestinationName(String name)
+    {
+        _destinationName = name;
+    }
+
+    /**
+     * Stop consuming and close connection
+     */
+    public void stop()
+    {
+        try
+        {
+            _consumer.close();
+            _consumer = null;
+            _connection.stop();
+            _connection.close();
+        }
+        catch(JMSException j)
+        {
+            _log.error("JMSException trying to Subscriber.stop: " + j.getStackTrace());
+        }
+    }
+
+}
+
+
+
+

Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java?view=auto&rev=488847
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java Tue Dec 19 14:57:56 2006
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.subscriber;
+
+import org.apache.log4j.BasicConfigurator;
+
+/**
+ * Allows you to simply start a subscriber
+ */
+public class SubscriptionWrapper {
+
+    private static Subscriber _subscriber;
+
+    /**
+     * Create a subscriber and start it
+     * @param args
+     */
+    public static void main(String args[])
+    {
+        //switch on logging
+        BasicConfigurator.configure();
+        
+         _subscriber = new Subscriber();
+
+        _subscriber.subscribe();
+    }
+
+    /**
+     * Stop subscribing now ...
+     */
+    public static void stop()
+    {
+        _subscriber.stop();
+    }
+}

Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java?view=auto&rev=488847
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java Tue Dec 19 14:57:56 2006
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.flow;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+
+public class ChannelFlowTest implements MessageListener
+{
+    private int sent;
+    private int received;
+
+    ChannelFlowTest(String broker) throws Exception
+    {
+        this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"));
+    }
+
+    ChannelFlowTest(AMQConnection connection) throws Exception
+    {
+        this(connection, new AMQQueue(randomize("ChannelFlowTest"), true));
+    }
+
+    ChannelFlowTest(AMQConnection connection, AMQDestination destination) throws Exception
+    {
+        AMQSession session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE, 50,25);
+
+        //set up a slow consumer
+        session.createConsumer(destination).setMessageListener(this);
+        connection.start();
+
+        //create a publisher
+        MessageProducer producer = session.createProducer(destination);
+        Message msg = session.createTextMessage("Message");
+
+        //publish in bursts that are fast enough to cause channel flow control
+        for(int i = 0; i < 10; i++)
+        {
+            for(int j = 0; j < 100; j++)
+            {
+                producer.send(msg);
+                sent++;
+            }
+            waitUntilReceived(sent - 40);
+        }
+
+        waitUntilReceived(sent);
+
+        session.close();
+        connection.close();
+    }
+
+
+    private synchronized void waitUntilReceived(int count) throws InterruptedException
+    {
+        while(received <count)
+        {
+            wait();
+        }
+    }
+
+    public synchronized void onMessage(Message message)
+    {
+        try
+        {
+            Thread.sleep(50);
+
+            received++;
+            notify();
+        }
+        catch (InterruptedException e)
+        {
+            e.printStackTrace();
+        }
+    }
+
+    private static String randomize(String in)
+    {
+        return in + System.currentTimeMillis();
+    }
+
+    public static void main(String[] argv) throws Exception
+    {
+        new ChannelFlowTest(argv.length == 0 ? "localhost:5672" : argv[0]);
+    }
+
+}

Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java?view=auto&rev=488847
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java Tue Dec 19 14:57:56 2006
@@ -0,0 +1,193 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.fragmentation;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.jms.MessageProducer;
+import org.apache.qpid.jms.Session;
+import org.apache.log4j.Logger;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * A client that behaves as follows:
+ * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li>
+ * <li>Creates a temporary queue</li>
+ * <li>Creates messages containing a property that is the name of the temporary queue</li>
+ * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li>
+ * </ul>
+ */
+public class TestLargePublisher
+{
+    private static final Logger _log = Logger.getLogger(TestLargePublisher.class);
+
+    private AMQConnection _connection;
+
+    private Session _session;
+
+    private class CallbackHandler implements MessageListener
+    {
+        private int _expectedMessageCount;
+
+        private int _actualMessageCount;
+
+        private long _startTime;
+
+        public CallbackHandler(int expectedMessageCount, long startTime)
+        {
+            _expectedMessageCount = expectedMessageCount;
+            _startTime = startTime;
+        }
+
+        public void onMessage(Message m)
+        {
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Message received: " + m);
+            }
+            _actualMessageCount++;
+            if (_actualMessageCount%1000 == 0)
+            {
+                _log.info("Received message count: " + _actualMessageCount);
+            }
+            /*if (!"henson".equals(m.toString()))
+           {
+               _log.error("AbstractJMSMessage response not correct: expected 'henson' but got " + m.toString());
+           }
+           else
+           {
+               if (_log.isDebugEnabled())
+               {
+                   _log.debug("AbstractJMSMessage " + m + " received");
+               }
+               else
+               {
+                   _log.info("AbstractJMSMessage received");
+               }
+           } */
+
+            if (_actualMessageCount == _expectedMessageCount)
+            {
+                long timeTaken = System.currentTimeMillis() - _startTime;
+                System.out.println("Total time taken to receive " + _expectedMessageCount+ " messages was " +
+                                   timeTaken + "ms, equivalent to " +
+                                   (_expectedMessageCount/(timeTaken/1000.0)) + " messages per second");
+            }
+        }
+    }
+
+    public TestLargePublisher(String host, int port, String clientID,
+                              final int messageCount) throws AMQException,URLSyntaxException
+    {
+        try
+        {
+            createConnection(host, port, clientID);
+            
+            _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            AMQTopic destination = new AMQTopic("large");
+            MessageProducer producer = (MessageProducer) _session.createProducer(destination);
+
+            _connection.start();
+            //TextMessage msg = _session.createTextMessage(tempDestination.getQueueName() + "/Presented to in conjunction with Mahnah Mahnah and the Snowths");
+            final long startTime = System.currentTimeMillis();
+
+            for (int i = 0; i < messageCount; i++)
+            {
+                BytesMessage msg = _session.createBytesMessage();
+                populateMessage(msg);
+                producer.send(msg);
+            }
+            _log.info("Finished sending " + messageCount + " messages");
+        }
+        catch (JMSException e)
+        {
+            e.printStackTrace();
+        }
+    }
+
+    private void createConnection(String host, int port, String clientID) throws AMQException , URLSyntaxException
+    {
+        _connection = new AMQConnection(host, port, "guest", "guest",
+                                        clientID, "/test");
+    }
+
+    private void populateMessage(BytesMessage msg) throws JMSException
+    {
+        int size = 1024 * 187; // 187k
+        byte[] data = new byte[size];
+        for (int i = 0; i < data.length; i++)
+        {
+            data[i] = (byte)(i%25);
+        }
+        msg.writeBytes(data);
+    }
+
+    /**
+     *
+     * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank
+     * means the server will allocate a name.
+     */
+    public static void main(String[] args) throws URLSyntaxException
+    {
+        final String host;
+        final int port;
+        final int numMessages;
+        if (args.length == 0)
+        {
+            host = "localhost";
+            port = 5672;
+            numMessages = 100;
+//            System.err.println("Usage: TestLargePublisher <host> <port> <number of messages>");
+        }
+        else
+        {
+            host = args[0];
+            port = Integer.parseInt(args[1]);
+            numMessages = Integer.parseInt(args[2]);
+        }
+
+        try
+        {
+            InetAddress address = InetAddress.getLocalHost();
+            String clientID = address.getHostName() + System.currentTimeMillis();
+            TestLargePublisher client = new TestLargePublisher(host, port, clientID, numMessages);
+        }
+        catch (UnknownHostException e)
+        {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+        catch (AMQException e)
+        {
+            System.err.println("Error in client: " + e);
+            e.printStackTrace();
+        }
+
+        //System.exit(0);
+    }
+}

Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java?view=auto&rev=488847
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java Tue Dec 19 14:57:56 2006
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.fragmentation;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.jms.Session;
+import org.apache.log4j.Logger;
+
+import javax.jms.*;
+import java.net.InetAddress;
+
+public class TestLargeSubscriber
+{
+    private static final Logger _logger = Logger.getLogger(TestLargeSubscriber.class);
+
+    private static MessageProducer _destinationProducer;
+
+    private static String _destinationName;
+
+    public static void main(String[] args)
+    {
+        _logger.info("Starting...");
+
+        final String host;
+        final int port;
+        final String username;
+        final String password;
+        final String virtualPath;
+        final int numExpectedMessages;
+        if (args.length == 0)
+        {
+            host = "localhost";
+            port = 5672;
+            username = "guest";
+            password = "guest";
+            virtualPath = "/test";
+            numExpectedMessages = 100;
+        }
+        else if (args.length == 6)
+        {
+            host = args[0];
+            port  = Integer.parseInt(args[1]);
+            username = args[2];
+            password = args[3];
+            virtualPath = args[4];
+            numExpectedMessages = Integer.parseInt(args[5]);
+        }
+        else
+        {
+            System.out.println("Usage: host port username password virtual-path expectedMessageCount");
+            System.exit(1);
+            throw new RuntimeException("cannot be reached");
+        }
+
+        try
+        {
+            InetAddress address = InetAddress.getLocalHost();
+            AMQConnection con = new AMQConnection(host, port, username, password,
+                                                  address.getHostName(), virtualPath);
+            final Session session = (Session) con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            final int expectedMessageCount = numExpectedMessages;
+
+            MessageConsumer consumer = session.createConsumer(new AMQTopic("large"),
+                                                              100, true, false, null);
+
+            consumer.setMessageListener(new MessageListener()
+            {
+                private int _messageCount;
+
+                private long _startTime = 0;
+
+                public void onMessage(Message message)
+                {
+                    validateMessage(message);
+                    if (_messageCount++ == 0)
+                    {
+                        _startTime = System.currentTimeMillis();
+                    }
+                    if (_logger.isInfoEnabled())
+                    {
+                        _logger.info("Got message '" + message + "'");
+                    }
+                    if (_messageCount == expectedMessageCount)
+                    {
+                        long totalTime = System.currentTimeMillis() - _startTime;
+                        _logger.error("Total time to receive " + _messageCount + " messages was " +
+                                      totalTime + "ms. Rate is " + (_messageCount/(totalTime/1000.0)));
+                    }
+                }
+
+                private void validateMessage(Message message)
+                {
+                    if (!(message instanceof BytesMessage))
+                    {
+                        _logger.error("Message is not of correct type - should be BytesMessage and is " +
+                                      message.getClass());
+                    }
+                    BytesMessage bm = (BytesMessage) message;
+                    final int expectedSize = 1024 * 187; // 187k
+                    try
+                    {
+                        if (bm.getBodyLength() != expectedSize)
+                        {
+                            _logger.error("Message is not correct length - should be  " + expectedSize + " and is " +
+                                          bm.getBodyLength());
+                        }
+                    }
+                    catch (JMSException e)
+                    {
+                        _logger.error("Failed to validate message: " + e, e);
+                    }
+                    try
+                    {
+                        byte[] data = new byte[(int)bm.getBodyLength()];
+                        bm.readBytes(data);
+                        for (int i = 0; i < data.length; i++)
+                        {
+                            if (data[i] != (byte)(i%25))
+                            {
+                                _logger.error("byte " + i + " of message is wrong - should be " + i%25 + " but is " +
+                                              data[i]);
+                            }
+                        }
+                        _logger.info("***** Validated message successfully");
+                    }
+                    catch (JMSException e)
+                    {
+                        _logger.error("Failed to validate message: " + e, e);
+                    }
+                }
+            });
+            con.start();
+        }
+        catch (Throwable t)
+        {
+            System.err.println("Fatal error: " + t);
+            t.printStackTrace();
+        }
+
+        System.out.println("Waiting...");
+    }
+}
+

Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java?view=auto&rev=488847
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java Tue Dec 19 14:57:56 2006
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.Enumeration;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+public class FieldTableTest extends TestCase
+{
+
+    public void testEncoding()
+    {
+        FieldTable table = FieldTableFactory.newFieldTable();
+
+        String key = "String";
+        String value = "Hello";
+        table.put(key, value);
+
+        //Add one for the type encoding
+        int size = EncodingUtils.encodedShortStringLength(key) + 1 +
+                   EncodingUtils.encodedLongStringLength(value);
+
+        assertEquals(table.getEncodedSize(), size);
+        
+        key = "Integer";
+        Integer number = new Integer(60);
+        table.put(key, number);
+
+        //Add one for the type encoding
+        size += EncodingUtils.encodedShortStringLength(key) + 1 + 4;
+
+
+        assertEquals(table.getEncodedSize(), size);
+    }
+
+
+    public void testDataDump() throws IOException, AMQFrameDecodingException
+    {
+        byte[] data = readBase64("content.txt");
+        System.out.println("Got " + data.length + " bytes of data");
+        for (int i = 0; i < 100; i++)
+        {
+            System.out.print((char) data[i]);
+        }
+        System.out.println();
+        int size = 4194304;
+        ByteBuffer buffer = ByteBuffer.allocate(data.length);
+        buffer.put(data);
+        buffer.flip();
+        FieldTable table = FieldTableFactory.newFieldTable(buffer, size);
+    }
+
+    /*
+    public void testCase1() throws AMQFrameDecodingException, IOException
+    {
+        doTestEncoding(load("FieldTableTest.properties"));
+    }
+
+    public void testCase2() throws AMQFrameDecodingException, IOException
+    {
+        doTestEncoding(load("FieldTableTest2.properties"));
+    }
+    */
+    void doTestEncoding(FieldTable table) throws AMQFrameDecodingException
+    {
+        assertEquivalent(table, encodeThenDecode(table));
+    }
+
+    public void assertEquivalent(FieldTable table1, FieldTable table2)
+    {
+        for (Object o : table1.keySet())
+        {
+            String key = (String) o;
+            assertEquals("Values for " + key + " did not match", table1.get(key), table2.get(key));
+            //System.out.println("Values for " + key + " matched (" + table1.get(key) + ")");
+        }
+    }
+
+    FieldTable encodeThenDecode(FieldTable table) throws AMQFrameDecodingException
+    {
+        ContentHeaderBody header = new ContentHeaderBody();
+        header.classId = 6;
+        BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+        header.properties = properties;
+
+        properties.setHeaders(table);
+        int size = header.getSize();
+
+        //encode
+        ByteBuffer buffer = ByteBuffer.allocate(size);
+        header.writePayload(buffer);
+
+        //decode
+        buffer.flip();
+
+        header = new ContentHeaderBody();
+        header.populateFromBuffer(buffer, size);
+
+        return ((BasicContentHeaderProperties) header.properties).getHeaders();
+    }
+
+    byte[] readBase64(String name) throws IOException
+    {
+        String content = read(new InputStreamReader(getClass().getResourceAsStream(name)));
+
+        return org.apache.commons.codec.binary.Base64.decodeBase64(content.getBytes());
+    }
+
+    FieldTable load(String name) throws IOException
+    {
+        return populate(FieldTableFactory.newFieldTable(), read(name));
+    }
+
+    Properties read(String name) throws IOException
+    {
+        Properties p = new Properties();
+        p.load(getClass().getResourceAsStream(name));
+        return p;
+    }
+
+    FieldTable populate(FieldTable table, Properties properties)
+    {
+        for (Enumeration i = properties.propertyNames(); i.hasMoreElements();)
+        {
+            String key = (String) i.nextElement();
+            String value = properties.getProperty(key);
+            try
+            {
+                int ival = Integer.parseInt(value);
+                table.put(key, (long) ival);
+            }
+            catch (NumberFormatException e)
+            {
+                table.put(key, value);
+            }
+        }
+        return table;
+    }
+
+    static String read(Reader in) throws IOException
+    {
+        return read(in instanceof BufferedReader ? (BufferedReader) in : new BufferedReader(in));
+    }
+
+    static String read(BufferedReader in) throws IOException
+    {
+        StringBuffer buffer = new StringBuffer();
+        String line = in.readLine();
+        while (line != null)
+        {
+            buffer.append(line).append(" ");
+            line = in.readLine();
+        }
+        return buffer.toString();
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new junit.framework.TestSuite(FieldTableTest.class);
+    }
+}

Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java
------------------------------------------------------------------------------
    svn:eol-style = native