You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:50 UTC
[41/42] activemq-artemis git commit: ARTEMIS-463 More simplifications
on the openwire head https://issues.apache.org/jira/browse/ARTEMIS-463
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
deleted file mode 100644
index 2f9d0bc..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import java.util.List;
-
-import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.postoffice.QueueBinding;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.QueueImpl;
-import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
-import org.apache.activemq.artemis.core.server.management.ManagementService;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-
-public class AMQServerConsumer extends ServerConsumerImpl {
-
- // TODO-NOW: remove this once unified
- AMQConsumer amqConsumer;
-
- public AMQConsumer getAmqConsumer() {
- return amqConsumer;
- }
-
- /** TODO-NOW: remove this once unified */
- public void setAmqConsumer(AMQConsumer amqConsumer) {
- this.amqConsumer = amqConsumer;
- }
-
- public AMQServerConsumer(long consumerID,
- AMQServerSession serverSession,
- QueueBinding binding,
- Filter filter,
- boolean started,
- boolean browseOnly,
- StorageManager storageManager,
- SessionCallback callback,
- boolean preAcknowledge,
- boolean strictUpdateDeliveryCount,
- ManagementService managementService,
- boolean supportLargeMessage,
- Integer credits,
- final ActiveMQServer server) throws Exception {
- super(consumerID, serverSession, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
- }
-
- public void amqPutBackToDeliveringList(final List<MessageReference> refs) {
- synchronized (this.deliveringRefs) {
- for (MessageReference ref : refs) {
- ref.incrementDeliveryCount();
- deliveringRefs.add(ref);
- }
- //adjust the order. Suppose deliveringRefs has 2 existing
- //refs m1, m2, and refs has 3 m3, m4, m5
- //new order must be m3, m4, m5, m1, m2
- if (refs.size() > 0) {
- long first = refs.get(0).getMessage().getMessageID();
- MessageReference m = deliveringRefs.peek();
- while (m.getMessage().getMessageID() != first) {
- deliveringRefs.poll();
- deliveringRefs.add(m);
- m = deliveringRefs.peek();
- }
- }
- }
- }
-
- public void moveToDeadLetterAddress(long mid, Throwable cause) throws Exception {
- MessageReference ref = removeReferenceByID(mid);
-
- if (ref == null) {
- throw new IllegalStateException("Cannot find ref to ack " + mid);
- }
-
- ServerMessage coreMsg = ref.getMessage();
- coreMsg.putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, cause.toString());
-
- QueueImpl queue = (QueueImpl) ref.getQueue();
- synchronized (queue) {
- queue.sendToDeadLetterAddress(ref);
- queue.decDelivering();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
deleted file mode 100644
index 3f0259d..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.activemq.artemis.api.core.Pair;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
-import org.apache.activemq.artemis.api.core.management.ManagementHelper;
-import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
-import org.apache.activemq.artemis.core.persistence.OperationContext;
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.core.postoffice.BindingType;
-import org.apache.activemq.artemis.core.postoffice.PostOffice;
-import org.apache.activemq.artemis.core.postoffice.QueueBinding;
-import org.apache.activemq.artemis.core.protocol.openwire.AMQTransactionImpl;
-import org.apache.activemq.artemis.core.security.SecurityStore;
-import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.QueueCreator;
-import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
-import org.apache.activemq.artemis.core.server.impl.RefsOperation;
-import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
-import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
-import org.apache.activemq.artemis.core.server.management.ManagementService;
-import org.apache.activemq.artemis.core.server.management.Notification;
-import org.apache.activemq.artemis.core.transaction.ResourceManager;
-import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-import org.apache.activemq.artemis.utils.TypedProperties;
-import org.apache.activemq.artemis.utils.UUID;
-
-public class AMQServerSession extends ServerSessionImpl {
-
- private boolean internal;
-
- public AMQServerSession(String name,
- String username,
- String password,
- int minLargeMessageSize,
- boolean autoCommitSends,
- boolean autoCommitAcks,
- boolean preAcknowledge,
- boolean persistDeliveryCountBeforeDelivery,
- boolean xa,
- RemotingConnection connection,
- StorageManager storageManager,
- PostOffice postOffice,
- ResourceManager resourceManager,
- SecurityStore securityStore,
- ManagementService managementService,
- ActiveMQServerImpl activeMQServerImpl,
- SimpleString managementAddress,
- SimpleString simpleString,
- SessionCallback callback,
- QueueCreator queueCreator,
- OperationContext context) throws Exception {
- super(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, persistDeliveryCountBeforeDelivery, xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, activeMQServerImpl, managementAddress, simpleString, callback, context, new AMQTransactionFactory(), queueCreator);
- }
-
- @Override
- protected void doClose(final boolean failed) throws Exception {
- Set<ServerConsumer> consumersClone = new HashSet<>(consumers.values());
- for (ServerConsumer consumer : consumersClone) {
- AMQServerConsumer amqConsumer = (AMQServerConsumer)consumer;
- amqConsumer.setStarted(false);
- }
-
- synchronized (this) {
- if (tx != null && tx.getXid() == null) {
- ((AMQTransactionImpl) tx).setRollbackForClose();
- }
- }
- super.doClose(failed);
- }
-
- public AtomicInteger getConsumerCredits(final long consumerID) {
- ServerConsumer consumer = consumers.get(consumerID);
-
- if (consumer == null) {
- ActiveMQServerLogger.LOGGER.debug("There is no consumer with id " + consumerID);
-
- return null;
- }
-
- return ((ServerConsumerImpl) consumer).getAvailableCredits();
- }
-
- public void enableXA() throws Exception {
- if (!this.xa) {
- if (this.tx != null) {
- //that's not expected, maybe a warning.
- this.tx.rollback();
- this.tx = null;
- }
-
- this.autoCommitAcks = false;
- this.autoCommitSends = false;
-
- this.xa = true;
- }
- }
-
- public void enableTx() throws Exception {
- if (this.xa) {
- throw new IllegalStateException("Session is XA");
- }
-
- this.autoCommitAcks = false;
- this.autoCommitSends = false;
-
- if (this.tx != null) {
- //that's not expected, maybe a warning.
- this.tx.rollback();
- this.tx = null;
- }
-
- this.tx = newTransaction();
- }
-
- //amq specific behavior
-
- // TODO: move this to AMQSession
- public void amqRollback(Set<Long> acked) throws Exception {
- if (tx == null) {
- // Might be null if XA
-
- tx = newTransaction();
- }
-
- RefsOperation oper = (RefsOperation) tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
-
- if (oper != null) {
- List<MessageReference> ackRefs = oper.getReferencesToAcknowledge();
- Map<Long, List<MessageReference>> toAcks = new HashMap<>();
- for (MessageReference ref : ackRefs) {
- Long consumerId = ref.getConsumerId();
-
- if (this.consumers.containsKey(consumerId)) {
- if (acked.contains(ref.getMessage().getMessageID())) {
- List<MessageReference> ackList = toAcks.get(consumerId);
- if (ackList == null) {
- ackList = new ArrayList<>();
- toAcks.put(consumerId, ackList);
- }
- ackList.add(ref);
- }
- }
- else {
- //consumer must have been closed, cancel to queue
- ref.getQueue().cancel(tx, ref);
- }
- }
- //iterate consumers
- if (toAcks.size() > 0) {
- Iterator<Entry<Long, List<MessageReference>>> iter = toAcks.entrySet().iterator();
- while (iter.hasNext()) {
- Entry<Long, List<MessageReference>> entry = iter.next();
- ServerConsumer consumer = consumers.get(entry.getKey());
- ((AMQServerConsumer) consumer).amqPutBackToDeliveringList(entry.getValue());
- }
- }
- }
-
- tx.rollback();
-
- if (xa) {
- tx = null;
- }
- else {
- tx = newTransaction();
- }
-
- }
-
- /**
- * The failed flag is used here to control delivery count.
- * If set to true the delivery count won't decrement.
- */
- public void amqCloseConsumer(long consumerID, boolean failed) throws Exception {
- final ServerConsumer consumer = consumers.get(consumerID);
-
- if (consumer != null) {
- consumer.close(failed);
- }
- else {
- ActiveMQServerLogger.LOGGER.cannotFindConsumer(consumerID);
- }
- }
-
- @Override
- public ServerConsumer createConsumer(final long consumerID,
- final SimpleString queueName,
- final SimpleString filterString,
- final boolean browseOnly,
- final boolean supportLargeMessage,
- final Integer credits) throws Exception {
- if (this.internal) {
- // Clebert TODO: PQP!!!!!!!!!!!!!!!!!!!!
-
- //internal sessions doesn't check security:: Why??? //// what's the reason for that? Where a link?
-
- Binding binding = postOffice.getBinding(queueName);
-
- if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE) {
- throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
- }
-
- Filter filter = FilterImpl.createFilter(filterString);
-
- ServerConsumer consumer = newConsumer(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits);
- consumers.put(consumer.getID(), consumer);
-
- if (!browseOnly) {
- TypedProperties props = new TypedProperties();
-
- props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
-
- props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
-
- props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
-
- props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
-
- Queue theQueue = (Queue) binding.getBindable();
-
- props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
-
- // HORNETQ-946
- props.putSimpleStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(username));
-
- props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, SimpleString.toSimpleString(this.remotingConnection.getRemoteAddress()));
-
- props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(name));
-
- if (filterString != null) {
- props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
- }
-
- Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CREATED, props);
-
- if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
- ActiveMQServerLogger.LOGGER.debug("Session with user=" + username + ", connection=" + this.remotingConnection + " created a consumer on queue " + queueName + ", filter = " + filterString);
- }
-
- managementService.sendNotification(notification);
- }
-
- return consumer;
- }
- else {
- return super.createConsumer(consumerID, queueName, filterString, browseOnly, supportLargeMessage, credits);
- }
- }
-
- @Override
- public Queue createQueue(final SimpleString address,
- final SimpleString name,
- final SimpleString filterString,
- final boolean temporary,
- final boolean durable) throws Exception {
- if (!this.internal) {
- return super.createQueue(address, name, filterString, temporary, durable);
- }
-
- Queue queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary);
-
- if (temporary) {
- // Temporary queue in core simply means the queue will be deleted if
- // the remoting connection
- // dies. It does not mean it will get deleted automatically when the
- // session is closed.
- // It is up to the user to delete the queue when finished with it
-
- TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(server, name);
-
- remotingConnection.addCloseListener(cleaner);
- remotingConnection.addFailureListener(cleaner);
-
- tempQueueCleannerUppers.put(name, cleaner);
- }
-
- if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
- ActiveMQServerLogger.LOGGER.debug("Queue " + name + " created on address " + name +
- " with filter=" + filterString + " temporary = " +
- temporary + " durable=" + durable + " on session user=" + this.username + ", connection=" + this.remotingConnection);
- }
-
- return queue;
- }
-
-
- // Clebert TODO: Get rid of these mthods
- @Override
- protected void doSend(final ServerMessage msg, final boolean direct) throws Exception {
- if (!this.internal) {
- super.doSend(msg, direct);
- return;
- }
-
- //bypass security check for internal sessions
- if (tx == null || autoCommitSends) {
- }
- else {
- routingContext.setTransaction(tx);
- }
-
- try {
- postOffice.route(msg, getQueueCreator(), routingContext, direct);
-
- Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
-
- if (value == null) {
- targetAddressInfos.put(msg.getAddress(), new Pair<>(msg.getUserID(), new AtomicLong(1)));
- }
- else {
- value.setA(msg.getUserID());
- value.getB().incrementAndGet();
- }
- }
- finally {
- routingContext.clear();
- }
- }
-
- @Override
- protected ServerConsumer newConsumer(long consumerID,
- ServerSessionImpl serverSessionImpl,
- QueueBinding binding,
- Filter filter,
- boolean started2,
- boolean browseOnly,
- StorageManager storageManager2,
- SessionCallback callback2,
- boolean preAcknowledge2,
- boolean strictUpdateDeliveryCount2,
- ManagementService managementService2,
- boolean supportLargeMessage,
- Integer credits) throws Exception {
- return new AMQServerConsumer(consumerID, this, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, this.server);
- }
-
- public AMQServerConsumer getConsumer(long nativeId) {
- return (AMQServerConsumer) this.consumers.get(nativeId);
- }
-
- public void setInternal(boolean internal) {
- this.internal = internal;
- }
-
- public boolean isInternal() {
- return this.internal;
- }
-
- public void moveToDeadLetterAddress(long consumerId, long mid, Throwable cause) throws Exception {
- AMQServerConsumer consumer = getConsumer(consumerId);
- consumer.moveToDeadLetterAddress(mid, cause);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
deleted file mode 100644
index a6ca4a0..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.persistence.OperationContext;
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.postoffice.PostOffice;
-import org.apache.activemq.artemis.core.security.SecurityStore;
-import org.apache.activemq.artemis.core.server.QueueCreator;
-import org.apache.activemq.artemis.core.server.ServerSessionFactory;
-import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
-import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
-import org.apache.activemq.artemis.core.server.management.ManagementService;
-import org.apache.activemq.artemis.core.transaction.ResourceManager;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-
-public class AMQServerSessionFactory implements ServerSessionFactory {
-
- private static final AMQServerSessionFactory singleInstance = new AMQServerSessionFactory();
-
- public static AMQServerSessionFactory getInstance() {
- return singleInstance;
- }
-
- private AMQServerSessionFactory() {
- }
-
- @Override
- public ServerSessionImpl createCoreSession(String name,
- String username,
- String password,
- int minLargeMessageSize,
- boolean autoCommitSends,
- boolean autoCommitAcks,
- boolean preAcknowledge,
- boolean persistDeliveryCountBeforeDelivery,
- boolean xa,
- RemotingConnection connection,
- StorageManager storageManager,
- PostOffice postOffice,
- ResourceManager resourceManager,
- SecurityStore securityStore,
- ManagementService managementService,
- ActiveMQServerImpl activeMQServerImpl,
- SimpleString managementAddress,
- SimpleString simpleString,
- SessionCallback callback,
- QueueCreator queueCreator,
- OperationContext context) throws Exception {
- return new AMQServerSession(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, persistDeliveryCountBeforeDelivery, xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, activeMQServerImpl, managementAddress, simpleString, callback, queueCreator, context);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 4675dca..74dd951 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -17,13 +17,7 @@
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import javax.jms.ResourceAllocationException;
-import javax.transaction.xa.Xid;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -32,15 +26,15 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
+import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
-import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@@ -48,43 +42,30 @@ import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.TransactionInfo;
-import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.wireformat.WireFormat;
public class AMQSession implements SessionCallback {
// ConsumerID is generated inside the session, 0, 1, 2, ... as many consumers as you have on the session
- protected final IDGenerator idGenerator = new SimpleIDGenerator(0);
+ protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
private ConnectionInfo connInfo;
- private AMQServerSession coreSession;
+ private ServerSession coreSession;
private SessionInfo sessInfo;
private ActiveMQServer server;
private OpenWireConnection connection;
- private Map<Long, AMQConsumer> consumers = new ConcurrentHashMap<>();
-
private AtomicBoolean started = new AtomicBoolean(false);
- private TransactionId txId = null;
-
- private boolean isTx;
-
private final ScheduledExecutorService scheduledPool;
- private OpenWireProtocolManager manager;
-
// The sessionWireformat used by the session
// this object is meant to be used per thread / session
// so we make a new one per AMQSession
@@ -94,20 +75,22 @@ public class AMQSession implements SessionCallback {
SessionInfo sessInfo,
ActiveMQServer server,
OpenWireConnection connection,
- ScheduledExecutorService scheduledPool,
- OpenWireProtocolManager manager) {
+ ScheduledExecutorService scheduledPool) {
this.connInfo = connInfo;
this.sessInfo = sessInfo;
this.server = server;
this.connection = connection;
this.scheduledPool = scheduledPool;
- this.manager = manager;
OpenWireFormat marshaller = (OpenWireFormat) connection.getMarshaller();
this.converter = new OpenWireMessageConverter(marshaller.copy());
}
+ public boolean isClosed() {
+ return coreSession.isClosed();
+ }
+
public OpenWireMessageConverter getConverter() {
return converter;
}
@@ -122,7 +105,7 @@ public class AMQSession implements SessionCallback {
// now
try {
- coreSession = (AMQServerSession) server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, AMQServerSessionFactory.getInstance(), true);
+ coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true);
long sessionId = sessInfo.getSessionId().getValue();
if (sessionId == -1) {
@@ -136,8 +119,8 @@ public class AMQSession implements SessionCallback {
}
public List<AMQConsumer> createConsumer(ConsumerInfo info,
- AMQSession amqSession,
- SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
+ AMQSession amqSession,
+ SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
//check destination
ActiveMQDestination dest = info.getDestination();
ActiveMQDestination[] dests = null;
@@ -147,7 +130,7 @@ public class AMQSession implements SessionCallback {
else {
dests = new ActiveMQDestination[]{dest};
}
-// Map<ActiveMQDestination, AMQConsumer> consumerMap = new HashMap<>();
+
List<AMQConsumer> consumersList = new java.util.LinkedList<>();
for (ActiveMQDestination openWireDest : dests) {
@@ -157,9 +140,9 @@ public class AMQSession implements SessionCallback {
}
AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool);
- consumer.init(slowConsumerDetectionListener, idGenerator.generateID());
+ long nativeID = consumerIDGenerator.generateID();
+ consumer.init(slowConsumerDetectionListener, nativeID);
consumersList.add(consumer);
- consumers.put(consumer.getNativeId(), consumer);
}
return consumersList;
@@ -180,7 +163,7 @@ public class AMQSession implements SessionCallback {
@Override
public void browserFinished(ServerConsumer consumer) {
- AMQConsumer theConsumer = ((AMQServerConsumer) consumer).getAmqConsumer();
+ AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
if (theConsumer != null) {
theConsumer.browseFinished();
}
@@ -204,13 +187,20 @@ public class AMQSession implements SessionCallback {
}
@Override
- public int sendMessage(ServerMessage message, ServerConsumer consumerID, int deliveryCount) {
- AMQConsumer consumer = consumers.get(consumerID.getID());
- return consumer.handleDeliver(message, deliveryCount);
+ public int sendMessage(MessageReference reference,
+ ServerMessage message,
+ ServerConsumer consumer,
+ int deliveryCount) {
+ AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
+ return theConsumer.handleDeliver(reference, message, deliveryCount);
}
@Override
- public int sendLargeMessage(ServerMessage message, ServerConsumer consumerID, long bodySize, int deliveryCount) {
+ public int sendLargeMessage(MessageReference reference,
+ ServerMessage message,
+ ServerConsumer consumerID,
+ long bodySize,
+ int deliveryCount) {
// TODO Auto-generated method stub
return 0;
}
@@ -231,16 +221,15 @@ public class AMQSession implements SessionCallback {
}
@Override
- public boolean hasCredits(ServerConsumer consumerID) {
+ public boolean hasCredits(ServerConsumer consumer) {
- AMQConsumer amqConsumer;
+ AMQConsumer amqConsumer = null;
- amqConsumer = consumers.get(consumerID.getID());
-
- if (amqConsumer != null) {
- return amqConsumer.hasCredits();
+ if (consumer.getProtocolData() != null) {
+ amqConsumer = (AMQConsumer) consumer.getProtocolData();
}
- return false;
+
+ return amqConsumer != null && amqConsumer.hasCredits();
}
@Override
@@ -252,11 +241,6 @@ public class AMQSession implements SessionCallback {
public void send(final ProducerInfo producerInfo,
final Message messageSend,
boolean sendProducerAck) throws Exception {
- TransactionId tid = messageSend.getTransactionId();
- if (tid != null) {
- resetSessionTx(tid);
- }
-
messageSend.setBrokerInTime(System.currentTimeMillis());
ActiveMQDestination destination = messageSend.getDestination();
@@ -376,7 +360,7 @@ public class AMQSession implements SessionCallback {
}
}
- public AMQServerSession getCoreSession() {
+ public ServerSession getCoreSession() {
return this.coreSession;
}
@@ -384,160 +368,16 @@ public class AMQSession implements SessionCallback {
return this.server;
}
- public void removeConsumer(long consumerId) throws Exception {
- boolean failed = !(this.txId != null || this.isTx);
-
- coreSession.amqCloseConsumer(consumerId, failed);
- consumers.remove(consumerId);
- }
-
public WireFormat getMarshaller() {
return this.connection.getMarshaller();
}
- public void acknowledge(MessageAck ack, AMQConsumer consumer) throws Exception {
- TransactionId tid = ack.getTransactionId();
- if (tid != null) {
- this.resetSessionTx(ack.getTransactionId());
- }
- consumer.acknowledge(ack);
-
- if (tid == null && ack.getAckType() == MessageAck.STANDARD_ACK_TYPE) {
- this.coreSession.commit();
- }
- }
-
- //AMQ session and transactions are create separately. Whether a session
- //is transactional or not is known only when a TransactionInfo command
- //comes in.
- public void resetSessionTx(TransactionId xid) throws Exception {
- if ((this.txId != null) && (!this.txId.equals(xid))) {
- throw new IllegalStateException("Session already associated with a tx");
- }
-
- this.isTx = true;
- if (this.txId == null) {
- //now reset session
- this.txId = xid;
-
- if (xid.isXATransaction()) {
- XATransactionId xaXid = (XATransactionId) xid;
- coreSession.enableXA();
- XidImpl coreXid = new XidImpl(xaXid.getBranchQualifier(), xaXid.getFormatId(), xaXid.getGlobalTransactionId());
- coreSession.xaStart(coreXid);
- }
- else {
- coreSession.enableTx();
- }
-
- this.manager.registerTx(this.txId, this);
- }
- }
-
- private void checkTx(TransactionId inId) {
- if (this.txId == null) {
- throw new IllegalStateException("Session has no transaction associated with it");
- }
-
- if (!this.txId.equals(inId)) {
- throw new IllegalStateException("Session already associated with another tx");
- }
-
- this.isTx = true;
- }
-
- public void endTransaction(TransactionInfo info) throws Exception {
- checkTx(info.getTransactionId());
-
- if (txId.isXATransaction()) {
- XATransactionId xid = (XATransactionId) txId;
- XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId());
- this.coreSession.xaEnd(coreXid);
- }
- }
-
- public void commitOnePhase(TransactionInfo info) throws Exception {
- checkTx(info.getTransactionId());
-
- if (txId.isXATransaction()) {
- XATransactionId xid = (XATransactionId) txId;
- XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId());
- this.coreSession.xaCommit(coreXid, true);
- }
- else {
- Iterator<AMQConsumer> iter = consumers.values().iterator();
- while (iter.hasNext()) {
- AMQConsumer consumer = iter.next();
- consumer.finishTx();
- }
- this.coreSession.commit();
- }
-
- this.txId = null;
- }
-
- public void prepareTransaction(XATransactionId xid) throws Exception {
- checkTx(xid);
- XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId());
- this.coreSession.xaPrepare(coreXid);
- }
-
- public void commitTwoPhase(XATransactionId xid) throws Exception {
- checkTx(xid);
- XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId());
- this.coreSession.xaCommit(coreXid, false);
-
- this.txId = null;
- }
-
- public void rollback(TransactionInfo info) throws Exception {
- checkTx(info.getTransactionId());
- if (this.txId.isXATransaction()) {
- XATransactionId xid = (XATransactionId) txId;
- XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId());
- this.coreSession.xaRollback(coreXid);
- }
- else {
- Iterator<AMQConsumer> iter = consumers.values().iterator();
- Set<Long> acked = new HashSet<>();
- while (iter.hasNext()) {
- AMQConsumer consumer = iter.next();
- consumer.rollbackTx(acked);
- }
- //on local rollback, amq broker doesn't do anything about the delivered
- //messages, which stay at clients until next time
- this.coreSession.amqRollback(acked);
- }
-
- this.txId = null;
- }
-
- public void recover(List<TransactionId> recovered) {
- List<Xid> xids = this.coreSession.xaGetInDoubtXids();
- for (Xid xid : xids) {
- XATransactionId amqXid = new XATransactionId(xid);
- recovered.add(amqXid);
- }
- }
-
- public void forget(final TransactionId tid) throws Exception {
- checkTx(tid);
- XATransactionId xid = (XATransactionId) tid;
- XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId());
- this.coreSession.xaForget(coreXid);
- this.txId = null;
- }
-
public ConnectionInfo getConnectionInfo() {
return this.connInfo;
}
- public void setInternal(boolean internal) {
- this.coreSession.setInternal(internal);
- }
-
- public boolean isInternal() {
- return this.coreSession.isInternal();
+ public void disableSecurity() {
+ this.coreSession.disableSecurity();
}
public void deliverMessage(MessageDispatch dispatch) {
@@ -548,20 +388,6 @@ public class AMQSession implements SessionCallback {
this.coreSession.close(false);
}
- public AMQConsumer getConsumer(Long coreConsumerId) {
- return consumers.get(coreConsumerId);
- }
-
- public void updateConsumerPrefetchSize(ConsumerId consumerId, int prefetch) {
- Iterator<AMQConsumer> iterator = consumers.values().iterator();
- while (iterator.hasNext()) {
- AMQConsumer consumer = iterator.next();
- if (consumer.getId().equals(consumerId)) {
- consumer.setPrefetchSize(prefetch);
- }
- }
- }
-
public OpenWireConnection getConnection() {
return connection;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java
index b29c448..e02638e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSingleConsumerBrokerExchange.java
@@ -40,6 +40,12 @@ public class AMQSingleConsumerBrokerExchange extends AMQConsumerBrokerExchange {
@Override
public void acknowledge(MessageAck ack) throws Exception {
- amqSession.acknowledge(ack, consumer);
+ consumer.acknowledge(ack);
}
+
+ @Override
+ public void updateConsumerPrefetchSize(int prefetch) {
+ consumer.setPrefetchSize(prefetch);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransactionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransactionFactory.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransactionFactory.java
deleted file mode 100644
index 3a47333..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQTransactionFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import javax.transaction.xa.Xid;
-
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.protocol.openwire.AMQTransactionImpl;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.core.transaction.TransactionFactory;
-
-public class AMQTransactionFactory implements TransactionFactory {
-
- @Override
- public Transaction newTransaction(Xid xid, StorageManager storageManager, int timeoutSeconds) {
- return new AMQTransactionImpl(xid, storageManager, timeoutSeconds);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/MessageInfo.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/MessageInfo.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/MessageInfo.java
deleted file mode 100644
index 005dd2e..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/MessageInfo.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import org.apache.activemq.command.MessageId;
-
-public class MessageInfo {
-
- public MessageId amqId;
- public long nativeId;
- public int size;
- //mark message that is acked within a local tx
- public boolean localAcked;
-
- public MessageInfo(MessageId amqId, long nativeId, int size) {
- this.amqId = amqId;
- this.nativeId = nativeId;
- this.size = size;
- }
-
- @Override
- public String toString() {
- return "native mid: " + this.nativeId + " amqId: " + amqId + " local acked: " + localAcked;
- }
-
- public void setLocalAcked(boolean ack) {
- localAcked = ack;
- }
-
- public boolean isLocalAcked() {
- return localAcked;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 0cad259..b3e52af 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -226,7 +226,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor> {
if (stompSession == null) {
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
String name = UUIDGenerator.getInstance().generateStringUUID();
- ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, null, true);
+ ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true);
stompSession.setServerSession(session);
sessions.put(connection.getID(), stompSession);
}
@@ -239,7 +239,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor> {
if (stompSession == null) {
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(executor));
String name = UUIDGenerator.getInstance().generateStringUUID();
- ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, null, true);
+ ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true);
stompSession.setServerSession(session);
transactedSessions.put(txID, stompSession);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index a6cbe71..9b5c70d 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
@@ -118,7 +119,7 @@ public class StompSession implements SessionCallback {
}
@Override
- public int sendMessage(ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) {
+ public int sendMessage(MessageReference ref, ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) {
LargeServerMessageImpl largeMessage = null;
ServerMessage newServerMessage = serverMessage;
try {
@@ -207,7 +208,7 @@ public class StompSession implements SessionCallback {
}
@Override
- public int sendLargeMessage(ServerMessage msg, ServerConsumer consumer, long bodySize, int deliveryCount) {
+ public int sendLargeMessage(MessageReference ref, ServerMessage msg, ServerConsumer consumer, long bodySize, int deliveryCount) {
return 0;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 1a9690f..de0a2fd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -1301,7 +1301,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
public TransportConfiguration[] getTransportConfigurations(final List<String> connectorNames) {
TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, connectorNames.size());
int count = 0;
- System.out.println(debugConnectors());
for (String connectorName : connectorNames) {
TransportConfiguration connector = getConnectorConfigurations().get(connectorName);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 82b0e92..99e9160 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.transaction.Transaction;
public class PagedReferenceImpl implements PagedReference {
@@ -48,6 +49,18 @@ public class PagedReferenceImpl implements PagedReference {
private boolean alreadyAcked;
+ private Object protocolData;
+
+ @Override
+ public Object getProtocolData() {
+ return protocolData;
+ }
+
+ @Override
+ public void setProtocolData(Object protocolData) {
+ this.protocolData = protocolData;
+ }
+
@Override
public ServerMessage getMessage() {
return getPagedMessage().getMessage();
@@ -199,9 +212,19 @@ public class PagedReferenceImpl implements PagedReference {
subscription.ack(this);
}
+ @Override
+ public void acknowledge(Transaction tx) throws Exception {
+ if (tx == null) {
+ getQueue().acknowledge(this);
+ }
+ else {
+ getQueue().acknowledge(tx, this);
+ }
+ }
+
/* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
+ * @see java.lang.Object#toString()
+ */
@Override
public String toString() {
String msgToString;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index 2de5adb..0e5cd2f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -149,7 +149,7 @@ public class ActiveMQPacketHandler implements ChannelHandler {
}
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(),
- new CoreSessionCallback(request.getName(), protocolManager, channel, connection), null, true);
+ new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true);
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel);
channel.setHandler(handler);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index c05a288..9d6125b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -56,7 +57,7 @@ public final class CoreSessionCallback implements SessionCallback {
}
@Override
- public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
+ public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
Packet packet = new SessionReceiveLargeMessage(consumer.getID(), message, bodySize, deliveryCount);
channel.send(packet);
@@ -79,7 +80,7 @@ public final class CoreSessionCallback implements SessionCallback {
}
@Override
- public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+ public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
Packet packet = new SessionReceiveMessage(consumer.getID(), message, deliveryCount);
int size = 0;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 64633bb..b47df20 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -139,7 +139,6 @@ public interface ActiveMQServer extends ActiveMQComponent {
boolean xa,
String defaultAddress,
SessionCallback callback,
- ServerSessionFactory sessionFactory,
boolean autoCreateQueues) throws Exception;
SecurityStore getSecurityStore();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 0ff55ac..b1e0dde 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.server;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+
/**
* A reference to a message.
*
@@ -35,6 +37,14 @@ public interface MessageReference {
*/
int getMessageMemoryEstimate();
+ /** To be used on holding protocol specific data during the delivery.
+ * This will be only valid while the message is on the delivering queue at the consumer */
+ Object getProtocolData();
+
+ /** To be used on holding protocol specific data during the delivery.
+ * This will be only valid while the message is on the delivering queue at the consumer */
+ void setProtocolData(Object data);
+
MessageReference copy(Queue queue);
/**
@@ -61,6 +71,8 @@ public interface MessageReference {
void acknowledge() throws Exception;
+ void acknowledge(Transaction tx) throws Exception;
+
void setConsumerId(Long consumerID);
Long getConsumerId();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index c92325a..ec9d4a3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -150,6 +150,8 @@ public interface Queue extends Bindable {
int sendMessagesToDeadLetterAddress(Filter filter) throws Exception;
+ void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception;
+
boolean changeReferencePriority(long messageID, byte newPriority) throws Exception;
int changeReferencesPriority(Filter filter, byte newPriority) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index d75efdd..d157a8c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -31,6 +31,12 @@ public interface ServerConsumer extends Consumer {
void fireSlowConsumer();
+ /** this is to be used with anything specific on a protocol head. */
+ Object getProtocolData();
+
+ /** this is to be used with anything specific on a protocol head. */
+ void setProtocolData(Object protocolData);
+
/**
* @param protocolContext
* @see #getProtocolContext()
@@ -68,6 +74,12 @@ public interface ServerConsumer extends Consumer {
MessageReference removeReferenceByID(long messageID) throws Exception;
+ /** Some protocols may choose to send the message back to delivering instead of redeliver.
+ * For example openwire will redeliver through the client, so messages will go back to delivering list after rollback. */
+ void backToDelivering(MessageReference reference);
+
+ List<MessageReference> getDeliveringReferencesBasedOnProtocol(boolean remove, Object protocolDataStart, Object protocolDataEnd);
+
void acknowledge(Transaction tx, long messageID) throws Exception;
void individualAcknowledge(Transaction tx, long messageID) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 62bb3b5..b7a7c47 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -36,9 +36,19 @@ public interface ServerSession extends SecurityAuth {
Object getConnectionID();
+ /**
+ * Certain protocols may create an internal session that shouldn't go through security checks.
+ * make sure you don't expose this property through any protocol layer as that would be a security breach
+ */
+ void enableSecurity();
+
+ void disableSecurity();
+
@Override
RemotingConnection getRemotingConnection();
+ Transaction newTransaction();
+
boolean removeConsumer(long consumerID) throws Exception;
void acknowledge(long consumerID, long messageID) throws Exception;
@@ -87,6 +97,11 @@ public interface ServerSession extends SecurityAuth {
void stop();
+ /**
+ * To be used by protocol heads that needs to control the transaction outside the session context.
+ */
+ void resetTX(Transaction transaction);
+
Queue createQueue(SimpleString address,
SimpleString name,
SimpleString filterString,
@@ -100,6 +115,13 @@ public interface ServerSession extends SecurityAuth {
SimpleString filterString,
boolean browseOnly) throws Exception;
+ ServerConsumer createConsumer(final long consumerID,
+ final SimpleString queueName,
+ final SimpleString filterString,
+ final boolean browseOnly,
+ final boolean supportLargeMessage,
+ final Integer credits) throws Exception;
+
QueueQueryResult executeQueueQuery(SimpleString name) throws Exception;
BindingQueryResult executeBindingQuery(SimpleString address) throws Exception;
@@ -151,6 +173,10 @@ public interface ServerSession extends SecurityAuth {
Transaction getCurrentTransaction();
+ ServerConsumer locateConsumer(long consumerID) throws Exception;
+
+ boolean isClosed();
+
void createSharedQueue(SimpleString address,
SimpleString name,
boolean durable,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSessionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSessionFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSessionFactory.java
deleted file mode 100644
index 6447daa..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSessionFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.server;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.persistence.OperationContext;
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.postoffice.PostOffice;
-import org.apache.activemq.artemis.core.security.SecurityStore;
-import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
-import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
-import org.apache.activemq.artemis.core.server.management.ManagementService;
-import org.apache.activemq.artemis.core.transaction.ResourceManager;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-
-public interface ServerSessionFactory {
-
- ServerSessionImpl createCoreSession(String name,
- String username,
- String password,
- int minLargeMessageSize,
- boolean autoCommitSends,
- boolean autoCommitAcks,
- boolean preAcknowledge,
- boolean persistDeliveryCountBeforeDelivery,
- boolean xa,
- RemotingConnection connection,
- StorageManager storageManager,
- PostOffice postOffice,
- ResourceManager resourceManager,
- SecurityStore securityStore,
- ManagementService managementService,
- ActiveMQServerImpl activeMQServerImpl,
- SimpleString managementAddress,
- SimpleString simpleString,
- SessionCallback callback,
- QueueCreator queueCreator,
- OperationContext context) throws Exception;
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 13a1283..69d13bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -112,7 +112,6 @@ import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.ServerSessionFactory;
import org.apache.activemq.artemis.core.server.ServiceRegistry;
import org.apache.activemq.artemis.core.server.cluster.BackupManager;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
@@ -1091,7 +1090,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean xa,
final String defaultAddress,
final SessionCallback callback,
- final ServerSessionFactory sessionFactory,
final boolean autoCreateQueues) throws Exception {
if (securityStore != null) {
@@ -1105,7 +1103,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
checkSessionLimit(username);
final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor());
- final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, sessionFactory, autoCreateQueues);
+ final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues);
sessions.put(name, session);
@@ -1178,14 +1176,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
String defaultAddress,
SessionCallback callback,
OperationContext context,
- ServerSessionFactory sessionFactory,
boolean autoCreateJMSQueues) throws Exception {
- if (sessionFactory == null) {
- return new ServerSessionImpl(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, autoCreateJMSQueues ? jmsQueueCreator : null);
- }
- else {
- return sessionFactory.createCoreSession(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, jmsQueueCreator, context);
- }
+ return new ServerSessionImpl(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, autoCreateJMSQueues ? jmsQueueCreator : null);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 9feb60e..932c260 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.Transaction;
/**
* A queue that will discard messages if a newer message with the same
@@ -188,6 +189,16 @@ public class LastValueQueue extends QueueImpl {
}
@Override
+ public Object getProtocolData() {
+ return ref.getProtocolData();
+ }
+
+ @Override
+ public void setProtocolData(Object data) {
+ ref.setProtocolData(data);
+ }
+
+ @Override
public void setAlreadyAcked() {
ref.setAlreadyAcked();
}
@@ -247,6 +258,11 @@ public class LastValueQueue extends QueueImpl {
}
@Override
+ public void acknowledge(Transaction tx) throws Exception {
+ ref.acknowledge(tx);
+ }
+
+ @Override
public void setPersistedCount(int count) {
ref.setPersistedCount(count);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index fd04b6d..de4d5ae 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.MemorySize;
/**
@@ -42,6 +43,8 @@ public class MessageReferenceImpl implements MessageReference {
private boolean alreadyAcked;
+ private Object protocolData;
+
// Static --------------------------------------------------------
private static final int memoryOffset;
@@ -86,6 +89,16 @@ public class MessageReferenceImpl implements MessageReference {
// MessageReference implementation -------------------------------
+ @Override
+ public Object getProtocolData() {
+ return protocolData;
+ }
+
+ @Override
+ public void setProtocolData(Object protocolData) {
+ this.protocolData = protocolData;
+ }
+
/**
* @return the persistedCount
*/
@@ -174,7 +187,16 @@ public class MessageReferenceImpl implements MessageReference {
@Override
public void acknowledge() throws Exception {
- queue.acknowledge(this);
+ this.acknowledge(null);
+ }
+
+ public void acknowledge(Transaction tx) throws Exception {
+ if (tx == null) {
+ getQueue().acknowledge(this);
+ }
+ else {
+ getQueue().acknowledge(tx, this);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aedf273/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 86ca36c..75f0f98 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1074,7 +1074,7 @@ public class QueueImpl implements Queue {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName());
}
- move(expiryAddress, ref, true, false);
+ move(null, expiryAddress, ref, true, false);
}
else {
if (isTrace) {
@@ -1461,7 +1461,7 @@ public class QueueImpl implements Queue {
MessageReference ref = iter.next();
if (ref.getMessage().getMessageID() == messageID) {
incDelivering();
- sendToDeadLetterAddress(ref);
+ sendToDeadLetterAddress(null, ref);
iter.remove();
refRemoved(ref);
return true;
@@ -1480,7 +1480,7 @@ public class QueueImpl implements Queue {
MessageReference ref = iter.next();
if (filter == null || filter.match(ref.getMessage())) {
incDelivering();
- sendToDeadLetterAddress(ref);
+ sendToDeadLetterAddress(null, ref);
iter.remove();
refRemoved(ref);
count++;
@@ -1507,7 +1507,7 @@ public class QueueImpl implements Queue {
refRemoved(ref);
incDelivering();
try {
- move(toAddress, ref, false, rejectDuplicate);
+ move(null, toAddress, ref, false, rejectDuplicate);
}
catch (Exception e) {
decDelivering();
@@ -2120,7 +2120,7 @@ public class QueueImpl implements Queue {
if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("Sending reference " + reference + " to DLA = " + addressSettings.getDeadLetterAddress() + " since ref.getDeliveryCount=" + reference.getDeliveryCount() + "and maxDeliveries=" + maxDeliveries + " from queue=" + this.getName());
}
- sendToDeadLetterAddress(reference, addressSettings.getDeadLetterAddress());
+ sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress());
return false;
}
@@ -2337,36 +2337,45 @@ public class QueueImpl implements Queue {
}
}
- public void sendToDeadLetterAddress(final MessageReference ref) throws Exception {
- sendToDeadLetterAddress(ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());
+ public void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception {
+ sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());
}
- private void sendToDeadLetterAddress(final MessageReference ref,
+ private void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref,
final SimpleString deadLetterAddress) throws Exception {
if (deadLetterAddress != null) {
Bindings bindingList = postOffice.getBindingsForAddress(deadLetterAddress);
if (bindingList.getBindings().isEmpty()) {
ActiveMQServerLogger.LOGGER.messageExceededMaxDelivery(ref, deadLetterAddress);
- acknowledge(ref);
+ ref.acknowledge(tx);
}
else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
- move(deadLetterAddress, ref, false, false);
+ move(tx, deadLetterAddress, ref, false, false);
}
}
else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(name);
- acknowledge(ref);
+ ref.acknowledge(tx);
}
}
- private void move(final SimpleString address,
+ private void move(final Transaction originalTX,
+ final SimpleString address,
final MessageReference ref,
final boolean expiry,
final boolean rejectDuplicate) throws Exception {
- Transaction tx = new TransactionImpl(storageManager);
+ Transaction tx;
+
+ if (originalTX != null) {
+ tx = originalTX;
+ }
+ else {
+ // if no TX we create a new one to commit at the end
+ tx = new TransactionImpl(storageManager);
+ }
ServerMessage copyMessage = makeCopy(ref, expiry);
@@ -2376,7 +2385,9 @@ public class QueueImpl implements Queue {
acknowledge(tx, ref);
- tx.commit();
+ if (originalTX == null) {
+ tx.commit();
+ }
}
/*