You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2017/09/20 21:05:55 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1424 Openwire not work with different tightEncoding options

Repository: activemq-artemis
Updated Branches:
  refs/heads/master bdb198423 -> e359f4bfd


ARTEMIS-1424 Openwire not work with different tightEncoding options

If message senders and receivers uses different
wireformat.tightEncodingEnabled options, broker will get marshalling
problem. This is because when openwire messages are converted to
core messages, and later these core messages converted to openwire
messages, the broker uses a mashaller that comes with the connection
used to carry the messages.

For example, if a producer sents a message using option "wireformat
.tightEncodingEnabled=false" and a receiver tries to receive it
using 'true' for the same option, it'll never get it because the
broker will fail to use a "tight encoding" marshaller to
decode a 'loose encoded' message.

To fix the problem, we always use 'tight encoding' for internal
message converters.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f84d26eb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f84d26eb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f84d26eb

Branch: refs/heads/master
Commit: f84d26ebb26b0c424ae90f994fc1aa673bbad88f
Parents: bdb1984
Author: Howard Gao <ho...@gmail.com>
Authored: Tue Sep 19 07:47:51 2017 +0800
Committer: Howard Gao <ho...@gmail.com>
Committed: Tue Sep 19 07:50:24 2017 +0800

----------------------------------------------------------------------
 .../openwire/OpenWireMessageConverter.java      |  7 +-
 .../openwire/OpenWireProtocolManager.java       | 12 ++--
 .../core/protocol/openwire/amq/AMQConsumer.java |  7 +-
 .../core/protocol/openwire/amq/AMQSession.java  |  7 +-
 .../integration/openwire/BasicOpenWireTest.java |  3 +
 .../openwire/SimpleOpenWireTest.java            | 67 ++++++++++++++++++++
 6 files changed, 83 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f84d26eb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 508bac9..88f90ee 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -423,10 +423,10 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       }
    }
 
-   public static MessageDispatch createMessageDispatch(MessageReference reference,
+   public MessageDispatch createMessageDispatch(MessageReference reference,
                                                        ICoreMessage message,
                                                        AMQConsumer consumer) throws IOException, JMSException {
-      ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getMarshaller(), consumer.getOpenwireDestination());
+      ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getOpenwireDestination());
 
       //we can use core message id for sequenceId
       amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
@@ -441,9 +441,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       return md;
    }
 
-   private static ActiveMQMessage toAMQMessage(MessageReference reference,
+   private ActiveMQMessage toAMQMessage(MessageReference reference,
                                                ICoreMessage coreMessage,
-                                               WireFormat marshaller,
                                                ActiveMQDestination actualDestination) throws IOException {
       ActiveMQMessage amqMsg = null;
       byte coreType = coreMessage.getType();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f84d26eb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 237789f..b552c35 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -119,7 +119,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
    private long maxInactivityDurationInitalDelay = 10 * 1000L;
    private boolean useKeepAlive = true;
 
-   private final OpenWireMessageConverter messageConverter;
+   private final OpenWireMessageConverter internalConverter;
 
    private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
 
@@ -131,7 +131,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
       wireFactory.setCacheEnabled(false);
       advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
       scheduledPool = server.getScheduledPool();
-      this.messageConverter = new OpenWireMessageConverter(wireFactory.createWireFormat());
+      this.internalConverter = new OpenWireMessageConverter(wireFactory.createWireFormat());
 
       final ClusterManager clusterManager = this.server.getClusterManager();
 
@@ -142,10 +142,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
       }
    }
 
-   public OpenWireFormat getNewWireFormat() {
-      return (OpenWireFormat) wireFactory.createWireFormat();
-   }
-
    @Override
    public void nodeUP(TopologyMember member, boolean last) {
       if (topologyMap.put(member.getNodeId(), member) == null) {
@@ -583,4 +579,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
       }
       return total;
    }
