You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/01/17 18:30:13 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-4977

Updated Branches:
  refs/heads/trunk 0918430dc -> e7703f70e


https://issues.apache.org/jira/browse/AMQ-4977

Don't increase the cache size for repeated pull commands for the same
destination + consumer combo since we only keep one instance in the map
at any given time.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e7703f70
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e7703f70
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e7703f70

Branch: refs/heads/trunk
Commit: e7703f70e0f679d0534379be26aa3de612747f93
Parents: 0918430
Author: Timothy Bish <ta...@gmai.com>
Authored: Fri Jan 17 12:29:55 2014 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Fri Jan 17 12:29:55 2014 -0500

----------------------------------------------------------------------
 .../apache/activemq/command/MessagePull.java    |  18 +++-
 .../activemq/state/ConnectionStateTracker.java  | 106 ++++++++++++++-----
 .../state/ConnectionStateTrackerTest.java       | 100 +++++++++++++++++
 3 files changed, 192 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e7703f70/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java b/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java
index 0ae58c4..e39aeae 100644
--- a/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java
@@ -20,10 +20,10 @@ import org.apache.activemq.state.CommandVisitor;
 
 /**
  * Used to pull messages on demand.
- * 
+ *
  * @openwire:marshaller code="20"
- * 
- * 
+ *
+ *
  */
 public class MessagePull extends BaseCommand {
 
@@ -35,10 +35,14 @@ public class MessagePull extends BaseCommand {
     private MessageId messageId;
     private String correlationId;
 
+    private transient boolean tracked = false;
+
+    @Override
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
 
+    @Override
     public Response visit(CommandVisitor visitor) throws Exception {
         return visitor.processMessagePull(this);
     }
@@ -112,4 +116,12 @@ public class MessagePull extends BaseCommand {
     public void setMessageId(MessageId messageId) {
         this.messageId = messageId;
     }
+
+    public void setTracked(boolean tracked) {
+        this.tracked = tracked;
+    }
+
+    public boolean isTracked() {
+        return this.tracked;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e7703f70/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
index 5e05a48..41b4577 100755
--- a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
+++ b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
@@ -20,8 +20,8 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Vector;
 import java.util.Map.Entry;
+import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.jms.TransactionRolledBackException;
@@ -52,15 +52,15 @@ import org.slf4j.LoggerFactory;
 /**
  * Tracks the state of a connection so a newly established transport can be
  * re-initialized to the state that was tracked.
- * 
- * 
+ *
+ *
  */
 public class ConnectionStateTracker extends CommandVisitorAdapter {
     private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class);
 
     private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
     private static final int MESSAGE_PULL_SIZE = 400;
-    protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>(); 
+    protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>();
 
     private boolean trackTransactions;
     private boolean restoreSessions = true;
@@ -70,8 +70,10 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
     private boolean trackMessages = true;
     private boolean trackTransactionProducers = true;
     private int maxCacheSize = 128 * 1024;
-    private int currentCacheSize;
-    private Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
+    private long currentCacheSize;  // use long to prevent overflow for folks who set high max.
+
+    private final Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
+        @Override
         protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) {
             boolean result = currentCacheSize > maxCacheSize;
             if (result) {
@@ -87,7 +89,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
             return result;
         }
     };
-    
+
     private class RemoveTransactionAction implements ResponseHandler {
         private final TransactionInfo info;
 
@@ -95,6 +97,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
             this.info = info;
         }
 
+        @Override
         public void onResponse(Command response) {
             ConnectionId connectionId = info.getConnectionId();
             ConnectionState cs = connectionStates.get(connectionId);
@@ -103,13 +106,13 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
             }
         }
     }
-    
-    private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
 
+    private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
         public PrepareReadonlyTransactionAction(TransactionInfo info) {
             super(info);
         }
 
+        @Override
         public void onResponse(Command command) {
             if (command instanceof IntegerResponse) {
                 IntegerResponse response = (IntegerResponse) command;
@@ -122,11 +125,16 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
     }
 
     /**
-     * 
-     * 
+     * Entry point for all tracked commands in the tracker.  Commands should be tracked before
+     * there is an attempt to send them on the wire.  Upon a successful send of a command it is
+     * necessary to call the trackBack method to complete the tracking of the given command.
+     *
      * @param command
+     *      The command that is to be tracked by this tracker.
+     *
      * @return null if the command is not state tracked.
-     * @throws IOException
+     *
+     * @throws IOException if an error occurs during setup of the tracking operation.
      */
     public Tracked track(Command command) throws IOException {
         try {
@@ -137,7 +145,15 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
             throw IOExceptionSupport.create(e);
         }
     }
-    
+
+    /**
+     * Completes the two phase tracking operation for a command that is sent on the wire.  Once
+     * the command is sent successfully to complete the tracking operation or otherwise update
+     * the state of the tracker.
+     *
+     * @param command
+     *      The command that was previously provided to the track method.
+     */
     public void trackBack(Command command) {
         if (command != null) {
             if (trackMessages && command.isMessage()) {
@@ -146,8 +162,12 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
                     currentCacheSize = currentCacheSize +  message.getSize();
                 }
             } else if (command instanceof MessagePull) {
-                // just needs to be a rough estimate of size, ~4 identifiers
-                currentCacheSize += MESSAGE_PULL_SIZE;
+                // We only track one MessagePull per consumer so only add to cache size
+                // when the command has been marked as tracked.
+                if (((MessagePull)command).isTracked()) {
+                    // just needs to be a rough estimate of size, ~4 identifiers
+                    currentCacheSize += MESSAGE_PULL_SIZE;
+                }
             }
         }
     }
@@ -171,8 +191,9 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
                 restoreTransactions(transport, connectionState);
             }
         }
