You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/08/30 21:25:36 UTC
[2/2] activemq-artemis git commit: ARTEMIS-696 Broker fails when
client sends messages in multiple transfer frames
ARTEMIS-696 Broker fails when client sends messages in multiple transfer frames
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8fccd5df
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8fccd5df
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8fccd5df
Branch: refs/heads/master
Commit: 8fccd5df4265270566c5747bff6af253f62049b1
Parents: b51142a
Author: Howard Gao <ho...@gmail.com>
Authored: Wed Aug 24 21:54:08 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Aug 30 17:24:31 2016 -0400
----------------------------------------------------------------------
.../protocol/proton/ProtonProtocolManager.java | 11 ++-
.../server/ProtonServerReceiverContext.java | 5 +-
.../proton/ProtonMaxFrameSizeTest.java | 97 ++++++++++++++++++++
.../integration/proton/ProtonPubSubTest.java | 25 +----
.../tests/integration/proton/ProtonTest.java | 24 +----
.../integration/proton/ProtonTestBase.java | 78 ++++++++++++++++
6 files changed, 195 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8fccd5df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
index 163fc6b..3567307 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
@@ -64,6 +64,8 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
* */
private String pubSubPrefix = ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX;
+ private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
+
public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) {
this.factory = factory;
this.server = server;
@@ -111,7 +113,7 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
String id = server.getConfiguration().getName();
AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().
- createConnection(connectionCallback, id, (int) ttl, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
+ createConnection(connectionCallback, id, (int) ttl, getMaxFrameSize(), DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
Executor executor = server.getExecutorFactory().getExecutor();
@@ -164,4 +166,11 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
}
+ public int getMaxFrameSize() {
+ return maxFrameSize;
+ }
+
+ public void setMaxFrameSize(int maxFrameSize) {
+ this.maxFrameSize = maxFrameSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8fccd5df/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
index 0bbe8ca..c564a9e 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
@@ -116,7 +116,10 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext {
receiver = ((Receiver) delivery.getLink());
if (!delivery.isReadable()) {
- System.err.println("!!!!! Readable!!!!!!!");
+ return;
+ }
+
+ if (delivery.isPartial()) {
return;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8fccd5df/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonMaxFrameSizeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonMaxFrameSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonMaxFrameSizeTest.java
new file mode 100644
index 0000000..658ea81
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonMaxFrameSizeTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.proton;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class ProtonMaxFrameSizeTest extends ProtonTestBase {
+
+ private static final int FRAME_SIZE = 512;
+
+ protected void configureAmqp(Map<String, Object> params) {
+ params.put("maxFrameSize", FRAME_SIZE);
+ }
+
+ @Test
+ public void testMultipleTransfers() throws Exception {
+
+ String testQueueName = "ConnectionFrameSize";
+ int nMsgs = 200;
+
+ AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+
+
+ AmqpConnection amqpConnection = client.createConnection();
+
+ try {
+ amqpConnection.connect();
+
+ AmqpSession session = amqpConnection.createSession();
+ AmqpSender sender = session.createSender("jms.queue." + testQueueName);
+
+ final int payload = FRAME_SIZE * 16;
+
+ for (int i = 0; i < nMsgs; ++i) {
+ AmqpMessage message = createAmqpMessage((byte) 'A', payload);
+ sender.send(message);
+ }
+
+ int count = getMessageCount(server.getPostOffice(), "jms.queue." + testQueueName);
+ assertEquals(nMsgs, count);
+
+ AmqpReceiver receiver = session.createReceiver("jms.queue." + testQueueName);
+ receiver.flow(nMsgs);
+
+ for (int i = 0; i < nMsgs; ++i) {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull("failed at " + i, message);
+ MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
+ Data data = (Data) wrapped.getBody();
+ System.out.println("received : message: " + data.getValue().getLength());
+ assertEquals(payload, data.getValue().getLength());
+ message.accept();
+ }
+
+ }
+ finally {
+ amqpConnection.close();
+ }
+ }
+
+ private AmqpMessage createAmqpMessage(byte value, int payloadSize) {
+ AmqpMessage message = new AmqpMessage();
+ byte[] payload = new byte[payloadSize];
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = value;
+ }
+ message.setBytes(payload);
+ return message;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8fccd5df/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java
index bf4e38c..a0fe626 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java
@@ -17,10 +17,6 @@
package org.apache.activemq.artemis.tests.integration.proton;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.message.ProtonJMessage;
@@ -45,36 +41,27 @@ import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
-import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
-public class ProtonPubSubTest extends ActiveMQTestBase {
+public class ProtonPubSubTest extends ProtonTestBase {
private final String prefix = "foo.bar.";
private final String pubAddress = "pubAddress";
private final String prefixedPubAddress = prefix + "pubAddress";
private final SimpleString ssPubAddress = new SimpleString(pubAddress);
private final SimpleString ssprefixedPubAddress = new SimpleString(prefixedPubAddress);
- private ActiveMQServer server;
private Connection connection;
private JmsConnectionFactory factory;
+ protected void configureAmqp(Map<String, Object> params) {
+ params.put("pubSubPrefix", prefix);
+ }
@Override
@Before
public void setUp() throws Exception {
super.setUp();
- disableCheckThread();
- server = this.createServer(true, true);
- HashMap<String, Object> params = new HashMap<>();
- params.put(TransportConstants.PORT_PROP_NAME, "5672");
- params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
- HashMap<String, Object> extraParams = new HashMap<>();
- extraParams.put("pubSubPrefix", prefix);
- TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "foo", extraParams);
-
- server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
- server.start();
server.createQueue(ssPubAddress, ssPubAddress, new SimpleString("foo=bar"), false, true);
server.createQueue(ssprefixedPubAddress, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true);
factory = new JmsConnectionFactory("amqp://localhost:5672");
@@ -97,8 +84,6 @@ public class ProtonPubSubTest extends ActiveMQTestBase {
if (connection != null) {
connection.close();
}
-
- server.stop();
}
finally {
super.tearDown();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8fccd5df/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index b197bc6..711f6ff 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -51,13 +51,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -88,7 +85,7 @@ import org.proton.plug.test.Constants;
import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
@RunWith(Parameterized.class)
-public class ProtonTest extends ActiveMQTestBase {
+public class ProtonTest extends ProtonTestBase {
private static final String amqpConnectionUri = "amqp://localhost:5672";
@@ -131,7 +128,6 @@ public class ProtonTest extends ActiveMQTestBase {
}
}
- private ActiveMQServer server;
private final String coreAddress;
private final String address;
private Connection connection;
@@ -140,23 +136,7 @@ public class ProtonTest extends ActiveMQTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
- disableCheckThread();
- server = this.createServer(true, true);
- HashMap<String, Object> params = new HashMap<>();
- params.put(TransportConstants.PORT_PROP_NAME, "5672");
- params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
- TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
-
- server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
- server.getConfiguration().setName(brokerName);
-
- // Default Page
- AddressSettings addressSettings = new AddressSettings();
- addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
- server.getConfiguration().getAddressesSettings().put("#", addressSettings);
-
- server.start();
server.createQueue(new SimpleString(coreAddress), new SimpleString(coreAddress), null, true, false);
server.createQueue(new SimpleString(coreAddress + "1"), new SimpleString(coreAddress + "1"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "2"), new SimpleString(coreAddress + "2"), null, true, false);
@@ -191,8 +171,6 @@ public class ProtonTest extends ActiveMQTestBase {
if (connection != null) {
connection.close();
}
-
- server.stop();
}
finally {
super.tearDown();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8fccd5df/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java
new file mode 100644
index 0000000..0acd4ae
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.proton;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ProtonTestBase extends ActiveMQTestBase {
+
+ protected String brokerName = "my-broker";
+ protected ActiveMQServer server;
+
+ protected String tcpAmqpConnectionUri = "tcp://localhost:5672";
+ protected String userName = "guest";
+ protected String password = "guest";
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ disableCheckThread();
+
+ server = this.createServer(true, true);
+ HashMap<String, Object> params = new HashMap<>();
+ params.put(TransportConstants.PORT_PROP_NAME, "5672");
+ params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
+ HashMap<String, Object> amqpParams = new HashMap<>();
+ configureAmqp(amqpParams);
+ TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "amqp-acceptor", amqpParams);
+
+ server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
+ server.getConfiguration().setName(brokerName);
+
+ // Default Page
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ server.getConfiguration().getAddressesSettings().put("#", addressSettings);
+
+ server.start();
+ }
+
+ protected void configureAmqp(Map<String, Object> params) {
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ try {
+ server.stop();
+ }
+ finally {
+ super.tearDown();
+ }
+ }
+}