You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/10/07 16:03:47 UTC

[1/3] git commit: https://issues.apache.org/jira/browse/AMQ-4485 - reenable test. concurrentStoreandDispatch case cannot be reconciled via setBatch, best we can do is trap duplicates from inflight messages as they occur. tagged async stores to support th

Repository: activemq
Updated Branches:
  refs/heads/trunk 838bbebee -> 140ce1bc8


https://issues.apache.org/jira/browse/AMQ-4485 - reenable test. concurrentStoreandDispatch case cannot be reconciled via setBatch, best we can do is trap duplicates from inflight messages as they occur. tagged async stores to support this. the revert of serialization with tx and non aysnc send brings perf back to where it was. https://issues.apache.org/jira/browse/AMQ-5266


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/140ce1bc
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/140ce1bc
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/140ce1bc

Branch: refs/heads/trunk
Commit: 140ce1bc8f447552c6a6dbfc1d3284da96386d2b
Parents: c1c82be
Author: gtully <ga...@gmail.com>
Authored: Tue Oct 7 14:49:58 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Tue Oct 7 14:50:41 2014 +0100

----------------------------------------------------------------------
 .../region/cursors/AbstractStoreCursor.java     |  45 +++-----
 .../activemq/store/kahadb/KahaDBStore.java      | 114 +++++++------------
 .../activemq/store/kahadb/MessageDatabase.java  |  46 ++------
 activemq-unit-tests/pom.xml                     |   1 -
 .../activemq/bugs/AMQ4485LowLimitTest.java      |  16 ++-
 .../TwoBrokerQueueClientsReconnectTest.java     |   4 +
 6 files changed, 85 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/140ce1bc/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index fad666c..bef6017 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -18,7 +18,6 @@ package org.apache.activemq.broker.region.cursors;
 
 import java.util.Iterator;
 import java.util.LinkedList;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.Future;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
@@ -100,13 +99,19 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
             clearIterator(true);
             recovered = true;
             storeHasMessages = true;
+        } else if (!cached) {
+            // a duplicate from the store (!cached) - needs to be removed/acked - otherwise it will get re dispatched on restart
+            if (message.isRecievedByDFBridge()) {
+                // expected for messages pending acks with kahadb.concurrentStoreAndDispatchQueues=true
+                LOG.trace("{} store replayed pending message due to concurrentStoreAndDispatchQueues {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
+            } else {
+                LOG.warn("{} - cursor got duplicate from store {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
+                duplicate(message);
+            }
         } else {
-            LOG.warn("{} - cursor got duplicate {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
-
-            // a duplicate from the store - needs to be removed/acked - otherwise it will get redispatched on restart
-            // jdbc store will store duplicates and will set entry locator to sequence long.
-            // REVISIT - this seems too hacky - see use case AMQ4952Test
-            if (!cached || message.getMessageId().getEntryLocator() instanceof Long) {
+            LOG.warn("{} - cursor got duplicate send {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
+            if (message.getMessageId().getEntryLocator() instanceof Long) {
+                // JDBC will store a duplicate (with new sequence id) - it needs an ack  (AMQ4952Test)
                 duplicate(message);
             }
         }
@@ -219,10 +224,11 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
             if (!pendingCachedIds.isEmpty() || lastCachedId != null) {
                 LOG.trace("{} - disabling cache. current Id: {} seq: {}, batchList size: {}",
                             new Object[]{this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong(), batchList.size()});
-                collapseLastCachedIds();
+                pruneLastCached();
                 if (lastCachedId != null) {
                     setBatch(lastCachedId);
                     lastCachedId = null;
+                    pendingCachedIds.clear();
                 }
             }
         }
@@ -240,6 +246,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
                 Future future = (Future) futureOrLong;
                 if (future.isCancelled()) {
                     it.remove();
+                } else {
+                    break;
                 }
             } else {
                 // store complete - track via lastCachedId
@@ -249,25 +257,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
         }
     }
 
-    private void collapseLastCachedIds() throws Exception {
-        for (MessageId candidate : pendingCachedIds) {
-            final Object futureOrLong = candidate.getFutureOrSequenceLong();
-            if (futureOrLong instanceof Future) {
-                Future future = (Future) futureOrLong;
-                try {
-                    future.get();
-                    // future should be replaced with sequence by this time
-                } catch (CancellationException ignored) {
-                    continue;
-                }
-            }
-            setLastCachedId(candidate);
-        }
-        pendingCachedIds.clear();
-    }
-
     private void setLastCachedId(MessageId candidate) {
-        if (lastCachedId == null) {
+        if (lastCachedId == null || lastCachedId.getFutureOrSequenceLong() == null) {  // possibly null for topics
             lastCachedId = candidate;
         } else if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), ((Long) lastCachedId.getFutureOrSequenceLong())) > 0) {
             lastCachedId = candidate;
@@ -360,7 +351,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     public String toString() {
         return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
                     + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled()
-                    + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size() + ",lastCachedId:" + lastCachedId;
+                    + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size() + ",lastCachedId:" + lastCachedId + ",lastCachedId-seq:" + (lastCachedId != null ? lastCachedId.getFutureOrSequenceLong() : "null");
     }
     
     protected abstract void doFillBatch() throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq/blob/140ce1bc/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 54cfd7d..abdf4bf 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -30,7 +30,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -38,12 +37,11 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -86,7 +84,7 @@ import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, MessageDatabase.SerialExecution<Location> {
+public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
     static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
     private static final int MAX_ASYNC_JOBS = 10000;
 
@@ -124,7 +122,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                 return txid;
             }
         };
-        serialExecutor = this;
     }
 
     @Override
