You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/11/04 15:55:32 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6498
Repository: activemq
Updated Branches:
refs/heads/master a5aa30025 -> 7cf7fba7a
https://issues.apache.org/jira/browse/AMQ-6498
Include the already received backlog when deciding to grant additional
credit to avoid excessive backlogs of messages during producer flow
control.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7cf7fba7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7cf7fba7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7cf7fba7
Branch: refs/heads/master
Commit: 7cf7fba7aab05ea9c41bf009325c63a6798f6cc8
Parents: a5aa300
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Nov 4 11:55:12 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Nov 4 11:55:12 2016 -0400
----------------------------------------------------------------------
.../transport/amqp/protocol/AmqpReceiver.java | 93 ++++-----
.../amqp/interop/AmqpFlowControlTest.java | 111 +++++++++++
.../amqp/profile/JmsSendReceiveStressTest.java | 187 +++++++++++++++++++
3 files changed, 345 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/7cf7fba7/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
index 33c319e..9f45ed2 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -70,6 +70,8 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
private InboundTransformer inboundTransformer;
+ private int sendsInFlight;
+
/**
* Create a new instance of an AmqpReceiver
*
@@ -204,58 +206,57 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
}
message.onSend();
- if (!delivery.remotelySettled()) {
- sendToActiveMQ(message, new ResponseHandler() {
-
- @Override
- public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
- if (response.isException()) {
- ExceptionResponse error = (ExceptionResponse) response;
- Rejected rejected = new Rejected();
- ErrorCondition condition = new ErrorCondition();
-
- if (error.getException() instanceof SecurityException) {
- condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
- } else if (error.getException() instanceof ResourceAllocationException) {
- condition.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
- } else {
- condition.setCondition(Symbol.valueOf("failed"));
- }
-
- condition.setDescription(error.getException().getMessage());
- rejected.setError(condition);
- delivery.disposition(rejected);
+
+ sendsInFlight++;
+
+ sendToActiveMQ(message, createResponseHandler(delivery));
+ }
+ }
+
+ private ResponseHandler createResponseHandler(final Delivery delivery) {
+ return new ResponseHandler() {
+
+ @Override
+ public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+ if (!delivery.remotelySettled()) {
+ if (response.isException()) {
+ ExceptionResponse error = (ExceptionResponse) response;
+ Rejected rejected = new Rejected();
+ ErrorCondition condition = new ErrorCondition();
+
+ if (error.getException() instanceof SecurityException) {
+ condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
+ } else if (error.getException() instanceof ResourceAllocationException) {
+ condition.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
} else {
- if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .3)) {
- LOG.debug("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId());
- getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
- }
-
- if (remoteState != null && remoteState instanceof TransactionalState) {
- TransactionalState txAccepted = new TransactionalState();
- txAccepted.setOutcome(Accepted.getInstance());
- txAccepted.setTxnId(((TransactionalState) remoteState).getTxnId());
-
- delivery.disposition(txAccepted);
- } else {
- delivery.disposition(Accepted.getInstance());
- }
+ condition.setCondition(Symbol.valueOf("failed"));
}
- delivery.settle();
- session.pumpProtonToSocket();
+ condition.setDescription(error.getException().getMessage());
+ rejected.setError(condition);
+ delivery.disposition(rejected);
+ } else {
+ final DeliveryState remoteState = delivery.getRemoteState();
+ if (remoteState != null && remoteState instanceof TransactionalState) {
+ TransactionalState txAccepted = new TransactionalState();
+ txAccepted.setOutcome(Accepted.getInstance());
+ txAccepted.setTxnId(((TransactionalState) remoteState).getTxnId());
+
+ delivery.disposition(txAccepted);
+ } else {
+ delivery.disposition(Accepted.getInstance());
+ }
}
- });
- } else {
- if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .3)) {
- LOG.debug("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId());
- getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
- session.pumpProtonToSocket();
+ }
+
+ if (getEndpoint().getCredit() + --sendsInFlight <= (getConfiguredReceiverCredit() * .3)) {
+ LOG.trace("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() * .7, getProducerId());
+ getEndpoint().flow((int) (getConfiguredReceiverCredit() * .7));
}
delivery.settle();
- sendToActiveMQ(message);
+ session.pumpProtonToSocket();
}
- }
+ };
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/7cf7fba7/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpFlowControlTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpFlowControlTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpFlowControlTest.java
new file mode 100644
index 0000000..3cbbd4b
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpFlowControlTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+
+public class AmqpFlowControlTest extends AmqpClientTestSupport {
+
+ @Override
+ protected void performAdditionalConfiguration(BrokerService brokerService) throws Exception {
+ // Setup a destination policy where it takes only 1 message at a time.
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry policy = new PolicyEntry();
+ policy.setMemoryLimit(1);
+ policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
+ policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+ policy.setProducerFlowControl(true);
+ policyMap.setDefaultEntry(policy);
+
+ brokerService.setDestinationPolicy(policyMap);
+ }
+
+ @Test(timeout = 60000)
+ public void testCreditNotGrantedUntilBacklogClears() throws Exception {
+ final int MSG_COUNT = 1000;
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = trackConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+ AmqpSender sender = session.createSender("queue://" + getTestName(), true);
+
+ for (int i = 1; i <= MSG_COUNT; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message: " + i);
+ sender.send(message);
+
+ if (i % 1000 == 0) {
+ LOG.info("Sent message: {}", i);
+ }
+ }
+
+ // Should only accept one message
+ final QueueViewMBean queue = getProxyToQueue(getTestName());
+ assertTrue("All messages should arrive", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return queue.getQueueSize() == 1;
+ }
+ }));
+
+ assertEquals(0, sender.getEndpoint().getRemoteCredit());
+
+ receiver.flow(1);
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(received);
+ received.accept();
+
+ // Should not grant any credit until backlog starts to clear
+ assertEquals(0, sender.getEndpoint().getRemoteCredit());
+
+ receiver.flow(MSG_COUNT - 1);
+ for (int i = 0; i < MSG_COUNT - 1; ++i) {
+ received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(received);
+ received.accept();
+ }
+
+ // Should have been granted credit once backlog was cleared.
+ assertTrue(sender.getEndpoint().getRemoteCredit() > 0);
+
+ sender.close();
+ connection.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/7cf7fba7/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/profile/JmsSendReceiveStressTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/profile/JmsSendReceiveStressTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/profile/JmsSendReceiveStressTest.java
new file mode 100644
index 0000000..69536ec
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/profile/JmsSendReceiveStressTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.profile;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Vector;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.activemq.transport.amqp.JMSClientTestSupport;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Ignore("Use for profiling and memory testing")
+public class JmsSendReceiveStressTest extends JMSClientTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JmsSendReceiveStressTest.class);
+
+ public static final int PAYLOAD_SIZE = 64 * 1024;
+
+ private final byte[] payload = new byte[PAYLOAD_SIZE];
+ private final int parallelProducer = 1;
+ private final int parallelConsumer = 1;
+ private final Vector<Throwable> exceptions = new Vector<Throwable>();
+ private JmsConnectionFactory factory;
+
+ private final long NUM_SENDS = 1000000;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ for (int i = 0; i < PAYLOAD_SIZE; ++i) {
+ payload[i] = (byte) (i % 255);
+ }
+ }
+
+ @Test
+ public void testProduceConsume() throws Exception {
+ factory = new JmsConnectionFactory(getAmqpURI(getAmqpConnectionURIOptions()));
+ factory.setForceAsyncAcks(true);
+ factory.setForceAsyncSend(false);
+ factory.setForceSyncSend(false);
+
+ final AtomicLong sharedSendCount = new AtomicLong(NUM_SENDS);
+ final AtomicLong sharedReceiveCount = new AtomicLong(NUM_SENDS);
+
+ Thread.sleep(2000);
+
+ long start = System.currentTimeMillis();
+ ExecutorService executorService = Executors.newFixedThreadPool(parallelConsumer + parallelProducer);
+
+ for (int i = 0; i < parallelConsumer; i++) {
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ consumeMessages(sharedReceiveCount);
+ } catch (Throwable e) {
+ exceptions.add(e);
+ }
+ }
+ });
+ }
+ for (int i = 0; i < parallelProducer; i++) {
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ publishMessages(sharedSendCount);
+ } catch (Throwable e) {
+ exceptions.add(e);
+ }
+ }
+ });
+ }
+
+ executorService.shutdown();
+ executorService.awaitTermination(30, TimeUnit.MINUTES);
+ assertTrue("Producers done in time", executorService.isTerminated());
+ assertTrue("No exceptions: " + exceptions, exceptions.isEmpty());
+
+ double duration = System.currentTimeMillis() - start;
+ LOG.info("Duration: " + duration + "ms");
+ LOG.info("Rate: " + (NUM_SENDS * 1000 / duration) + "m/s");
+ }
+
+ private void consumeMessages(AtomicLong count) throws Exception {
+ JmsConnection connection = (JmsConnection) factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, ActiveMQSession.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName());
+ MessageConsumer consumer = session.createConsumer(queue);
+ long v;
+ while ((v = count.decrementAndGet()) > 0) {
+ if ((count.get() % 10000) == 0) {
+ LOG.info("Received message: {}", NUM_SENDS - count.get());
+ }
+ assertNotNull("got message " + v, consumer.receive(15000));
+ }
+ LOG.info("Received message: {}", NUM_SENDS);
+
+ consumer.close();
+ }
+
+ private void publishMessages(AtomicLong count) throws Exception {
+ JmsConnection connection = (JmsConnection) factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName());
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ while (count.getAndDecrement() > 0) {
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(payload);
+ producer.send(message);
+ if ((count.get() % 10000) == 0) {
+ LOG.info("Sent message: {}", NUM_SENDS - count.get());
+ }
+ }
+ producer.close();
+ connection.close();
+ }
+
+ @Override
+ protected void performAdditionalConfiguration(BrokerService brokerService) throws Exception {
+ PolicyEntry policyEntry = new PolicyEntry();
+ policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+ policyEntry.setPrioritizedMessages(false);
+ policyEntry.setExpireMessagesPeriod(0);
+ policyEntry.setEnableAudit(false);
+ policyEntry.setOptimizedDispatch(true);
+ policyEntry.setQueuePrefetch(1); // ensure no contention on add with
+ // matched producer/consumer
+
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.setDefaultEntry(policyEntry);
+ brokerService.setDestinationPolicy(policyMap);
+ }
+
+ @Override
+ protected String getAmqpTransformer() {
+ return "jms";
+ }
+
+ private String getAmqpConnectionURIOptions() {
+ return "jms.presettlePolicy.presettleAll=false";
+ }
+}