You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/04/25 15:13:45 UTC

[2/3] activemq-artemis git commit: ARTEMIS-503 - replace proton-jms with proton-jms from ActiveMQ

ARTEMIS-503 - replace proton-jms with proton-jms from ActiveMQ

https://issues.apache.org/jira/browse/ARTEMIS-503


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9a17681f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9a17681f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9a17681f

Branch: refs/heads/master
Commit: 9a17681f83280fba31beaddb3359277512ebf673
Parents: 76d937f
Author: Andy Taylor <an...@gmail.com>
Authored: Mon Apr 25 09:38:03 2016 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Mon Apr 25 14:13:30 2016 +0100

----------------------------------------------------------------------
 artemis-distribution/pom.xml                    |  8 +--
 artemis-distribution/src/main/assembly/dep.xml  |  2 +-
 artemis-protocols/artemis-amqp-protocol/pom.xml |  8 +--
 .../AMQPNativeOutboundTransformer.java          | 56 ++++++++++++++++++++
 .../proton/converter/ActiveMQJMSVendor.java     |  9 +---
 .../converter/ProtonMessageConverter.java       | 51 +++++++++++++++---
 .../plug/ProtonSessionIntegrationCallback.java  |  2 +-
 .../core/protocol/proton/TestConversions.java   |  2 +-
 pom.xml                                         | 14 +++--
 tests/integration-tests/pom.xml                 |  4 --
 10 files changed, 123 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/artemis-distribution/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-distribution/pom.xml b/artemis-distribution/pom.xml
index 85fd398..6f462b4 100644
--- a/artemis-distribution/pom.xml
+++ b/artemis-distribution/pom.xml
@@ -147,10 +147,6 @@
          <groupId>org.jboss.logmanager</groupId>
          <artifactId>jboss-logmanager</artifactId>
       </dependency>
-      <dependency>
-         <groupId>org.apache.qpid</groupId>
-         <artifactId>proton-jms</artifactId>
-      </dependency>
        <dependency>
            <groupId>io.airlift</groupId>
            <artifactId>airline</artifactId>
@@ -190,6 +186,10 @@
          <groupId>io.netty</groupId>
          <artifactId>netty-codec-mqtt</artifactId>
       </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>activemq-amqp</artifactId>
+      </dependency>
    </dependencies>
 
    <build>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/artemis-distribution/src/main/assembly/dep.xml
----------------------------------------------------------------------
diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml
index 4adfaf8..d723bae 100644
--- a/artemis-distribution/src/main/assembly/dep.xml
+++ b/artemis-distribution/src/main/assembly/dep.xml
@@ -80,7 +80,7 @@
             <include>org.jboss.logging:jboss-logging</include>
             <include>io.netty:netty-all</include>
             <include>org.apache.qpid:proton-j</include>
-            <include>org.apache.qpid:proton-jms</include>
+            <include>org.apache.activemq:activemq-amqp</include>
             <include>org.apache.activemq:activemq-client</include>
             <include>org.slf4j:slf4j-api</include>
             <include>io.airlift:airline</include>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/artemis-protocols/artemis-amqp-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/pom.xml b/artemis-protocols/artemis-amqp-protocol/pom.xml
index 98149c1..78e9c3b 100644
--- a/artemis-protocols/artemis-amqp-protocol/pom.xml
+++ b/artemis-protocols/artemis-amqp-protocol/pom.xml
@@ -42,6 +42,10 @@
          <version>${project.version}</version>
       </dependency>
       <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>activemq-amqp</artifactId>
+      </dependency>
+      <dependency>
          <groupId>org.jboss.logging</groupId>
          <artifactId>jboss-logging-processor</artifactId>
          <scope>provided</scope>
@@ -84,10 +88,6 @@
          <artifactId>proton-j</artifactId>
       </dependency>
       <dependency>
-         <groupId>org.apache.qpid</groupId>
-         <artifactId>proton-jms</artifactId>
-      </dependency>
-      <dependency>
          <groupId>org.apache.geronimo.specs</groupId>
          <artifactId>geronimo-jms_2.0_spec</artifactId>
          <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java
new file mode 100644
index 0000000..c187ad0
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.proton.converter;
+
+import org.apache.activemq.transport.amqp.message.OutboundTransformer;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.message.ProtonJMessage;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+
+public class AMQPNativeOutboundTransformer {
+   static ProtonJMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
+      byte[] data = new byte[(int) msg.getBodyLength()];
+      msg.readBytes(data);
+      msg.reset();
+      int count = msg.getIntProperty("JMSXDeliveryCount");
+
+      // decode...
+      ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
+      int offset = 0;
+      int len = data.length;
+      while (len > 0) {
+         final int decoded = amqp.decode(data, offset, len);
+         assert decoded > 0 : "Make progress decoding the message";
+         offset += decoded;
+         len -= decoded;
+      }
+
+      // Update the DeliveryCount header...
+      // The AMQP delivery-count field only includes prior failed delivery attempts,
+      // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1.
+      if (amqp.getHeader() == null) {
+         amqp.setHeader(new Header());
+      }
+
+      amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1));
+
+      return amqp;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java
index 639b390..ba6b9be 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java
@@ -26,7 +26,6 @@ import javax.jms.TextMessage;
 
 import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerDestination;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
-import org.apache.qpid.proton.jms.JMSVendor;
 import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
 import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSBytesMessage;
 import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMapMessage;
@@ -36,8 +35,9 @@ import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMST
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.utils.IDGenerator;
+import org.apache.activemq.transport.amqp.message.JMSVendor;
 
-public class ActiveMQJMSVendor extends JMSVendor {
+public class ActiveMQJMSVendor implements JMSVendor {
 
    private final IDGenerator serverGenerator;
 
@@ -86,11 +86,6 @@ public class ActiveMQJMSVendor extends JMSVendor {
    }
 
    @Override
-   public <T extends Destination> T createDestination(String name, Class<T> kind) {
-      return super.createDestination(name, kind);
-   }
-
-   @Override
    public void setJMSXGroupID(Message message, String s) {
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java
index 4de2357..da99e68 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java
@@ -16,23 +16,30 @@
  */
 package org.apache.activemq.artemis.core.protocol.proton.converter;
 
-import org.apache.qpid.proton.jms.EncodedMessage;
-import org.apache.qpid.proton.jms.InboundTransformer;
-import org.apache.qpid.proton.jms.JMSMappingInboundTransformer;
-import org.apache.qpid.proton.jms.JMSMappingOutboundTransformer;
+import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
+import org.apache.activemq.transport.amqp.message.EncodedMessage;
+import org.apache.activemq.transport.amqp.message.InboundTransformer;
+import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer;
+import org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer;
 import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMessage;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.utils.IDGenerator;
 
+import javax.jms.BytesMessage;
+import java.io.IOException;
+
 public class ProtonMessageConverter implements MessageConverter {
 
    ActiveMQJMSVendor activeMQJMSVendor;
 
+   private final String prefixVendor;
+
    public ProtonMessageConverter(IDGenerator idGenerator) {
       activeMQJMSVendor = new ActiveMQJMSVendor(idGenerator);
       inboundTransformer = new JMSMappingInboundTransformer(activeMQJMSVendor);
       outboundTransformer = new JMSMappingOutboundTransformer(activeMQJMSVendor);
+      prefixVendor = outboundTransformer.getPrefixVendor();
    }
 
    private final InboundTransformer inboundTransformer;
@@ -50,11 +57,30 @@ public class ProtonMessageConverter implements MessageConverter {
     *
     * @param messageSource
     * @return
-    * @throws Exception
+    * @throws Exception                    https://issues.jboss.org/browse/ENTMQ-1560
     */
    public ServerJMSMessage inboundJMSType(EncodedMessage messageSource) throws Exception {
       EncodedMessage encodedMessageSource = messageSource;
-      ServerJMSMessage transformedMessage = (ServerJMSMessage) inboundTransformer.transform(encodedMessageSource);
+      ServerJMSMessage transformedMessage = null;
+
+      InboundTransformer transformer = inboundTransformer;
+
+      while (transformer != null) {
+         try {
+            transformedMessage = (ServerJMSMessage) transformer.transform(encodedMessageSource);
+            break;
+         }
+         catch (Exception e) {
+            ActiveMQClientLogger.LOGGER.debug("Transform of message using [{}] transformer, failed" + inboundTransformer.getTransformerName());
+            ActiveMQClientLogger.LOGGER.trace("Transformation error:", e);
+
+            transformer = transformer.getFallbackTransformer();
+         }
+      }
+
+      if (transformedMessage == null) {
+         throw new IOException("Failed to transform incoming delivery, skipping.");
+      }
 
       transformedMessage.encode();
 
@@ -64,8 +90,19 @@ public class ProtonMessageConverter implements MessageConverter {
    @Override
    public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception {
       ServerJMSMessage jmsMessage = activeMQJMSVendor.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount);
+
       jmsMessage.decode();
 
-      return outboundTransformer.convert(jmsMessage);
+      if (jmsMessage.getBooleanProperty(prefixVendor + "NATIVE")) {
+         if (jmsMessage instanceof BytesMessage) {
+            return AMQPNativeOutboundTransformer.transform(outboundTransformer, (BytesMessage) jmsMessage);
+         }
+         else {
+            return null;
+         }
+      }
+      else {
+         return outboundTransformer.convert(jmsMessage);
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 2dccc30..ccd2b7e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -24,13 +24,13 @@ import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.apache.activemq.transport.amqp.message.EncodedMessage;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.jms.EncodedMessage;
 import org.apache.qpid.proton.message.ProtonJMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java
index de514bb..fc9fe2c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java
@@ -26,12 +26,12 @@ import java.util.Map;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.transport.amqp.message.EncodedMessage;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
 import org.apache.qpid.proton.amqp.messaging.Data;
-import org.apache.qpid.proton.jms.EncodedMessage;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.ProtonJMessage;
 import org.apache.qpid.proton.message.impl.MessageImpl;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 292aa14..47b58e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -408,15 +408,21 @@
             <!-- License: Apache 2.0 -->
          </dependency>
          <dependency>
-            <groupId>org.apache.qpid</groupId>
-            <artifactId>proton-jms</artifactId>
-            <version>${proton.version}</version>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-client</artifactId>
+            <version>${activemq5-version}</version>
             <!-- License: Apache 2.0 -->
          </dependency>
          <dependency>
             <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-client</artifactId>
+            <artifactId>activemq-amqp</artifactId>
             <version>${activemq5-version}</version>
+            <exclusions>
+               <exclusion>
+                  <groupId>org.apache.geronimo.specs</groupId>
+                  <artifactId>geronimo-jms_1.1_spec</artifactId>
+               </exclusion>
+            </exclusions>
             <!-- License: Apache 2.0 -->
          </dependency>
          <dependency>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/tests/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index 6c38b7c..f0e1d14 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -194,10 +194,6 @@
           <groupId>org.apache.qpid</groupId>
           <artifactId>proton-j</artifactId>
        </dependency>
-       <dependency>
-          <groupId>org.apache.qpid</groupId>
-          <artifactId>proton-jms</artifactId>
-       </dependency>
       <dependency>
          <groupId>org.apache.qpid</groupId>
          <artifactId>qpid-client</artifactId>