You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/01/17 18:30:13 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-4977
Updated Branches:
refs/heads/trunk 0918430dc -> e7703f70e
https://issues.apache.org/jira/browse/AMQ-4977
Don't increase the cache size for repeated pull commands for the same
destination + consumer combo since we only keep one instance in the map
at any given time.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e7703f70
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e7703f70
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e7703f70
Branch: refs/heads/trunk
Commit: e7703f70e0f679d0534379be26aa3de612747f93
Parents: 0918430
Author: Timothy Bish <ta...@gmai.com>
Authored: Fri Jan 17 12:29:55 2014 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Fri Jan 17 12:29:55 2014 -0500
----------------------------------------------------------------------
.../apache/activemq/command/MessagePull.java | 18 +++-
.../activemq/state/ConnectionStateTracker.java | 106 ++++++++++++++-----
.../state/ConnectionStateTrackerTest.java | 100 +++++++++++++++++
3 files changed, 192 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/e7703f70/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java b/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java
index 0ae58c4..e39aeae 100644
--- a/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java
@@ -20,10 +20,10 @@ import org.apache.activemq.state.CommandVisitor;
/**
* Used to pull messages on demand.
- *
+ *
* @openwire:marshaller code="20"
- *
- *
+ *
+ *
*/
public class MessagePull extends BaseCommand {
@@ -35,10 +35,14 @@ public class MessagePull extends BaseCommand {
private MessageId messageId;
private String correlationId;
+ private transient boolean tracked = false;
+
+ @Override
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
+ @Override
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processMessagePull(this);
}
@@ -112,4 +116,12 @@ public class MessagePull extends BaseCommand {
public void setMessageId(MessageId messageId) {
this.messageId = messageId;
}
+
+ public void setTracked(boolean tracked) {
+ this.tracked = tracked;
+ }
+
+ public boolean isTracked() {
+ return this.tracked;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e7703f70/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
index 5e05a48..41b4577 100755
--- a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
+++ b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
@@ -20,8 +20,8 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Vector;
import java.util.Map.Entry;
+import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.TransactionRolledBackException;
@@ -52,15 +52,15 @@ import org.slf4j.LoggerFactory;
/**
* Tracks the state of a connection so a newly established transport can be
* re-initialized to the state that was tracked.
- *
- *
+ *
+ *
*/
public class ConnectionStateTracker extends CommandVisitorAdapter {
private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class);
private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
private static final int MESSAGE_PULL_SIZE = 400;
- protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>();
+ protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>();
private boolean trackTransactions;
private boolean restoreSessions = true;
@@ -70,8 +70,10 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
private boolean trackMessages = true;
private boolean trackTransactionProducers = true;
private int maxCacheSize = 128 * 1024;
- private int currentCacheSize;
- private Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
+ private long currentCacheSize; // use long to prevent overflow for folks who set high max.
+
+ private final Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
+ @Override
protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) {
boolean result = currentCacheSize > maxCacheSize;
if (result) {
@@ -87,7 +89,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return result;
}
};
-
+
private class RemoveTransactionAction implements ResponseHandler {
private final TransactionInfo info;
@@ -95,6 +97,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
this.info = info;
}
+ @Override
public void onResponse(Command response) {
ConnectionId connectionId = info.getConnectionId();
ConnectionState cs = connectionStates.get(connectionId);
@@ -103,13 +106,13 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
}
}
}
-
- private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
+ private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
public PrepareReadonlyTransactionAction(TransactionInfo info) {
super(info);
}
+ @Override
public void onResponse(Command command) {
if (command instanceof IntegerResponse) {
IntegerResponse response = (IntegerResponse) command;
@@ -122,11 +125,16 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
}
/**
- *
- *
+ * Entry point for all tracked commands in the tracker. Commands should be tracked before
+ * there is an attempt to send them on the wire. Upon a successful send of a command it is
+ * necessary to call the trackBack method to complete the tracking of the given command.
+ *
* @param command
+ * The command that is to be tracked by this tracker.
+ *
* @return null if the command is not state tracked.
- * @throws IOException
+ *
+ * @throws IOException if an error occurs during setup of the tracking operation.
*/
public Tracked track(Command command) throws IOException {
try {
@@ -137,7 +145,15 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
throw IOExceptionSupport.create(e);
}
}
-
+
+ /**
+ * Completes the two phase tracking operation for a command that is sent on the wire. Once
+ * the command is sent successfully to complete the tracking operation or otherwise update
+ * the state of the tracker.
+ *
+ * @param command
+ * The command that was previously provided to the track method.
+ */
public void trackBack(Command command) {
if (command != null) {
if (trackMessages && command.isMessage()) {
@@ -146,8 +162,12 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
currentCacheSize = currentCacheSize + message.getSize();
}
} else if (command instanceof MessagePull) {
- // just needs to be a rough estimate of size, ~4 identifiers
- currentCacheSize += MESSAGE_PULL_SIZE;
+ // We only track one MessagePull per consumer so only add to cache size
+ // when the command has been marked as tracked.
+ if (((MessagePull)command).isTracked()) {
+ // just needs to be a rough estimate of size, ~4 identifiers
+ currentCacheSize += MESSAGE_PULL_SIZE;
+ }
}
}
}
@@ -171,8 +191,9 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
restoreTransactions(transport, connectionState);
}
}
- //now flush messages
- for (Command msg:messageCache.values()) {
+
+ // now flush messages and MessagePull commands.
+ for (Command msg : messageCache.values()) {
if (LOG.isDebugEnabled()) {
LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId() : msg));
}
@@ -186,7 +207,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
if (LOG.isDebugEnabled()) {
LOG.debug("tx: " + transactionState.getId());
}
-
+
// rollback any completed transactions - no way to know if commit got there
// or if reply went missing
//
@@ -203,7 +224,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
}
}
}
-
+
// replay short lived producers that may have been involved in the transaction
for (ProducerState producerState : transactionState.getProducerStates().values()) {
if (LOG.isDebugEnabled()) {
@@ -211,14 +232,14 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
}
transport.oneway(producerState.getInfo());
}
-
+
for (Command command : transactionState.getCommands()) {
if (LOG.isDebugEnabled()) {
LOG.debug("tx replay: " + command);
}
transport.oneway(command);
}
-
+
for (ProducerState producerState : transactionState.getProducerStates().values()) {
if (LOG.isDebugEnabled()) {
LOG.debug("tx remove replayed producer :" + producerState.getInfo());
@@ -226,7 +247,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
transport.oneway(producerState.getInfo().createRemoveCommand());
}
}
-
+
for (TransactionInfo command: toRollback) {
// respond to the outstanding commit
ExceptionResponse response = new ExceptionResponse();
@@ -269,7 +290,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
// Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete
final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId());
final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete();
- for (ConsumerState consumerState : sessionState.getConsumerStates()) {
+ for (ConsumerState consumerState : sessionState.getConsumerStates()) {
ConsumerInfo infoToSend = consumerState.getInfo();
if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) {
infoToSend = consumerState.getInfo().copy();
@@ -319,6 +340,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
}
}
+ @Override
public Response processAddDestination(DestinationInfo info) {
if (info != null) {
ConnectionState cs = connectionStates.get(info.getConnectionId());
@@ -329,6 +351,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
+ @Override
public Response processRemoveDestination(DestinationInfo info) {
if (info != null) {
ConnectionState cs = connectionStates.get(info.getConnectionId());
@@ -339,6 +362,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
+ @Override
public Response processAddProducer(ProducerInfo info) {
if (info != null && info.getProducerId() != null) {
SessionId sessionId = info.getProducerId().getParentId();
@@ -358,6 +382,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
+ @Override
public Response processRemoveProducer(ProducerId id) {
if (id != null) {
SessionId sessionId = id.getParentId();
@@ -377,6 +402,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
+ @Override
public Response processAddConsumer(ConsumerInfo info) {
if (info != null) {
SessionId sessionId = info.getConsumerId().getParentId();
@@ -396,6 +422,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
+ @Override
public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
if (id != null) {
SessionId sessionId = id.getParentId();
@@ -416,6 +443,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
+ @Override
public Response processAddSession(SessionInfo info) {
if (info != null) {
ConnectionId connectionId = info.getSessionId().getParentId();
@@ -429,6 +457,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
+ @Override
public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
if (id != null) {
ConnectionId connectionId = id.getParentId();
@@ -442,6 +471,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
+ @Override
public Response processAddConnection(ConnectionInfo info) {
if (info != null) {
connectionStates.put(info.getConnectionId(), new ConnectionState(info));
@@ -449,6 +479,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
+ @Override
public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
if (id != null) {
connectionStates.remove(id);
@@ -456,6 +487,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
+ @Override
public Response processMessage(Message send) throws Exception {
if (send != null) {
if (trackTransactions && send.getTransactionId() != null) {
@@ -467,13 +499,13 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
if (transactionState != null) {
transactionState.addCommand(send);
-
+
if (trackTransactionProducers) {
// for jmstemplate, track the producer in case it is closed before commit
// and needs to be replayed
SessionState ss = cs.getSessionState(producerId.getParentId());
ProducerState producerState = ss.getProducerState(producerId);
- producerState.setTransactionState(transactionState);
+ producerState.setTransactionState(transactionState);
}
}
}
@@ -486,6 +518,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return null;
}
+ @Override
public Response processBeginTransaction(TransactionInfo info) {
if (trackTransactions && info != null && info.getTransactionId() != null) {
ConnectionId connectionId = info.getConnectionId();
@@ -502,6 +535,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return null;
}
+ @Override
public Response processPrepareTransaction(TransactionInfo info) throws Exception {
if (trackTransactions && info != null) {
ConnectionId connectionId = info.getConnectionId();
@@ -519,6 +553,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return null;
}
+ @Override
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
if (trackTransactions && info != null) {
ConnectionId connectionId = info.getConnectionId();
@@ -536,6 +571,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return null;
}
+ @Override
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
if (trackTransactions && info != null) {
ConnectionId connectionId = info.getConnectionId();
@@ -553,6 +589,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return null;
}
+ @Override
public Response processRollbackTransaction(TransactionInfo info) throws Exception {
if (trackTransactions && info != null) {
ConnectionId connectionId = info.getConnectionId();
@@ -570,6 +607,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return null;
}
+ @Override
public Response processEndTransaction(TransactionInfo info) throws Exception {
if (trackTransactions && info != null) {
ConnectionId connectionId = info.getConnectionId();
@@ -592,7 +630,10 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
if (pull != null) {
// leave a single instance in the cache
final String id = pull.getDestination() + "::" + pull.getConsumerId();
- messageCache.put(id.intern(), pull);
+ if (messageCache.put(id.intern(), pull) == null) {
+ // Only marked as tracked if this is the first request we've seen.
+ pull.setTracked(true);
+ }
}
return null;
}
@@ -628,7 +669,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
public void setTrackTransactions(boolean trackTransactions) {
this.trackTransactions = trackTransactions;
}
-
+
public boolean isTrackTransactionProducers() {
return this.trackTransactionProducers;
}
@@ -636,7 +677,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
public void setTrackTransactionProducers(boolean trackTransactionProducers) {
this.trackTransactionProducers = trackTransactionProducers;
}
-
+
public boolean isRestoreTransaction() {
return restoreTransaction;
}
@@ -661,6 +702,13 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
this.maxCacheSize = maxCacheSize;
}
+ /**
+ * @return the current cache size for the Message and MessagePull Command cache.
+ */
+ public long getCurrentCacheSize() {
+ return this.currentCacheSize;
+ }
+
public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) {
ConnectionState connectionState = connectionStates.get(connectionId);
if (connectionState != null) {
@@ -675,7 +723,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
if (LOG.isDebugEnabled()) {
LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch());
}
- transport.oneway(control);
+ transport.oneway(control);
} catch (Exception ex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to submit control for consumer: " + control.getConsumerId()
http://git-wip-us.apache.org/repos/asf/activemq/blob/e7703f70/activemq-client/src/test/java/org/apache/activemq/state/ConnectionStateTrackerTest.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/test/java/org/apache/activemq/state/ConnectionStateTrackerTest.java b/activemq-client/src/test/java/org/apache/activemq/state/ConnectionStateTrackerTest.java
new file mode 100644
index 0000000..1a46757
--- /dev/null
+++ b/activemq-client/src/test/java/org/apache/activemq/state/ConnectionStateTrackerTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.SessionId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ConnectionStateTrackerTest {
+
+ private final ActiveMQQueue queue = new ActiveMQQueue("Test");
+ private ConnectionId testConnectionId;
+ private SessionId testSessionId;
+
+ private int connectionId = 0;
+ private int sessionId = 0;
+ private int consumerId = 0;
+
+ @Before
+ public void setUp() throws Exception {
+ testConnectionId = createConnectionId();
+ testSessionId = createSessionId(testConnectionId);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testCacheSizeWithMessagePulls() throws IOException {
+
+ final ConsumerId consumer1 = createConsumerId(testSessionId);
+
+ ConnectionStateTracker tracker = new ConnectionStateTracker();
+
+ assertEquals(0, tracker.getCurrentCacheSize());
+
+ MessagePull pullCommand = createPullCommand(consumer1);
+ tracker.track(pullCommand);
+
+ assertEquals(0, tracker.getCurrentCacheSize());
+
+ tracker.trackBack(pullCommand);
+ long currentSize = tracker.getCurrentCacheSize();
+
+ assertTrue(currentSize > 0);
+
+ pullCommand = createPullCommand(consumer1);
+ tracker.track(pullCommand);
+ tracker.trackBack(pullCommand);
+
+ assertEquals(currentSize, tracker.getCurrentCacheSize());
+ }
+
+ private MessagePull createPullCommand(ConsumerId id) {
+ MessagePull pullCommand = new MessagePull();
+ pullCommand.setDestination(queue);
+ pullCommand.setConsumerId(id);
+ return pullCommand;
+ }
+
+ private ConnectionId createConnectionId() {
+ ConnectionId id = new ConnectionId();
+ id.setValue(UUID.randomUUID() + ":" + connectionId++);
+ return id;
+ }
+
+ private SessionId createSessionId(ConnectionId connectionId) {
+ return new SessionId(connectionId, sessionId++);
+ }
+
+ private ConsumerId createConsumerId(SessionId sessionId) {
+ return new ConsumerId(sessionId, consumerId++);
+ }
+}