You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/08/05 18:52:01 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5918

Repository: activemq
Updated Branches:
  refs/heads/master 457dbd8b6 -> a79f317d3


https://issues.apache.org/jira/browse/AMQ-5918

Update to use the QPid JMS client v0.3.0

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a79f317d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a79f317d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a79f317d

Branch: refs/heads/master
Commit: a79f317d31ccee753b6097d85473488934221900
Parents: 457dbd8
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Aug 5 12:49:21 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Aug 5 12:51:52 2015 -0400

----------------------------------------------------------------------
 assembly/src/release/examples/amqp/java/pom.xml | 18 +++---
 .../java/src/main/java/example/Listener.java    | 58 ++++++++++++--------
 .../java/src/main/java/example/Publisher.java   | 47 +++++++++-------
 3 files changed, 70 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a79f317d/assembly/src/release/examples/amqp/java/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/src/release/examples/amqp/java/pom.xml b/assembly/src/release/examples/amqp/java/pom.xml
index d2b063b..f014ec4 100644
--- a/assembly/src/release/examples/amqp/java/pom.xml
+++ b/assembly/src/release/examples/amqp/java/pom.xml
@@ -6,9 +6,9 @@
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
   the License.  You may obtain a copy of the License at
-  
+
   http://www.apache.org/licenses/LICENSE-2.0
-  
+
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,16 +18,16 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
   <modelVersion>4.0.0</modelVersion>
-  
+
   <groupId>example</groupId>
   <artifactId>amqp-example</artifactId>
   <version>0.1-SNAPSHOT</version>
-  
+
   <name>example</name>
   <description>ActiveMQ AMQP Java Examples</description>
 
   <dependencies>
-    
+
     <dependency>
       <groupId>org.apache.geronimo.specs</groupId>
       <artifactId>geronimo-jms_1.1_spec</artifactId>
@@ -35,8 +35,8 @@
     </dependency>
     <dependency>
       <groupId>org.apache.qpid</groupId>
-      <artifactId>qpid-amqp-1-0-client-jms</artifactId>
-      <version>${qpid-jms-version}</version>
+      <artifactId>qpid-jms-client</artifactId>
+      <version>0.3.0</version>
     </dependency>
 
   </dependencies>
@@ -65,9 +65,9 @@
           </execution>
         </executions>
       </plugin>
-      
+
     </plugins>
   </build>
-  
+
 </project>
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/a79f317d/assembly/src/release/examples/amqp/java/src/main/java/example/Listener.java
----------------------------------------------------------------------
diff --git a/assembly/src/release/examples/amqp/java/src/main/java/example/Listener.java b/assembly/src/release/examples/amqp/java/src/main/java/example/Listener.java
index bd8d198..4598d5b 100644
--- a/assembly/src/release/examples/amqp/java/src/main/java/example/Listener.java
+++ b/assembly/src/release/examples/amqp/java/src/main/java/example/Listener.java
@@ -16,73 +16,83 @@
  */
 package example;
 
