You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/01/20 18:42:01 UTC

svn commit: r901300 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/ main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/bugs/

Author: rajdavies
Date: Wed Jan 20 17:42:00 2010
New Revision: 901300

URL: http://svn.apache.org/viewvc?rev=901300&view=rev
Log:
Improvement for https://issues.apache.org/activemq/browse/AMQ-2512

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=901300&r1=901299&r2=901300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Wed Jan 20 17:42:00 2010
@@ -19,7 +19,6 @@
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map.Entry;
-
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.Message;
@@ -47,6 +46,7 @@
         this.regionDestination=destination;
     }
     
+    @Override
     public final synchronized void start() throws Exception{
         if (!isStarted()) {
             super.start();
@@ -60,6 +60,7 @@
         } 
     }
     
+    @Override
     public final synchronized void stop() throws Exception {
         resetBatch();
         super.stop();
@@ -91,6 +92,7 @@
         return recovered;
     }
     
+    @Override
     public final void reset() {
         if (batchList.isEmpty()) {
             try {
@@ -104,6 +106,7 @@
         size();
     }
     
+    @Override
     public synchronized void release() {
         clearIterator(false);
     }
@@ -127,6 +130,7 @@
     public final void finished() {
     }
         
+    @Override
     public final synchronized boolean hasNext() {
         if (batchList.isEmpty()) {
             try {
@@ -140,6 +144,7 @@
         return this.iterator.hasNext();
     }
     
+    @Override
     public final synchronized MessageReference next() {
         MessageReference result = null;
         if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
@@ -149,6 +154,7 @@
         return result;
     }
     
+    @Override
     public final synchronized void addMessageLast(MessageReference node) throws Exception {
         if (cacheEnabled && hasSpace()) {
             recoverMessage(node.getMessage(),true);
@@ -171,11 +177,13 @@
     protected void setBatch(MessageId messageId) throws Exception {
     }
 
+    @Override
     public final synchronized void addMessageFirst(MessageReference node) throws Exception {
         cacheEnabled=false;
         size++;
     }
 
+    @Override
     public final synchronized void remove() {
         size--;
         if (iterator!=null) {
@@ -184,7 +192,7 @@
         if (last != null) {
             last.decrementReferenceCount();
         }
-        if (size==0 && isStarted() && useCache && hasSpace() && getStoreSize() == 0) {
+        if (size==0 && isStarted() && useCache && hasSpace() && isStoreEmpty()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " enabling cache on last remove");
             }
@@ -192,16 +200,19 @@
         }
     }
 
+    @Override
     public final synchronized void remove(MessageReference node) {
         size--;
         cacheEnabled=false;
         batchList.remove(node.getMessageId());
     }
     
+    @Override
     public final synchronized void clear() {
         gc();
     }
     
+    @Override
     public final synchronized void gc() {
         for (Message msg : batchList.values()) {
             rollback(msg.getMessageId());
@@ -218,6 +229,7 @@
         }
     }
     
+    @Override
     protected final synchronized void fillBatch() {
         if (batchResetNeeded) {
             resetBatch();
@@ -237,15 +249,18 @@
         }
     }
     
+    @Override
     public final synchronized boolean isEmpty() {
         // negative means more messages added to store through queue.send since last reset
         return size == 0;
     }
 
+    @Override
     public final synchronized boolean hasMessagesBufferedToDeliver() {
         return !batchList.isEmpty();
     }
 
+    @Override
     public final synchronized int size() {
         if (size < 0) {
             this.size = getStoreSize();
@@ -259,4 +274,6 @@
     protected abstract void resetBatch();
     
     protected abstract int getStoreSize();
+    
+    protected abstract boolean isStoreEmpty();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=901300&r1=901299&r2=901300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Wed Jan 20 17:42:00 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.io.IOException;
-
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
@@ -33,7 +32,7 @@
  */
 class QueueStorePrefetch extends AbstractStoreCursor {
     private static final Log LOG = LogFactory.getLog(QueueStorePrefetch.class);
-    private MessageStore store;
+    private final MessageStore store;
    
     /**
      * Construct it
@@ -41,7 +40,7 @@
      */
     public QueueStorePrefetch(Queue queue) {
         super(queue);
-        this.store = (MessageStore)queue.getMessageStore();
+        this.store = queue.getMessageStore();
 
     }
 
@@ -58,29 +57,47 @@
 
    
         
+    @Override
     protected synchronized int getStoreSize() {
         try {
-            return this.store.getMessageCount();
+            int result = this.store.getMessageCount();
+            return result;
+            
         } catch (IOException e) {
             LOG.error("Failed to get message count", e);
             throw new RuntimeException(e);
         }
     }
     
+    @Override
+    protected synchronized boolean isStoreEmpty() {
+        try {
+            return this.store.isEmpty();
+            
+        } catch (Exception e) {
+            LOG.error("Failed to get message count", e);
+            throw new RuntimeException(e);
+        }
+    }
+    
+    @Override
     protected void resetBatch() {
         this.store.resetBatching();
     }
     
+    @Override
     protected void setBatch(MessageId messageId) throws Exception {
         store.setBatch(messageId);
         batchResetNeeded = false;
     }
 
     
+    @Override
     protected void doFillBatch() throws Exception {
         this.store.recoverNextMessages(this.maxBatchSize, this);
     }
 
+    @Override
     public String toString() {
         return "QueueStorePrefetch" + System.identityHashCode(this);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=901300&r1=901299&r2=901300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Wed Jan 20 17:42:00 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.io.IOException;
-
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.Message;
@@ -36,10 +35,10 @@
  */
 class TopicStorePrefetch extends AbstractStoreCursor {
     private static final Log LOG = LogFactory.getLog(TopicStorePrefetch.class);
-    private TopicMessageStore store;
-    private String clientId;
-    private String subscriberName;
-    private Subscription subscription;
+    private final TopicMessageStore store;
+    private final String clientId;
+    private final String subscriberName;
+    private final Subscription subscription;
     
     /**
      * @param topic
@@ -62,6 +61,7 @@
     }
     
         
+    @Override
     public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
         MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext();
         messageEvaluationContext.setMessageReference(message);
@@ -73,6 +73,7 @@
     }
 
    
+    @Override
     protected synchronized int getStoreSize() {
         try {
             return store.getMessageCount(clientId, subscriberName);
@@ -81,17 +82,31 @@
             throw new RuntimeException(e);
         }
     }
+    
+    @Override
+    protected synchronized boolean isStoreEmpty() {
+        try {
+            return this.store.isEmpty();
+            
+        } catch (Exception e) {
+            LOG.error("Failed to get message count", e);
+            throw new RuntimeException(e);
+        }
+    }
 
             
+    @Override
     protected void resetBatch() {
         this.store.resetBatching(clientId, subscriberName);
     }
     
+    @Override
     protected void doFillBatch() throws Exception {
         this.store.recoverNextMessages(clientId, subscriberName,
                 maxBatchSize, this);
     }
 
+    @Override
     public String toString() {
         return "TopicStorePrefetch" + System.identityHashCode(this) + "(" + clientId + "," + subscriberName + ")";
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=901300&r1=901299&r2=901300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java Wed Jan 20 17:42:00 2010
@@ -17,10 +17,9 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
-
+import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.usage.MemoryUsage;
 
 abstract public class AbstractMessageStore implements MessageStore {
@@ -48,4 +47,13 @@
     
     public void setBatch(MessageId messageId) throws IOException, Exception {
     }
+    
+    /**
+     * flag to indicate if the store is empty
+     * @return true if the message count is 0
+     * @throws Exception 
+     */
+     public boolean isEmpty() throws Exception{
+         return getMessageCount()==0;
+     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=901300&r1=901299&r2=901300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java Wed Jan 20 17:42:00 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
-
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -25,7 +24,6 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.SystemUsage;
 
 /**
  * Represents a message store which is used by the persistent implementations
@@ -114,7 +112,15 @@
     /**
      * allow caching cursors to set the current batch offset when cache is exhausted
      * @param messageId
+     * @throws Exception 
      */
     void setBatch(MessageId messageId) throws Exception;
     
+    /**
+     * flag to indicate if the store is empty
+     * @return true if the message count is 0
+     * @throws Exception 
+     */
+    boolean isEmpty() throws Exception;
+    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=901300&r1=901299&r2=901300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java Wed Jan 20 17:42:00 2010
@@ -96,4 +96,8 @@
     public void setBatch(MessageId messageId) throws Exception {
         delegate.setBatch(messageId);
     }
+
+    public boolean isEmpty() throws Exception {
+       return delegate.isEmpty();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=901300&r1=901299&r2=901300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Wed Jan 20 17:42:00 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -25,7 +24,6 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.SystemUsage;
 
 /**
  * A simple proxy that delegates to another MessageStore.
@@ -138,4 +136,8 @@
     public void setBatch(MessageId messageId) throws Exception {
         delegate.setBatch(messageId);
     }
+    
+    public boolean isEmpty() throws Exception {
+        return delegate.isEmpty();
+     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=901300&r1=901299&r2=901300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Wed Jan 20 17:42:00 2010
@@ -24,7 +24,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.Map.Entry;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -69,7 +68,7 @@
 
 public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
 
-    private WireFormat wireFormat = new OpenWireFormat();
+    private final WireFormat wireFormat = new OpenWireFormat();
 
     public void setBrokerName(String brokerName) {
     }
@@ -128,6 +127,7 @@
             this.dest = convert( destination );
         }
 
+        @Override
         public ActiveMQDestination getDestination() {
             return destination;
         }
@@ -200,6 +200,25 @@
                 });
             }
         }
+        
+        public boolean isEmpty() throws IOException {
+            synchronized(indexMutex) {
+                return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>(){
+                    public Boolean execute(Transaction tx) throws IOException {
+                        // Iterate through all index entries to get a count of messages in the destination.
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        boolean result = true;
+                        for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
+                            iterator.next();
+                            result = false;
+                            break;
+                        }
+                        return result;
+                    }
+                });
+            }
+        }
+
 
         public void recover(final MessageRecoveryListener listener) throws Exception {
             synchronized(indexMutex) {
@@ -266,10 +285,13 @@
             
         }
 
+        @Override
         public void setMemoryUsage(MemoryUsage memoeyUSage) {
         }
+        @Override
         public void start() throws Exception {
         }
+        @Override
         public void stop() throws Exception {
         }
         

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java?rev=901300&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java Wed Jan 20 17:42:00 2010
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.util.IOHelper;
+
+public class AMQ2512Test extends EmbeddedBrokerTestSupport {
+    private static Connection connection;
+    private final static String QUEUE_NAME = "dee.q";
+    private final static int INITIAL_MESSAGES_CNT = 1000;
+    private final static int WORKER_INTERNAL_ITERATIONS = 100;
+    private final static int TOTAL_MESSAGES_CNT = INITIAL_MESSAGES_CNT * WORKER_INTERNAL_ITERATIONS
+            + INITIAL_MESSAGES_CNT;
+    private final static byte[] payload = new byte[5 * 1024];
+    private final static String TEXT = new String(payload);
+
+    private final static String PRP_INITIAL_ID = "initial-id";
+    private final static String PRP_WORKER_ID = "worker-id";
+
+    private final static CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT);
+
+    private final static AtomicInteger ON_MSG_COUNTER = new AtomicInteger();
+
+    public void testKahaDBFailure() throws Exception {
+        final ConnectionFactory fac = new ActiveMQConnectionFactory(this.bindAddress);
+        connection = fac.createConnection();
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue queue = session.createQueue(QUEUE_NAME);
+        final MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        connection.start();
+
+        final long startTime = System.nanoTime();
+
+        final List<Consumer> consumers = new ArrayList<Consumer>();
+        for (int i = 0; i < 20; i++) {
+            consumers.add(new Consumer("worker-" + i));
+        }
+
+        for (int i = 0; i < INITIAL_MESSAGES_CNT; i++) {
+            final TextMessage msg = session.createTextMessage(TEXT);
+            msg.setStringProperty(PRP_INITIAL_ID, "initial-" + i);
+            producer.send(msg);
+        }
+
+        LATCH.await();
+        final long endTime = System.nanoTime();
+        System.out.println("Total execution time = "
+                + TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [ms].");
+        System.out.println("Rate = " + TOTAL_MESSAGES_CNT
+                / TimeUnit.SECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [msg/s].");
+
+        for (Consumer c : consumers) {
+            c.close();
+        }
+        connection.close();
+    }
+
+    private final static class Consumer implements MessageListener {
+        private final String name;
+        private final Session session;
+        private final MessageProducer producer;
+
+        private Consumer(String name) {
+            this.name = name;
+            try {
+                session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+                final Queue queue = session.createQueue(QUEUE_NAME + "?consumer.prefetchSize=10");
+                producer = session.createProducer(queue);
+                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+                final MessageConsumer consumer = session.createConsumer(queue);
+                consumer.setMessageListener(this);
+            } catch (JMSException e) {
+                e.printStackTrace();
+                throw new RuntimeException(e);
+            }
+        }
+
+        public void onMessage(Message message) {
+            final TextMessage msg = (TextMessage) message;
+            try {
+                if (!msg.propertyExists(PRP_WORKER_ID)) {
+                    for (int i = 0; i < WORKER_INTERNAL_ITERATIONS; i++) {
+                        final TextMessage newMsg = session.createTextMessage(msg.getText());
+                        newMsg.setStringProperty(PRP_WORKER_ID, name + "-" + i);
+                        newMsg.setStringProperty(PRP_INITIAL_ID, msg.getStringProperty(PRP_INITIAL_ID));
+                        producer.send(newMsg);
+                    }
+                }
+                msg.acknowledge();
+
+            } catch (JMSException e) {
+                e.printStackTrace();
+                throw new RuntimeException(e);
+            } finally {
+                final int onMsgCounter = ON_MSG_COUNTER.getAndIncrement();
+                if (onMsgCounter % 1000 == 0) {
+                    System.out.println("message received: " + onMsgCounter);
+                }
+                LATCH.countDown();
+            }
+        }
+
+        private void close() {
+            if (session != null) {
+                try {
+                    session.close();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        bindAddress = "tcp://0.0.0.0:61617";
+        super.setUp();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        File dataFileDir = new File("target/test-amq-2512/datadb");
+        IOHelper.mkdirs(dataFileDir);
+        IOHelper.deleteChildren(dataFileDir);
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(dataFileDir); 
+        BrokerService answer = new BrokerService();
+        answer.setPersistenceAdapter(kaha);
+      
+        kaha.setEnableJournalDiskSyncs(false);
+        //kaha.setIndexCacheSize(10);
+        answer.setDataDirectoryFile(dataFileDir);
+        answer.setUseJmx(false);
+        answer.addConnector(bindAddress);
+        return answer;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain