You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/03/29 20:38:19 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1777 Adding Protocol specific into producer / consumer

Repository: activemq-artemis
Updated Branches:
  refs/heads/master b6a29a5b5 -> 309729bbd


ARTEMIS-1777 Adding Protocol specific into producer / consumer


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c2955af1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c2955af1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c2955af1

Branch: refs/heads/master
Commit: c2955af16415f1479a002c8238d78410fd27e36b
Parents: b6a29a5
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Mar 29 11:37:56 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 29 15:21:38 2018 -0400

----------------------------------------------------------------------
 artemis-cli/pom.xml                             |  6 +++
 .../artemis/cli/commands/AbstractAction.java    |  4 +-
 .../artemis/cli/commands/messages/Browse.java   |  7 ++-
 .../commands/messages/ConnectionAbstract.java   | 45 +++++++++++++++++++-
 .../artemis/cli/commands/messages/Consumer.java |  8 ++--
 .../cli/commands/messages/ConsumerThread.java   |  8 +++-
 .../cli/commands/messages/DestAbstract.java     | 12 ++++++
 .../artemis/cli/commands/messages/Producer.java |  7 ++-
 artemis-distribution/pom.xml                    | 10 ++++-
 artemis-distribution/src/main/assembly/dep.xml  |  1 +
 10 files changed, 90 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-cli/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-cli/pom.xml b/artemis-cli/pom.xml
index 7870c59..88a48db 100644
--- a/artemis-cli/pom.xml
+++ b/artemis-cli/pom.xml
@@ -94,6 +94,12 @@
          <groupId>org.apache.geronimo.specs</groupId>
          <artifactId>geronimo-json_1.0_spec</artifactId>
       </dependency>
+      <!-- artemis producer and consumer can use amqp as the protocol -->
+      <dependency>
+         <groupId>org.apache.qpid</groupId>
+         <artifactId>qpid-jms-client</artifactId>
+         <version>${qpid.jms.version}</version>
+      </dependency>
       <dependency>
          <groupId>org.jboss.logging</groupId>
          <artifactId>jboss-logging-annotations</artifactId>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java
