You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/10/31 16:47:11 UTC
activemq-artemis git commit: ARTEMIS-2159 OpenWire would allow one
extra send
Repository: activemq-artemis
Updated Branches:
refs/heads/2.6.x b5c862feb -> d54cce16c
ARTEMIS-2159 OpenWire would allow one extra send
Thanks to Otavio Piske collaborating a test change here.
(cherry picked from commit 02a6d5bb493d6e0eea1ed847157d4e6b57aacf7f)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d54cce16
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d54cce16
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d54cce16
Branch: refs/heads/2.6.x
Commit: d54cce16cb3c8a5169fe8d076b0e0a9a4e388561
Parents: b5c862f
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Oct 31 09:13:05 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Oct 31 12:46:57 2018 -0400
----------------------------------------------------------------------
.../core/protocol/openwire/amq/AMQSession.java | 100 +++++++++----------
.../core/paging/impl/PagingStoreImpl.java | 4 +-
.../openwire/OpenWireFlowControlFailTest.java | 30 ++++--
3 files changed, 76 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d54cce16/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 0250f1c..a107ba7 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -443,63 +443,63 @@ public class AMQSession implements SessionCallback {
final AtomicInteger count,
final org.apache.activemq.artemis.api.core.Message coreMsg,
final SimpleString address) throws ResourceAllocationException {
- if (!store.checkMemory(() -> {
- Exception exceptionToSend = null;
-
- try {
- getCoreSession().send(coreMsg, false, dest.isTemporary());
- } catch (Exception e) {
- logger.warn(e.getMessage(), e);
- exceptionToSend = e;
- }
+ if (!store.checkMemory(null)) {
+ this.connection.getContext().setDontSendReponse(false);
connection.enableTtl();
- if (count == null || count.decrementAndGet() == 0) {
- if (exceptionToSend != null) {
- this.connection.getContext().setDontSendReponse(false);
- connection.sendException(exceptionToSend);
- } else {
- server.getStorageManager().afterCompleteOperations(new IOCallback() {
- @Override
- public void done() {
- if (sendProducerAck) {
- try {
- ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
- connection.dispatchAsync(ack);
- } catch (Exception e) {
- connection.getContext().setDontSendReponse(false);
- ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
- connection.sendException(e);
- }
- } else {
+ throw new ResourceAllocationException("Queue is full " + address);
+ }
+
+ Exception exceptionToSend = null;
+
+ try {
+ getCoreSession().send(coreMsg, false, dest.isTemporary());
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ exceptionToSend = e;
+ }
+ connection.enableTtl();
+ if (count == null || count.decrementAndGet() == 0) {
+ if (exceptionToSend != null) {
+ this.connection.getContext().setDontSendReponse(false);
+ connection.sendException(exceptionToSend);
+ } else {
+ server.getStorageManager().afterCompleteOperations(new IOCallback() {
+ @Override
+ public void done() {
+ if (sendProducerAck) {
+ try {
+ ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+ connection.dispatchAsync(ack);
+ } catch (Exception e) {
connection.getContext().setDontSendReponse(false);
- try {
- Response response = new Response();
- response.setCorrelationId(messageSend.getCommandId());
- connection.dispatchAsync(response);
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
- connection.sendException(e);
- }
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ connection.sendException(e);
}
- }
-
- @Override
- public void onError(int errorCode, String errorMessage) {
+ } else {
+ connection.getContext().setDontSendReponse(false);
try {
- final IOException e = new IOException(errorMessage);
- ActiveMQServerLogger.LOGGER.warn(errorMessage);
- connection.serviceException(e);
- } catch (Exception ex) {
- ActiveMQServerLogger.LOGGER.debug(ex);
+ Response response = new Response();
+ response.setCorrelationId(messageSend.getCommandId());
+ connection.dispatchAsync(response);
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ connection.sendException(e);
}
}
- });
- }
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ try {
+ final IOException e = new IOException(errorMessage);
+ ActiveMQServerLogger.LOGGER.warn(errorMessage);
+ connection.serviceException(e);
+ } catch (Exception ex) {
+ ActiveMQServerLogger.LOGGER.debug(ex);
+ }
+ }
+ });
}
- })) {
- this.connection.getContext().setDontSendReponse(false);
- connection.enableTtl();
- throw new ResourceAllocationException("Queue is full " + address);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d54cce16/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 00001cc..908ab9f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -693,7 +693,9 @@ public class PagingStoreImpl implements PagingStore {
}
}
- runWhenAvailable.run();
+ if (runWhenAvailable != null) {
+ runWhenAvailable.run();
+ }
return true;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d54cce16/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java
index 341f920..a2685b0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.openwire;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -58,34 +59,49 @@ public class OpenWireFlowControlFailTest extends OpenWireTestBase {
textBody.append(" ");
}
ConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
+ int numberOfMessage = 0;
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(addressInfo.getName().toString());
MessageProducer producer = session.createProducer(queue);
- int numberOfMessage = 0;
boolean failed = false;
try {
for (int i = 0; i < 1000; i++) {
- producer.send(session.createTextMessage(textBody.toString()));
+ TextMessage message = session.createTextMessage(textBody.toString());
+ message.setIntProperty("i", i);
+
+ producer.send(message);
numberOfMessage++;
}
} catch (Exception e) {
e.printStackTrace(System.out);
failed = true;
+ try {
+ producer.send(session.createTextMessage(textBody.toString()));
+ Assert.fail("Exception expected");
+ } catch (JMSException expected) {
+ expected.printStackTrace();
+
+ }
}
+ Assert.assertTrue(failed);
+ }
- System.out.println("Message failed with " + numberOfMessage);
+ factory = new ActiveMQConnectionFactory(urlString);
+ try (Connection connection2 = factory.createConnection()) {
+ Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Queue queue = session2.createQueue(addressInfo.getName().toString());
- Assert.assertTrue(failed);
- MessageConsumer consumer = session.createConsumer(queue);
- connection.start();
+ MessageConsumer consumer = session2.createConsumer(queue);
+ connection2.start();
for (int i = 0; i < numberOfMessage; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(textBody.toString(), message.getText());
}
- Assert.assertNull(consumer.receiveNoWait());
+ TextMessage msg = (TextMessage)consumer.receive(500);
+ Assert.assertNull(msg);
}
}
}