You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/11/07 03:00:34 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1850
QueueControl.listDeliveringMessages returns empty result
Repository: activemq-artemis
Updated Branches:
refs/heads/master c0a40a161 -> 7c5470548
ARTEMIS-1850 QueueControl.listDeliveringMessages returns empty result
With AMQP protocol when some messages are received in a transaction,
calling JMX QueueControl.listDeliveringMessages() returns empty list
before the transaction is committed.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/72eadb20
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/72eadb20
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/72eadb20
Branch: refs/heads/master
Commit: 72eadb201d870b097c8659497823f27bf2401d6f
Parents: c0a40a1
Author: Howard Gao <ho...@gmail.com>
Authored: Mon May 7 14:33:16 2018 +0800
Committer: Howard Gao <ho...@gmail.com>
Committed: Tue Nov 6 20:23:32 2018 +0800
----------------------------------------------------------------------
.../amqp/broker/AMQPSessionCallback.java | 19 ++++
.../transaction/ProtonTransactionHandler.java | 11 ++-
.../core/server/impl/ServerSessionImpl.java | 10 +++
.../spi/core/protocol/SessionCallback.java | 5 ++
.../integration/amqp/JMXManagementTest.java | 92 ++++++++++++++++++++
5 files changed, 136 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72eadb20/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 61816af..14c1042 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -56,6 +56,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionHandler;
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -109,6 +110,8 @@ public class AMQPSessionCallback implements SessionCallback {
private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>();
+ private ProtonTransactionHandler transactionHandler;
+
public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
ProtonProtocolManager manager,
AMQPConnectionContext connection,
@@ -690,6 +693,14 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
+ @Override
+ public Transaction getCurrentTransaction() {
+ if (this.transactionHandler != null) {
+ return this.transactionHandler.getCurrentTransaction();
+ }
+ return null;
+ }
+
public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException {
return protonSPI.getTransaction(txid, remove);
}
@@ -740,6 +751,14 @@ public class AMQPSessionCallback implements SessionCallback {
serverSession.removeProducer(name);
}
+ public void setTransactionHandler(ProtonTransactionHandler transactionHandler) {
+ this.transactionHandler = transactionHandler;
+ }
+
+ public ProtonTransactionHandler getTransactionHandler() {
+ return this.transactionHandler;
+ }
+
class AddressQueryCache<T> {
SimpleString address;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72eadb20/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
index 9ccc196..78a5b33 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
import java.nio.ByteBuffer;
import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
@@ -47,6 +48,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
private final int amqpCredit;
private final int amqpLowMark;
+ private Transaction currentTx;
final AMQPSessionCallback sessionSPI;
final AMQPConnectionContext connection;
@@ -58,6 +60,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
this.connection = connection;
this.amqpCredit = connection.getAmqpCredits();
this.amqpLowMark = connection.getAmqpLowCredits();
+ this.sessionSPI.setTransactionHandler(this);
}
@Override
@@ -100,6 +103,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
Binary txID = sessionSPI.newTransaction();
Declared declared = new Declared();
declared.setTxnId(txID);
+ currentTx = sessionSPI.getTransaction(txID, false);
IOCallback ioAction = new IOCallback() {
@Override
public void done() {
@@ -115,7 +119,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
@Override
public void onError(int errorCode, String errorMessage) {
-
+ currentTx = null;
}
};
sessionSPI.afterIO(ioAction);
@@ -133,6 +137,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
try {
delivery.settle();
delivery.disposition(new Accepted());
+ currentTx = null;
} finally {
connection.unlock();
connection.flush();
@@ -192,4 +197,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
message.decode(encoded);
return message;
}
+
+ public Transaction getCurrentTransaction() {
+ return currentTx;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72eadb20/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index d13cd76..7ab353a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1884,6 +1884,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return oper.getListOnConsumer(consumerId);
}
} else {
+ //amqp handles the transaction in callback
+ if (callback != null) {
+ Transaction transaction = callback.getCurrentTransaction();
+ if (transaction != null) {
+ RefsOperation operation = (RefsOperation) transaction.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
+ if (operation != null) {
+ return operation.getListOnConsumer(consumerId);
+ }
+ }
+ }
return Collections.emptyList();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72eadb20/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index c4a2dbe..5577522 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
public interface SessionCallback {
@@ -93,4 +94,8 @@ public interface SessionCallback {
default void close(boolean failed) {
}
+
+ default Transaction getCurrentTransaction() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/72eadb20/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
new file mode 100644
index 0000000..3be3e88
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Map;
+
+public class JMXManagementTest extends JMSClientTestSupport {
+
+ @Test
+ public void testListDeliveringMessages() throws Exception {
+ SimpleString queue = new SimpleString(getQueueName());
+
+ Connection connection1 = createConnection();
+ Connection connection2 = createConnection();
+ Session prodSession = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session consSession = connection2.createSession(true, Session.SESSION_TRANSACTED);
+
+ javax.jms.Queue jmsQueue = prodSession.createQueue(queue.toString());
+
+ QueueControl queueControl = createManagementControl(queue, queue);
+
+ MessageProducer producer = prodSession.createProducer(jmsQueue);
+ final int num = 20;
+
+ for (int i = 0; i < num; i++) {
+ TextMessage message = prodSession.createTextMessage("hello" + i);
+ producer.send(message);
+ }
+
+ connection2.start();
+ MessageConsumer consumer = consSession.createConsumer(jmsQueue);
+
+ for (int i = 0; i < num; i++) {
+ TextMessage msgRec = (TextMessage) consumer.receive(5000);
+ assertNotNull(msgRec);
+ assertEquals(msgRec.getText(), "hello" + i);
+ }
+
+ //before commit
+ assertEquals(num, queueControl.getDeliveringCount());
+
+ Map<String, Map<String, Object>[]> result = queueControl.listDeliveringMessages();
+ assertEquals(1, result.size());
+
+ Map<String, Object>[] msgMaps = result.entrySet().iterator().next().getValue();
+
+ assertEquals(num, msgMaps.length);
+
+ consSession.commit();
+ result = queueControl.listDeliveringMessages();
+
+ assertEquals(0, result.size());
+
+ consSession.close();
+ prodSession.close();
+
+ connection1.close();
+ connection2.close();
+ }
+
+ protected QueueControl createManagementControl(final SimpleString address,
+ final SimpleString queue) throws Exception {
+ QueueControl queueControl = ManagementControlHelper.createQueueControl(address, queue, RoutingType.ANYCAST, this.mBeanServer);
+
+ return queueControl;
+ }
+}
[2/2] activemq-artemis git commit: This closes #2074
Posted by cl...@apache.org.
This closes #2074
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7c547054
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7c547054
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7c547054
Branch: refs/heads/master
Commit: 7c5470548a98405b2ca97ee4749e6258641daec1
Parents: c0a40a1 72eadb2
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Nov 6 22:00:26 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Nov 6 22:00:26 2018 -0500
----------------------------------------------------------------------
.../amqp/broker/AMQPSessionCallback.java | 19 ++++
.../transaction/ProtonTransactionHandler.java | 11 ++-
.../core/server/impl/ServerSessionImpl.java | 10 +++
.../spi/core/protocol/SessionCallback.java | 5 ++
.../integration/amqp/JMXManagementTest.java | 92 ++++++++++++++++++++
5 files changed, 136 insertions(+), 1 deletion(-)
----------------------------------------------------------------------