You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/19 23:58:02 UTC
svn commit: r488847 [3/7] - in
/incubator/qpid/branches/new_persistence/java/client: ./ src/old_test/
src/old_test/java/ src/old_test/java/org/ src/old_test/java/org/apache/
src/old_test/java/org/apache/qpid/
src/old_test/java/org/apache/qpid/IBMPerfTe...
Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java?view=auto&rev=488847
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java Tue Dec 19 14:57:56 2006
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.subscriber;
+
+import org.apache.log4j.BasicConfigurator;
+
+/**
+ * Allows you to simply start a monitored subscriber
+ */
+public class MonitoredSubscriptionWrapper {
+
+ private static MonitoredSubscriber _subscriber;
+
+ /**
+ * Create a monitored subscriber and start it
+ * @param args - no params required
+ */
+ public static void main(String args[])
+ {
+ //switch on logging
+ BasicConfigurator.configure();
+
+ _subscriber = new MonitoredSubscriber();
+
+ _subscriber.subscribe();
+ }
+
+ /**
+ * Stop subscribing now ...
+ */
+ public static void stop()
+ {
+ _subscriber.stop();
+ }
+}
Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java?view=auto&rev=488847
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java Tue Dec 19 14:57:56 2006
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.subscriber;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnectionFactory;
+
+import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.naming.InitialContext;
+
+import org.apache.qpid.example.shared.InitialContextHelper;
+
+/**
+ * Subscriber which consumes messages from a queue
+ */
+
+public class Subscriber
+{
+ private static final Logger _log = Logger.getLogger(Subscriber.class);
+
+ protected static Connection _connection;
+
+ protected static MessageConsumer _consumer;
+
+ protected static InitialContextHelper _contextHelper;
+
+ protected static AMQConnectionFactory _connectionFactory;
+
+ protected String _destinationName;
+
+ public Subscriber()
+ {
+ try
+ {
+ //get an initial context from default properties
+ _contextHelper = new InitialContextHelper(null);
+ InitialContext ctx = _contextHelper.getInitialContext();
+
+ //then create a connection using the AMQConnectionFactory
+ _connectionFactory = (AMQConnectionFactory) ctx.lookup("local");
+
+ //lookup queue name
+ _destinationName = (String) ctx.lookup("MyQueue");
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ _log.error(e);
+ }
+ }
+
+ /**
+ * Listener class that handles messages
+ */
+ public static class ExampleMessageListener implements MessageListener
+ {
+ private String _name;
+
+ public ExampleMessageListener(String name)
+ {
+ _name = name;
+
+ }
+
+ /**
+ * Listens for message callbacks, handles and then acknowledges them
+ * @param message - the message received
+ */
+ public void onMessage(javax.jms.Message message)
+ {
+ _log.info(_name + " got message '" + message + "'");
+
+ try
+ {
+ //NB: Handle your message appropriately for your application here
+ //do some stuff
+
+ _log.debug("Acknowledging recieved message");
+
+ //Now acknowledge the message to clear it from our queue
+ message.acknowledge();
+ }
+ catch(JMSException j)
+ {
+ _log.error("JMSException trying to acknowledge message receipt");
+ j.printStackTrace();
+ }
+ catch(Exception e)
+ {
+ _log.error("Unexpected exception trying to handle message");
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * Subscribes to example Queue and attaches listener
+ */
+ public void subscribe()
+ {
+ _log.info("Starting subscription ...");
+
+ try
+ {
+ _connection = _connectionFactory.createConnection();
+
+ //create a transactional session
+ Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+ //Queue is non-exclusive and not deleted when last consumer detaches
+ Destination destination = session.createQueue(_destinationName);
+
+ //Create a consumer with a destination of our queue which will use defaults for prefetch etc
+ _consumer = session.createConsumer(destination);
+
+ //give the message listener a name of it's own
+ _consumer.setMessageListener(new ExampleMessageListener("MessageListener " + System.currentTimeMillis()));
+
+ _connection.start();
+ }
+ catch (Throwable t)
+ {
+ _log.error("Fatal error: " + t);
+ t.printStackTrace();
+ }
+
+ _log.info("Waiting for messages ...");
+
+ //wait for messages and sleep to survive failover
+ try
+ {
+ while(true)
+ {
+ Thread.sleep(Long.MAX_VALUE);
+ }
+ }
+ catch (Exception e)
+ {
+ _log.warn("Exception while Subscriber sleeping",e);
+ }
+ }
+
+ /**
+ * Set destination (queue or topic) name
+ * @param name
+ */
+ public void setDestinationName(String name)
+ {
+ _destinationName = name;
+ }
+
+ /**
+ * Stop consuming and close connection
+ */
+ public void stop()
+ {
+ try
+ {
+ _consumer.close();
+ _consumer = null;
+ _connection.stop();
+ _connection.close();
+ }
+ catch(JMSException j)
+ {
+ _log.error("JMSException trying to Subscriber.stop: " + j.getStackTrace());
+ }
+ }
+
+}
+
+
+
+
Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/Subscriber.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java?view=auto&rev=488847
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java Tue Dec 19 14:57:56 2006
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.subscriber;
+
+import org.apache.log4j.BasicConfigurator;
+
+/**
+ * Allows you to simply start a subscriber
+ */
+public class SubscriptionWrapper {
+
+ private static Subscriber _subscriber;
+
+ /**
+ * Create a subscriber and start it
+ * @param args
+ */
+ public static void main(String args[])
+ {
+ //switch on logging
+ BasicConfigurator.configure();
+
+ _subscriber = new Subscriber();
+
+ _subscriber.subscribe();
+ }
+
+ /**
+ * Stop subscribing now ...
+ */
+ public static void stop()
+ {
+ _subscriber.stop();
+ }
+}
Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java?view=auto&rev=488847
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java Tue Dec 19 14:57:56 2006
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.flow;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+
+public class ChannelFlowTest implements MessageListener
+{
+ private int sent;
+ private int received;
+
+ ChannelFlowTest(String broker) throws Exception
+ {
+ this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"));
+ }
+
+ ChannelFlowTest(AMQConnection connection) throws Exception
+ {
+ this(connection, new AMQQueue(randomize("ChannelFlowTest"), true));
+ }
+
+ ChannelFlowTest(AMQConnection connection, AMQDestination destination) throws Exception
+ {
+ AMQSession session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE, 50,25);
+
+ //set up a slow consumer
+ session.createConsumer(destination).setMessageListener(this);
+ connection.start();
+
+ //create a publisher
+ MessageProducer producer = session.createProducer(destination);
+ Message msg = session.createTextMessage("Message");
+
+ //publish in bursts that are fast enough to cause channel flow control
+ for(int i = 0; i < 10; i++)
+ {
+ for(int j = 0; j < 100; j++)
+ {
+ producer.send(msg);
+ sent++;
+ }
+ waitUntilReceived(sent - 40);
+ }
+
+ waitUntilReceived(sent);
+
+ session.close();
+ connection.close();
+ }
+
+
+ private synchronized void waitUntilReceived(int count) throws InterruptedException
+ {
+ while(received <count)
+ {
+ wait();
+ }
+ }
+
+ public synchronized void onMessage(Message message)
+ {
+ try
+ {
+ Thread.sleep(50);
+
+ received++;
+ notify();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ new ChannelFlowTest(argv.length == 0 ? "localhost:5672" : argv[0]);
+ }
+
+}
Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/flow/ChannelFlowTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java?view=auto&rev=488847
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java Tue Dec 19 14:57:56 2006
@@ -0,0 +1,193 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.fragmentation;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.jms.MessageProducer;
+import org.apache.qpid.jms.Session;
+import org.apache.log4j.Logger;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * A client that behaves as follows:
+ * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li>
+ * <li>Creates a temporary queue</li>
+ * <li>Creates messages containing a property that is the name of the temporary queue</li>
+ * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li>
+ * </ul>
+ */
+public class TestLargePublisher
+{
+ private static final Logger _log = Logger.getLogger(TestLargePublisher.class);
+
+ private AMQConnection _connection;
+
+ private Session _session;
+
+ private class CallbackHandler implements MessageListener
+ {
+ private int _expectedMessageCount;
+
+ private int _actualMessageCount;
+
+ private long _startTime;
+
+ public CallbackHandler(int expectedMessageCount, long startTime)
+ {
+ _expectedMessageCount = expectedMessageCount;
+ _startTime = startTime;
+ }
+
+ public void onMessage(Message m)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message received: " + m);
+ }
+ _actualMessageCount++;
+ if (_actualMessageCount%1000 == 0)
+ {
+ _log.info("Received message count: " + _actualMessageCount);
+ }
+ /*if (!"henson".equals(m.toString()))
+ {
+ _log.error("AbstractJMSMessage response not correct: expected 'henson' but got " + m.toString());
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("AbstractJMSMessage " + m + " received");
+ }
+ else
+ {
+ _log.info("AbstractJMSMessage received");
+ }
+ } */
+
+ if (_actualMessageCount == _expectedMessageCount)
+ {
+ long timeTaken = System.currentTimeMillis() - _startTime;
+ System.out.println("Total time taken to receive " + _expectedMessageCount+ " messages was " +
+ timeTaken + "ms, equivalent to " +
+ (_expectedMessageCount/(timeTaken/1000.0)) + " messages per second");
+ }
+ }
+ }
+
+ public TestLargePublisher(String host, int port, String clientID,
+ final int messageCount) throws AMQException,URLSyntaxException
+ {
+ try
+ {
+ createConnection(host, port, clientID);
+
+ _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQTopic destination = new AMQTopic("large");
+ MessageProducer producer = (MessageProducer) _session.createProducer(destination);
+
+ _connection.start();
+ //TextMessage msg = _session.createTextMessage(tempDestination.getQueueName() + "/Presented to in conjunction with Mahnah Mahnah and the Snowths");
+ final long startTime = System.currentTimeMillis();
+
+ for (int i = 0; i < messageCount; i++)
+ {
+ BytesMessage msg = _session.createBytesMessage();
+ populateMessage(msg);
+ producer.send(msg);
+ }
+ _log.info("Finished sending " + messageCount + " messages");
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ private void createConnection(String host, int port, String clientID) throws AMQException , URLSyntaxException
+ {
+ _connection = new AMQConnection(host, port, "guest", "guest",
+ clientID, "/test");
+ }
+
+ private void populateMessage(BytesMessage msg) throws JMSException
+ {
+ int size = 1024 * 187; // 187k
+ byte[] data = new byte[size];
+ for (int i = 0; i < data.length; i++)
+ {
+ data[i] = (byte)(i%25);
+ }
+ msg.writeBytes(data);
+ }
+
+ /**
+ *
+ * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank
+ * means the server will allocate a name.
+ */
+ public static void main(String[] args) throws URLSyntaxException
+ {
+ final String host;
+ final int port;
+ final int numMessages;
+ if (args.length == 0)
+ {
+ host = "localhost";
+ port = 5672;
+ numMessages = 100;
+// System.err.println("Usage: TestLargePublisher <host> <port> <number of messages>");
+ }
+ else
+ {
+ host = args[0];
+ port = Integer.parseInt(args[1]);
+ numMessages = Integer.parseInt(args[2]);
+ }
+
+ try
+ {
+ InetAddress address = InetAddress.getLocalHost();
+ String clientID = address.getHostName() + System.currentTimeMillis();
+ TestLargePublisher client = new TestLargePublisher(host, port, clientID, numMessages);
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ catch (AMQException e)
+ {
+ System.err.println("Error in client: " + e);
+ e.printStackTrace();
+ }
+
+ //System.exit(0);
+ }
+}
Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java?view=auto&rev=488847
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java Tue Dec 19 14:57:56 2006
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.fragmentation;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.jms.Session;
+import org.apache.log4j.Logger;
+
+import javax.jms.*;
+import java.net.InetAddress;
+
+public class TestLargeSubscriber
+{
+ private static final Logger _logger = Logger.getLogger(TestLargeSubscriber.class);
+
+ private static MessageProducer _destinationProducer;
+
+ private static String _destinationName;
+
+ public static void main(String[] args)
+ {
+ _logger.info("Starting...");
+
+ final String host;
+ final int port;
+ final String username;
+ final String password;
+ final String virtualPath;
+ final int numExpectedMessages;
+ if (args.length == 0)
+ {
+ host = "localhost";
+ port = 5672;
+ username = "guest";
+ password = "guest";
+ virtualPath = "/test";
+ numExpectedMessages = 100;
+ }
+ else if (args.length == 6)
+ {
+ host = args[0];
+ port = Integer.parseInt(args[1]);
+ username = args[2];
+ password = args[3];
+ virtualPath = args[4];
+ numExpectedMessages = Integer.parseInt(args[5]);
+ }
+ else
+ {
+ System.out.println("Usage: host port username password virtual-path expectedMessageCount");
+ System.exit(1);
+ throw new RuntimeException("cannot be reached");
+ }
+
+ try
+ {
+ InetAddress address = InetAddress.getLocalHost();
+ AMQConnection con = new AMQConnection(host, port, username, password,
+ address.getHostName(), virtualPath);
+ final Session session = (Session) con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ final int expectedMessageCount = numExpectedMessages;
+
+ MessageConsumer consumer = session.createConsumer(new AMQTopic("large"),
+ 100, true, false, null);
+
+ consumer.setMessageListener(new MessageListener()
+ {
+ private int _messageCount;
+
+ private long _startTime = 0;
+
+ public void onMessage(Message message)
+ {
+ validateMessage(message);
+ if (_messageCount++ == 0)
+ {
+ _startTime = System.currentTimeMillis();
+ }
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Got message '" + message + "'");
+ }
+ if (_messageCount == expectedMessageCount)
+ {
+ long totalTime = System.currentTimeMillis() - _startTime;
+ _logger.error("Total time to receive " + _messageCount + " messages was " +
+ totalTime + "ms. Rate is " + (_messageCount/(totalTime/1000.0)));
+ }
+ }
+
+ private void validateMessage(Message message)
+ {
+ if (!(message instanceof BytesMessage))
+ {
+ _logger.error("Message is not of correct type - should be BytesMessage and is " +
+ message.getClass());
+ }
+ BytesMessage bm = (BytesMessage) message;
+ final int expectedSize = 1024 * 187; // 187k
+ try
+ {
+ if (bm.getBodyLength() != expectedSize)
+ {
+ _logger.error("Message is not correct length - should be " + expectedSize + " and is " +
+ bm.getBodyLength());
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Failed to validate message: " + e, e);
+ }
+ try
+ {
+ byte[] data = new byte[(int)bm.getBodyLength()];
+ bm.readBytes(data);
+ for (int i = 0; i < data.length; i++)
+ {
+ if (data[i] != (byte)(i%25))
+ {
+ _logger.error("byte " + i + " of message is wrong - should be " + i%25 + " but is " +
+ data[i]);
+ }
+ }
+ _logger.info("***** Validated message successfully");
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Failed to validate message: " + e, e);
+ }
+ }
+ });
+ con.start();
+ }
+ catch (Throwable t)
+ {
+ System.err.println("Fatal error: " + t);
+ t.printStackTrace();
+ }
+
+ System.out.println("Waiting...");
+ }
+}
+
Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargeSubscriber.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java?view=auto&rev=488847
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java Tue Dec 19 14:57:56 2006
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.Enumeration;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+public class FieldTableTest extends TestCase
+{
+
+ public void testEncoding()
+ {
+ FieldTable table = FieldTableFactory.newFieldTable();
+
+ String key = "String";
+ String value = "Hello";
+ table.put(key, value);
+
+ //Add one for the type encoding
+ int size = EncodingUtils.encodedShortStringLength(key) + 1 +
+ EncodingUtils.encodedLongStringLength(value);
+
+ assertEquals(table.getEncodedSize(), size);
+
+ key = "Integer";
+ Integer number = new Integer(60);
+ table.put(key, number);
+
+ //Add one for the type encoding
+ size += EncodingUtils.encodedShortStringLength(key) + 1 + 4;
+
+
+ assertEquals(table.getEncodedSize(), size);
+ }
+
+
+ public void testDataDump() throws IOException, AMQFrameDecodingException
+ {
+ byte[] data = readBase64("content.txt");
+ System.out.println("Got " + data.length + " bytes of data");
+ for (int i = 0; i < 100; i++)
+ {
+ System.out.print((char) data[i]);
+ }
+ System.out.println();
+ int size = 4194304;
+ ByteBuffer buffer = ByteBuffer.allocate(data.length);
+ buffer.put(data);
+ buffer.flip();
+ FieldTable table = FieldTableFactory.newFieldTable(buffer, size);
+ }
+
+ /*
+ public void testCase1() throws AMQFrameDecodingException, IOException
+ {
+ doTestEncoding(load("FieldTableTest.properties"));
+ }
+
+ public void testCase2() throws AMQFrameDecodingException, IOException
+ {
+ doTestEncoding(load("FieldTableTest2.properties"));
+ }
+ */
+ void doTestEncoding(FieldTable table) throws AMQFrameDecodingException
+ {
+ assertEquivalent(table, encodeThenDecode(table));
+ }
+
+ public void assertEquivalent(FieldTable table1, FieldTable table2)
+ {
+ for (Object o : table1.keySet())
+ {
+ String key = (String) o;
+ assertEquals("Values for " + key + " did not match", table1.get(key), table2.get(key));
+ //System.out.println("Values for " + key + " matched (" + table1.get(key) + ")");
+ }
+ }
+
+ FieldTable encodeThenDecode(FieldTable table) throws AMQFrameDecodingException
+ {
+ ContentHeaderBody header = new ContentHeaderBody();
+ header.classId = 6;
+ BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+ header.properties = properties;
+
+ properties.setHeaders(table);
+ int size = header.getSize();
+
+ //encode
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ header.writePayload(buffer);
+
+ //decode
+ buffer.flip();
+
+ header = new ContentHeaderBody();
+ header.populateFromBuffer(buffer, size);
+
+ return ((BasicContentHeaderProperties) header.properties).getHeaders();
+ }
+
+ byte[] readBase64(String name) throws IOException
+ {
+ String content = read(new InputStreamReader(getClass().getResourceAsStream(name)));
+
+ return org.apache.commons.codec.binary.Base64.decodeBase64(content.getBytes());
+ }
+
+ FieldTable load(String name) throws IOException
+ {
+ return populate(FieldTableFactory.newFieldTable(), read(name));
+ }
+
+ Properties read(String name) throws IOException
+ {
+ Properties p = new Properties();
+ p.load(getClass().getResourceAsStream(name));
+ return p;
+ }
+
+ FieldTable populate(FieldTable table, Properties properties)
+ {
+ for (Enumeration i = properties.propertyNames(); i.hasMoreElements();)
+ {
+ String key = (String) i.nextElement();
+ String value = properties.getProperty(key);
+ try
+ {
+ int ival = Integer.parseInt(value);
+ table.put(key, (long) ival);
+ }
+ catch (NumberFormatException e)
+ {
+ table.put(key, value);
+ }
+ }
+ return table;
+ }
+
+ static String read(Reader in) throws IOException
+ {
+ return read(in instanceof BufferedReader ? (BufferedReader) in : new BufferedReader(in));
+ }
+
+ static String read(BufferedReader in) throws IOException
+ {
+ StringBuffer buffer = new StringBuffer();
+ String line = in.readLine();
+ while (line != null)
+ {
+ buffer.append(line).append(" ");
+ line = in.readLine();
+ }
+ return buffer.toString();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(FieldTableTest.class);
+ }
+}
Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java
------------------------------------------------------------------------------
svn:eol-style = native