You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/12/21 17:06:58 UTC
[3/5] activemq-artemis git commit: ARTEMIS-883 Fix OpenWire
ProducerFlowControlTest
ARTEMIS-883 Fix OpenWire ProducerFlowControlTest
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ae90edfd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ae90edfd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ae90edfd
Branch: refs/heads/master
Commit: ae90edfdb624e1cdf4a4b36cbf82dbc7a1be8295
Parents: 0b131bd
Author: Howard Gao <ho...@gmail.com>
Authored: Tue Dec 20 21:13:38 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Dec 21 12:04:06 2016 -0500
----------------------------------------------------------------------
.../protocol/openwire/OpenWireConnection.java | 18 ++
.../core/protocol/openwire/amq/AMQSession.java | 169 +++++++++----------
.../integration/openwire/BasicOpenWireTest.java | 11 +-
.../openwire/amq/ProducerBlockingTtlTest.java | 147 ++++++++++++++++
4 files changed, 255 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae90edfd/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index f90c0b7..0bcff66 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -183,6 +183,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<>();
+ private AtomicBoolean disableTtl = new AtomicBoolean(false);
+
// TODO-NOW: check on why there are two connections created for every createConnection on the client.
public OpenWireConnection(Connection connection,
ActiveMQServer server,
@@ -776,6 +778,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
this.connectionEntry = connectionEntry;
}
+ @Override
+ public boolean checkDataReceived() {
+ if (disableTtl.get()) {
+ return true;
+ }
+ return super.checkDataReceived();
+ }
+
public void setUpTtl(final long inactivityDuration,
final long inactivityDurationInitialDelay,
final boolean useKeepAlive) {
@@ -818,6 +828,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
+ public void disableTtl() {
+ disableTtl.set(true);
+ }
+
+ public void enableTtl() {
+ disableTtl.set(false);
+ }
+
class SlowConsumerDetection implements SlowConsumerDetectionListener {
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae90edfd/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 9c592ca..006f05e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -40,9 +40,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
-import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
@@ -53,12 +51,12 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.wireformat.WireFormat;
public class AMQSession implements SessionCallback {
-
// ConsumerID is generated inside the session, 0, 1, 2, ... as many consumers as you have on the session
protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
@@ -303,108 +301,103 @@ public class AMQSession implements SessionCallback {
originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
}
- Runnable runnable;
-
- if (sendProducerAck) {
- runnable = new Runnable() {
- @Override
- public void run() {
- try {
- ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
- connection.dispatchSync(ack);
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
- connection.sendException(e);
- }
-
- }
- };
- } else {
- final Connection transportConnection = connection.getTransportConnection();
-
- if (transportConnection == null) {
- // I don't think this could happen, but just in case, avoiding races
- runnable = null;
- } else {
- runnable = new Runnable() {
- @Override
- public void run() {
- transportConnection.setAutoRead(true);
- }
- };
- }
- }
-
- internalSend(actualDestinations, originalCoreMsg, runnable);
- }
+ boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
- private void internalSend(ActiveMQDestination[] actualDestinations,
- ServerMessage originalCoreMsg,
- final Runnable onComplete) throws Exception {
+ final AtomicInteger count = new AtomicInteger(actualDestinations.length);
- Runnable runToUse;
+ final Exception[] anyException = new Exception[] {null};
- if (actualDestinations.length <= 1 || onComplete == null) {
- // if onComplete is null, this will be null ;)
- runToUse = onComplete;
- } else {
- final AtomicInteger count = new AtomicInteger(actualDestinations.length);
- runToUse = new Runnable() {
- @Override
- public void run() {
- if (count.decrementAndGet() == 0) {
- onComplete.run();
- }
- }
- };
+ if (shouldBlockProducer) {
+ connection.getContext().setDontSendReponse(true);
}
- SimpleString[] addresses = new SimpleString[actualDestinations.length];
- PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
-
- // We fillup addresses, pagingStores and we will throw failure if that's the case
for (int i = 0; i < actualDestinations.length; i++) {
ActiveMQDestination dest = actualDestinations[i];
- addresses[i] = new SimpleString(dest.getPhysicalName());
- pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
- if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
- throw new ResourceAllocationException("Queue is full");
- }
- }
-
- for (int i = 0; i < actualDestinations.length; i++) {
-
+ SimpleString address = new SimpleString(dest.getPhysicalName());
ServerMessage coreMsg = originalCoreMsg.copy();
-
- coreMsg.setAddress(addresses[i]);
-
- PagingStore store = pagingStores[i];
-
- if (store.isFull()) {
- connection.getTransportConnection().setAutoRead(false);
- }
+ coreMsg.setAddress(address);
if (actualDestinations[i].isQueue()) {
checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
- }
-
- if (actualDestinations[i].isQueue()) {
coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
} else {
coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
}
- RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
+ PagingStore store = server.getPagingManager().getPageStore(address);
- if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
- throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
- }
- if (runToUse != null) {
- // if the timeout is >0, it will wait this much milliseconds
- // before running the the runToUse
- // this will eventually unblock blocked destinations
- // playing flow control
- store.checkMemory(runToUse);
+ this.connection.disableTtl();
+ if (shouldBlockProducer) {
+ if (!store.checkMemory(() -> {
+ try {
+ RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
+
+ if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
+ throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
+ }
+ } catch (Exception e) {
+ if (anyException[0] == null) {
+ anyException[0] = e;
+ }
+ }
+ connection.enableTtl();
+ if (count.decrementAndGet() == 0) {
+ if (anyException[0] != null) {
+ this.connection.getContext().setDontSendReponse(false);
+ ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
+ connection.sendException(anyException[0]);
+ } else {
+ if (sendProducerAck) {
+ try {
+ ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+ connection.dispatchAsync(ack);
+ } catch (Exception e) {
+ this.connection.getContext().setDontSendReponse(false);
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ connection.sendException(e);
+ }
+ } else {
+ connection.getContext().setDontSendReponse(false);
+ try {
+ Response response = new Response();
+ response.setCorrelationId(messageSend.getCommandId());
+ connection.dispatchAsync(response);
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ connection.sendException(e);
+ }
+ }
+ }
+ }
+ })) {
+ this.connection.getContext().setDontSendReponse(false);
+ connection.enableTtl();
+ throw new ResourceAllocationException("Queue is full " + address);
+ }
+ } else {
+ //non-persistent messages goes here, by default we stop reading from
+ //transport
+ connection.getTransportConnection().setAutoRead(false);
+ if (!store.checkMemory(() -> {
+ connection.getTransportConnection().setAutoRead(true);
+ connection.enableTtl();
+ })) {
+ connection.getTransportConnection().setAutoRead(true);
+ connection.enableTtl();
+ throw new ResourceAllocationException("Queue is full " + address);
+ }
+
+ RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
+ if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
+ throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
+ }
+
+ if (count.decrementAndGet() == 0) {
+ if (sendProducerAck) {
+ ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+ connection.dispatchAsync(ack);
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae90edfd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
index 6be92f8..d01e237 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java
@@ -45,8 +45,8 @@ public class BasicOpenWireTest extends OpenWireTestBase {
public TestName name = new TestName();
protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true";
- protected ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
- protected ActiveMQXAConnectionFactory xaFactory = new ActiveMQXAConnectionFactory(urlString);
+ protected ActiveMQConnectionFactory factory;
+ protected ActiveMQXAConnectionFactory xaFactory;
protected ActiveMQConnection connection;
protected String topicName = "amqTestTopic1";
@@ -64,6 +64,9 @@ public class BasicOpenWireTest extends OpenWireTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
+ System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", "5");
+ factory = new ActiveMQConnectionFactory(getConnectionUrl());
+ xaFactory = new ActiveMQXAConnectionFactory(getConnectionUrl());
SimpleString coreQueue = new SimpleString(queueName);
this.server.createQueue(coreQueue, RoutingType.ANYCAST, coreQueue, null, false, false, -1, false, true);
testQueues.put(queueName, coreQueue);
@@ -81,6 +84,10 @@ public class BasicOpenWireTest extends OpenWireTestBase {
}
}
+ protected String getConnectionUrl() {
+ return urlString;
+ }
+
@Override
@After
public void tearDown() throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae90edfd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerBlockingTtlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerBlockingTtlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerBlockingTtlTest.java
new file mode 100644
index 0000000..8473d67
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerBlockingTtlTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ProducerBlockingTtlTest extends BasicOpenWireTest {
+
+ ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
+ protected ActiveMQConnection flowControlConnection;
+
+ @Override
+ protected void extraServerConfig(Configuration serverConfig) {
+ String match = "#";
+ Map<String, AddressSettings> asMap = serverConfig.getAddressesSettings();
+ asMap.get(match).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ this.makeSureCoreQueueExist("QUEUE.A");
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ try {
+ if (flowControlConnection != null) {
+ TcpTransport t = flowControlConnection.getTransport().narrow(TcpTransport.class);
+ try {
+ flowControlConnection.getTransport().stop();
+ flowControlConnection.close();
+ } catch (Throwable ignored) {
+ }
+ }
+ } finally {
+ super.tearDown();
+ }
+ }
+
+ //set ttl to 1000
+ @Override
+ protected String getConnectionUrl() {
+ return urlString + "&wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000";
+ }
+
+ @Test
+ public void testProducerBlockWontGetTimeout() throws Exception {
+
+ flowControlConnection = (ActiveMQConnection) factory.createConnection();
+ Connection consumerConnection = factory.createConnection();
+ Thread fillThread = null;
+ AtomicBoolean keepGoing = new AtomicBoolean(true);
+ try {
+ flowControlConnection.start();
+
+ final Session session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer = session.createProducer(queueA);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ final String text = "Hello World";
+ final int num = 10;
+
+ fillThread = new Thread("Fill thread.") {
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < num && keepGoing.get(); i++) {
+ producer.send(session.createTextMessage(text + i));
+ }
+ } catch (JMSException e) {
+ }
+ }
+ };
+
+ fillThread.start();
+
+ //longer enough than TTL (1000)
+ Thread.sleep(4000);
+
+ //receive messages and unblock the producer
+ consumerConnection.start();
+ Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(queueA);
+
+ for (int i = 0; i < num; i++) {
+ TextMessage m = (TextMessage) consumer.receive(5000);
+ assertNotNull(m);
+ assertEquals("Hello World" + i, m.getText());
+ }
+ assertNull(consumer.receive(3));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+
+ if (fillThread != null) {
+ keepGoing.set(false);
+ fillThread.interrupt();
+ fillThread.join();
+ }
+ try {
+ flowControlConnection.close();
+ flowControlConnection = null;
+ } catch (Throwable t) {
+ }
+ try {
+ consumerConnection.close();
+ } catch (Throwable t) {
+ }
+ }
+ }
+}