@@ -347,17 +344,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
         this.forceRecoverIndex = forceRecoverIndex;
     }
 
-    @Override
-    public Location execute(Callable<Location> c) throws Exception {
-        if (isConcurrentStoreAndDispatchQueues()) {
-            FutureTask<Location> future = new FutureTask<>(c);
-            this.queueExecutor.execute(future);
-            return future.get();
-        } else {
-            return c.call();
-        }
-    }
-
     public class KahaDBMessageStore extends AbstractMessageStore {
         protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
         protected KahaDestination dest;
@@ -383,25 +369,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                 throws IOException {
             if (isConcurrentStoreAndDispatchQueues()) {
                 StoreQueueTask result = new StoreQueueTask(this, context, message);
+                ListenableFuture<Object> future = result.getFuture();
+                message.getMessageId().setFutureOrSequenceLong(future);
+                message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch
                 result.aquireLocks();
                 addQueueTask(this, result);
-                final ListenableFuture<Object> future = result.getFuture();
                 if (indexListener != null) {
                     // allow concurrent dispatch by setting entry locator,
-                    // wait for add completion to remove potential pending addition
-                    message.getMessageId().setFutureOrSequenceLong(future);
-                    indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
-                        @Override
-                        public void run() {
-                            try {
-                                future.get();
-                                trackPendingAddComplete(dest, (Long) message.getMessageId().getFutureOrSequenceLong());
-                            } catch (CancellationException okNothingToTrack) {
-                            } catch (Exception e) {
-                                LOG.warn("{} unexpected exception tracking completion of async add of {}", this, message.getMessageId(), e);
-                            }
-                        }
-                    }));
+                    indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
                 }
                 return future;
             } else {
@@ -442,7 +417,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
 
         @Override
         public void addMessage(final ConnectionContext context, final Message message) throws IOException {
-            KahaAddMessageCommand command = new KahaAddMessageCommand();
+            final KahaAddMessageCommand command = new KahaAddMessageCommand();
             command.setDestination(dest);
             command.setMessageId(message.getMessageId().toProducerKey());
             command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId())));
