You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/05/25 23:20:44 UTC

svn commit: r948209 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Tue May 25 21:20:40 2010
New Revision: 948209

URL: http://svn.apache.org/viewvc?rev=948209&view=rev
Log:
resolve regression in ThreeBrokerVirtualTopicNetworkTest - asnyc tasks need to use destination in key as id is not uniqueue with virtual topics. Also, on a failed cancle, we must wait for the write to compete so the ack/remove does not lag the write leaving an outstanding message. consequence of fixes for https://issues.apache.org/activemq/browse/AMQ-2620  and
https://issues.apache.org/activemq/browse/AMQ-2568

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=948209&r1=948208&r2=948209&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Tue May 25 21:20:40 2010
@@ -37,6 +37,7 @@ import java.util.concurrent.ThreadPoolEx
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -72,19 +73,21 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
 import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
-import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Transaction;
 
 public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
+    private static final Log LOG = LogFactory.getLog(KahaDBStore.class);
     private static final int MAX_ASYNC_JOBS = 10000;
     protected ExecutorService queueExecutor;
     protected ExecutorService topicExecutor;
-    protected final Map<MessageId, StoreQueueTask> asyncQueueMap = new HashMap<MessageId, StoreQueueTask>();
+    protected final Map<AsyncJobKey, StoreQueueTask> asyncQueueMap = new HashMap<AsyncJobKey, StoreQueueTask>();
     protected final Map<MessageId, StoreTopicTask> asyncTopicMap = new HashMap<MessageId, StoreTopicTask>();
     private final WireFormat wireFormat = new OpenWireFormat();
     private SystemUsage usageManager;
@@ -95,11 +98,10 @@ public class KahaDBStore extends Message
     private boolean concurrentStoreAndDispatchQueues = true;
     private boolean concurrentStoreAndDispatchTopics = true;
     private int maxAsyncJobs = MAX_ASYNC_JOBS;
-    private Scheduler scheduler;
-
+ 
     public KahaDBStore() {
-
     }
+    
     public void setBrokerName(String brokerName) {
     }
 
@@ -197,8 +199,8 @@ public class KahaDBStore extends Message
         super.doStop(stopper);
     }
 
-    protected StoreQueueTask removeQueueTask(MessageId id) {
-        StoreQueueTask task = this.asyncQueueMap.remove(id);
+    protected StoreQueueTask removeQueueTask(ActiveMQDestination activeMQDestination, MessageId id) {
+        StoreQueueTask task = this.asyncQueueMap.remove(new AsyncJobKey(id, activeMQDestination));
         if (task != null) {
             task.getMessage().decrementReferenceCount();
             this.queueSemaphore.release();
@@ -206,14 +208,14 @@ public class KahaDBStore extends Message
         return task;
     }
 
-    protected void addQueueTask(StoreQueueTask task) throws IOException {
+    protected void addQueueTask(ActiveMQDestination activeMQDestination, StoreQueueTask task) throws IOException {
         try {
             this.queueSemaphore.acquire();
 
         } catch (InterruptedException e) {
             throw new InterruptedIOException(e.getMessage());
         }
-        this.asyncQueueMap.put(task.getMessage().getMessageId(), task);
+        this.asyncQueueMap.put(new AsyncJobKey(task.getMessage().getMessageId(), activeMQDestination), task);
         task.getMessage().incrementReferenceCount();
         this.queueExecutor.execute(task);
     }
@@ -302,7 +304,7 @@ public class KahaDBStore extends Message
                 throws IOException {
             if (isConcurrentStoreAndDispatchQueues()) {
                 StoreQueueTask result = new StoreQueueTask(this, context, message);
-                addQueueTask(result);
+                addQueueTask(destination, result);
                 return result.getFuture();
             } else {
                 return super.asyncAddQueueMessage(context, message);
@@ -312,9 +314,16 @@ public class KahaDBStore extends Message
         @Override
         public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
             if (isConcurrentStoreAndDispatchQueues()) {
-                StoreQueueTask task = removeQueueTask(ack.getLastMessageId());
+                StoreQueueTask task = removeQueueTask(destination, ack.getLastMessageId());
                 if (task != null) {
                     if (!task.cancel()) {
+                        try {
+                            task.future.get();
+                        } catch (InterruptedException e) {
+                            throw new InterruptedIOException(e.toString());
+                        } catch (Exception ignored) {
+                            LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
+                        }
                         removeMessage(context, ack);
                     }
                 } else {
@@ -334,7 +343,6 @@ public class KahaDBStore extends Message
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
             command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
             store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null);
-
         }
 
         public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
@@ -894,6 +902,35 @@ public class KahaDBStore extends Message
         }
     }
 
+    static class AsyncJobKey {
+        MessageId id;
+        ActiveMQDestination destination;
+        
+        AsyncJobKey(MessageId id, ActiveMQDestination destination) {
+            this.id = id;
+            this.destination = destination;
+        }
+        
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == this) {
+                return true;
+            }
+            return obj instanceof AsyncJobKey &&
+                id.equals(((AsyncJobKey)obj).id) &&
+                destination.equals(((AsyncJobKey)obj).destination);
+        }
+
+        @Override
+        public int hashCode() {
+            return id.hashCode() + destination.hashCode();
+        }
+
+        public String toString() {
+            return destination.getPhysicalName() + "-" + id; 
+        }
+    }
+    
     class StoreQueueTask implements Runnable {
         protected final Message message;
         protected final ConnectionContext context;
@@ -915,8 +952,7 @@ public class KahaDBStore extends Message
 
         public boolean cancel() {
             if (this.done.compareAndSet(false, true)) {
-                this.future.cancel(false);
-                return true;
+                return this.future.cancel(false);
             }
             return false;
         }
@@ -925,7 +961,7 @@ public class KahaDBStore extends Message
             try {
                 if (this.done.compareAndSet(false, true)) {
                     this.store.addMessage(context, message);
-                    removeQueueTask(this.message.getMessageId());
+                    removeQueueTask(this.store.getDestination(), this.message.getMessageId());
                     this.future.complete();
                 }
             } catch (Exception e) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=948209&r1=948208&r2=948209&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Tue May 25 21:20:40 2010
@@ -744,7 +744,6 @@ public class MessageDatabase extends Ser
             synchronized (indexMutex) {
                 ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
                 inflightTx.add(new AddOpperation(command, location));
-                TransactionId key = key(command.getTransactionInfo());
             }
         } else {
             synchronized (indexMutex) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java?rev=948209&r1=948208&r2=948209&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java Tue May 25 21:20:40 2010
@@ -114,6 +114,7 @@ public class ThreeBrokerVirtualTopicNetw
         // ensure we don't get any more messages
         Thread.sleep(2000);
         
+        LOG.info("MessagesA: " + msgsA.getMessageIds());
         assertEquals(10, msgsA.getMessageCount());
         assertEquals(11, msgsB.getMessageCount());
         assertEquals(11, msgsC.getMessageCount());        
@@ -141,6 +142,7 @@ public class ThreeBrokerVirtualTopicNetw
         // ensure we don't get any more messages
         Thread.sleep(5000);
         
+        LOG.info("Extra MessagesA: " + msgsA.getMessageIds());
         assertEquals(0, msgsA.getMessageCount());
         assertEquals(11, msgsB.getMessageCount());
         assertEquals(11, msgsC.getMessageCount());