index 3619ed7..37f08c3 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java
@@ -30,8 +30,8 @@ public abstract class AbstractAction extends ConnectionAbstract {
 
    public void performCoreManagement(ManagementCallback<ClientMessage> cb) throws Exception {
 
-      try (ActiveMQConnectionFactory factory = createConnectionFactory();
-         ServerLocator locator = factory.getServerLocator();
+      try (ActiveMQConnectionFactory factory = createCoreConnectionFactory();
+           ServerLocator locator = factory.getServerLocator();
            ClientSessionFactory sessionFactory = locator.createSessionFactory();
            ClientSession session = sessionFactory.createSession(user, password, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)) {
          session.start();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java
index 9562b59..e249cbf 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java
@@ -18,14 +18,13 @@
 package org.apache.activemq.artemis.cli.commands.messages;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.Session;
 
 import io.airlift.airline.Command;
 import io.airlift.airline.Option;
 import org.apache.activemq.artemis.cli.commands.ActionContext;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 
 @Command(name = "browser", description = "It will browse messages on an instance")
 public class Browse extends DestAbstract {
@@ -39,9 +38,8 @@ public class Browse extends DestAbstract {
 
       System.out.println("Consumer:: filter = " + filter);
 
-      ActiveMQConnectionFactory factory = createConnectionFactory();
+      ConnectionFactory factory = createConnectionFactory();
 
-      Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE);
       try (Connection connection = factory.createConnection()) {
          ConsumerThread[] threadsArray = new ConsumerThread[threads];
          for (int i = 0; i < threads; i++) {
@@ -51,6 +49,7 @@ public class Browse extends DestAbstract {
             } else {
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             }
+            Destination dest = lookupDestination(session);
             threadsArray[i] = new ConsumerThread(session, dest, i);
 
             threadsArray[i].setVerbose(verbose).setSleep(sleep).setMessageCount(messageCount).setFilter(filter).setBrowse(true);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java
index 41443c4..90882e6 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java
@@ -18,12 +18,14 @@
 package org.apache.activemq.artemis.cli.commands.messages;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.JMSSecurityException;
 
 import io.airlift.airline.Option;
 import org.apache.activemq.artemis.cli.commands.InputAbstract;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.qpid.jms.JmsConnectionFactory;
 
 public class ConnectionAbstract extends InputAbstract {
 
@@ -39,7 +41,48 @@ public class ConnectionAbstract extends InputAbstract {
    @Option(name = "--clientID", description = "ClientID to be associated with connection")
    String clientID;
 
-   protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+   @Option(name = "--protocol", description = "Protocol used. Valid values are amqp or core. Default=core.")
+   String protocol = "core";
+
+   protected ConnectionFactory createConnectionFactory() throws Exception {
+      if (protocol.equals("core")) {
+         return createCoreConnectionFactory();
+      } else if (protocol.equals("amqp")) {
+         return createAMQPConnectionFactory();
+      } else {
+         throw new IllegalStateException("protocol " + protocol + " not supported");
+      }
+   }
+
+   private ConnectionFactory createAMQPConnectionFactory() {
+      if (brokerURL.startsWith("tcp://")) {
+         // replacing tcp:// by amqp://
+         brokerURL = "amqp" + brokerURL.substring(3);
+      }
+      JmsConnectionFactory cf = new JmsConnectionFactory(user, password, brokerURL);
+      if (clientID != null) {
+         cf.setClientID(clientID);
+      }
+
+      try {
+         Connection connection = cf.createConnection();
+         connection.close();
+         return cf;
+      } catch (JMSSecurityException e) {
+         // if a security exception will get the user and password through an input
+         context.err.println("Connection failed::" + e.getMessage());
+         userPassword();
+         return new JmsConnectionFactory(user, password, brokerURL);
+      } catch (JMSException e) {
+         // if a connection exception will ask for the URL, user and password
+         context.err.println("Connection failed::" + e.getMessage());
+         brokerURL = input("--url", "Type in the broker URL for a retry (e.g. tcp://localhost:61616)", brokerURL);
+         userPassword();
+         return new JmsConnectionFactory(user, password, brokerURL);
+      }
+   }
+
+   protected ActiveMQConnectionFactory createCoreConnectionFactory() {
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL, user, password);
 
       if (clientID != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
index c58f792..ee15a66 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
@@ -18,14 +18,13 @@
 package org.apache.activemq.artemis.cli.commands.messages;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.Session;
 
 import io.airlift.airline.Command;
 import io.airlift.airline.Option;
 import org.apache.activemq.artemis.cli.commands.ActionContext;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 
 @Command(name = "consumer", description = "It will consume messages from an instance")
 public class Consumer extends DestAbstract {
@@ -48,9 +47,8 @@ public class Consumer extends DestAbstract {
 
       System.out.println("Consumer:: filter = " + filter);
 
-      ActiveMQConnectionFactory factory = createConnectionFactory();
+      ConnectionFactory factory = createConnectionFactory();
 
-      Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE);
       try (Connection connection = factory.createConnection()) {
          ConsumerThread[] threadsArray = new ConsumerThread[threads];
          for (int i = 0; i < threads; i++) {
@@ -60,6 +58,7 @@ public class Consumer extends DestAbstract {
             } else {
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             }
+            Destination dest = lookupDestination(session);
             threadsArray[i] = new ConsumerThread(session, dest, i);
 
             threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull).setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout).setFilter(filter).setBrowse(false);
@@ -82,4 +81,5 @@ public class Consumer extends DestAbstract {
       }
    }
 
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
index 7883e58..ecffa34 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
@@ -146,12 +146,16 @@ public class ConsumerThread extends Thread {
                consumer = session.createConsumer(destination);
             }
          }
+         int count = 0;
          while (running && received < messageCount) {
             Message msg = consumer.receive(receiveTimeOut);
             if (msg != null) {
-               System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
                if (verbose) {
-                  System.out.println("..." + msg);
+                  System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
+               } else {
+                  if (++count % 1000 == 0) {
+                     System.out.println("Received " + count);
+                  }
                }
                if (bytesAsText && (msg instanceof BytesMessage)) {
                   long length = ((BytesMessage) msg).getBodyLength();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
index acf0473..2f4a34c 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
@@ -17,7 +17,11 @@
 
 package org.apache.activemq.artemis.cli.commands.messages;
 
+import javax.jms.Destination;
+import javax.jms.Session;
+
 import io.airlift.airline.Option;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 
 public class DestAbstract extends ConnectionAbstract {
 
@@ -36,4 +40,12 @@ public class DestAbstract extends ConnectionAbstract {
    @Option(name = "--threads", description = "Number of Threads to be used (Default: 1)")
    int threads = 1;
 
+   protected Destination lookupDestination(Session session) throws Exception {
+      if (protocol.equals("AMQP")) {
+         return session.createQueue(destination);
+      } else {
+         return ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE);
+      }
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
index 3ed4b57..e077fb0 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
@@ -18,14 +18,13 @@
 package org.apache.activemq.artemis.cli.commands.messages;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.Session;
 
 import io.airlift.airline.Command;
 import io.airlift.airline.Option;
 import org.apache.activemq.artemis.cli.commands.ActionContext;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 
 @Command(name = "producer", description = "It will send messages to an instance")
 public class Producer extends DestAbstract {
@@ -49,9 +48,8 @@ public class Producer extends DestAbstract {
    public Object execute(ActionContext context) throws Exception {
       super.execute(context);
 
-      ActiveMQConnectionFactory factory = createConnectionFactory();
+      ConnectionFactory factory = createConnectionFactory();
 
-      Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE);
       try (Connection connection = factory.createConnection()) {
          ProducerThread[] threadsArray = new ProducerThread[threads];
          for (int i = 0; i < threads; i++) {
@@ -61,6 +59,7 @@ public class Producer extends DestAbstract {
             } else {
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             }
+            Destination dest = lookupDestination(session);
             threadsArray[i] = new ProducerThread(session, dest, i);
 
             threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent).

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-distribution/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-distribution/pom.xml b/artemis-distribution/pom.xml
index 84116a4..6565430 100644
--- a/artemis-distribution/pom.xml
+++ b/artemis-distribution/pom.xml
@@ -170,7 +170,15 @@
            <artifactId>tomcat-servlet-api</artifactId>
        </dependency>
 
-       <!-- Management Console Dependencies -->
+      <!-- for artemis cli producer/consumer -->
+      <dependency>
+         <groupId>org.apache.qpid</groupId>
+         <artifactId>qpid-jms-client</artifactId>
+         <version>${qpid.jms.version}</version>
+      </dependency>
+
+
+      <!-- Management Console Dependencies -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>artemis-console</artifactId>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2955af1/artemis-distribution/src/main/assembly/dep.xml
----------------------------------------------------------------------
diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml
index f809713..aad4ab1 100644
--- a/artemis-distribution/src/main/assembly/dep.xml
+++ b/artemis-distribution/src/main/assembly/dep.xml
@@ -69,6 +69,7 @@
             <include>org.apache.activemq:artemis-service-extensions</include>
             <include>org.apache.activemq:artemis-web</include>
             <include>org.apache.activemq.rest:artemis-rest</include>
+            <include>org.apache.qpid:qpid-jms-client</include>
 
             <!-- dependencies -->
             <include>org.apache.geronimo.specs:geronimo-jms_2.0_spec</include>


[2/2] activemq-artemis git commit: This closes #1980

Posted by cl...@apache.org.
This closes #1980


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/309729bb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/309729bb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/309729bb

Branch: refs/heads/master
Commit: 309729bbdb4e7160a4e41d25521ad54f8d680c3f
Parents: b6a29a5 c2955af
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Mar 29 16:38:12 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 29 16:38:12 2018 -0400

----------------------------------------------------------------------
 artemis-cli/pom.xml                             |  6 +++
 .../artemis/cli/commands/AbstractAction.java    |  4 +-
 .../artemis/cli/commands/messages/Browse.java   |  7 ++-
 .../commands/messages/ConnectionAbstract.java   | 45 +++++++++++++++++++-
 .../artemis/cli/commands/messages/Consumer.java |  8 ++--
 .../cli/commands/messages/ConsumerThread.java   |  8 +++-
 .../cli/commands/messages/DestAbstract.java     | 12 ++++++
 .../artemis/cli/commands/messages/Producer.java |  7 ++-
 artemis-distribution/pom.xml                    | 10 ++++-
 artemis-distribution/src/main/assembly/dep.xml  |  1 +
 10 files changed, 90 insertions(+), 18 deletions(-)
----------------------------------------------------------------------