@@ -450,25 +425,25 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
             command.setPrioritySupported(isPrioritizedMessages());
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
             command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
-            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() {
-                @Override
-                public void sequenceAssignedWithIndexLocked(final long sequence) {
-                    final Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
-                    message.getMessageId().setFutureOrSequenceLong(sequence);
-                    if (indexListener != null) {
-                        trackPendingAdd(dest, sequence);
-                        if (possibleFuture == null) {
-                            // sync add (for async future present from getFutureOrSequenceLong)
-                            indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
-                                @Override
-                                public void run() {
-                                    trackPendingAddComplete(dest, sequence);
-                                }
-                            }));
+                store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() {
+                    // sync add? (for async, future present from getFutureOrSequenceLong)
+                    Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
+
+                    public void sequenceAssignedWithIndexLocked(final long sequence) {
+                        message.getMessageId().setFutureOrSequenceLong(sequence);
+                        if (indexListener != null) {
+                            if (possibleFuture == null) {
+                                trackPendingAdd(dest, sequence);
+                                indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
+                                    @Override
+                                    public void run() {
+                                        trackPendingAddComplete(dest, sequence);
+                                    }
+                                }));
+                            }
                         }
                     }
-                }
-            }, null);
+                }, null);
         }
 
         @Override
@@ -680,32 +655,23 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
 
         @Override
         public void setBatch(final MessageId identity) throws IOException {
+            final String key = identity.toProducerKey();
+            indexLock.writeLock().lock();
             try {
-                final String key = identity.toProducerKey();
-                lockAsyncJobQueue();
-
-                // Hopefully one day the page file supports concurrent read
-                // operations... but for now we must externally synchronize...
-
-                indexLock.writeLock().lock();
-                try {
-                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                        @Override
-                        public void execute(Transaction tx) throws IOException {
-                            StoredDestination sd = getStoredDestination(dest, tx);
-                            Long location = sd.messageIdIndex.get(tx, key);
-                            if (location != null) {
-                                sd.orderIndex.setBatch(tx, location);
-                            } else {
-                                LOG.warn("{} Location {} not found in order index for {}", this, identity.getFutureOrSequenceLong(), identity);
-                            }
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    @Override
+                    public void execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        Long location = sd.messageIdIndex.get(tx, key);
+                        if (location != null) {
+                            sd.orderIndex.setBatch(tx, location);
+                        } else {
+                            LOG.warn("{} {} setBatch failed, location for {} not found in messageId index for {}", this, dest.getName(), identity.getFutureOrSequenceLong(), identity);
                         }
-                    });
-                } finally {
-                    indexLock.writeLock().unlock();
-                }
+                    }
+                });
             } finally {
-                unlockAsyncJobQueue();
+                indexLock.writeLock().unlock();
             }
         }
 