+
+   public OpenWireMessageConverter getInternalConverter() {
+      return internalConverter;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f84d26eb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 969d9ae..57506a2 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -51,7 +51,6 @@ import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.RemoveInfo;
-import org.apache.activemq.wireformat.WireFormat;
 
 public class AMQConsumer {
    private AMQSession session;
@@ -186,10 +185,6 @@ public class AMQConsumer {
       return info.getConsumerId();
    }
 
-   public WireFormat getMarshaller() {
-      return this.session.getMarshaller();
-   }
-
    public void acquireCredit(int n) throws Exception {
       if (messagePullHandler != null) {
          //don't acquire any credits when the pull handler controls it!!
@@ -217,7 +212,7 @@ public class AMQConsumer {
             //so we need to remove this property too.
             message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
          }
-         dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, this);
+         dispatch = session.getConverter().createMessageDispatch(reference, message, this);
          int size = dispatch.getMessage().getSize();
          reference.setProtocolData(dispatch.getMessage().getMessageId());
          session.deliverMessage(dispatch);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f84d26eb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 9b6670e..330ac35 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -54,7 +54,6 @@ import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.wireformat.WireFormat;
 import org.jboss.logging.Logger;
 
 import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD;
@@ -104,7 +103,7 @@ public class AMQSession implements SessionCallback {
    }
 
    public OpenWireMessageConverter getConverter() {
-      return converter;
+      return protocolManager.getInternalConverter();
    }
 
    public void initialize() {
@@ -436,11 +435,11 @@ public class AMQSession implements SessionCallback {
    public ActiveMQServer getCoreServer() {
       return this.server;
    }
-
+/*
    public WireFormat getMarshaller() {
       return this.connection.getMarshaller();
    }
-
+*/
    public ConnectionInfo getConnectionInfo() {
       return this.connInfo;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f84d26eb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
index e24b632..7e82764 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
@@ -45,7 +45,9 @@ public class BasicOpenWireTest extends OpenWireTestBase {
    public TestName name = new TestName();
 
    protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true";
+   protected static final String urlStringLoose = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.tightEncodingEnabled=false";
    protected ActiveMQConnectionFactory factory;
+   protected ActiveMQConnectionFactory looseFactory;
    protected ActiveMQXAConnectionFactory xaFactory;
 
    protected ActiveMQConnection connection;
@@ -85,6 +87,7 @@ public class BasicOpenWireTest extends OpenWireTestBase {
 
    protected void createFactories() {
       factory = new ActiveMQConnectionFactory(getConnectionUrl());
+      looseFactory = new ActiveMQConnectionFactory(urlStringLoose);
       xaFactory = new ActiveMQXAConnectionFactory(getConnectionUrl());
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f84d26eb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
index 5521814..9e3af50 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
@@ -367,6 +367,73 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
       session.close();
    }
 
+   @Test
+   public void testSendReceiveDifferentEncoding() throws Exception {
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      System.out.println("creating queue: " + queueName);
+      Destination dest = new ActiveMQQueue(queueName);
+
+      System.out.println("creating producer...");
+      MessageProducer producer = session.createProducer(dest);
+
+      final int num = 10;
+      final String msgBase = "MfromAMQ-";
+      for (int i = 0; i < num; i++) {
+         TextMessage msg = session.createTextMessage(msgBase + i);
+         producer.send(msg);
+         System.out.println("sent: ");
+      }
+
+      //receive loose
+      ActiveMQConnection looseConn = (ActiveMQConnection) looseFactory.createConnection();
+      try {
+         looseConn.start();
+         Session looseSession = looseConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer looseConsumer = looseSession.createConsumer(dest);
+
+         System.out.println("receiving messages...");
+         for (int i = 0; i < num; i++) {
+            TextMessage msg = (TextMessage) looseConsumer.receive(5000);
+            System.out.println("received: " + msg);
+            String content = msg.getText();
+            System.out.println("content: " + content);
+            assertEquals(msgBase + i, content);
+         }
+
+         assertNull(looseConsumer.receive(1000));
+         looseConsumer.close();
+
+         //now reverse
+
+         MessageProducer looseProducer = looseSession.createProducer(dest);
+         for (int i = 0; i < num; i++) {
+            TextMessage msg = looseSession.createTextMessage(msgBase + i);
+            looseProducer.send(msg);
+            System.out.println("sent: ");
+         }
+
+         MessageConsumer consumer = session.createConsumer(dest);
+         System.out.println("receiving messages...");
+         for (int i = 0; i < num; i++) {
+            TextMessage msg = (TextMessage) consumer.receive(5000);
+            System.out.println("received: " + msg);
+            assertNotNull(msg);
+            String content = msg.getText();
+            System.out.println("content: " + content);
+            assertEquals(msgBase + i, content);
+         }
+
+         assertNull(consumer.receive(1000));
+
+         session.close();
+         looseSession.close();
+      } finally {
+         looseConn.close();
+      }
+   }
+
    //   @Test -- ignored for now
    public void testKeepAlive() throws Exception {
       connection.start();


[2/2] activemq-artemis git commit: This closes #1542

Posted by ta...@apache.org.
This closes #1542


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e359f4bf
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e359f4bf
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e359f4bf

Branch: refs/heads/master
Commit: e359f4bfd18d2fe8baac1ddd357f7e24b3151e38
Parents: bdb1984 f84d26e
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 20 17:05:29 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 20 17:05:29 2017 -0400

----------------------------------------------------------------------
 .../openwire/OpenWireMessageConverter.java      |  7 +-
 .../openwire/OpenWireProtocolManager.java       | 12 ++--
 .../core/protocol/openwire/amq/AMQConsumer.java |  7 +-
 .../core/protocol/openwire/amq/AMQSession.java  |  7 +-
 .../integration/openwire/BasicOpenWireTest.java |  3 +
 .../openwire/SimpleOpenWireTest.java            | 67 ++++++++++++++++++++
 6 files changed, 83 insertions(+), 20 deletions(-)
----------------------------------------------------------------------