-import org.apache.qpid.amqp_1_0.jms.impl.*;
+import org.apache.qpid.jms.*;
 import javax.jms.*;
 
 class Listener {
 
-    public static void main(String []args) throws JMSException {
+    public static void main(String[] args) throws JMSException {
+
+        final String TOPIC_PREFIX = "topic://";
 
         String user = env("ACTIVEMQ_USER", "admin");
         String password = env("ACTIVEMQ_PASSWORD", "password");
         String host = env("ACTIVEMQ_HOST", "localhost");
         int port = Integer.parseInt(env("ACTIVEMQ_PORT", "5672"));
-        String destination = arg(args, 0, "topic://event");
 
-        ConnectionFactoryImpl factory = new ConnectionFactoryImpl(host, port, user, password);
-        Destination dest = null;
-        if( destination.startsWith("topic://") ) {
-            dest = new TopicImpl(destination);
-        } else {
-            dest = new QueueImpl(destination);
-        }
+        String connectionURI = "amqp://" + host + ":" + port;
+        String destinationName = arg(args, 0, "topic://event");
+
+        JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI);
 
         Connection connection = factory.createConnection(user, password);
         connection.start();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createConsumer(dest);
+
+        Destination destination = null;
+        if (destinationName.startsWith(TOPIC_PREFIX)) {
+            destination = session.createTopic(destinationName.substring(TOPIC_PREFIX.length()));
+        } else {
+            destination = session.createQueue(destinationName);
+        }
+
+        MessageConsumer consumer = session.createConsumer(destination);
         long start = System.currentTimeMillis();
         long count = 1;
         System.out.println("Waiting for messages...");
-        while(true) {
+        while (true) {
             Message msg = consumer.receive();
-            if( msg instanceof  TextMessage ) {
+            if (msg instanceof TextMessage) {
                 String body = ((TextMessage) msg).getText();
-                if( "SHUTDOWN".equals(body)) {
+                if ("SHUTDOWN".equals(body)) {
                     long diff = System.currentTimeMillis() - start;
-                    System.out.println(String.format("Received %d in %.2f seconds", count, (1.0*diff/1000.0)));
+                    System.out.println(String.format("Received %d in %.2f seconds", count, (1.0 * diff / 1000.0)));
                     connection.close();
+                    try {
+                        Thread.sleep(10);
+                    } catch (Exception e) {}
                     System.exit(1);
                 } else {
                     try {
-                        if( count != msg.getIntProperty("id") ) {
-                            System.out.println("mismatch: "+count+"!="+msg.getIntProperty("id"));
+                        if (count != msg.getIntProperty("id")) {
+                            System.out.println("mismatch: " + count + "!=" + msg.getIntProperty("id"));
                         }
                     } catch (NumberFormatException ignore) {
                     }
-                    if( count == 1 ) {
+
+                    if (count == 1) {
                         start = System.currentTimeMillis();
-                    } else if( count % 1000 == 0 ) {
+                    } else if (count % 1000 == 0) {
                         System.out.println(String.format("Received %d messages.", count));
                     }
-                    count ++;
+                    count++;
                 }
 
             } else {
-                System.out.println("Unexpected message type: "+msg.getClass());
+                System.out.println("Unexpected message type: " + msg.getClass());
             }
         }
     }
 
     private static String env(String key, String defaultValue) {
         String rc = System.getenv(key);
-        if( rc== null )
+        if (rc == null)
             return defaultValue;
         return rc;
     }
 
-    private static String arg(String []args, int index, String defaultValue) {
-        if( index < args.length )
+    private static String arg(String[] args, int index, String defaultValue) {
+        if (index < args.length)
             return args[index];
         else
             return defaultValue;

http://git-wip-us.apache.org/repos/asf/activemq/blob/a79f317d/assembly/src/release/examples/amqp/java/src/main/java/example/Publisher.java
----------------------------------------------------------------------
diff --git a/assembly/src/release/examples/amqp/java/src/main/java/example/Publisher.java b/assembly/src/release/examples/amqp/java/src/main/java/example/Publisher.java
index 70c46bd..ec0fe1e 100644
--- a/assembly/src/release/examples/amqp/java/src/main/java/example/Publisher.java
+++ b/assembly/src/release/examples/amqp/java/src/main/java/example/Publisher.java
@@ -16,66 +16,73 @@
  */
 package example;
 
-import org.apache.qpid.amqp_1_0.jms.impl.*;
+import org.apache.qpid.jms.*;
 import javax.jms.*;
 
 class Publisher {
 
-    public static void main(String []args) throws Exception {
+    public static void main(String[] args) throws Exception {
+
+        final String TOPIC_PREFIX = "topic://";
 
         String user = env("ACTIVEMQ_USER", "admin");
         String password = env("ACTIVEMQ_PASSWORD", "password");
         String host = env("ACTIVEMQ_HOST", "localhost");
         int port = Integer.parseInt(env("ACTIVEMQ_PORT", "5672"));
-        String destination = arg(args, 0, "topic://event");
+
+        String connectionURI = "amqp://" + host + ":" + port;
+        String destinationName = arg(args, 0, "topic://event");
 
         int messages = 10000;
         int size = 256;
 
         String DATA = "abcdefghijklmnopqrstuvwxyz";
         String body = "";
-        for( int i=0; i < size; i ++) {
-            body += DATA.charAt(i%DATA.length());
+        for (int i = 0; i < size; i++) {
+            body += DATA.charAt(i % DATA.length());
         }
 
-        ConnectionFactoryImpl factory = new ConnectionFactoryImpl(host, port, user, password);
-        Destination dest = null;
-        if( destination.startsWith("topic://") ) {
-            dest = new TopicImpl(destination);
-        } else {
-            dest = new QueueImpl(destination);
-        }
+        JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI);
 
         Connection connection = factory.createConnection(user, password);
         connection.start();
+
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(dest);
+
+        Destination destination = null;
+        if (destinationName.startsWith(TOPIC_PREFIX)) {
+            destination = session.createTopic(destinationName.substring(TOPIC_PREFIX.length()));
+        } else {
+            destination = session.createQueue(destinationName);
+        }
+
+        MessageProducer producer = session.createProducer(destination);
         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
-        for( int i=1; i <= messages; i ++) {
-            TextMessage msg = session.createTextMessage("#:"+i);
+        for (int i = 1; i <= messages; i++) {
+            TextMessage msg = session.createTextMessage("#:" + i);
             msg.setIntProperty("id", i);
             producer.send(msg);
-            if( (i % 1000) == 0) {
+            if ((i % 1000) == 0) {
                 System.out.println(String.format("Sent %d messages", i));
             }
         }
 
         producer.send(session.createTextMessage("SHUTDOWN"));
-        Thread.sleep(1000*3);
+        Thread.sleep(1000 * 3);
         connection.close();
         System.exit(0);
     }
 
     private static String env(String key, String defaultValue) {
         String rc = System.getenv(key);
-        if( rc== null )
+        if (rc == null)
             return defaultValue;
         return rc;
     }
 
-    private static String arg(String []args, int index, String defaultValue) {
-        if( index < args.length )
+    private static String arg(String[] args, int index, String defaultValue) {
+        if (index < args.length)
             return args[index];
         else
             return defaultValue;