You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2006/09/20 00:07:25 UTC
svn commit: r447994 [29/46] - in /incubator/qpid/trunk/qpid: ./ cpp/
cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/
cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/
cpp/common/concurrent/ cpp/common/concur...
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/framing/content.txt
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/Listener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/Listener.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/Listener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/Listener.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,114 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.headers;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.testutil.Config;
+
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.JMSException;
+
+public class Listener implements MessageListener
+{
+ private final AMQConnection _connection;
+ private final MessageProducer _controller;
+ private final AMQSession _session;
+ private final MessageFactory _factory;
+ private int count;
+ private long start;
+
+ Listener(AMQConnection connection, Destination exchange) throws Exception
+ {
+ _connection = connection;
+ _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _factory = new MessageFactory(_session, 0, 19);
+
+ //register for events
+ _factory.createConsumer(exchange).setMessageListener(this);
+ _connection.start();
+
+ _controller = _session.createProducer(exchange);
+ }
+
+ private void shutdown()
+ {
+ try
+ {
+ _session.close();
+ _connection.stop();
+ _connection.close();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ private void report()
+ {
+ try
+ {
+ String msg = getReport();
+ _controller.send(_factory.createReportResponseMessage(msg));
+ System.out.println("Sent report: " + msg);
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ private String getReport() throws JMSException
+ {
+ long time = (System.currentTimeMillis() - start);
+ return "Received " + count + " in " + time + "ms";
+ }
+
+ public void onMessage(Message message)
+ {
+ if(count == 0) start = System.currentTimeMillis();
+
+ if(_factory.isShutdown(message))
+ {
+ shutdown();
+ }
+ else if(_factory.isReport(message))
+ {
+ //send a report:
+ report();
+ }
+ else if (++count % 100 == 0)
+ {
+ System.out.println("Received " + count + " messages.");
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ Config config = new Config();
+ config.setType(Config.HEADERS);
+ config.setName("test_headers_exchange");
+ config.setOptions(argv);
+ new Listener((AMQConnection) config.getConnection(), config.getDestination());
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/Listener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/MessageFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/MessageFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/MessageFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,166 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.headers;
+
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.client.AMQSession;
+
+import javax.jms.*;
+
+/**
+ */
+class MessageFactory
+{
+ private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+
+ private final AMQSession _session;
+ private final byte[] _payload;
+
+ private String[] _headerNames;
+
+ MessageFactory(AMQSession session)
+ {
+ this(session, Integer.getInteger("amqj.test.message_size", 256).intValue(), 5);
+ }
+
+ MessageFactory(AMQSession session, int payloadSize, int headerCount)
+ {
+ if (headerCount < 1)
+ {
+ throw new IllegalArgumentException("Header count must be positive");
+ }
+ _session = session;
+ _payload = new byte[payloadSize];
+ for(int i = 0; i < _payload.length; i++)
+ {
+ _payload[i] = (byte) DATA[i % DATA.length];
+ }
+ _headerNames = new String[headerCount];
+ // note that with the standard encoding the headers get prefixed with an S to indicate their type
+ for (int i = 0; i < _headerNames.length; i++)
+ {
+ if (i < 10)
+ {
+ _headerNames[i] = "F000" + i;
+ }
+ else if (i >= 10 && i < 100)
+ {
+ _headerNames[i] = "F00" + i;
+ }
+ else
+ {
+ _headerNames[i] = "F0" + i;
+ }
+ }
+ }
+
+ Message createEventMessage() throws JMSException
+ {
+ BytesMessage msg = _session.createBytesMessage();
+ if (_payload.length != 0)
+ {
+ msg.writeBytes(_payload);
+ }
+ return setHeaders(msg, _headerNames);
+ }
+
+ Message createShutdownMessage() throws JMSException
+ {
+ return setHeaders(_session.createMessage(), new String[]{"F0000", "SHUTDOWN"});
+ }
+
+ Message createReportRequestMessage() throws JMSException
+ {
+ return setHeaders(_session.createMessage(), new String[]{"F0000", "REPORT"});
+ }
+
+ Message createReportResponseMessage(String msg) throws JMSException
+ {
+ return setHeaders(_session.createTextMessage(msg), new String[]{"CONTROL", "REPORT"});
+ }
+
+ boolean isShutdown(Message m)
+ {
+ return checkPresent(m, "SHUTDOWN");
+ }
+
+ boolean isReport(Message m)
+ {
+ return checkPresent(m, "REPORT");
+ }
+
+ Object getReport(Message m)
+ {
+ try
+ {
+ return ((TextMessage) m).getText();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(System.out);
+ return e.toString();
+ }
+ }
+
+ FieldTable getConsumerBinding()
+ {
+ FieldTable binding = new FieldTable();
+ binding.put("SF0000", "value");
+ return binding;
+ }
+
+ FieldTable getControllerBinding()
+ {
+ FieldTable binding = new FieldTable();
+ binding.put("SCONTROL", "value");
+ return binding;
+ }
+
+ MessageConsumer createConsumer(Destination source) throws Exception
+ {
+ return _session.createConsumer(source, 0, false, true, null, getConsumerBinding());
+ }
+
+ MessageConsumer createController(Destination source) throws Exception
+ {
+ return _session.createConsumer(source, 0, false, true, null, getControllerBinding());
+ }
+
+ private static boolean checkPresent(Message m, String s)
+ {
+ try
+ {
+ return m.getStringProperty(s) != null;
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(System.out);
+ return false;
+ }
+ }
+
+ private static Message setHeaders(Message m, String[] headers) throws JMSException
+ {
+ for(int i = 0; i < headers.length; i++)
+ {
+ // the value in GRM is 5 bytes
+ m.setStringProperty(headers[i], "value");
+ }
+ return m;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/MessageFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/Publisher.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/Publisher.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/Publisher.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/Publisher.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,130 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.headers;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.testutil.Config;
+
+import javax.jms.*;
+
+public class Publisher implements MessageListener
+{
+ private final Object _lock = new Object();
+ private final AMQConnection _connection;
+ private final AMQSession _session;
+ private final Destination _exchange;
+ private final MessageFactory _factory;
+ private final MessageProducer _publisher;
+ private int _count;
+
+ Publisher(AMQConnection connection, Destination exchange) throws Exception
+ {
+ _connection = connection;
+ _exchange = exchange;
+ _session = (AMQSession) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _factory = new MessageFactory(_session, 0, 19);
+ _publisher = _session.createProducer(_exchange);
+ }
+
+ Publisher(Config config) throws Exception
+ {
+ this((AMQConnection) config.getConnection(), config.getDestination());
+ }
+
+ private void test(int msgCount, int consumerCount) throws Exception
+ {
+ _count = consumerCount;
+ _factory.createController(_exchange).setMessageListener(this);
+ _connection.start();
+ long start = System.currentTimeMillis();
+ publish(msgCount);
+ waitForCompletion(consumerCount);
+ long end = System.currentTimeMillis();
+
+ System.out.println("Completed in " + (end - start) + " ms.");
+
+ //request shutdown
+ _publisher.send(_factory.createShutdownMessage());
+
+ _connection.stop();
+ _connection.close();
+ }
+
+ private void publish(int count) throws Exception
+ {
+
+ //send events
+ for (int i = 0; i < count; i++)
+ {
+ _publisher.send(_factory.createEventMessage());
+ if ((i + 1) % 100 == 0)
+ {
+ System.out.println("Sent " + (i + 1) + " messages");
+ }
+ }
+
+ //request report
+ _publisher.send(_factory.createReportRequestMessage());
+ }
+
+ private void waitForCompletion(int consumers) throws Exception
+ {
+ System.out.println("Waiting for completion...");
+ synchronized (_lock)
+ {
+ while (_count > 0)
+ {
+ _lock.wait();
+ }
+ }
+ }
+
+
+ public void onMessage(Message message)
+ {
+ System.out.println("Received report " + _factory.getReport(message) + " " + --_count + " remaining");
+ if (_count == 0)
+ {
+ synchronized (_lock)
+ {
+ _lock.notify();
+ }
+ }
+ }
+
+
+ public static void main(String[] argv) throws Exception
+ {
+ if (argv.length >= 2)
+ {
+ int msgCount = Integer.parseInt(argv[argv.length - 2]);
+ int consumerCount = Integer.parseInt(argv[argv.length - 1]);
+
+ Config config = new Config();
+ config.setType(Config.HEADERS);
+ config.setName("test_headers_exchange");
+ String[] options = new String[argv.length - 2];
+ System.arraycopy(argv, 0, options, 0, options.length);
+ config.setOptions(options);
+
+ new Publisher(config).test(msgCount, consumerCount);
+ }
+
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/headers/Publisher.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceable/Bind.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceable/Bind.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceable/Bind.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceable/Bind.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,270 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.jndi.referenceable;
+
+import org.apache.qpid.client.*;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.*;
+import javax.naming.*;
+
+import java.util.Properties;
+import java.io.InputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+/**
+ * Binds a reference from a JNDI source.
+ * Given a properties file with the JNDI information and a binding string.
+ */
+class Bind
+{
+ private static final String USAGE="USAGE: java bind <JNDI Properties file> -cf <url> <binding> | -c <url> <binding> [-t <topic Name> <binding>] [-q <queue Name> <binding>]";
+ public Bind(String propertiesFile, String bindingURL, Referenceable reference) throws NameAlreadyBoundException, NoInitialContextException
+ {
+ // Set up the environment for creating the initial context
+ String qpid_home = System.getProperty("QPID_HOME");
+
+ if (qpid_home == null || qpid_home.equals(""))
+ {
+ System.out.println("QPID_HOME is not set");
+ System.exit(1);
+ }
+
+ if (qpid_home.charAt(qpid_home.length() - 1) != '/')
+ {
+ qpid_home += "/";
+ }
+
+ try
+ {
+ InputStream inputStream = new FileInputStream(qpid_home + propertiesFile);
+ Properties properties = new Properties();
+ properties.load(inputStream);
+
+ // Create the initial context
+ Context ctx = new InitialContext(properties);
+
+ // Perform the binds
+ ctx.bind(bindingURL, reference);
+
+ // Close the context when we're done
+ ctx.close();
+ }
+ catch (IOException ioe)
+ {
+ System.out.println("Unable to access properties file:" + propertiesFile + " Due to:" + ioe);
+ }
+ catch (NamingException e)
+ {
+ System.out.println("Operation failed: " + e);
+ if (e instanceof NameAlreadyBoundException)
+ {
+ throw (NameAlreadyBoundException) e;
+ }
+
+ if (e instanceof NoInitialContextException)
+ {
+ throw (NoInitialContextException) e;
+ }
+ }
+
+ }
+
+ private static String parse(String[] args, int index, String what, String type)
+ {
+ try
+ {
+ return args[index];
+ }
+ catch (IndexOutOfBoundsException ioobe)
+ {
+ System.out.println("ERROR: No " + what + " specified for " + type + ".");
+ System.out.println(USAGE);
+ System.exit(1);
+ }
+
+ // The path is either return normally or exception.. which calls system exit so keep the compiler happy
+ return "Never going to happen";
+ }
+
+
+ public static void main(String[] args) throws NameAlreadyBoundException, NoInitialContextException, URLSyntaxException, AMQException, JMSException
+ {
+
+
+ org.apache.log4j.Logger.getRootLogger().setLevel(org.apache.log4j.Level.OFF);
+
+// org.apache.log4j.Logger _logger = org.apache.log4j.Logger.getLogger(AMQConnection.class);
+// _logger.setLevel(org.apache.log4j.Level.OFF);
+
+ boolean exit = false;
+
+ String qpid_home = System.getProperty("QPID_HOME");
+
+ if (qpid_home == null || qpid_home.equals(""))
+ {
+ System.out.println("QPID_HOME is not set");
+ exit = true;
+ }
+
+ if (args.length <= 2)
+ {
+ System.out.println("At least a connection or connection factory must be requested to be bound.");
+ exit = true;
+ }
+ else
+ {
+ if ((args.length - 1) % 3 != 0)
+ {
+ System.out.println("Not all values have full details");
+ exit = true;
+ }
+ }
+ if (exit)
+ {
+ System.out.println(USAGE);
+ System.exit(1);
+ }
+
+ if (qpid_home.charAt(qpid_home.length() - 1) != '/')
+
+ {
+ qpid_home += "/";
+ }
+
+ AMQConnectionFactory cf = null;
+ AMQConnection c = null;
+ AMQSession session = null;
+ Referenceable reference = null;
+
+ for (int index = 1; index < args.length; index ++)
+ {
+ String obj = args[index];
+
+ String what = "Invalid";
+ String binding;
+
+ if (obj.startsWith("-c"))
+ {
+ boolean isFactory = obj.contains("f");
+
+
+ if (isFactory)
+ {
+ what = "ConnectionFactory";
+ }
+ else
+ {
+ what = "Factory";
+ }
+
+ String url = parse(args, ++index, "url", what);
+
+ if (isFactory)
+ {
+
+ cf = new AMQConnectionFactory(url);
+ reference = cf;
+ }
+ else
+ {
+ c = new AMQConnection(url);
+ reference = c;
+ }
+
+ }
+
+ if (obj.equals("-t") || obj.equals("-q"))
+ {
+ if (c == null)
+ {
+ c = (AMQConnection) cf.createConnection();
+ }
+
+ if (session == null)
+ {
+ session = (AMQSession) c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ }
+
+ if (obj.equals("-t"))
+ {
+
+ String topicName = parse(args, ++index, "Topic Name", "Topic");
+ reference = (AMQTopic) session.createTopic(topicName);
+ what = "Topic";
+ }
+ else
+ {
+ if (obj.equals("-q"))
+ {
+ String topicName = parse(args, ++index, "Queue Name", "Queue");
+ reference = (AMQQueue) session.createQueue(topicName);
+ what = "Queue";
+ }
+ }
+
+ binding = parse(args, ++index, "binding", what);
+ if (binding == null)
+ {
+ System.out.println(obj + " is not a known Object to bind.");
+ System.exit(1);
+ }
+ else
+ {
+ System.out.print("Binding:" + reference + " to " + binding);
+ try
+ {
+ new Bind(args[0], binding, reference);
+ System.out.println(" ..Successful");
+
+ }
+ catch (NameAlreadyBoundException nabe)
+ {
+ System.out.println("");
+ if (!obj.startsWith("-c") || index == args.length - 1)
+ {
+ throw nabe;
+ }
+ else
+ {
+ System.out.println("Continuing with other bindings using the same connection details");
+ }
+ }
+ finally
+ {
+ if (!obj.startsWith("-c") || index == args.length - 1)
+ {
+ if (c != null)
+ {
+ c.close();
+ }
+ }
+ }
+ }
+ }
+
+ if (c != null)
+ {
+ c.close();
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceable/Bind.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceable/Lookup.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceable/Lookup.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceable/Lookup.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceable/Lookup.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,176 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.jndi.referenceable;
+
+import javax.naming.*;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+
+import java.util.Properties;
+import java.io.InputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+/**
+ * Looksup a reference from a JNDI source.
+ * Given a properties file with the JNDI information and a binding string.
+ */
+class Lookup
+{
+ private static final String USAGE = "USAGE: java lookup <JNDI Properties file> -b <binding>";
+
+ public Lookup(String propertiesFile, String bindingValue) throws NamingException
+ {
+ // Set up the environment for creating the initial context
+ String qpid_home = System.getProperty("QPID_HOME");
+
+ if (qpid_home == null || qpid_home.equals(""))
+ {
+ System.out.println("QPID_HOME is not set");
+ System.exit(1);
+ }
+
+ if (qpid_home.charAt(qpid_home.length() - 1) != '/')
+ {
+ qpid_home += "/";
+ }
+
+ try
+ {
+ InputStream inputStream = new FileInputStream(qpid_home + propertiesFile);
+ Properties properties = new Properties();
+ properties.load(inputStream);
+
+ // Create the initial context
+ Context ctx = new InitialContext(properties);
+
+ // Perform the binds
+ Object obj = ctx.lookup(bindingValue);
+
+ if (obj instanceof Connection)
+ {
+ try
+ {
+ ((Connection) obj).close();
+ }
+ catch (JMSException jmse)
+ {
+ ;
+ }
+ }
+
+ System.out.println(bindingValue + " bound to " + obj);
+
+ // Close the context when we're done
+ ctx.close();
+ }
+ catch (IOException ioe)
+ {
+ System.out.println("Unable to access properties file:" + propertiesFile + " Due to:" + ioe);
+ }
+ }
+
+ private static String parse(String[] args, int index, String what)
+ {
+ try
+ {
+ return args[index];
+ }
+ catch (IndexOutOfBoundsException ioobe)
+ {
+ System.out.println("ERROR: No " + what + " specified.");
+ System.out.println(USAGE);
+ System.exit(1);
+ }
+
+ // The path is either return normally or exception.. which calls system exit so keep the compiler happy
+ return "Never going to happen";
+ }
+
+
+ public static void main(String[] args) throws NamingException
+ {
+ boolean exit = false;
+
+ String qpid_home = System.getProperty("QPID_HOME");
+
+ if (qpid_home == null || qpid_home.equals(""))
+ {
+ System.out.println("QPID_HOME is not set");
+ exit = true;
+ }
+
+ if (args.length <= 2)
+ {
+ System.out.println("At least a connection or connection factory must be requested to be bound.");
+ exit = true;
+ }
+ else
+ {
+ if ((args.length - 1) % 2 != 0)
+ {
+ System.out.println("Not all values have full details");
+ exit = true;
+ }
+ }
+ if (exit)
+ {
+ System.out.println(USAGE);
+ System.exit(1);
+ }
+
+ if (qpid_home.charAt(qpid_home.length() - 1) != '/')
+
+ {
+ qpid_home += "/";
+ }
+
+ for (int index = 1; index < args.length; index ++)
+ {
+ String obj = args[index];
+
+
+ if (obj.equals("-b"))
+ {
+ String binding = parse(args, ++index, "binding");
+
+ if (binding == null)
+ {
+ System.out.println("Binding not specified.");
+ System.exit(1);
+ }
+ else
+ {
+ System.out.print("Looking up:" + binding);
+ try
+ {
+ new Lookup(args[0], binding);
+ }
+ catch (NamingException nabe)
+ {
+ System.out.println("Problem unbinding " + binding + " continuing with other values.");
+ }
+ }
+ }// if -b
+ else
+ {
+ System.out.println("Continuing with other bindings option not known:" + obj);
+ }
+ }//for
+ }//main
+}//class
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceable/Lookup.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceable/Unbind.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceable/Unbind.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceable/Unbind.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceable/Unbind.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,163 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.jndi.referenceable;
+
+import javax.naming.*;
+
+import java.util.Properties;
+import java.io.InputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+/**
+ * Unbinds a reference from a JNDI source.
+ * Given a properties file with the JNDI information and a binding string.
+ */
+class UnBind
+{
+ private static final String USAGE = "USAGE: java unbind <JNDI Properties file> -b <binding>";
+
+ public UnBind(String propertiesFile, String bindingValue) throws NamingException
+ {
+ // Set up the environment for creating the initial context
+ String qpid_home = System.getProperty("QPID_HOME");
+
+ if (qpid_home == null || qpid_home.equals(""))
+ {
+ System.out.println("QPID_HOME is not set");
+ System.exit(1);
+ }
+
+ if (qpid_home.charAt(qpid_home.length() - 1) != '/')
+ {
+ qpid_home += "/";
+ }
+
+ try
+ {
+ InputStream inputStream = new FileInputStream(qpid_home + propertiesFile);
+ Properties properties = new Properties();
+ properties.load(inputStream);
+
+ // Create the initial context
+ Context ctx = new InitialContext(properties);
+
+ // Perform the binds
+ ctx.unbind(bindingValue);
+
+ // Close the context when we're done
+ ctx.close();
+ }
+ catch (IOException ioe)
+ {
+ System.out.println("Unable to access properties file:" + propertiesFile + " Due to:" + ioe);
+ }
+ }
+
+ private static String parse(String[] args, int index, String what)
+ {
+ try
+ {
+ return args[index];
+ }
+ catch (IndexOutOfBoundsException ioobe)
+ {
+ System.out.println("ERROR: No " + what + " specified.");
+ System.out.println(USAGE);
+ System.exit(1);
+ }
+
+ // The path is either return normally or exception.. which calls system exit so keep the compiler happy
+ return "Never going to happen";
+ }
+
+
+ public static void main(String[] args) throws NamingException
+ {
+ boolean exit = false;
+
+ String qpid_home = System.getProperty("QPID_HOME");
+
+ if (qpid_home == null || qpid_home.equals(""))
+ {
+ System.out.println("QPID_HOME is not set");
+ exit = true;
+ }
+
+ if (args.length <= 2)
+ {
+ System.out.println("At least a connection or connection factory must be requested to be bound.");
+ exit = true;
+ }
+ else
+ {
+ if ((args.length - 1) % 2 != 0)
+ {
+ System.out.println("Not all values have full details");
+ exit = true;
+ }
+ }
+ if (exit)
+ {
+ System.out.println(USAGE);
+ System.exit(1);
+ }
+
+ if (qpid_home.charAt(qpid_home.length() - 1) != '/')
+
+ {
+ qpid_home += "/";
+ }
+
+ for (int index = 1; index < args.length; index ++)
+ {
+ String obj = args[index];
+
+
+ if (obj.equals("-b"))
+ {
+ String binding = parse(args, ++index, "binding");
+
+ if (binding == null)
+ {
+ System.out.println("Binding not specified.");
+ System.exit(1);
+ }
+ else
+ {
+ System.out.print("UnBinding:" + binding);
+ try
+ {
+ new UnBind(args[0], binding);
+ System.out.println(" ..Successful");
+ }
+ catch (NamingException nabe)
+ {
+ System.out.println("");
+
+ System.out.println("Problem unbinding " + binding + " continuing with other values.");
+ }
+ }
+ }// if -b
+ else
+ {
+ System.out.println("Continuing with other bindings option not known:" + obj);
+ }
+ }//for
+ }//main
+}//class
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceable/Unbind.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Bind.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Bind.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Bind.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Bind.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,168 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.jndi.referenceabletest;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.junit.Assert;
+
+import javax.jms.*;
+import javax.naming.*;
+
+import java.util.Hashtable;
+
+/**
+ * Usage: To run these you need to have the sun JNDI SPI for the FileSystem.
+ * This can be downloaded from sun here:
+ * http://java.sun.com/products/jndi/downloads/index.html
+ * Click : Download JNDI 1.2.1 & More button
+ * Download: File System Service Provider, 1.2 Beta 3
+ * and add the two jars in the lib dir to your class path.
+ * <p/>
+ * Also you need to create the directory /temp/qpid-jndi-test
+ */
+class Bind
+{
+
+ String _connectionFactoryString = "";
+
+ String _connectionString = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
+
+ Topic _topic = null;
+
+ boolean _bound = false;
+
+ public Bind() throws NameAlreadyBoundException, NoInitialContextException
+ {
+ this(false);
+ }
+ public Bind(boolean output) throws NameAlreadyBoundException, NoInitialContextException
+ {
+ // Set up the environment for creating the initial context
+ Hashtable env = new Hashtable(11);
+ env.put(Context.INITIAL_CONTEXT_FACTORY,
+ "com.sun.jndi.fscontext.RefFSContextFactory");
+ env.put(Context.PROVIDER_URL, "file:/temp/qpid-jndi-test");
+
+ try
+ {
+ // Create the initial context
+ Context ctx = new InitialContext(env);
+
+ // Create the connection factory to be bound
+ ConnectionFactory connectionFactory = null;
+ // Create the Connection to be bound
+ Connection connection = null;
+
+ try
+ {
+ connectionFactory = new AMQConnectionFactory(_connectionString);
+ connection = connectionFactory.createConnection();
+
+ _connectionFactoryString = ((AMQConnectionFactory) connectionFactory).getConnectionURL().getURL();
+ }
+ catch (JMSException jmsqe)
+ {
+ Assert.fail("Unable to create Connection:" + jmsqe);
+ }
+ catch (URLSyntaxException urlse)
+ {
+ Assert.fail("Unable to create Connection:" + urlse);
+ }
+
+ try
+ {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _topic = session.createTopic("Fruity");
+
+ }
+ catch (JMSException jmse)
+ {
+
+ }
+ // Perform the binds
+ ctx.bind("ConnectionFactory", connectionFactory);
+ if (output)
+ {
+ System.out.println("Bound factory\n" + ((AMQConnectionFactory) connectionFactory).getConnectionURL());
+ }
+ ctx.bind("Connection", connection);
+ if (output)
+ {
+ System.out.println("Bound Connection\n" + ((AMQConnection) connection).toURL());
+ }
+ ctx.bind("Topic", _topic);
+ if (output)
+ {
+ System.out.println("Bound Topic:\n" + ((AMQTopic) _topic).toURL());
+ }
+ _bound = true;
+
+ // Check that it is bound
+ //Object obj = ctx.lookup("Connection");
+ //System.out.println(((AMQConnection)obj).toURL());
+
+ // Close the context when we're done
+ ctx.close();
+ }
+ catch (NamingException e)
+ {
+ System.out.println("Operation failed: " + e);
+ if (e instanceof NameAlreadyBoundException)
+ {
+ throw (NameAlreadyBoundException) e;
+ }
+
+ if (e instanceof NoInitialContextException)
+ {
+ throw (NoInitialContextException) e;
+ }
+ }
+
+ }
+
+ public String connectionFactoryValue()
+ {
+ return _connectionFactoryString;
+ }
+
+ public String connectionValue()
+ {
+ return _connectionString;
+ }
+
+ public String topicValue()
+ {
+ return ((AMQTopic) _topic).toURL();
+ }
+
+ public boolean bound()
+ {
+ return _bound;
+ }
+
+ public static void main(String[] args) throws NameAlreadyBoundException, NoInitialContextException
+ {
+ new Bind(true);
+ }
+}
+
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Bind.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/JNDIReferenceableTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/JNDIReferenceableTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/JNDIReferenceableTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/JNDIReferenceableTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,108 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.jndi.referenceabletest;
+
+import org.junit.Test;
+import org.junit.Assert;
+import junit.framework.JUnit4TestAdapter;
+
+import javax.naming.NamingException;
+import javax.naming.NameAlreadyBoundException;
+import javax.naming.NoInitialContextException;
+
+
+/**
+ * Usage: To run these you need to have the sun JNDI SPI for the FileSystem.
+ * This can be downloaded from sun here:
+ * http://java.sun.com/products/jndi/downloads/index.html
+ * Click : Download JNDI 1.2.1 & More button
+ * Download: File System Service Provider, 1.2 Beta 3
+ * and add the two jars in the lib dir to your class path.
+ * <p/>
+ * Also you need to create the directory /temp/qpid-jndi-test
+ */
+public class JNDIReferenceableTest
+{
+
+ @Test
+ public void referenceable()
+ {
+ Bind b = null;
+ try
+ {
+
+
+ try
+ {
+ b = new Bind();
+
+ }
+ catch (NameAlreadyBoundException e)
+ {
+ if (new Unbind().unbound())
+ {
+ try
+ {
+ b = new Bind();
+ }
+ catch (NameAlreadyBoundException ee)
+ {
+ Assert.fail("Unable to clear bound objects for test.");
+ }
+ }
+ else
+ {
+ Assert.fail("Unable to clear bound objects for test.");
+ }
+
+ }
+ }
+ catch (NoInitialContextException e)
+ {
+ Assert.fail("You don't have the File System SPI on you class path.\n" +
+ "This can be downloaded from sun here:\n" +
+ "http://java.sun.com/products/jndi/downloads/index.html\n" +
+ "Click : Download JNDI 1.2.1 & More button\n" +
+ "Download: File System Service Provider, 1.2 Beta 3\n" +
+ "and add the two jars in the lib dir to your class path.");
+ }
+
+ Assert.assertTrue(b.bound());
+
+ Lookup l = new Lookup();
+
+ Assert.assertTrue(l.connectionFactoryValue().equals(b.connectionFactoryValue()));
+
+ Assert.assertTrue(l.connectionValue().equals(b.connectionValue()));
+
+ Assert.assertTrue(l.topicValue().equals(b.topicValue()));
+
+
+ Unbind u = new Unbind();
+
+ Assert.assertTrue(u.unbound());
+
+ }
+
+ public static junit.framework.Test suite
+ ()
+ {
+ return new JUnit4TestAdapter(JNDIReferenceableTest.class);
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/JNDIReferenceableTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Lookup.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Lookup.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Lookup.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Lookup.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,95 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.jndi.referenceabletest;
+
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+
+import javax.naming.*;
+import java.util.Hashtable;
+
+
+/**
+ * Usage: To run these you need to have the sun JNDI SPI for the FileSystem.
+ * This can be downloaded from sun here:
+ * http://java.sun.com/products/jndi/downloads/index.html
+ * Click : Download JNDI 1.2.1 & More button
+ * Download: File System Service Provider, 1.2 Beta 3
+ * and add the two jars in the lib dir to your class path.
+ * <p/>
+ * Also you need to create the directory /temp/qpid-jndi-test
+ */
+class Lookup
+{
+ AMQTopic _topic = null;
+ AMQConnection _connection = null;
+ AMQConnectionFactory _connectionFactory = null;
+
+ public Lookup()
+ {
+ // Set up the environment for creating the initial context
+ Hashtable env = new Hashtable(11);
+ env.put(Context.INITIAL_CONTEXT_FACTORY,
+ "com.sun.jndi.fscontext.RefFSContextFactory");
+ env.put(Context.PROVIDER_URL, "file:/temp/qpid-jndi-test");
+
+ try
+ {
+ // Create the initial context
+ Context ctx = new InitialContext(env);
+
+ _topic = (AMQTopic) ctx.lookup("Topic");
+
+ _connection = (AMQConnection) ctx.lookup("Connection");
+
+ _connectionFactory = (AMQConnectionFactory) ctx.lookup("ConnectionFactory");
+ //System.out.println(topic);
+
+ // Close the context when we're done
+ ctx.close();
+ }
+ catch (NamingException e)
+ {
+ System.out.println("Operation failed: " + e);
+ }
+ }
+
+
+ public String connectionFactoryValue()
+ {
+ return ((AMQConnection) _connectionFactory.getConnectionURL()).toURL();
+ }
+
+ public String connectionValue()
+ {
+ return _connection.toURL();
+ }
+
+ public String topicValue()
+ {
+ return _topic.toURL();
+ }
+
+
+ public static void main(String[] args)
+ {
+ new Lookup();
+ }
+}
+
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Lookup.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Unbind.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Unbind.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Unbind.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Unbind.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,124 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.jndi.referenceabletest;
+
+import javax.naming.*;
+import java.io.File;
+import java.util.Hashtable;
+
+/**
+ * Usage: To run these you need to have the sun JNDI SPI for the FileSystem.
+ * This can be downloaded from sun here:
+ * http://java.sun.com/products/jndi/downloads/index.html
+ * Click : Download JNDI 1.2.1 & More button
+ * Download: File System Service Provider, 1.2 Beta 3
+ * and add the two jars in the lib dir to your class path.
+ * <p/>
+ * Also you need to create the directory /temp/qpid-jndi-test
+ */
+class Unbind
+{
+
+
+ boolean _unbound = false;
+
+ public Unbind()
+ {
+ this(false);
+ }
+
+ public Unbind(boolean output)
+ {
+
+ // Set up the environment for creating the initial context
+ Hashtable env = new Hashtable(11);
+ env.put(Context.INITIAL_CONTEXT_FACTORY,
+ "com.sun.jndi.fscontext.RefFSContextFactory");
+ env.put(Context.PROVIDER_URL, "file:/temp/qpid-jndi-test");
+
+ try
+ {
+ // Create the initial context
+ Context ctx = new InitialContext(env);
+
+ // Remove the binding
+ ctx.unbind("ConnectionFactory");
+ ctx.unbind("Connection");
+ ctx.unbind("Topic");
+
+ // Check that it is gone
+ Object obj = null;
+ try
+ {
+ obj = ctx.lookup("ConnectionFactory");
+ }
+ catch (NameNotFoundException ne)
+ {
+ if (output)
+ {
+ System.out.println("unbind ConnectionFactory successful");
+ }
+ try
+ {
+ obj = ctx.lookup("Connection");
+ }
+ catch (NameNotFoundException ne2)
+ {
+ if (output)
+ {
+ System.out.println("unbind Connection successful");
+ }
+
+ try
+ {
+ obj = ctx.lookup("Topic");
+ }
+ catch (NameNotFoundException ne3)
+ {
+ if (output)
+ {
+ System.out.println("unbind Topic successful");
+ }
+ _unbound = true;
+ }
+ }
+ }
+
+ //System.out.println("unbind failed; object still there: " + obj);
+
+ // Close the context when we're done
+ ctx.close();
+ }
+ catch (NamingException e)
+ {
+ System.out.println("Operation failed: " + e);
+ }
+ }
+
+ public boolean unbound()
+ {
+ return _unbound;
+ }
+
+ public static void main(String[] args)
+ {
+
+ new Unbind(true);
+ }
+}
+
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Unbind.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/UnitTests.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/UnitTests.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/UnitTests.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,33 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.jndi.referenceabletest;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.apache.qpid.ack.*;
+import junit.framework.JUnit4TestAdapter;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({JNDIReferenceableTest.class})
+public class UnitTests
+{
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(org.apache.qpid.destinationurl.UnitTests.class);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/jndi/referenceabletest/UnitTests.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/latency/LatencyTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/latency/LatencyTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/latency/LatencyTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/latency/LatencyTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,148 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.latency;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+import javax.jms.BytesMessage;
+
+public class LatencyTest implements MessageListener
+{
+ private volatile boolean waiting;
+ private int sent;
+ private int received;
+
+ private final byte[] data;
+
+ private long min = Long.MAX_VALUE;
+ private long max = 0;
+ private long total = 0;
+
+ LatencyTest(String broker, int count, int delay, int length) throws Exception
+ {
+ this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"), count, delay, length);
+ }
+
+ LatencyTest(AMQConnection connection, int count, int delay, int length) throws Exception
+ {
+ this(connection, new AMQQueue(randomize("LatencyTest"), true), count, delay, length);
+ }
+
+ LatencyTest(AMQConnection connection, AMQDestination destination, int count, int delay, int length) throws Exception
+ {
+ AMQSession session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ data = new byte[length];
+ for(int i = 0; i < data.length; i++)
+ {
+ data[i] = (byte) (i % 100);
+ }
+
+ //set up a consumer
+ session.createConsumer(destination).setMessageListener(this);
+ connection.start();
+
+ //create a publisher
+ MessageProducer producer = session.createProducer(destination, false, false, true);
+
+ //publish at a low volume
+ for(int i = 0; i < count; i++)
+ {
+ BytesMessage msg = session.createBytesMessage();
+ msg.writeBytes(data);
+ msg.setStringProperty("sent-at", Long.toString(System.nanoTime()));
+ producer.send(msg);
+ Thread.sleep(delay);
+ if(++sent % 100 == 0)
+ {
+ System.out.println("Sent " + sent + " of " + count);
+ }
+ }
+
+ waitUntilReceived(sent);
+
+ session.close();
+ connection.close();
+
+ System.out.println("Latency (in nanoseconds): avg=" + (total/sent) + ", min=" + min + ", max=" + max
+ + ", avg(discarding min and max)=" + ((total - min - max) / (sent - 2)));
+ }
+
+
+ private synchronized void waitUntilReceived(int count) throws InterruptedException
+ {
+ waiting = true;
+ while(received < count)
+ {
+ wait();
+ }
+ waiting = false;
+ }
+
+ public void onMessage(Message message)
+ {
+ received++;
+ try
+ {
+ long sent = Long.parseLong(message.getStringProperty("sent-at"));
+ long time = System.nanoTime() - sent;
+ total += time;
+ min = Math.min(min, time);
+ max = Math.max(max, time);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+
+ if(waiting){
+ synchronized(this)
+ {
+ notify();
+ }
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ String host = argv.length > 0 ? argv[0] : "localhost:5672";
+ if("-help".equals(host))
+ {
+ System.out.println("Usage: <broker> <message count> <delay between messages> <message size>");
+ }
+ int count = argv.length > 1 ? Integer.parseInt(argv[1]) : 1000;
+ int delay = argv.length > 2 ? Integer.parseInt(argv[2]) : 1000;
+ int size = argv.length > 3 ? Integer.parseInt(argv[3]) : 512;
+ new LatencyTest(host, count, delay, size);
+ }
+
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/latency/LatencyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/mina/AcceptorTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/mina/AcceptorTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/mina/AcceptorTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/mina/AcceptorTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,107 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.mina;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.transport.socket.nio.SocketAcceptor;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.junit.Test;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * Tests MINA socket performance. This acceptor simply reads data from the network and writes it back again.
+ *
+ */
+public class AcceptorTest
+{
+ private static final Logger _logger = Logger.getLogger(AcceptorTest.class);
+
+ public static int PORT = 9999;
+
+ private static class TestHandler extends IoHandlerAdapter
+ {
+ private int _sentCount;
+
+ private int _bytesSent;
+
+ public void messageReceived(IoSession session, Object message) throws Exception
+ {
+ ((ByteBuffer) message).acquire();
+ session.write(message);
+ _logger.debug("Sent response " + ++_sentCount);
+ _bytesSent += ((ByteBuffer)message).remaining();
+ _logger.debug("Bytes sent: " + _bytesSent);
+ }
+
+ public void messageSent(IoSession session, Object message) throws Exception
+ {
+ //((ByteBuffer) message).release();
+ }
+
+ public void exceptionCaught(IoSession session, Throwable cause) throws Exception
+ {
+ _logger.error("Error: " + cause, cause);
+ }
+ }
+
+ @Test
+ public void startAcceptor() throws IOException
+ {
+ boolean useMultithreadedIoProcessor = Boolean.getBoolean("qpid.io.multithreaded");
+ IoAcceptor acceptor = null;
+ if (useMultithreadedIoProcessor)
+ {
+ acceptor = new org.apache.qpid.nio.SocketAcceptor();
+ }
+ else
+ {
+ acceptor = new SocketAcceptor();
+ }
+ SocketAcceptorConfig config = (SocketAcceptorConfig) acceptor.getDefaultConfig();
+ SocketSessionConfig sc = (SocketSessionConfig) config.getSessionConfig();
+ sc.setTcpNoDelay(true);
+ sc.setSendBufferSize(32768);
+ sc.setReceiveBufferSize(32768);
+
+ config.setThreadModel(new ReadWriteThreadModel());
+
+ acceptor.bind(new InetSocketAddress(PORT),
+ new TestHandler());
+ _logger.info("Bound on port " + PORT);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(AcceptorTest.class);
+ }
+
+ public static void main(String[] args) throws IOException
+ {
+ AcceptorTest a = new AcceptorTest();
+ a.startAcceptor();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/mina/AcceptorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/mina/BlockingAcceptorTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/mina/BlockingAcceptorTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/mina/BlockingAcceptorTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/mina/BlockingAcceptorTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,91 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.mina;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class BlockingAcceptorTest
+{
+ private static final Logger _logger = Logger.getLogger(BlockingAcceptorTest.class);
+
+ public static int PORT = 9999;
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(AcceptorTest.class);
+ }
+
+ @Test
+ public void startAcceptor() throws IOException
+ {
+
+ ServerSocket sock = new ServerSocket(PORT);
+
+ sock.setReuseAddress(true);
+ sock.setReceiveBufferSize(32768);
+ _logger.info("Bound on port " + PORT);
+
+ while (true)
+ {
+ final Socket s = sock.accept();
+ _logger.info("Received connection from " + s.getRemoteSocketAddress());
+ s.setReceiveBufferSize(32768);
+ s.setSendBufferSize(32768);
+ s.setTcpNoDelay(true);
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ byte[] chunk = new byte[32768];
+ try
+ {
+ InputStream is = s.getInputStream();
+ OutputStream os = s.getOutputStream();
+
+ while (true)
+ {
+ int count = is.read(chunk, 0, chunk.length);
+ if (count > 0)
+ {
+ os.write(chunk, 0, count);
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ _logger.error("Error - closing connection: " + e, e);
+ }
+ }
+ }, "SocketReaderWriter").start();
+ }
+ }
+
+ public static void main(String[] args) throws IOException
+ {
+ BlockingAcceptorTest a = new BlockingAcceptorTest();
+ a.startAcceptor();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/mina/BlockingAcceptorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/mina/WriterTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/mina/WriterTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/mina/WriterTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/mina/WriterTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,272 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.mina;
+
+import junit.framework.JUnit4TestAdapter;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.*;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+
+public class WriterTest implements Runnable
+{
+ private static final Logger _logger = Logger.getLogger(WriterTest.class);
+
+ private IoSession _session;
+
+ private long _startTime;
+
+ private long[] _chunkTimes;
+
+ private int _chunkCount = 500000;
+
+ private int _chunkSize = 1024;
+
+ private CountDownLatch _notifier;
+
+ public void run()
+ {
+ _startTime = System.currentTimeMillis();
+ _notifier = new CountDownLatch(1);
+ for (int i = 0; i < _chunkCount; i++)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(_chunkSize, false);
+ byte check = (byte) (i % 128);
+ buf.put(check);
+ buf.fill((byte)88, buf.remaining());
+ buf.flip();
+ _session.write(buf);
+ }
+
+ try
+ {
+ _logger.info("All buffers sent; waiting for receipt from server");
+ _notifier.await();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ _logger.info("Completed");
+ long totalTime = System.currentTimeMillis() - _startTime;
+ _logger.info("Total time: " + totalTime);
+ _logger.info("MB per second: " + (_chunkSize * _chunkCount)/totalTime);
+ long lastChunkTime = _startTime;
+ double average = 0;
+ for (int i = 0; i < _chunkTimes.length; i++)
+ {
+ if (i == 0)
+ {
+ average = _chunkTimes[i] - _startTime;
+ }
+ else
+ {
+ long delta = _chunkTimes[i] - lastChunkTime;
+ if (delta != 0)
+ {
+ average = (average + delta)/2;
+ }
+ }
+ lastChunkTime = _chunkTimes[i];
+ }
+ _logger.info("Average chunk time: " + average + "ms");
+ CloseFuture cf = _session.close();
+ cf.join();
+ }
+
+ private class WriterHandler extends IoHandlerAdapter
+ {
+ private int _chunksReceived = 0;
+
+ private int _partialBytesRead = 0;
+
+ private byte _partialCheckNumber;
+
+ private int _totalBytesReceived = 0;
+
+ public void messageReceived(IoSession session, Object message) throws Exception
+ {
+ ByteBuffer result = (ByteBuffer) message;
+ _totalBytesReceived += result.remaining();
+ int size = result.remaining();
+ long now = System.currentTimeMillis();
+ if (_partialBytesRead > 0)
+ {
+ int offset = _chunkSize - _partialBytesRead;
+ if (size >= offset)
+ {
+ _chunkTimes[_chunksReceived++] = now;
+ result.position(offset);
+ }
+ else
+ {
+ // have not read even one chunk, including the previous partial bytes
+ _partialBytesRead += size;
+ return;
+ }
+ }
+
+ int chunkCount = result.remaining()/_chunkSize;
+
+ for (int i = 0; i < chunkCount; i++)
+ {
+ _chunkTimes[_chunksReceived++] = now;
+ byte check = result.get();
+ _logger.debug("Check number " + check + " read");
+ if (check != (byte)((_chunksReceived - 1)%128))
+ {
+ _logger.error("Check number " + check + " read when expected " + (_chunksReceived%128));
+ }
+ _logger.debug("Chunk times recorded");
+
+ try
+ {
+ result.skip(_chunkSize - 1);
+ }
+ catch (IllegalArgumentException e)
+ {
+ _logger.error("Position was: " + result.position());
+ _logger.error("Tried to skip to: " + (_chunkSize * i));
+ _logger.error("limit was; " + result.limit());
+ }
+ }
+ _logger.debug("Chunks received now " + _chunksReceived);
+ _logger.debug("Bytes received: " + _totalBytesReceived);
+ _partialBytesRead = result.remaining();
+
+ if (_partialBytesRead > 0)
+ {
+ _partialCheckNumber = result.get();
+ }
+
+ if (_chunksReceived >= _chunkCount)
+ {
+ _notifier.countDown();
+ }
+
+ }
+
+ public void exceptionCaught(IoSession session, Throwable cause) throws Exception
+ {
+ _logger.error("Error: " + cause, cause);
+ }
+ }
+
+ public void startWriter() throws IOException, InterruptedException
+ {
+ IoConnector ioConnector = null;
+ boolean useMultithreadedIoProcessor = Boolean.getBoolean("qpid.io.multithreaded");
+ if (useMultithreadedIoProcessor)
+ {
+ ioConnector = new org.apache.qpid.nio.SocketConnector();
+ }
+ else
+ {
+ ioConnector = new SocketConnector();
+ }
+ SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
+ cfg.setThreadModel(ThreadModel.MANUAL);
+ SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+ scfg.setTcpNoDelay(true);
+ scfg.setSendBufferSize(32768);
+ scfg.setReceiveBufferSize(32768);
+
+ final InetSocketAddress address = new InetSocketAddress("localhost", AcceptorTest.PORT);
+ _logger.info("Attempting connection to " + address);
+ ConnectFuture future = ioConnector.connect(address, new WriterHandler());
+ // wait for connection to complete
+ future.join();
+ _logger.info("Connection completed");
+ // we call getSession which throws an IOException if there has been an error connecting
+ _session = future.getSession();
+ _chunkTimes = new long[_chunkCount];
+ Thread t = new Thread(this);
+ t.start();
+ t.join();
+ _logger.info("Test completed");
+ }
+
+ @Test
+ public void test1k() throws IOException, InterruptedException
+ {
+ _logger.info("Starting 1k test");
+ _chunkSize = 1024;
+ startWriter();
+ }
+
+ @Test
+ public void test2k() throws IOException, InterruptedException
+ {
+ _logger.info("Starting 2k test");
+ _chunkSize = 2048;
+ startWriter();
+ }
+
+ @Test
+ public void test4k() throws IOException, InterruptedException
+ {
+ _logger.info("Starting 4k test");
+ _chunkSize = 4096;
+ startWriter();
+ }
+
+ @Test
+ public void test8k() throws IOException, InterruptedException
+ {
+ _logger.info("Starting 8k test");
+ _chunkSize = 8192;
+ startWriter();
+ }
+
+ @Test
+ public void test16k() throws IOException, InterruptedException
+ {
+ _logger.info("Starting 16k test");
+ _chunkSize = 16384;
+ startWriter();
+ }
+
+ @Test
+ public void test32k() throws IOException, InterruptedException
+ {
+ _logger.info("Starting 32k test");
+ _chunkSize = 32768;
+ startWriter();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new JUnit4TestAdapter(WriterTest.class);
+ }
+
+ public static void main(String[] args) throws IOException, InterruptedException
+ {
+ WriterTest w = new WriterTest();
+ //w.test1k();
+ //w.test2k();
+ //w.test4k();
+ w.test8k();
+ //w.test16k();
+ //w.test32k();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/mina/WriterTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/multiconsumer/AMQTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/multiconsumer/AMQTest.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/multiconsumer/AMQTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/multiconsumer/AMQTest.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,264 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.multiconsumer;
+
+import java.io.ByteArrayOutputStream;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+
+/**
+ * Test AMQ.
+ */
+public class AMQTest extends TestCase implements ExceptionListener
+{
+
+ private final static String COMPRESSION_PROPNAME = "_MSGAPI_COMP";
+ private final static String UTF8 = "UTF-8";
+ private static final String SUBJECT = "test.amq";
+ private static final String DUMMYCONTENT = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+ private static final String HUGECONTENT;
+
+ private Connection connect = null;
+ private Session pubSession = null;
+ private Session subSession = null;
+ private Topic topic = null;
+
+ static
+ {
+ StringBuilder sb = new StringBuilder(DUMMYCONTENT.length() * 115);
+ for (int i = 0; i < 100; i++)
+ {
+ sb.append(DUMMYCONTENT);
+ }
+ HUGECONTENT = sb.toString();
+ }
+
+ private void setup() throws Exception
+ {
+ connect = new AMQConnection("localhost", 5672, "guest", "guest", "client1", "/");
+ connect.setExceptionListener(this);
+ pubSession = connect.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ subSession = connect.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ topic = new AMQTopic(SUBJECT);
+
+ connect.start();
+ }
+
+ public void testMultipleListeners() throws Exception
+ {
+ setup();
+ try
+ {
+ // Create 5 listeners
+ MsgHandler[] listeners = new MsgHandler[5];
+ for (int i = 0; i < listeners.length; i++)
+ {
+ listeners[i] = new MsgHandler();
+ MessageConsumer subscriber = subSession.createConsumer(topic);
+ subscriber.setMessageListener(listeners[i]);
+ }
+ MessageProducer publisher = pubSession.createProducer(topic);
+ // Send a single message
+ TextMessage msg = pubSession.createTextMessage();
+ msg.setText(DUMMYCONTENT);
+ publisher.send(msg);
+ Thread.sleep(5000);
+ // Check listeners to ensure they all got it
+ for (int i = 0; i < listeners.length; i++)
+ {
+ if (listeners[i].isGotIt())
+ {
+ System.out.println("Got callback for listener " + i);
+ }
+ else
+ {
+ TestCase.fail("Listener " + i + " did not get callback");
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ System.err.println("Error: " + e);
+ e.printStackTrace(System.err);
+ }
+ finally
+ {
+ close();
+ }
+ }
+
+ public void testCompression() throws Exception
+ {
+ setup();
+ String comp = this.compressString(HUGECONTENT);
+ try
+ {
+ MsgHandler listener = new MsgHandler();
+ MessageConsumer subscriber = subSession.createConsumer(topic);
+ subscriber.setMessageListener(listener);
+ MessageProducer publisher = pubSession.createProducer(topic);
+
+ // Send a single message
+ TextMessage msg = pubSession.createTextMessage();
+ // Set the compressed text
+ msg.setText(comp);
+ msg.setBooleanProperty(COMPRESSION_PROPNAME, true);
+ publisher.send(msg);
+ Thread.sleep(1000);
+ // Check listeners to ensure we got it
+ if (listener.isGotIt())
+ {
+ System.out.println("Got callback for listener");
+ }
+ else
+ {
+ TestCase.fail("Listener did not get callback");
+ }
+ }
+ finally
+ {
+ close();
+ }
+ }
+
+ private void close() throws Exception
+ {
+ if (connect != null)
+ {
+ connect.close();
+ }
+ }
+
+ private class MsgHandler implements MessageListener
+ {
+ private boolean gotIt = false;
+
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ TextMessage textMessage = (TextMessage) msg;
+ String string = textMessage.getText();
+ if (string != null && string.length() > 0)
+ {
+ gotIt = true;
+ }
+ if (textMessage.getBooleanProperty(COMPRESSION_PROPNAME))
+ {
+ string = inflateString(string);
+ }
+ System.out.println("Got callback of size " + (string==null?0:string.length()));
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public boolean isGotIt()
+ {
+ return this.gotIt;
+ }
+ }
+
+ private String compressString(String string) throws Exception
+ {
+ long start = System.currentTimeMillis();
+ byte[] input = string.getBytes();
+ Deflater compressor = new Deflater(Deflater.BEST_COMPRESSION);
+ compressor.setInput(input);
+ compressor.finish();
+
+ // Get byte array from output of compressor
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(input.length);
+ byte[] buf = new byte[1024];
+ while (!compressor.finished())
+ {
+ int cnt = compressor.deflate(buf);
+ baos.write(buf, 0, cnt);
+ }
+ baos.close();
+ byte[] output = baos.toByteArray();
+
+ // Convert byte array into String
+ byte[] base64 = Base64.encodeBase64(output);
+ String sComp = new String(base64, UTF8);
+
+ long diff = System.currentTimeMillis() - start;
+ System.out.println("Compressed text from " + input.length + " to "
+ + sComp.getBytes().length + " in " + diff + " ms");
+ System.out.println("Compressed text = '" + sComp + "'");
+
+ return sComp;
+ }
+
+ private String inflateString(String string) throws Exception
+ {
+ byte[] input = string.getBytes();
+
+ // First convert Base64 string back to binary array
+ byte[] bytes = Base64.decodeBase64(input);
+
+ // Set string as input data for decompressor
+ Inflater decompressor = new Inflater();
+ decompressor.setInput(bytes);
+
+ // Decompress the data
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(input.length);
+ byte[] buf = new byte[1024];
+ while (!decompressor.finished())
+ {
+ int count = decompressor.inflate(buf);
+ bos.write(buf, 0, count);
+ }
+ bos.close();
+ byte[] output = bos.toByteArray();
+
+ // Get the decompressed data
+ return new String(output, UTF8);
+ }
+
+ /**
+ * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
+ */
+ public void onException(JMSException e)
+ {
+ System.err.println(e.getMessage());
+ e.printStackTrace(System.err);
+ }
+
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/multiconsumer/AMQTest.java
------------------------------------------------------------------------------
svn:eol-style = native