You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:51 UTC
[42/42] activemq-artemis git commit: ARTEMIS-463 More simplifications
on the openwire head https://issues.apache.org/jira/browse/ARTEMIS-463
ARTEMIS-463 More simplifications on the openwire head
https://issues.apache.org/jira/browse/ARTEMIS-463
This will have some extra refactoring on the protocol head, transferring responsibility to the broker classes in a lot of cases
and removing some duplicated code
This was a team effort from Clebert Suconic and Howard Gao
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3aedf273
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3aedf273
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3aedf273
Branch: refs/heads/master
Commit: 3aedf27386975a3c40cd5d13083da7912cfb34e8
Parents: 6ddf486
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Apr 1 16:58:50 2016 -0400
Committer: jbertram <jb...@apache.org>
Committed: Mon Apr 4 11:08:43 2016 -0500
----------------------------------------------------------------------
.../jms/server/impl/JMSServerManagerImpl.java | 38 +-
.../plug/ProtonSessionIntegrationCallback.java | 15 +-
.../protocol/mqtt/MQTTConnectionManager.java | 6 +-
.../core/protocol/mqtt/MQTTSessionCallback.java | 7 +-
.../protocol/openwire/AMQTransactionImpl.java | 59 --
.../protocol/openwire/OpenWireConnection.java | 417 ++++++++++----
.../openwire/OpenWireMessageConverter.java | 1 +
.../openwire/OpenWireProtocolManager.java | 123 +---
.../core/protocol/openwire/OpenWireUtil.java | 73 ---
.../amq/AMQCompositeConsumerBrokerExchange.java | 9 +-
.../core/protocol/openwire/amq/AMQConsumer.java | 247 +++-----
.../openwire/amq/AMQConsumerBrokerExchange.java | 2 +
.../openwire/amq/AMQServerConsumer.java | 102 ----
.../protocol/openwire/amq/AMQServerSession.java | 391 -------------
.../openwire/amq/AMQServerSessionFactory.java | 69 ---
.../core/protocol/openwire/amq/AMQSession.java | 248 ++------
.../amq/AMQSingleConsumerBrokerExchange.java | 8 +-
.../openwire/amq/AMQTransactionFactory.java | 32 --
.../core/protocol/openwire/amq/MessageInfo.java | 47 --
.../protocol/stomp/StompProtocolManager.java | 4 +-
.../core/protocol/stomp/StompSession.java | 5 +-
.../core/config/impl/ConfigurationImpl.java | 1 -
.../core/paging/cursor/PagedReferenceImpl.java | 27 +-
.../core/impl/ActiveMQPacketHandler.java | 2 +-
.../protocol/core/impl/CoreSessionCallback.java | 5 +-
.../artemis/core/server/ActiveMQServer.java | 1 -
.../artemis/core/server/MessageReference.java | 12 +
.../activemq/artemis/core/server/Queue.java | 2 +
.../artemis/core/server/ServerConsumer.java | 12 +
.../artemis/core/server/ServerSession.java | 26 +
.../core/server/ServerSessionFactory.java | 55 --
.../core/server/impl/ActiveMQServerImpl.java | 12 +-
.../core/server/impl/LastValueQueue.java | 16 +
.../core/server/impl/MessageReferenceImpl.java | 24 +-
.../artemis/core/server/impl/QueueImpl.java | 39 +-
.../core/server/impl/ServerConsumerImpl.java | 84 ++-
.../core/server/impl/ServerSessionImpl.java | 146 ++---
.../core/transaction/ResourceManager.java | 3 +
.../artemis/core/transaction/Transaction.java | 6 +
.../core/transaction/TransactionFactory.java | 26 -
.../transaction/impl/ResourceManagerImpl.java | 14 +-
.../core/transaction/impl/TransactionImpl.java | 12 +
.../spi/core/protocol/SessionCallback.java | 11 +-
.../impl/ScheduledDeliveryHandlerTest.java | 4 +
.../artemis/tests/util/ThreadLeakCheckRule.java | 27 +-
.../activemq/JmsRollbackRedeliveryTest.java | 2 +-
.../transport/SoWriteTimeoutClientTest.java | 4 +-
.../FailoverConsumerOutstandingCommitTest.java | 24 +-
.../failover/FailoverTransactionTest.java | 70 ++-
.../integration/client/HangConsumerTest.java | 11 +-
.../integration/openwire/BasicSecurityTest.java | 9 +-
.../integration/openwire/OpenWireUtilTest.java | 2 +-
.../openwire/SimpleOpenWireTest.java | 572 ++++++++++++++++++-
.../core/postoffice/impl/BindingsImplTest.java | 10 +
.../unit/core/postoffice/impl/FakeQueue.java | 5 +
55 files changed, 1462 insertions(+), 1717 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
index 9872d0f..d888a8c 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
@@ -1044,35 +1044,31 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
}
}
- private boolean internalCreateQueue(final String queueName,
+ private synchronized boolean internalCreateQueue(final String queueName,
final String selectorString,
final boolean durable) throws Exception {
- // TODO: there was an openwire test failng because of this
- // is this really needed for FailoverClusterTest ?
- synchronized (queues) {
- if (queues.get(queueName) != null) {
- return false;
- }
- else {
- ActiveMQQueue activeMQQueue = ActiveMQDestination.createQueue(queueName);
+ if (queues.get(queueName) != null) {
+ return false;
+ }
+ else {
+ ActiveMQQueue activeMQQueue = ActiveMQDestination.createQueue(queueName);
- // Convert from JMS selector to core filter
- String coreFilterString = null;
+ // Convert from JMS selector to core filter
+ String coreFilterString = null;
- if (selectorString != null) {
- coreFilterString = SelectorTranslator.convertToActiveMQFilterString(selectorString);
- }
+ if (selectorString != null) {
+ coreFilterString = SelectorTranslator.convertToActiveMQFilterString(selectorString);
+ }
- Queue queue = server.deployQueue(SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(coreFilterString), durable, false);
+ Queue queue = server.deployQueue(SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(coreFilterString), durable, false);
- queues.put(queueName, activeMQQueue);
+ queues.put(queueName, activeMQQueue);
- this.recoverregistryBindings(queueName, PersistedType.Queue);
+ this.recoverregistryBindings(queueName, PersistedType.Queue);
- jmsManagementService.registerQueue(activeMQQueue, queue);
+ jmsManagementService.registerQueue(activeMQQueue, queue);
- return true;
- }
+ return true;
}
}
@@ -1084,7 +1080,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
* @return
* @throws Exception
*/
- private boolean internalCreateTopic(final String topicName) throws Exception {
+ private synchronized boolean internalCreateTopic(final String topicName) throws Exception {
if (topics.get(topicName) != null) {
return false;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 5d6af2a..55bade9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -20,6 +20,8 @@ import java.util.concurrent.Executor;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.qpid.proton.amqp.Binary;
@@ -117,7 +119,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
false, // boolean autoCommitAcks,
false, // boolean preAcknowledge,
true, //boolean xa,
- (String) null, this, null, true);
+ (String) null, this, true);
}
@Override
@@ -214,7 +216,12 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
@Override
public Binary getCurrentTXID() {
- return new Binary(ByteUtil.longToBytes(serverSession.getCurrentTransaction().getID()));
+ Transaction tx = serverSession.getCurrentTransaction();
+ if (tx == null) {
+ tx = serverSession.newTransaction();
+ serverSession.resetTX(tx);
+ }
+ return new Binary(ByteUtil.longToBytes(tx.getID()));
}
@Override
@@ -341,7 +348,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
}
@Override
- public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+ public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
ProtonPlugSender plugSender = (ProtonPlugSender) consumer.getProtocolContext();
@@ -359,7 +366,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
}
@Override
- public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
+ public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
return 0;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index 9d60513..a3b8b78 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -96,7 +96,11 @@ public class MQTTConnectionManager {
String id = UUIDGenerator.getInstance().generateStringUUID();
ActiveMQServer server = session.getServer();
- ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null, session.getSessionCallback(), null, // Session factory
+ ServerSession serverSession = server.createSession(id, username, password,
+ ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS,
+ MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE,
+ MQTTUtil.SESSION_XA, null, session.getSessionCallback(),
MQTTUtil.SESSION_AUTO_CREATE_QUEUE);
return (ServerSessionImpl) serverSession;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 28d86b8..82b1ed6 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -18,6 +18,7 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -41,7 +42,7 @@ public class MQTTSessionCallback implements SessionCallback {
}
@Override
- public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+ public int sendMessage(MessageReference referece, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
try {
session.getMqttPublishManager().sendMessage(message, consumer, deliveryCount);
}
@@ -62,8 +63,8 @@ public class MQTTSessionCallback implements SessionCallback {
}
@Override
- public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
- return sendMessage(message, consumer, deliveryCount);
+ public int sendLargeMessage(MessageReference reference, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
+ return sendMessage(reference, message, consumer, deliveryCount);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java
deleted file mode 100644
index bbd7e95..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/AMQTransactionImpl.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire;
-
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.impl.RefsOperation;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
-
-import javax.transaction.xa.Xid;
-
-public class AMQTransactionImpl extends TransactionImpl {
-
- private boolean rollbackForClose = false;
-
- public AMQTransactionImpl(Xid xid, StorageManager storageManager, int timeoutSeconds) {
- super(xid, storageManager, timeoutSeconds);
- }
-
- @Override
- public RefsOperation createRefsOperation(Queue queue) {
- return new AMQrefsOperation(queue, storageManager);
- }
-
- public class AMQrefsOperation extends RefsOperation {
-
- public AMQrefsOperation(Queue queue, StorageManager storageManager) {
- super(queue, storageManager);
- }
-
-
- // This is because the Rollbacks happen through the consumer, not through the server's
- @Override
- public void afterRollback(Transaction tx) {
- if (rollbackForClose) {
- super.afterRollback(tx);
- }
- }
- }
-
- public void setRollbackForClose() {
- this.rollbackForClose = true;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index e8259c3..f9e8838 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -16,15 +16,18 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire;
+import javax.jms.IllegalStateException;
import javax.jms.InvalidClientIDException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSSecurityException;
+import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -45,22 +48,30 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionConte
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
+import org.apache.activemq.artemis.core.server.impl.RefsOperation;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
@@ -96,6 +107,7 @@ import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConnectionState;
@@ -130,29 +142,42 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
// Clebert: Artemis session has meta-data support, perhaps we could reuse it here
private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>();
-
- private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new HashMap<>();
- private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new HashMap<>();
+ private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new ConcurrentHashMap<>();
+ private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new ConcurrentHashMap<>();
// Clebert TODO: Artemis already stores the Session. Why do we need a different one here
private Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>();
-
-
private ConnectionState state;
private final Set<ActiveMQDestination> tempQueues = new ConcurrentHashSet<>();
- private Map<TransactionId, TransactionInfo> txMap = new ConcurrentHashMap<>();
+ /**
+ * Openwire doesn't sen transactions associated with any sessions.
+ * It will however send beingTX / endTX as it would be doing it with XA Transactions.
+ * But always without any association with Sessions.
+ * This collection will hold nonXA transactions. Hopefully while they are in transit only.
+ */
+ private Map<TransactionId, Transaction> txMap = new ConcurrentHashMap<>();
private volatile AMQSession advisorySession;
+ private final ActiveMQServer server;
+
+ /**
+ * This is to be used with connection operations that don't have a session.
+ * Such as TM operations.
+ */
+ private ServerSession internalSession;
+
// TODO-NOW: check on why there are two connections created for every createConnection on the client.
public OpenWireConnection(Connection connection,
+ ActiveMQServer server,
Executor executor,
OpenWireProtocolManager openWireProtocolManager,
OpenWireFormat wf) {
super(connection, executor);
+ this.server = server;
this.protocolManager = openWireProtocolManager;
this.wireFormat = wf;
}
@@ -206,7 +231,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
boolean responseRequired = command.isResponseRequired();
int commandId = command.getCommandId();
-
// TODO-NOW: the server should send packets to the client based on the requested times
// need to look at what Andy did on AMQP
@@ -227,8 +251,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
response = command.visit(commandProcessorInstance);
}
catch (Exception e) {
+ // TODO: logging
+ e.printStackTrace();
if (responseRequired) {
- response = new ExceptionResponse(e);
+ response = convertException(e);
}
}
finally {
@@ -276,6 +302,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
public void sendException(Exception e) {
+ Response resp = convertException(e);
+ try {
+ dispatch(resp);
+ }
+ catch (IOException e2) {
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e2);
+ }
+ }
+
+ private Response convertException(Exception e) {
Response resp;
if (e instanceof ActiveMQSecurityException) {
resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
@@ -286,12 +322,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
else {
resp = new ExceptionResponse(e);
}
- try {
- dispatch(resp);
- }
- catch (IOException e2) {
- ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e2);
- }
+ return resp;
}
private void setLastCommand(Command command) {
@@ -426,9 +457,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
- private void addConsumerBrokerExchange(ConsumerId id,
- AMQSession amqSession,
- List<AMQConsumer> consumerList) {
+ private void addConsumerBrokerExchange(ConsumerId id, AMQSession amqSession, List<AMQConsumer> consumerList) {
AMQConsumerBrokerExchange result = consumerExchanges.get(id);
if (result == null) {
if (consumerList.size() == 1) {
@@ -471,12 +500,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
return result;
}
- private void removeConsumerBrokerExchange(ConsumerId id) {
- synchronized (consumerExchanges) {
- consumerExchanges.remove(id);
- }
- }
-
public void deliverMessage(MessageDispatch dispatch) {
Message m = dispatch.getMessage();
if (m != null) {
@@ -504,7 +527,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
- private void disconnect(ActiveMQException me, String reason, boolean fail) {
+ private void disconnect(ActiveMQException me, String reason, boolean fail) {
if (context == null || destroyed) {
return;
@@ -576,7 +599,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
- public AMQConnectionContext initContext(ConnectionInfo info) {
+ public AMQConnectionContext initContext(ConnectionInfo info) throws Exception {
WireFormatInfo wireFormatInfo = wireFormat.getPreferedWireFormatInfo();
// Older clients should have been defaulting this field to true.. but
// they were not.
@@ -608,9 +631,15 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
info.setClientIp(getRemoteAddress());
}
+ createInternalSession(info);
+
return context;
}
+ private void createInternalSession(ConnectionInfo info) throws Exception {
+ internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true);
+ }
+
//raise the refCount of context
public void reconnect(AMQConnectionContext existingContext, ConnectionInfo info) {
this.context = existingContext;
@@ -663,17 +692,17 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
ActiveMQDestination dest = info.getDestination();
if (dest.isQueue()) {
SimpleString qName = OpenWireUtil.toCoreAddress(dest);
- QueueBinding binding = (QueueBinding) protocolManager.getServer().getPostOffice().getBinding(qName);
+ QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName);
if (binding == null) {
if (getState().getInfo() != null) {
CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
- protocolManager.getServer().getSecurityStore().check(qName, checkType, this);
+ server.getSecurityStore().check(qName, checkType, this);
- protocolManager.getServer().checkQueueCreationLimit(getUsername());
+ server.checkQueueCreationLimit(getUsername());
}
ConnectionInfo connInfo = getState().getInfo();
- protocolManager.getServer().createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary());
+ server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary());
}
if (dest.isTemporary()) {
@@ -690,11 +719,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
-
public void updateConsumer(ConsumerControl consumerControl) {
- SessionId sessionId = consumerControl.getConsumerId().getParentId();
- AMQSession amqSession = sessions.get(sessionId);
- amqSession.updateConsumerPrefetchSize(consumerControl.getConsumerId(), consumerControl.getPrefetch());
+ ConsumerId consumerId = consumerControl.getConsumerId();
+ AMQConsumerBrokerExchange exchange = this.consumerExchanges.get(consumerId);
+ if (exchange != null) {
+ exchange.updateConsumerPrefetchSize(consumerControl.getPrefetch());
+ }
}
public void addConsumer(ConsumerInfo info) throws Exception {
@@ -707,7 +737,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
SessionState ss = cs.getSessionState(sessionId);
if (ss == null) {
- throw new IllegalStateException(protocolManager.getServer() + " Cannot add a consumer to a session that had not been registered: " + sessionId);
+ throw new IllegalStateException(server + " Cannot add a consumer to a session that had not been registered: " + sessionId);
}
// Avoid replaying dup commands
if (!ss.getConsumerIds().contains(info.getConsumerId())) {
@@ -729,13 +759,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public void onSlowConsumer(ServerConsumer consumer) {
- if (consumer instanceof AMQServerConsumer) {
- AMQServerConsumer serverConsumer = (AMQServerConsumer)consumer;
- ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(serverConsumer.getAmqConsumer().getOpenwireDestination());
+ if (consumer.getProtocolData() != null && consumer.getProtocolData() instanceof AMQConsumer) {
+ AMQConsumer amqConsumer = (AMQConsumer) consumer.getProtocolData();
+ ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(amqConsumer.getOpenwireDestination());
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
try {
- advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, serverConsumer.getAmqConsumer().getId().toString());
- protocolManager.fireAdvisory(context, topic, advisoryMessage, serverConsumer.getAmqConsumer().getId());
+ advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, amqConsumer.getId().toString());
+ protocolManager.fireAdvisory(context, topic, advisoryMessage, amqConsumer.getId());
}
catch (Exception e) {
// TODO-NOW: LOGGING
@@ -758,9 +788,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
public AMQSession addSession(SessionInfo ss, boolean internal) {
- AMQSession amqSession = new AMQSession(getState().getInfo(), ss, protocolManager.getServer(), this, protocolManager.getScheduledPool(), protocolManager);
+ AMQSession amqSession = new AMQSession(getState().getInfo(), ss, server, this, protocolManager.getScheduledPool());
amqSession.initialize();
- amqSession.setInternal(internal);
+
+ if (internal) {
+ amqSession.disableSecurity();
+ }
+
sessions.put(ss.getSessionId(), amqSession);
sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId());
return amqSession;
@@ -780,10 +814,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public void removeDestination(ActiveMQDestination dest) throws Exception {
if (dest.isQueue()) {
SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName());
- protocolManager.getServer().destroyQueue(qName);
+ server.destroyQueue(qName);
}
else {
- Bindings bindings = protocolManager.getServer().getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName()));
+ Bindings bindings = server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName()));
Iterator<Binding> iterator = bindings.getBindings().iterator();
while (iterator.hasNext()) {
@@ -815,17 +849,15 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private void validateDestination(ActiveMQDestination destination) throws Exception {
if (destination.isQueue()) {
SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
- BindingQueryResult result = protocolManager.getServer().bindingQuery(physicalName);
+ BindingQueryResult result = server.bindingQuery(physicalName);
if (!result.isExists() && !result.isAutoCreateJmsQueues()) {
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
}
}
}
-
CommandProcessor commandProcessorInstance = new CommandProcessor();
-
// This will listen for commands throught the protocolmanager
public class CommandProcessor implements CommandVisitor {
@@ -934,18 +966,71 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
- protocolManager.removeSubscription(subInfo);
+ SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
+ server.destroyQueue(subQueueName);
+
return null;
}
@Override
public Response processRollbackTransaction(TransactionInfo info) throws Exception {
- protocolManager.rollbackTransaction(info);
- TransactionId txId = info.getTransactionId();
- txMap.remove(txId);
+ Transaction tx = lookupTX(info.getTransactionId(), null);
+ if (info.getTransactionId().isXATransaction() && tx == null) {
+ throw newXAException("Transaction '" + info.getTransactionId() + "' has not been started.", XAException.XAER_NOTA);
+ }
+ else if (tx != null) {
+
+ AMQSession amqSession = (AMQSession) tx.getProtocolData();
+
+ if (amqSession != null) {
+ amqSession.getCoreSession().resetTX(tx);
+
+ try {
+ returnReferences(tx, amqSession);
+ }
+ finally {
+ amqSession.getCoreSession().resetTX(null);
+ }
+ }
+ tx.rollback();
+ }
+
return null;
}
+ /**
+ * Openwire will redeliver rolled back references.
+ * We need to return those here.
+ */
+ private void returnReferences(Transaction tx, AMQSession session) throws Exception {
+ if (session == null || session.isClosed()) {
+ return;
+ }
+
+ RefsOperation oper = (RefsOperation) tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
+
+ if (oper != null) {
+ List<MessageReference> ackRefs = oper.getReferencesToAcknowledge();
+
+ for (ListIterator<MessageReference> referenceIterator = ackRefs.listIterator(ackRefs.size()); referenceIterator.hasPrevious(); ) {
+ MessageReference ref = referenceIterator.previous();
+
+ Long consumerID = ref.getConsumerId();
+
+ ServerConsumer consumer = null;
+ if (consumerID != null) {
+ consumer = session.getCoreSession().locateConsumer(consumerID);
+ }
+
+ if (consumer != null) {
+ referenceIterator.remove();
+ ref.incrementDeliveryCount();
+ consumer.backToDelivering(ref);
+ }
+ }
+ }
+ }
+
@Override
public Response processShutdown(ShutdownInfo info) throws Exception {
OpenWireConnection.this.shutdown(false);
@@ -989,44 +1074,135 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public Response processBeginTransaction(TransactionInfo info) throws Exception {
- TransactionId txId = info.getTransactionId();
+ final TransactionId txID = info.getTransactionId();
- if (!txMap.containsKey(txId)) {
- txMap.put(txId, info);
+ try {
+ internalSession.resetTX(null);
+ if (txID.isXATransaction()) {
+ Xid xid = OpenWireUtil.toXID(txID);
+ internalSession.xaStart(xid);
+ }
+ else {
+ Transaction transaction = internalSession.newTransaction();
+ txMap.put(txID, transaction);
+ transaction.addOperation(new TransactionOperationAbstract() {
+ @Override
+ public void afterCommit(Transaction tx) {
+ txMap.remove(txID);
+ }
+ });
+ }
+ }
+ finally {
+ internalSession.resetTX(null);
}
return null;
}
@Override
- public Response processBrokerInfo(BrokerInfo arg0) throws Exception {
- throw new IllegalStateException("not implemented! ");
+ public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+ return processCommit(info, true);
+ }
+
+ private Response processCommit(TransactionInfo info, boolean onePhase) throws Exception {
+ TransactionId txID = info.getTransactionId();
+
+ Transaction tx = lookupTX(txID, null);
+
+ AMQSession session = (AMQSession) tx.getProtocolData();
+
+ tx.commit(onePhase);
+
+ return null;
}
@Override
- public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
- try {
- protocolManager.commitTransactionOnePhase(info);
- TransactionId txId = info.getTransactionId();
- txMap.remove(txId);
+ public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+ return processCommit(info, false);
+ }
+
+ @Override
+ public Response processForgetTransaction(TransactionInfo info) throws Exception {
+ TransactionId txID = info.getTransactionId();
+
+ if (txID.isXATransaction()) {
+ try {
+ Xid xid = OpenWireUtil.toXID(info.getTransactionId());
+ internalSession.xaForget(xid);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
}
- catch (Exception e) {
- e.printStackTrace();
- throw e;
+ else {
+ txMap.remove(txID);
}
return null;
}
@Override
- public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
- protocolManager.commitTransactionTwoPhase(info);
- TransactionId txId = info.getTransactionId();
- txMap.remove(txId);
+ public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+ TransactionId txID = info.getTransactionId();
+
+ try {
+ if (txID.isXATransaction()) {
+ try {
+ Xid xid = OpenWireUtil.toXID(info.getTransactionId());
+ internalSession.xaPrepare(xid);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+ else {
+ Transaction tx = lookupTX(txID, null);
+ tx.prepare();
+ }
+ }
+ finally {
+ internalSession.resetTX(null);
+ }
+
+ return new IntegerResponse(XAResource.XA_RDONLY);
+ }
+
+ @Override
+ public Response processEndTransaction(TransactionInfo info) throws Exception {
+ TransactionId txID = info.getTransactionId();
+
+ if (txID.isXATransaction()) {
+ try {
+ Transaction tx = lookupTX(txID, null);
+ internalSession.resetTX(tx);
+ try {
+ Xid xid = OpenWireUtil.toXID(info.getTransactionId());
+ internalSession.xaEnd(xid);
+ }
+ finally {
+ internalSession.resetTX(null);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+ else {
+ txMap.remove(info);
+ }
return null;
}
@Override
+ public Response processBrokerInfo(BrokerInfo arg0) throws Exception {
+ throw new IllegalStateException("not implemented! ");
+ }
+
+ @Override
public Response processConnectionControl(ConnectionControl connectionControl) throws Exception {
//activemq5 keeps a var to remember only the faultTolerant flag
//this can be sent over a reconnected transport as the first command
@@ -1058,31 +1234,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
@Override
- public Response processEndTransaction(TransactionInfo info) throws Exception {
- protocolManager.endTransaction(info);
- TransactionId txId = info.getTransactionId();
-
- if (!txMap.containsKey(txId)) {
- txMap.put(txId, info);
- }
- return null;
- }
-
- @Override
public Response processFlush(FlushCommand arg0) throws Exception {
throw new IllegalStateException("not implemented! ");
}
@Override
- public Response processForgetTransaction(TransactionInfo info) throws Exception {
- TransactionId txId = info.getTransactionId();
- txMap.remove(txId);
-
- protocolManager.forgetTransaction(info.getTransactionId());
- return null;
- }
-
- @Override
public Response processKeepAlive(KeepAliveInfo arg0) throws Exception {
throw new IllegalStateException("not implemented! ");
}
@@ -1097,15 +1253,32 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
AMQSession session = getSession(producerId.getParentId());
- session.send(producerInfo, messageSend, sendProducerAck);
+ Transaction tx = lookupTX(messageSend.getTransactionId(), session);
+
+ session.getCoreSession().resetTX(tx);
+ try {
+ session.send(producerInfo, messageSend, sendProducerAck);
+ }
+ finally {
+ session.getCoreSession().resetTX(null);
+ }
+
return null;
}
-
@Override
public Response processMessageAck(MessageAck ack) throws Exception {
- AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId());
- consumerBrokerExchange.acknowledge(ack);
+ AMQSession session = getSession(ack.getConsumerId().getParentId());
+ Transaction tx = lookupTX(ack.getTransactionId(), session);
+ session.getCoreSession().resetTX(tx);
+
+ try {
+ AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId());
+ consumerBrokerExchange.acknowledge(ack);
+ }
+ finally {
+ session.getCoreSession().resetTX(null);
+ }
return null;
}
@@ -1130,13 +1303,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
@Override
- public Response processPrepareTransaction(TransactionInfo info) throws Exception {
- protocolManager.prepareTransaction(info);
- //activemq needs a rdonly response
- return new IntegerResponse(XAResource.XA_RDONLY);
- }
-
- @Override
public Response processProducerAck(ProducerAck arg0) throws Exception {
// a broker doesn't do producers.. this shouldn't happen
return null;
@@ -1144,20 +1310,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public Response processRecoverTransactions(TransactionInfo info) throws Exception {
- Set<SessionId> sIds = state.getSessionIds();
-
-
+ List<Xid> xids = server.getResourceManager().getInDoubtTransactions();
List<TransactionId> recovered = new ArrayList<>();
- if (sIds != null) {
- for (SessionId sid : sIds) {
- AMQSession s = sessions.get(sid);
- if (s != null) {
- s.recover(recovered);
- }
- }
+ for (Xid xid : xids) {
+ XATransactionId amqXid = new XATransactionId(xid);
+ recovered.add(amqXid);
}
-
- return new DataArrayResponse(recovered.toArray(new TransactionId[0]));
+ return new DataArrayResponse(recovered.toArray(new TransactionId[recovered.size()]));
}
@Override
@@ -1186,15 +1345,45 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
ConsumerInfo info = consumerState.getInfo();
info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
- AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(id);
+ AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.remove(id);
consumerBrokerExchange.removeConsumer();
- removeConsumerBrokerExchange(id);
+ return null;
+ }
+
+ }
+ private Transaction lookupTX(TransactionId txID, AMQSession session) throws IllegalStateException {
+ if (txID == null) {
return null;
}
+ Xid xid = null;
+ Transaction transaction;
+ if (txID.isXATransaction()) {
+ xid = OpenWireUtil.toXID(txID);
+ transaction = server.getResourceManager().getTransaction(xid);
+ }
+ else {
+ transaction = txMap.get(txID);
+ }
+
+ if (transaction == null) {
+ throw new IllegalStateException("cannot find transactionInfo::" + txID + " xid=" + xid);
+ }
+
+ if (session != null && transaction.getProtocolData() != session) {
+ transaction.setProtocolData(session);
+ }
+
+ return transaction;
+ }
+
+ public static XAException newXAException(String s, int errorCode) {
+ XAException xaException = new XAException(s + " " + "xaErrorCode:" + errorCode);
+ xaException.errorCode = errorCode;
+ return xaException;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 89f71ed..b0a6d46 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
+import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index bbbb696..7d9c25a 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.protocol.openwire;
import javax.jms.InvalidClientIDException;
-import javax.transaction.xa.XAException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -33,7 +32,6 @@ import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
-import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
@@ -65,11 +63,7 @@ import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
-import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.state.ProducerState;
@@ -96,15 +90,10 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<>();
- // TODO-NOW: this can probably go away
private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<String, AMQConnectionContext>();
private String brokerName;
- // Clebert: Artemis already has a Resource Manager. Need to remove this..
- // The TransactionID extends XATransactionID, so all we need is to convert the XID here
- private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<>();
-
private final Map<String, TopologyMember> topologyMap = new ConcurrentHashMap<>();
private final LinkedList<TopologyMember> members = new LinkedList<>();
@@ -140,7 +129,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
}
public OpenWireFormat getNewWireFormat() {
- return (OpenWireFormat)wireFactory.createWireFormat();
+ return (OpenWireFormat) wireFactory.createWireFormat();
}
@Override
@@ -156,9 +145,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
}
}
-
- public void removeConnection(ConnectionInfo info,
- Throwable error) throws InvalidClientIDException {
+ public void removeConnection(ConnectionInfo info, Throwable error) throws InvalidClientIDException {
synchronized (clientIdSet) {
String clientId = info.getClientId();
if (clientId != null) {
@@ -176,7 +163,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
}
}
-
public ScheduledExecutorService getScheduledPool() {
return scheduledPool;
}
@@ -223,7 +209,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
- OpenWireConnection owConn = new OpenWireConnection(connection, server.getExecutorFactory().getExecutor(), this, wf);
+ OpenWireConnection owConn = new OpenWireConnection(connection, server, server.getExecutorFactory().getExecutor(), this, wf);
owConn.sendHandshake();
// TODO CLEBERT What is this constant here? we should get it from TTL initial pings
@@ -323,7 +309,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
fireAdvisory(context, topic, copy);
// init the conn
- context.getConnection().addSessions( context.getConnectionState().getSessionIds());
+ context.getConnection().addSessions(context.getConnectionState().getSessionIds());
}
}
@@ -343,9 +329,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
* See AdvisoryBroker.fireAdvisory()
*/
public void fireAdvisory(AMQConnectionContext context,
- ActiveMQTopic topic,
- Command command,
- ConsumerId targetConsumerId) throws Exception {
+ ActiveMQTopic topic,
+ Command command,
+ ConsumerId targetConsumerId) throws Exception {
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
@@ -448,55 +434,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
public boolean isStopping() {
return false;
}
- public void endTransaction(TransactionInfo info) throws Exception {
- AMQSession txSession = transactions.get(info.getTransactionId());
-
- if (txSession != null) {
- txSession.endTransaction(info);
- }
- }
-
- public void commitTransactionOnePhase(TransactionInfo info) throws Exception {
- AMQSession txSession = transactions.get(info.getTransactionId());
-
- if (txSession != null) {
- txSession.commitOnePhase(info);
- }
- transactions.remove(info.getTransactionId());
- }
-
- public void prepareTransaction(TransactionInfo info) throws Exception {
- XATransactionId xid = (XATransactionId) info.getTransactionId();
- AMQSession txSession = transactions.get(xid);
- if (txSession != null) {
- txSession.prepareTransaction(xid);
- }
- }
-
- public void commitTransactionTwoPhase(TransactionInfo info) throws Exception {
- XATransactionId xid = (XATransactionId) info.getTransactionId();
- AMQSession txSession = transactions.get(xid);
- if (txSession != null) {
- txSession.commitTwoPhase(xid);
- }
- transactions.remove(xid);
- }
-
- public void rollbackTransaction(TransactionInfo info) throws Exception {
- AMQSession txSession = transactions.get(info.getTransactionId());
- if (txSession != null) {
- txSession.rollback(info);
- }
- else if (info.getTransactionId().isLocalTransaction()) {
- //during a broker restart, recovered local transaction may not be registered
- //in that case we ignore and let the tx removed silently by connection.
- //see AMQ1925Test.testAMQ1925_TXBegin
- }
- else {
- throw newXAException("Transaction '" + info.getTransactionId() + "' has not been started.", XAException.XAER_NOTA);
- }
- transactions.remove(info.getTransactionId());
- }
public boolean validateUser(String login, String passcode) {
boolean validated = true;
@@ -510,26 +447,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
return validated;
}
- public void forgetTransaction(TransactionId xid) throws Exception {
- AMQSession txSession = transactions.get(xid);
- if (txSession != null) {
- txSession.forget(xid);
- }
- transactions.remove(xid);
- }
-
- /**
- * TODO: remove this, use the regular ResourceManager from the Server's
- */
- public void registerTx(TransactionId txId, AMQSession amqSession) {
- transactions.put(txId, amqSession);
- }
-
- public void removeSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
- SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
- server.destroyQueue(subQueueName);
- }
-
public void sendBrokerInfo(OpenWireConnection connection) throws Exception {
BrokerInfo brokerInfo = new BrokerInfo();
brokerInfo.setBrokerName(getBrokerName());
@@ -543,14 +460,26 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
connection.dispatch(brokerInfo);
}
+ /**
+ * URI property
+ */
+ @SuppressWarnings("unused")
public void setRebalanceClusterClients(boolean rebalance) {
this.rebalanceClusterClients = rebalance;
}
+ /**
+ * URI property
+ */
+ @SuppressWarnings("unused")
public boolean isRebalanceClusterClients() {
return this.rebalanceClusterClients;
}
+ /**
+ * URI property
+ */
+ @SuppressWarnings("unused")
public void setUpdateClusterClients(boolean updateClusterClients) {
this.updateClusterClients = updateClusterClients;
}
@@ -559,10 +488,18 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
return this.updateClusterClients;
}
+ /**
+ * URI property
+ */
+ @SuppressWarnings("unused")
public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
}
+ /**
+ * URI property
+ */
+ @SuppressWarnings("unused")
public boolean isUpdateClusterClientsOnRemove() {
return this.updateClusterClientsOnRemove;
}
@@ -571,10 +508,4 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
this.brokerName = name;
}
- public static XAException newXAException(String s, int errorCode) {
- XAException xaException = new XAException(s + " " + "xaErrorCode:" + errorCode);
- xaException.errorCode = errorCode;
- return xaException;
- }
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
deleted file mode 100644
index 4513eb3..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.ByteSequence;
-
-public class OpenWireUtil {
-
- public static ActiveMQBuffer toActiveMQBuffer(ByteSequence bytes) {
- ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bytes.length);
-
- buffer.writeBytes(bytes.data, bytes.offset, bytes.length);
- return buffer;
- }
-
- public static SimpleString toCoreAddress(ActiveMQDestination dest) {
- if (dest.isQueue()) {
- return new SimpleString("jms.queue." + dest.getPhysicalName());
- }
- else {
- return new SimpleString("jms.topic." + dest.getPhysicalName());
- }
- }
-
- /**
- * We convert the core address to an ActiveMQ Destination. We use the actual address on the message rather than the
- * destination set on the consumer because it maybe different and the JMS spec says that it should be what ever was
- * set on publish/send so a divert or wildcard may mean thats its different to the destination subscribed to by the
- * consumer
- */
- public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) {
- String address = message.getAddress().toString();
- String strippedAddress = address.replace("jms.queue.", "").replace("jms.topic.", "");
- if (actualDestination.isQueue()) {
- return new ActiveMQQueue(strippedAddress);
- }
- else {
- return new ActiveMQTopic(strippedAddress);
- }
- }
-
- /*
- *This util converts amq wildcards to compatible core wildcards
- *The conversion is like this:
- *AMQ * wildcard --> Core * wildcard (no conversion)
- *AMQ > wildcard --> Core # wildcard
- */
- public static String convertWildcard(String physicalName) {
- return physicalName.replaceAll("(\\.>)+", ".#");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
index 56b4b6d..5b9d72e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
@@ -48,7 +48,7 @@ public class AMQCompositeConsumerBrokerExchange extends AMQConsumerBrokerExchang
public void acknowledge(MessageAck ack) throws Exception {
AMQConsumer amqConsumer = consumerMap.get(ack.getDestination());
if (amqConsumer != null) {
- amqSession.acknowledge(ack, amqConsumer);
+ amqConsumer.acknowledge(ack);
}
}
@@ -58,4 +58,11 @@ public class AMQCompositeConsumerBrokerExchange extends AMQConsumerBrokerExchang
amqConsumer.removeConsumer();
}
}
+
+ @Override
+ public void updateConsumerPrefetchSize(int prefetch) {
+ for (AMQConsumer amqConsumer : consumerMap.values()) {
+ amqConsumer.setPrefetchSize(prefetch);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index ef9b2a8..7a06c73 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -17,10 +17,8 @@
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.io.IOException;
-import java.util.Iterator;
-import java.util.Set;
+import java.util.List;
import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -30,11 +28,14 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
+import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
+import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
@@ -42,7 +43,6 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.TransactionId;
import org.apache.activemq.wireformat.WireFormat;
public class AMQConsumer {
@@ -50,11 +50,10 @@ public class AMQConsumer {
private org.apache.activemq.command.ActiveMQDestination openwireDestination;
private ConsumerInfo info;
private final ScheduledExecutorService scheduledPool;
- private long nativeId = -1;
+ private ServerConsumer serverConsumer;
private int prefetchSize;
- private AtomicInteger windowAvailable;
- private final java.util.Queue<MessageInfo> deliveringRefs = new ConcurrentLinkedQueue<>();
+ private AtomicInteger currentWindow;
private long messagePullSequence = 0;
private MessagePullHandler messagePullHandler;
@@ -67,20 +66,13 @@ public class AMQConsumer {
this.info = info;
this.scheduledPool = scheduledPool;
this.prefetchSize = info.getPrefetchSize();
- this.windowAvailable = new AtomicInteger(prefetchSize);
+ this.currentWindow = new AtomicInteger(prefetchSize);
if (prefetchSize == 0) {
messagePullHandler = new MessagePullHandler();
}
}
public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
- this.nativeId = nativeId;
- AMQServerConsumer serverConsumer = createServerConsumer(info, slowConsumerDetectionListener);
- serverConsumer.setAmqConsumer(this);
- }
-
-
- private AMQServerConsumer createServerConsumer(ConsumerInfo info, SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
@@ -93,13 +85,13 @@ public class AMQConsumer {
SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), physicalName, info.getSubscriptionName(), selector, address);
- AMQServerConsumer serverConsumer = (AMQServerConsumer) session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
+ serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
- return serverConsumer;
}
else {
- SimpleString queueName = new SimpleString("jms.queue." + physicalName);
- AMQServerConsumer serverConsumer = (AMQServerConsumer) session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
+ SimpleString queueName = OpenWireUtil.toCoreAddress(openwireDestination);
+ session.getCoreServer().getJMSQueueCreator().create(queueName);
+ serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString());
if (addrSettings != null) {
@@ -113,10 +105,10 @@ public class AMQConsumer {
}
}
- return serverConsumer;
-
}
+ serverConsumer.setProtocolData(this);
+
}
private SimpleString createTopicSubscription(boolean isDurable,
@@ -167,12 +159,6 @@ public class AMQConsumer {
return queueName;
}
-
-
- public long getNativeId() {
- return this.nativeId;
- }
-
public ConsumerId getId() {
return info.getConsumerId();
}
@@ -182,16 +168,17 @@ public class AMQConsumer {
}
public void acquireCredit(int n) throws Exception {
- boolean promptDelivery = windowAvailable.get() == 0;
- if (windowAvailable.get() < prefetchSize) {
- this.windowAvailable.addAndGet(n);
- }
+ int oldwindow = currentWindow.getAndAdd(n);
+
+ boolean promptDelivery = oldwindow < prefetchSize;
+
if (promptDelivery) {
- session.getCoreSession().promptDelivery(nativeId);
+ serverConsumer.promptDelivery();
}
+
}
- public int handleDeliver(ServerMessage message, int deliveryCount) {
+ public int handleDeliver(MessageReference reference, ServerMessage message, int deliveryCount) {
MessageDispatch dispatch;
try {
if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) {
@@ -200,9 +187,9 @@ public class AMQConsumer {
//decrement deliveryCount as AMQ client tends to add 1.
dispatch = OpenWireMessageConverter.createMessageDispatch(message, deliveryCount - 1, this);
int size = dispatch.getMessage().getSize();
- this.deliveringRefs.add(new MessageInfo(dispatch.getMessage().getMessageId(), message.getMessageID(), size));
+ reference.setProtocolData(dispatch.getMessage().getMessageId());
session.deliverMessage(dispatch);
- windowAvailable.decrementAndGet();
+ currentWindow.decrementAndGet();
return size;
}
catch (IOException e) {
@@ -218,114 +205,59 @@ public class AMQConsumer {
md.setConsumerId(getId());
md.setDestination(openwireDestination);
session.deliverMessage(md);
- windowAvailable.decrementAndGet();
}
+ /** The acknowledgement in openwire is done based on intervals.
+ * We will iterate through the list of delivering messages at {@link ServerConsumer#getDeliveringReferencesBasedOnProtocol(boolean, Object, Object)}
+ * and add those to the Transaction.
+ * Notice that we will start a new transaction on the cases where there is no transaction. */
public void acknowledge(MessageAck ack) throws Exception {
+
+
MessageId first = ack.getFirstMessageId();
- MessageId lastm = ack.getLastMessageId();
- TransactionId tid = ack.getTransactionId();
- boolean isLocalTx = (tid != null) && tid.isLocalTransaction();
- boolean single = lastm.equals(first);
-
- MessageInfo mi = null;
- int n = 0;
-
- if (ack.isIndividualAck()) {
- Iterator<MessageInfo> iter = deliveringRefs.iterator();
- while (iter.hasNext()) {
- mi = iter.next();
- if (mi.amqId.equals(lastm)) {
- n++;
- if (!isLocalTx) {
- iter.remove();
- session.getCoreSession().individualAcknowledge(nativeId, mi.nativeId);
- }
- else {
- mi.setLocalAcked(true);
- }
- if (tid == null) {
- session.getCoreSession().commit();
- }
- break;
- }
- }
+ MessageId last = ack.getLastMessageId();
+
+ if (first == null) {
+ first = last;
}
- else if (ack.isRedeliveredAck()) {
- //client tells that this message is for redlivery.
- //do nothing until poisoned.
- n = ack.getMessageCount();
+
+ boolean removeReferences = !serverConsumer.isBrowseOnly(); // if it's browse only, nothing to be acked, we just remove the lists
+
+ if (ack.isRedeliveredAck() || ack.isDeliveredAck() || ack.isExpiredAck()) {
+ removeReferences = false;
}
- else if (ack.isPoisonAck()) {
- //send to dlq
- Iterator<MessageInfo> iter = deliveringRefs.iterator();
- boolean firstFound = false;
- while (iter.hasNext()) {
- mi = iter.next();
- if (mi.amqId.equals(first)) {
- n++;
- iter.remove();
- session.getCoreSession().moveToDeadLetterAddress(nativeId, mi.nativeId, ack.getPoisonCause());
- session.getCoreSession().commit();
- if (single) {
- break;
- }
- firstFound = true;
- }
- else if (firstFound || first == null) {
- n++;
- iter.remove();
- session.getCoreSession().moveToDeadLetterAddress(nativeId, mi.nativeId, ack.getPoisonCause());
- session.getCoreSession().commit();
- if (mi.amqId.equals(lastm)) {
- break;
- }
- }
+
+ List<MessageReference> ackList = serverConsumer.getDeliveringReferencesBasedOnProtocol(removeReferences, first, last);
+
+ acquireCredit(ack.getMessageCount());
+
+ if (removeReferences) {
+
+ Transaction originalTX = session.getCoreSession().getCurrentTransaction();
+ Transaction transaction;
+
+ if (originalTX == null) {
+ transaction = session.getCoreSession().newTransaction();
}
- }
- else if (ack.isDeliveredAck() || ack.isExpiredAck()) {
- //ToDo: implement with tests
- n = ack.getMessageCount();
- }
- else {
- Iterator<MessageInfo> iter = deliveringRefs.iterator();
- boolean firstFound = false;
- while (iter.hasNext()) {
- MessageInfo ami = iter.next();
- if (ami.amqId.equals(first)) {
- n++;
- if (!isLocalTx) {
- iter.remove();
- }
- else {
- ami.setLocalAcked(true);
- }
- if (single) {
- mi = ami;
- break;
- }
- firstFound = true;
+ else {
+ transaction = originalTX;
+ }
+
+ if (ack.isIndividualAck() || ack.isStandardAck()) {
+ for (MessageReference ref : ackList) {
+ ref.acknowledge(transaction);
}
- else if (firstFound || first == null) {
- n++;
- if (!isLocalTx) {
- iter.remove();
- }
- else {
- ami.setLocalAcked(true);
- }
- if (ami.amqId.equals(lastm)) {
- mi = ami;
- break;
- }
+ }
+ else if (ack.isPoisonAck()) {
+ for (MessageReference ref : ackList) {
+ ref.getQueue().sendToDeadLetterAddress(transaction, ref);
}
}
- if (mi != null && !isLocalTx) {
- session.getCoreSession().acknowledge(nativeId, mi.nativeId);
+
+ if (originalTX == null) {
+ transaction.commit(true);
}
}
-
- acquireCredit(n);
}
public void browseFinished() {
@@ -337,61 +269,23 @@ public class AMQConsumer {
session.deliverMessage(md);
}
- //this is called before session commit a local tx
- public void finishTx() throws Exception {
- MessageInfo lastMi = null;
-
- MessageInfo mi = null;
- Iterator<MessageInfo> iter = deliveringRefs.iterator();
- while (iter.hasNext()) {
- mi = iter.next();
- if (mi.isLocalAcked()) {
- iter.remove();
- lastMi = mi;
- }
- }
-
- if (lastMi != null) {
- session.getCoreSession().acknowledge(nativeId, lastMi.nativeId);
- }
- }
-
- public void rollbackTx(Set<Long> acked) throws Exception {
- MessageInfo lastMi = null;
-
- MessageInfo mi = null;
- Iterator<MessageInfo> iter = deliveringRefs.iterator();
- while (iter.hasNext()) {
- mi = iter.next();
- if (mi.isLocalAcked()) {
- acked.add(mi.nativeId);
- lastMi = mi;
- }
- }
-
- if (lastMi != null) {
- session.getCoreSession().acknowledge(nativeId, lastMi.nativeId);
- }
- }
-
public ConsumerInfo getInfo() {
return info;
}
public boolean hasCredits() {
- return windowAvailable.get() > 0;
+ return currentWindow.get() > 0;
}
public void processMessagePull(MessagePull messagePull) throws Exception {
- windowAvailable.incrementAndGet();
-
+ currentWindow.incrementAndGet();
if (messagePullHandler != null) {
messagePullHandler.nextSequence(messagePullSequence++, messagePull.getTimeout());
}
}
public void removeConsumer() throws Exception {
- session.removeConsumer(nativeId);
+ serverConsumer.close(false);
}
public org.apache.activemq.command.ActiveMQDestination getOpenwireDestination() {
@@ -400,10 +294,10 @@ public class AMQConsumer {
public void setPrefetchSize(int prefetchSize) {
this.prefetchSize = prefetchSize;
- this.windowAvailable.set(prefetchSize);
+ this.currentWindow.set(prefetchSize);
this.info.setPrefetchSize(prefetchSize);
if (this.prefetchSize > 0) {
- session.getCoreSession().promptDelivery(nativeId);
+ serverConsumer.promptDelivery();
}
}
@@ -421,7 +315,7 @@ public class AMQConsumer {
this.next = next;
this.timeout = timeout;
latch = new CountDownLatch(1);
- session.getCoreSession().forceConsumerDelivery(nativeId, messagePullSequence);
+ serverConsumer.forceDelivery(messagePullSequence);
//if we are 0 timeout or less we need to wait to get either the forced message or a real message.
if (timeout <= 0) {
latch.await(10, TimeUnit.SECONDS);
@@ -434,7 +328,6 @@ public class AMQConsumer {
public boolean checkForcedConsumer(ServerMessage message) {
if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
- System.out.println("MessagePullHandler.checkForcedConsumer");
if (next >= 0) {
if (timeout <= 0) {
latch.countDown();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
index 21a45b1..0132465 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
@@ -32,4 +32,6 @@ public abstract class AMQConsumerBrokerExchange {
public abstract void processMessagePull(MessagePull messagePull) throws Exception;
public abstract void removeConsumer() throws Exception;
+
+ public abstract void updateConsumerPrefetchSize(int prefetch);
}