You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/08/30 21:25:36 UTC

[2/2] activemq-artemis git commit: ARTEMIS-696 Broker fails when client sends messages in multiple transfer frames

ARTEMIS-696 Broker fails when client sends messages in multiple transfer frames


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8fccd5df
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8fccd5df
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8fccd5df

Branch: refs/heads/master
Commit: 8fccd5df4265270566c5747bff6af253f62049b1
Parents: b51142a
Author: Howard Gao <ho...@gmail.com>
Authored: Wed Aug 24 21:54:08 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Aug 30 17:24:31 2016 -0400

----------------------------------------------------------------------
 .../protocol/proton/ProtonProtocolManager.java  | 11 ++-
 .../server/ProtonServerReceiverContext.java     |  5 +-
 .../proton/ProtonMaxFrameSizeTest.java          | 97 ++++++++++++++++++++
 .../integration/proton/ProtonPubSubTest.java    | 25 +----
 .../tests/integration/proton/ProtonTest.java    | 24 +----
 .../integration/proton/ProtonTestBase.java      | 78 ++++++++++++++++
 6 files changed, 195 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8fccd5df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
index 163fc6b..3567307 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
@@ -64,6 +64,8 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
    * */
    private String pubSubPrefix = ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX;
 
+   private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
+
    public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) {
       this.factory = factory;
       this.server = server;
@@ -111,7 +113,7 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
 
       String id = server.getConfiguration().getName();
       AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().
-         createConnection(connectionCallback, id, (int) ttl, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
+         createConnection(connectionCallback, id, (int) ttl, getMaxFrameSize(), DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
 
       Executor executor = server.getExecutorFactory().getExecutor();
 
@@ -164,4 +166,11 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
    }
 
 
+   public int getMaxFrameSize() {
+      return maxFrameSize;
+   }
+
+   public void setMaxFrameSize(int maxFrameSize) {
+      this.maxFrameSize = maxFrameSize;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8fccd5df/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
index 0bbe8ca..c564a9e 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
@@ -116,7 +116,10 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext {
          receiver = ((Receiver) delivery.getLink());
 
          if (!delivery.isReadable()) {
-            System.err.println("!!!!! Readable!!!!!!!");
+            return;
+         }
+
+         if (delivery.isPartial()) {
             return;
          }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8fccd5df/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonMaxFrameSizeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonMaxFrameSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonMaxFrameSizeTest.java
new file mode 100644
index 0000000..658ea81
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonMaxFrameSizeTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.proton;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class ProtonMaxFrameSizeTest extends ProtonTestBase {
+
+   private static final int FRAME_SIZE = 512;
+
+   protected void configureAmqp(Map<String, Object> params) {
+      params.put("maxFrameSize", FRAME_SIZE);
+   }
+
+   @Test
+   public void testMultipleTransfers() throws Exception {
+
+      String testQueueName = "ConnectionFrameSize";
+      int nMsgs = 200;
+
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+
+
+      AmqpConnection amqpConnection = client.createConnection();
+
+      try {
+         amqpConnection.connect();
+
+         AmqpSession session = amqpConnection.createSession();
+         AmqpSender sender = session.createSender("jms.queue." + testQueueName);
+
+         final int payload = FRAME_SIZE * 16;
+
+         for (int i = 0; i < nMsgs; ++i) {
+            AmqpMessage message = createAmqpMessage((byte) 'A', payload);
+            sender.send(message);
+         }
+
+         int count = getMessageCount(server.getPostOffice(), "jms.queue." + testQueueName);
+         assertEquals(nMsgs, count);
+
+         AmqpReceiver receiver = session.createReceiver("jms.queue." + testQueueName);
+         receiver.flow(nMsgs);
+
+         for (int i = 0; i < nMsgs; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull("failed at " + i, message);
+            MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
+            Data data = (Data) wrapped.getBody();
+            System.out.println("received : message: " + data.getValue().getLength());
+            assertEquals(payload, data.getValue().getLength());
+            message.accept();
+         }
+
+      }
+      finally {
+         amqpConnection.close();
+      }
+   }
+
+   private AmqpMessage createAmqpMessage(byte value, int payloadSize) {
+      AmqpMessage message = new AmqpMessage();
+      byte[] payload = new byte[payloadSize];
+      for (int i = 0; i < payload.length; i++) {
+         payload[i] = value;
+      }
+      message.setBytes(payload);
+      return message;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8fccd5df/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java
index bf4e38c..a0fe626 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java
@@ -17,10 +17,6 @@
 package org.apache.activemq.artemis.tests.integration.proton;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.message.ProtonJMessage;
@@ -45,36 +41,27 @@ import javax.jms.Topic;
 import javax.jms.TopicConnection;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
-import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 
-public class ProtonPubSubTest extends ActiveMQTestBase {
+public class ProtonPubSubTest extends ProtonTestBase {
    private final String prefix = "foo.bar.";
    private final String pubAddress = "pubAddress";
    private final String prefixedPubAddress = prefix + "pubAddress";
    private final SimpleString ssPubAddress = new SimpleString(pubAddress);
    private final SimpleString ssprefixedPubAddress = new SimpleString(prefixedPubAddress);
-   private ActiveMQServer server;
    private Connection connection;
    private JmsConnectionFactory factory;
 
+   protected void configureAmqp(Map<String, Object> params) {
+      params.put("pubSubPrefix", prefix);
+   }
 
    @Override
    @Before
    public void setUp() throws Exception {
       super.setUp();
-      disableCheckThread();
-      server = this.createServer(true, true);
-      HashMap<String, Object> params = new HashMap<>();
-      params.put(TransportConstants.PORT_PROP_NAME, "5672");
-      params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
-      HashMap<String, Object> extraParams = new HashMap<>();
-      extraParams.put("pubSubPrefix", prefix);
-      TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "foo", extraParams);
-
-      server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
-      server.start();
       server.createQueue(ssPubAddress, ssPubAddress, new SimpleString("foo=bar"), false, true);
       server.createQueue(ssprefixedPubAddress, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true);
       factory = new JmsConnectionFactory("amqp://localhost:5672");
@@ -97,8 +84,6 @@ public class ProtonPubSubTest extends ActiveMQTestBase {
          if (connection != null) {
             connection.close();
          }
-
-         server.stop();
       }
       finally {
          super.tearDown();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8fccd5df/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index b197bc6..711f6ff 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -51,13 +51,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.VersionLoader;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -88,7 +85,7 @@ import org.proton.plug.test.Constants;
 import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
 
 @RunWith(Parameterized.class)
-public class ProtonTest extends ActiveMQTestBase {
+public class ProtonTest extends ProtonTestBase {
 
    private static final String amqpConnectionUri = "amqp://localhost:5672";
 
@@ -131,7 +128,6 @@ public class ProtonTest extends ActiveMQTestBase {
       }
    }
 
-   private ActiveMQServer server;
    private final String coreAddress;
    private final String address;
    private Connection connection;
@@ -140,23 +136,7 @@ public class ProtonTest extends ActiveMQTestBase {
    @Before
    public void setUp() throws Exception {
       super.setUp();
-      disableCheckThread();
 
-      server = this.createServer(true, true);
-      HashMap<String, Object> params = new HashMap<>();
-      params.put(TransportConstants.PORT_PROP_NAME, "5672");
-      params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
-      TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
-
-      server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
-      server.getConfiguration().setName(brokerName);
-
-      // Default Page
-      AddressSettings addressSettings = new AddressSettings();
-      addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
-      server.getConfiguration().getAddressesSettings().put("#", addressSettings);
-
-      server.start();
       server.createQueue(new SimpleString(coreAddress), new SimpleString(coreAddress), null, true, false);
       server.createQueue(new SimpleString(coreAddress + "1"), new SimpleString(coreAddress + "1"), null, true, false);
       server.createQueue(new SimpleString(coreAddress + "2"), new SimpleString(coreAddress + "2"), null, true, false);
@@ -191,8 +171,6 @@ public class ProtonTest extends ActiveMQTestBase {
          if (connection != null) {
             connection.close();
          }
-
-         server.stop();
       }
       finally {
          super.tearDown();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8fccd5df/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java
new file mode 100644
index 0000000..0acd4ae
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.proton;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ProtonTestBase extends ActiveMQTestBase {
+
+   protected String brokerName = "my-broker";
+   protected ActiveMQServer server;
+
+   protected String tcpAmqpConnectionUri = "tcp://localhost:5672";
+   protected String userName = "guest";
+   protected String password = "guest";
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      disableCheckThread();
+
+      server = this.createServer(true, true);
+      HashMap<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.PORT_PROP_NAME, "5672");
+      params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
+      HashMap<String, Object> amqpParams = new HashMap<>();
+      configureAmqp(amqpParams);
+      TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "amqp-acceptor", amqpParams);
+
+      server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
+      server.getConfiguration().setName(brokerName);
+
+      // Default Page
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      server.getConfiguration().getAddressesSettings().put("#", addressSettings);
+
+      server.start();
+   }
+
+   protected void configureAmqp(Map<String, Object> params) {
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      try {
+         server.stop();
+      }
+      finally {
+         super.tearDown();
+      }
+   }
+}