-        //now flush messages
-        for (Command msg:messageCache.values()) {
+
+        // now flush messages and MessagePull commands.
+        for (Command msg : messageCache.values()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId() : msg));
             }
@@ -186,7 +207,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("tx: " + transactionState.getId());
             }
-            
+
             // rollback any completed transactions - no way to know if commit got there
             // or if reply went missing
             //
@@ -203,7 +224,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
                     }
                 }
             }
-            
+
             // replay short lived producers that may have been involved in the transaction
             for (ProducerState producerState : transactionState.getProducerStates().values()) {
                 if (LOG.isDebugEnabled()) {
@@ -211,14 +232,14 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
                 }
                 transport.oneway(producerState.getInfo());
             }
-            
+
             for (Command command : transactionState.getCommands()) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("tx replay: " + command);
                 }
                 transport.oneway(command);
             }
-            
+
             for (ProducerState producerState : transactionState.getProducerStates().values()) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("tx remove replayed producer :" + producerState.getInfo());
@@ -226,7 +247,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
                 transport.oneway(producerState.getInfo().createRemoveCommand());
             }
         }
-        
+
         for (TransactionInfo command: toRollback) {
             // respond to the outstanding commit
             ExceptionResponse response = new ExceptionResponse();
@@ -269,7 +290,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete
         final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId());
         final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete();
-        for (ConsumerState consumerState : sessionState.getConsumerStates()) {   
+        for (ConsumerState consumerState : sessionState.getConsumerStates()) {
             ConsumerInfo infoToSend = consumerState.getInfo();
             if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) {
                 infoToSend = consumerState.getInfo().copy();
@@ -319,6 +340,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         }
     }
 