@@ -723,7 +689,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
 
         protected void lockAsyncJobQueue() {
             try {
-                this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
+                if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) {
+                    throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore);
+                }
             } catch (Exception e) {
                 LOG.error("Failed to lock async jobs for " + this.destination, e);
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/140ce1bc/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index c8bc2b8..554f1d3 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -256,11 +256,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     private boolean enableIndexPageCaching = true;
     ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
 
-    interface SerialExecution<V> {
-        public V execute(Callable<V> c) throws Exception;
-    }
-    SerialExecution<Location> serialExecutor;
-
     @Override
     public void doStart() throws Exception {
         load();
@@ -962,20 +957,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         return store(data, sync, before, after, null);
     }
 
-    public Location store(final KahaCommitCommand data, final boolean sync, final IndexAware before, final Runnable after) throws IOException {
-        try {
-            return serialExecutor.execute(new Callable<Location>() {
-                @Override
-                public Location call() throws Exception {
-                    return store(data, sync, before, after, null);
-                }
-            });
-        } catch (Exception e) {
-            LOG.error("Failed to execute commit", e);
-            throw new IOException(e);
-        }
-    }
-
     /**
      * All updated are are funneled through this method. The updates are converted
      * to a JournalMessage which is logged to the journal and then the data from
@@ -1259,24 +1240,19 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         }
 
         final List<Operation> messagingTx = inflightTx;
+        indexLock.writeLock().lock();
         try {
-            indexLock.writeLock().lock();
-            try {
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                    @Override
-                    public void execute(Transaction tx) throws IOException {
-                        for (Operation op : messagingTx) {
-                            op.execute(tx);
-                        }
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                @Override
+                public void execute(Transaction tx) throws IOException {
+                    for (Operation op : messagingTx) {
+                        op.execute(tx);
                     }
-                });
-                metadata.lastUpdate = location;
-            } finally {
-                indexLock.writeLock().unlock();
-            }
-        } catch (Exception e) {
-            LOG.error("serial execution of commit failed", e);
-            throw new IOException(e);
+                }
+            });
+            metadata.lastUpdate = location;
+        } finally {
+            indexLock.writeLock().unlock();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/140ce1bc/activemq-unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index 3a49bda..1333412 100755
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -557,7 +557,6 @@
             <artifactId>maven-surefire-plugin</artifactId>
             <configuration>
               <excludes combine.children="append">
-                <exclude>**/AMQ4485LowLimitTest.*</exclude>
                 <!-- http://jira.activemq.org/jira/browse/AMQ-626 -->
                 <exclude>**/MultipleTestsWithSpringFactoryBeanTest.*</exclude>
                 <exclude>**/MultipleTestsWithXBeanFactoryBeanTest.*</exclude>

http://git-wip-us.apache.org/repos/asf/activemq/blob/140ce1bc/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
index 5af1124..eb46f8f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
@@ -242,7 +242,7 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
                     if (tally.accumulator.get() != expected) {
                         LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get() + " != " + expected + ", " + tally.expected);
                         if (tally.accumulator.get() > expected - 50) {
-                            dumpQueueStat(tally.destination);
+                            dumpQueueStat(null);
                         }
                         if (tally.expected.size() == 1) {
                             startConsumer(tally.brokerName, tally.destination);
@@ -260,6 +260,9 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
         LOG.info("done");
         long duration = System.currentTimeMillis() - startTime;
         LOG.info("Duration:" + TimeUtils.printDuration(duration));
+
+        assertEquals("nothing in the dlq's", 0, dumpQueueStat(new ActiveMQQueue("ActiveMQ.DLQ")));
+
     }
 
     private void startConsumer(String brokerName, ActiveMQDestination destination) throws Exception {
@@ -273,17 +276,20 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
         queueConnection.close();
     }
 
-    private void dumpQueueStat(ActiveMQDestination destination) throws Exception {
+    private long dumpQueueStat(ActiveMQDestination destination) throws Exception {
+        long sumTotal = 0;
         Collection<BrokerItem> brokerList = brokers.values();
         for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext(); ) {
             BrokerService brokerService = i.next().broker;
             for (ObjectName objectName : brokerService.getAdminView().getQueues()) {
-                //if (objectName.toString().contains(destination.getQualifiedName())) {
+                if (destination != null && objectName.toString().contains(destination.getPhysicalName())) {
                     QueueViewMBean qViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(objectName, QueueViewMBean.class, false);
-                    LOG.info(brokerService.getBrokerName() + ", " + qViewMBean.getName() + " Size: " + qViewMBean.getEnqueueCount());
-                //}
+                    LOG.info(brokerService.getBrokerName() + ", " + qViewMBean.getName() + ", Enqueue:"  + qViewMBean.getEnqueueCount() + ", Size: " + qViewMBean.getQueueSize());
+                    sumTotal += qViewMBean.getQueueSize();
+                }
             }
         }
+        return sumTotal;
     }
 
     private void startAllGWFanoutConsumers(int nBrokers) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/140ce1bc/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
index bfc797f..9adc2a3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
@@ -42,6 +42,7 @@ import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
 import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -456,6 +457,9 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
         BrokerService brokerService = brokers.get(broker2).broker;
         brokerService.setPersistent(true);
         brokerService.setDeleteAllMessagesOnStartup(true);
+        // disable concurrent dispatch otherwise store duplicate suppression will be skipped b/c cursor audit is already
+        // disabled so verification of stats will fail - ie: duplicate will be dispatched
+        ((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
         brokerService.setPlugins(new BrokerPlugin[]{
                 new BrokerPluginSupport() {
                     @Override


[3/3] git commit: remove npe when no jars on classpath, in ide

Posted by gt...@apache.org.
remove npe when no jars on classpath, in ide


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/57fc29b6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/57fc29b6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/57fc29b6

Branch: refs/heads/trunk
Commit: 57fc29b6ce5b80930021733c313862b817e6b537
Parents: 838bbeb
Author: gtully <ga...@gmail.com>
Authored: Tue Oct 7 14:07:49 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Tue Oct 7 14:50:41 2014 +0100

----------------------------------------------------------------------
 .../org/apache/activemq/ActiveMQConnectionMetaData.java | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/57fc29b6/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java
index b6d6c87..74b1039 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java
@@ -43,11 +43,13 @@ public final class ActiveMQConnectionMetaData implements ConnectionMetaData {
             Package p = Package.getPackage("org.apache.activemq");
             if (p != null) {
                 version = p.getImplementationVersion();
-                Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*");
-                Matcher m = pattern.matcher(version);
-                if (m.matches()) {
-                    major = Integer.parseInt(m.group(1));
-                    minor = Integer.parseInt(m.group(2));
+                if (version != null) {
+                    Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*");
+                    Matcher m = pattern.matcher(version);
+                    if (m.matches()) {
+                        major = Integer.parseInt(m.group(1));
+                        minor = Integer.parseInt(m.group(2));
+                    }
                 }
             }
         } catch (Throwable e) {


[2/3] git commit: fix deadlock that blocks remote broker start in duplex case with durable sub recreation on restart - dynamicOnly=true works around

Posted by gt...@apache.org.
fix deadlock that blocks remote broker start in duplex case with durable sub recreation on restart - dynamicOnly=true works around


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c1c82beb
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c1c82beb
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c1c82beb

Branch: refs/heads/trunk
Commit: c1c82beb2d6cea17eb5a8164a42dde1bba2ef77c
Parents: 57fc29b
Author: gtully <ga...@gmail.com>
Authored: Tue Oct 7 14:38:20 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Tue Oct 7 14:50:41 2014 +0100

----------------------------------------------------------------------
 .../network/DemandForwardingBridgeSupport.java  |  24 +-
 ...DurableSubscriberWithNetworkRestartTest.java | 231 +++++++++++++++++++
 2 files changed, 244 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/c1c82beb/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 7d334ac..83eea31 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -427,6 +427,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
             startRemoteBridge();
         } catch (Throwable e) {
             serviceRemoteException(e);
+            return;
+        }
+
+        try {
+            if (safeWaitUntilStarted()) {
+                setupStaticDestinations();
+            }
+        } catch (Throwable e) {
+            serviceLocalException(e);
         }
     }
 
@@ -509,14 +518,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 startedLatch.countDown();
                 localStartedLatch.countDown();
             }
-
-            if (!disposed.get()) {
-                setupStaticDestinations();
-            } else {
-                LOG.warn("Network connection between {} and {} ({}) was interrupted during establishment.", new Object[]{
-                        localBroker, remoteBroker, remoteBrokerName
-                });
-            }
         }
     }
 
@@ -841,7 +842,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
     }
 
     public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
-
+        LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
         if (!disposed.get()) {
             if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
                 // not a reason to terminate the bridge - temps can disappear with
@@ -1398,12 +1399,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
      * Performs a timed wait on the started latch and then checks for disposed
      * before performing another wait each time the the started wait times out.
      */
-    protected void safeWaitUntilStarted() throws InterruptedException {
+    protected boolean safeWaitUntilStarted() throws InterruptedException {
         while (!disposed.get()) {
             if (startedLatch.await(1, TimeUnit.SECONDS)) {
-                return;
+                break;
             }
         }
+        return !disposed.get();
     }
 
     protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/c1c82beb/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkRestartTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkRestartTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkRestartTest.java
new file mode 100644
index 0000000..3799c6c
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkRestartTest.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.Set;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+import static org.junit.Assume.assumeNotNull;
+
+
+public class DurableSubscriberWithNetworkRestartTest extends JmsMultipleBrokersTestSupport {
+    private static final Log LOG = LogFactory.getLog(DurableSubscriberWithNetworkRestartTest.class);
+    private static final String HUB = "HubBroker";
+    private static final String SPOKE = "SpokeBroker";
+    protected static final int MESSAGE_COUNT = 10;
+    public boolean dynamicOnly = false;
+
+    public void testSendOnAReceiveOnBWithTransportDisconnectDynamicOnly() throws Exception {
+        dynamicOnly = true;
+        try {
+            testSendOnAReceiveOnBWithTransportDisconnect();
+        } finally {
+            dynamicOnly = false;
+        }
+    }
+
+    public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
+        bridge(SPOKE, HUB);
+        startAllBrokers();
+
+        verifyDuplexBridgeMbean();
+
+        // Setup connection
+        URI hubURI = brokers.get(HUB).broker.getTransportConnectors().get(0).getPublishableConnectURI();
+        URI spokeURI = brokers.get(SPOKE).broker.getTransportConnectors().get(0).getPublishableConnectURI();
+        ActiveMQConnectionFactory facHub = new ActiveMQConnectionFactory(hubURI);
+        ActiveMQConnectionFactory facSpoke = new ActiveMQConnectionFactory(spokeURI);
+        Connection conHub = facHub.createConnection();
+        Connection conSpoke = facSpoke.createConnection();
+        conHub.setClientID("clientHUB");
+        conSpoke.setClientID("clientSPOKE");
+        conHub.start();
+        conSpoke.start();
+        Session sesHub = conHub.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session sesSpoke = conSpoke.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQTopic topic = new ActiveMQTopic("TEST.FOO");
+        String consumerName = "consumerName";
+
+        // Setup consumers
+        MessageConsumer remoteConsumer = sesHub.createDurableSubscriber(topic, consumerName);
+        sleep(1000);
+        remoteConsumer.close();
+
+        // Setup producer
+        MessageProducer localProducer = sesSpoke.createProducer(topic);
+        localProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        final String payloadString = new String(new byte[10*1024]);
+        // Send messages
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message test = sesSpoke.createTextMessage("test-" + i);
+            test.setStringProperty("payload", payloadString);
+            localProducer.send(test);
+        }
+        localProducer.close();
+
+        final String options = "?persistent=true&useJmx=true&deleteAllMessagesOnStartup=false";
+        for (int i=0;i<2;i++) {
+            brokers.get(SPOKE).broker.stop();
+            sleep(1000);
+            createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options));
+            bridge(SPOKE, HUB);
+            brokers.get(SPOKE).broker.start();
+            LOG.info("restarted spoke..:" + i);
+
+            assertTrue("got mbeans on restart", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return countMbeans( brokers.get(HUB).broker, "networkBridge", 20000) == (dynamicOnly ? 1 : 2);
+                }
+            }));
+        }
+    }
+
+    private void verifyDuplexBridgeMbean() throws Exception {
+        assertEquals(1, countMbeans( brokers.get(HUB).broker, "networkBridge", 5000));
+    }
+
+    private int countMbeans(BrokerService broker, String type, int timeout) throws Exception {
+        final long expiryTime = System.currentTimeMillis() + timeout;
+
+        if (!type.contains("=")) {
+            type = type + "=*";
+        }
+
+        final ObjectName beanName = new ObjectName("org.apache.activemq:type=Broker,brokerName="
+                + broker.getBrokerName() + "," + type +",*");
+        Set<ObjectName> mbeans = null;
+        int count = 0;
+        do {
+            if (timeout > 0) {
+                Thread.sleep(100);
+            }
+
+            mbeans = broker.getManagementContext().queryNames(beanName, null);
+            if (mbeans != null) {
+                count = mbeans.size();
+                LOG.info("Found: " + count + ", matching type: " +type);
+                for (ObjectName objectName : mbeans) {
+                    LOG.info("" + objectName);
+                }
+                //} else {
+                //logAllMbeans(broker);
+            }
+        } while ((mbeans == null || mbeans.isEmpty()) && expiryTime > System.currentTimeMillis());
+
+        // If port 1099 is in use when the Broker starts, starting the jmx connector
+        // will fail.  So, if we have no mbsc to query, skip the test.
+        if (timeout > 0) {
+            assumeNotNull(mbeans);
+        }
+
+        return count;
+
+    }
+
+    private void logAllMbeans(BrokerService broker) throws MalformedURLException {
+        try {
+            // trace all existing MBeans
+            Set<?> all = broker.getManagementContext().queryNames(null, null);
+            LOG.info("Total MBean count=" + all.size());
+            for (Object o : all) {
+                //ObjectInstance bean = (ObjectInstance)o;
+                LOG.info(o);
+            }
+        } catch (Exception ignored) {
+            LOG.warn("getMBeanServer ex: " + ignored);
+        }
+    }
+
+    public NetworkConnector bridge(String from, String to) throws Exception {
+        NetworkConnector networkConnector = bridgeBrokers(from, to, dynamicOnly, -1, true);
+        networkConnector.setSuppressDuplicateQueueSubscriptions(true);
+        networkConnector.setDecreaseNetworkConsumerPriority(true);
+        networkConnector.setConsumerTTL(1);
+        networkConnector.setDuplex(true);
+        return networkConnector;
+    }
+
+    @Override
+    protected void startAllBrokers() throws Exception {
+        // Ensure HUB is started first so bridge will be active from the get go
+        BrokerItem brokerItem = brokers.get(HUB);
+        brokerItem.broker.start();
+        brokerItem = brokers.get(SPOKE);
+        brokerItem.broker.start();
+        sleep(600);
+    }
+
+    public void setUp() throws Exception {
+        super.setAutoFail(false);
+        super.setUp();
+        createBrokers(true);
+    }
+
+    private void createBrokers(boolean del) throws Exception {
+        final String options = "?persistent=true&useJmx=true&deleteAllMessagesOnStartup=" + del;
+        createBroker(new URI("broker:(tcp://localhost:61617)/" + HUB + options));
+        createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options));
+    }
+
+    protected void configureBroker(BrokerService broker) {
+        broker.setKeepDurableSubsActive(false);
+        broker.getManagementContext().setCreateConnector(false);
+        PolicyMap defaultPolcyMap = new PolicyMap();
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        //defaultPolicy.setUseCache(false);
+        if (broker.getBrokerName().equals(HUB)) {
+            defaultPolicy.setStoreUsageHighWaterMark(2);
+            broker.getSystemUsage().getStoreUsage().setLimit(1*1024*1024);
+        }
+        defaultPolcyMap.setDefaultEntry(defaultPolicy);
+        broker.setDestinationPolicy(defaultPolcyMap);
+        broker.getSystemUsage().getMemoryUsage().setLimit(100*1024*1024);
+    }
+
+    public void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    private void sleep(int milliSecondTime) {
+        try {
+            Thread.sleep(milliSecondTime);
+        } catch (InterruptedException igonred) {
+        }
+    }
+}