+    @Override
     public Response processAddDestination(DestinationInfo info) {
         if (info != null) {
             ConnectionState cs = connectionStates.get(info.getConnectionId());
@@ -329,6 +351,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         return TRACKED_RESPONSE_MARKER;
     }
 
+    @Override
     public Response processRemoveDestination(DestinationInfo info) {
         if (info != null) {
             ConnectionState cs = connectionStates.get(info.getConnectionId());
@@ -339,6 +362,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         return TRACKED_RESPONSE_MARKER;
     }
 
+    @Override
     public Response processAddProducer(ProducerInfo info) {
         if (info != null && info.getProducerId() != null) {
             SessionId sessionId = info.getProducerId().getParentId();
@@ -358,6 +382,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         return TRACKED_RESPONSE_MARKER;
     }
 
+    @Override
     public Response processRemoveProducer(ProducerId id) {
         if (id != null) {
             SessionId sessionId = id.getParentId();
@@ -377,6 +402,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         return TRACKED_RESPONSE_MARKER;
     }
 
+    @Override
     public Response processAddConsumer(ConsumerInfo info) {
         if (info != null) {
             SessionId sessionId = info.getConsumerId().getParentId();
@@ -396,6 +422,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         return TRACKED_RESPONSE_MARKER;
     }
 
+    @Override
     public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
         if (id != null) {
             SessionId sessionId = id.getParentId();
@@ -416,6 +443,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         return TRACKED_RESPONSE_MARKER;
     }
 
+    @Override
     public Response processAddSession(SessionInfo info) {
         if (info != null) {
             ConnectionId connectionId = info.getSessionId().getParentId();
@@ -429,6 +457,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         return TRACKED_RESPONSE_MARKER;
     }
 
+    @Override
     public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
         if (id != null) {
             ConnectionId connectionId = id.getParentId();
@@ -442,6 +471,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         return TRACKED_RESPONSE_MARKER;
     }
 
+    @Override
     public Response processAddConnection(ConnectionInfo info) {
         if (info != null) {
             connectionStates.put(info.getConnectionId(), new ConnectionState(info));
@@ -449,6 +479,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         return TRACKED_RESPONSE_MARKER;
     }
 
+    @Override
     public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
         if (id != null) {
             connectionStates.remove(id);
@@ -456,6 +487,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         return TRACKED_RESPONSE_MARKER;
     }
 
+    @Override
     public Response processMessage(Message send) throws Exception {
         if (send != null) {
             if (trackTransactions && send.getTransactionId() != null) {
@@ -467,13 +499,13 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
                         TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
                         if (transactionState != null) {
                             transactionState.addCommand(send);
-                            
+
                             if (trackTransactionProducers) {
                                 // for jmstemplate, track the producer in case it is closed before commit
                                 // and needs to be replayed
                                 SessionState ss = cs.getSessionState(producerId.getParentId());
                                 ProducerState producerState = ss.getProducerState(producerId);
-                                producerState.setTransactionState(transactionState);            
+                                producerState.setTransactionState(transactionState);
                             }
                         }
                     }
@@ -486,6 +518,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         return null;
     }
 
+    @Override
     public Response processBeginTransaction(TransactionInfo info) {
         if (trackTransactions && info != null && info.getTransactionId() != null) {
             ConnectionId connectionId = info.getConnectionId();
@@ -502,6 +535,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         return null;
     }
 
+    @Override
     public Response processPrepareTransaction(TransactionInfo info) throws Exception {
         if (trackTransactions && info != null) {
             ConnectionId connectionId = info.getConnectionId();
@@ -519,6 +553,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         return null;
     }
 
+    @Override
     public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
         if (trackTransactions && info != null) {
             ConnectionId connectionId = info.getConnectionId();
@@ -536,6 +571,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         return null;
     }
 
+    @Override
     public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
         if (trackTransactions && info != null) {
             ConnectionId connectionId = info.getConnectionId();
@@ -553,6 +589,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         return null;
     }
 
+    @Override
     public Response processRollbackTransaction(TransactionInfo info) throws Exception {
         if (trackTransactions && info != null) {
             ConnectionId connectionId = info.getConnectionId();
@@ -570,6 +607,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         return null;
     }
 
+    @Override
     public Response processEndTransaction(TransactionInfo info) throws Exception {
         if (trackTransactions && info != null) {
             ConnectionId connectionId = info.getConnectionId();
@@ -592,7 +630,10 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         if (pull != null) {
             // leave a single instance in the cache
             final String id = pull.getDestination() + "::" + pull.getConsumerId();
-            messageCache.put(id.intern(), pull);
+            if (messageCache.put(id.intern(), pull) == null) {
+                // Only marked as tracked if this is the first request we've seen.
+                pull.setTracked(true);
+            }
         }
         return null;
     }
@@ -628,7 +669,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
     public void setTrackTransactions(boolean trackTransactions) {
         this.trackTransactions = trackTransactions;
     }
-    
+
     public boolean isTrackTransactionProducers() {
         return this.trackTransactionProducers;
     }
@@ -636,7 +677,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
     public void setTrackTransactionProducers(boolean trackTransactionProducers) {
         this.trackTransactionProducers = trackTransactionProducers;
     }
-    
+
     public boolean isRestoreTransaction() {
         return restoreTransaction;
     }
@@ -661,6 +702,13 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
         this.maxCacheSize = maxCacheSize;
     }
 
+    /**
+     * @return the current cache size for the Message and MessagePull Command cache.
+     */
+    public long getCurrentCacheSize() {
+        return this.currentCacheSize;
+    }
+
     public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) {
         ConnectionState connectionState = connectionStates.get(connectionId);
         if (connectionState != null) {
@@ -675,7 +723,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch());
                     }
-                    transport.oneway(control);  
+                    transport.oneway(control);
                 } catch (Exception ex) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Failed to submit control for consumer: " + control.getConsumerId()

http://git-wip-us.apache.org/repos/asf/activemq/blob/e7703f70/activemq-client/src/test/java/org/apache/activemq/state/ConnectionStateTrackerTest.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/test/java/org/apache/activemq/state/ConnectionStateTrackerTest.java b/activemq-client/src/test/java/org/apache/activemq/state/ConnectionStateTrackerTest.java
new file mode 100644
index 0000000..1a46757
--- /dev/null
+++ b/activemq-client/src/test/java/org/apache/activemq/state/ConnectionStateTrackerTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.SessionId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ConnectionStateTrackerTest {
+
+    private final ActiveMQQueue queue = new ActiveMQQueue("Test");
+    private ConnectionId testConnectionId;
+    private SessionId testSessionId;
+
+    private int connectionId = 0;
+    private int sessionId = 0;
+    private int consumerId = 0;
+
+    @Before
+    public void setUp() throws Exception {
+        testConnectionId = createConnectionId();
+        testSessionId = createSessionId(testConnectionId);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    @Test
+    public void testCacheSizeWithMessagePulls() throws IOException {
+
+        final ConsumerId consumer1 = createConsumerId(testSessionId);
+
+        ConnectionStateTracker tracker = new ConnectionStateTracker();
+
+        assertEquals(0, tracker.getCurrentCacheSize());
+
+        MessagePull pullCommand = createPullCommand(consumer1);
+        tracker.track(pullCommand);
+
+        assertEquals(0, tracker.getCurrentCacheSize());
+
+        tracker.trackBack(pullCommand);
+        long currentSize = tracker.getCurrentCacheSize();
+
+        assertTrue(currentSize > 0);
+
+        pullCommand = createPullCommand(consumer1);
+        tracker.track(pullCommand);
+        tracker.trackBack(pullCommand);
+
+        assertEquals(currentSize, tracker.getCurrentCacheSize());
+    }
+
+    private MessagePull createPullCommand(ConsumerId id) {
+        MessagePull pullCommand = new MessagePull();
+        pullCommand.setDestination(queue);
+        pullCommand.setConsumerId(id);
+        return pullCommand;
+    }
+
+    private ConnectionId createConnectionId() {
+        ConnectionId id = new ConnectionId();
+        id.setValue(UUID.randomUUID() + ":" + connectionId++);
+        return id;
+    }
+
+    private SessionId createSessionId(ConnectionId connectionId) {
+        return new SessionId(connectionId, sessionId++);
+    }
+
+    private ConsumerId createConsumerId(SessionId sessionId) {
+        return new ConsumerId(sessionId, consumerId++);
+    }
+}