You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/01/20 18:42:01 UTC
svn commit: r901300 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/cursors/
main/java/org/apache/activemq/store/
main/java/org/apache/activemq/store/kahadb/
test/java/org/apache/activemq/bugs/
Author: rajdavies
Date: Wed Jan 20 17:42:00 2010
New Revision: 901300
URL: http://svn.apache.org/viewvc?rev=901300&view=rev
Log:
Improvement for https://issues.apache.org/activemq/browse/AMQ-2512
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=901300&r1=901299&r2=901300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Wed Jan 20 17:42:00 2010
@@ -19,7 +19,6 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map.Entry;
-
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.Message;
@@ -47,6 +46,7 @@
this.regionDestination=destination;
}
+ @Override
public final synchronized void start() throws Exception{
if (!isStarted()) {
super.start();
@@ -60,6 +60,7 @@
}
}
+ @Override
public final synchronized void stop() throws Exception {
resetBatch();
super.stop();
@@ -91,6 +92,7 @@
return recovered;
}
+ @Override
public final void reset() {
if (batchList.isEmpty()) {
try {
@@ -104,6 +106,7 @@
size();
}
+ @Override
public synchronized void release() {
clearIterator(false);
}
@@ -127,6 +130,7 @@
public final void finished() {
}
+ @Override
public final synchronized boolean hasNext() {
if (batchList.isEmpty()) {
try {
@@ -140,6 +144,7 @@
return this.iterator.hasNext();
}
+ @Override
public final synchronized MessageReference next() {
MessageReference result = null;
if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
@@ -149,6 +154,7 @@
return result;
}
+ @Override
public final synchronized void addMessageLast(MessageReference node) throws Exception {
if (cacheEnabled && hasSpace()) {
recoverMessage(node.getMessage(),true);
@@ -171,11 +177,13 @@
protected void setBatch(MessageId messageId) throws Exception {
}
+ @Override
public final synchronized void addMessageFirst(MessageReference node) throws Exception {
cacheEnabled=false;
size++;
}
+ @Override
public final synchronized void remove() {
size--;
if (iterator!=null) {
@@ -184,7 +192,7 @@
if (last != null) {
last.decrementReferenceCount();
}
- if (size==0 && isStarted() && useCache && hasSpace() && getStoreSize() == 0) {
+ if (size==0 && isStarted() && useCache && hasSpace() && isStoreEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " enabling cache on last remove");
}
@@ -192,16 +200,19 @@
}
}
+ @Override
public final synchronized void remove(MessageReference node) {
size--;
cacheEnabled=false;
batchList.remove(node.getMessageId());
}
+ @Override
public final synchronized void clear() {
gc();
}
+ @Override
public final synchronized void gc() {
for (Message msg : batchList.values()) {
rollback(msg.getMessageId());
@@ -218,6 +229,7 @@
}
}
+ @Override
protected final synchronized void fillBatch() {
if (batchResetNeeded) {
resetBatch();
@@ -237,15 +249,18 @@
}
}
+ @Override
public final synchronized boolean isEmpty() {
// negative means more messages added to store through queue.send since last reset
return size == 0;
}
+ @Override
public final synchronized boolean hasMessagesBufferedToDeliver() {
return !batchList.isEmpty();
}
+ @Override
public final synchronized int size() {
if (size < 0) {
this.size = getStoreSize();
@@ -259,4 +274,6 @@
protected abstract void resetBatch();
protected abstract int getStoreSize();
+
+ protected abstract boolean isStoreEmpty();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=901300&r1=901299&r2=901300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Wed Jan 20 17:42:00 2010
@@ -17,7 +17,6 @@
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
-
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
@@ -33,7 +32,7 @@
*/
class QueueStorePrefetch extends AbstractStoreCursor {
private static final Log LOG = LogFactory.getLog(QueueStorePrefetch.class);
- private MessageStore store;
+ private final MessageStore store;
/**
* Construct it
@@ -41,7 +40,7 @@
*/
public QueueStorePrefetch(Queue queue) {
super(queue);
- this.store = (MessageStore)queue.getMessageStore();
+ this.store = queue.getMessageStore();
}
@@ -58,29 +57,47 @@
+ @Override
protected synchronized int getStoreSize() {
try {
- return this.store.getMessageCount();
+ int result = this.store.getMessageCount();
+ return result;
+
} catch (IOException e) {
LOG.error("Failed to get message count", e);
throw new RuntimeException(e);
}
}
+ @Override
+ protected synchronized boolean isStoreEmpty() {
+ try {
+ return this.store.isEmpty();
+
+ } catch (Exception e) {
+ LOG.error("Failed to get message count", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
protected void resetBatch() {
this.store.resetBatching();
}
+ @Override
protected void setBatch(MessageId messageId) throws Exception {
store.setBatch(messageId);
batchResetNeeded = false;
}
+ @Override
protected void doFillBatch() throws Exception {
this.store.recoverNextMessages(this.maxBatchSize, this);
}
+ @Override
public String toString() {
return "QueueStorePrefetch" + System.identityHashCode(this);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=901300&r1=901299&r2=901300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Wed Jan 20 17:42:00 2010
@@ -17,7 +17,6 @@
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
-
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
@@ -36,10 +35,10 @@
*/
class TopicStorePrefetch extends AbstractStoreCursor {
private static final Log LOG = LogFactory.getLog(TopicStorePrefetch.class);
- private TopicMessageStore store;
- private String clientId;
- private String subscriberName;
- private Subscription subscription;
+ private final TopicMessageStore store;
+ private final String clientId;
+ private final String subscriberName;
+ private final Subscription subscription;
/**
* @param topic
@@ -62,6 +61,7 @@
}
+ @Override
public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext();
messageEvaluationContext.setMessageReference(message);
@@ -73,6 +73,7 @@
}
+ @Override
protected synchronized int getStoreSize() {
try {
return store.getMessageCount(clientId, subscriberName);
@@ -81,17 +82,31 @@
throw new RuntimeException(e);
}
}
+
+ @Override
+ protected synchronized boolean isStoreEmpty() {
+ try {
+ return this.store.isEmpty();
+
+ } catch (Exception e) {
+ LOG.error("Failed to get message count", e);
+ throw new RuntimeException(e);
+ }
+ }
+ @Override
protected void resetBatch() {
this.store.resetBatching(clientId, subscriberName);
}
+ @Override
protected void doFillBatch() throws Exception {
this.store.recoverNextMessages(clientId, subscriberName,
maxBatchSize, this);
}
+ @Override
public String toString() {
return "TopicStorePrefetch" + System.identityHashCode(this) + "(" + clientId + "," + subscriberName + ")";
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=901300&r1=901299&r2=901300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java Wed Jan 20 17:42:00 2010
@@ -17,10 +17,9 @@
package org.apache.activemq.store;
import java.io.IOException;
-
+import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
-import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.usage.MemoryUsage;
abstract public class AbstractMessageStore implements MessageStore {
@@ -48,4 +47,13 @@
public void setBatch(MessageId messageId) throws IOException, Exception {
}
+
+ /**
+ * flag to indicate if the store is empty
+ * @return true if the message count is 0
+ * @throws Exception
+ */
+ public boolean isEmpty() throws Exception{
+ return getMessageCount()==0;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=901300&r1=901299&r2=901300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java Wed Jan 20 17:42:00 2010
@@ -17,7 +17,6 @@
package org.apache.activemq.store;
import java.io.IOException;
-
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@@ -25,7 +24,6 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.SystemUsage;
/**
* Represents a message store which is used by the persistent implementations
@@ -114,7 +112,15 @@
/**
* allow caching cursors to set the current batch offset when cache is exhausted
* @param messageId
+ * @throws Exception
*/
void setBatch(MessageId messageId) throws Exception;
+ /**
+ * flag to indicate if the store is empty
+ * @return true if the message count is 0
+ * @throws Exception
+ */
+ boolean isEmpty() throws Exception;
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=901300&r1=901299&r2=901300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java Wed Jan 20 17:42:00 2010
@@ -96,4 +96,8 @@
public void setBatch(MessageId messageId) throws Exception {
delegate.setBatch(messageId);
}
+
+ public boolean isEmpty() throws Exception {
+ return delegate.isEmpty();
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=901300&r1=901299&r2=901300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Wed Jan 20 17:42:00 2010
@@ -17,7 +17,6 @@
package org.apache.activemq.store;
import java.io.IOException;
-
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@@ -25,7 +24,6 @@
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.SystemUsage;
/**
* A simple proxy that delegates to another MessageStore.
@@ -138,4 +136,8 @@
public void setBatch(MessageId messageId) throws Exception {
delegate.setBatch(messageId);
}
+
+ public boolean isEmpty() throws Exception {
+ return delegate.isEmpty();
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=901300&r1=901299&r2=901300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Wed Jan 20 17:42:00 2010
@@ -24,7 +24,6 @@
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
-
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -69,7 +68,7 @@
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
- private WireFormat wireFormat = new OpenWireFormat();
+ private final WireFormat wireFormat = new OpenWireFormat();
public void setBrokerName(String brokerName) {
}
@@ -128,6 +127,7 @@
this.dest = convert( destination );
}
+ @Override
public ActiveMQDestination getDestination() {
return destination;
}
@@ -200,6 +200,25 @@
});
}
}
+
+ public boolean isEmpty() throws IOException {
+ synchronized(indexMutex) {
+ return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>(){
+ public Boolean execute(Transaction tx) throws IOException {
+ // Iterate through all index entries to get a count of messages in the destination.
+ StoredDestination sd = getStoredDestination(dest, tx);
+ boolean result = true;
+ for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
+ iterator.next();
+ result = false;
+ break;
+ }
+ return result;
+ }
+ });
+ }
+ }
+
public void recover(final MessageRecoveryListener listener) throws Exception {
synchronized(indexMutex) {
@@ -266,10 +285,13 @@
}
+ @Override
public void setMemoryUsage(MemoryUsage memoeyUSage) {
}
+ @Override
public void start() throws Exception {
}
+ @Override
public void stop() throws Exception {
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java?rev=901300&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java Wed Jan 20 17:42:00 2010
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.util.IOHelper;
+
+public class AMQ2512Test extends EmbeddedBrokerTestSupport {
+ private static Connection connection;
+ private final static String QUEUE_NAME = "dee.q";
+ private final static int INITIAL_MESSAGES_CNT = 1000;
+ private final static int WORKER_INTERNAL_ITERATIONS = 100;
+ private final static int TOTAL_MESSAGES_CNT = INITIAL_MESSAGES_CNT * WORKER_INTERNAL_ITERATIONS
+ + INITIAL_MESSAGES_CNT;
+ private final static byte[] payload = new byte[5 * 1024];
+ private final static String TEXT = new String(payload);
+
+ private final static String PRP_INITIAL_ID = "initial-id";
+ private final static String PRP_WORKER_ID = "worker-id";
+
+ private final static CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT);
+
+ private final static AtomicInteger ON_MSG_COUNTER = new AtomicInteger();
+
+ public void testKahaDBFailure() throws Exception {
+ final ConnectionFactory fac = new ActiveMQConnectionFactory(this.bindAddress);
+ connection = fac.createConnection();
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue queue = session.createQueue(QUEUE_NAME);
+ final MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ connection.start();
+
+ final long startTime = System.nanoTime();
+
+ final List<Consumer> consumers = new ArrayList<Consumer>();
+ for (int i = 0; i < 20; i++) {
+ consumers.add(new Consumer("worker-" + i));
+ }
+
+ for (int i = 0; i < INITIAL_MESSAGES_CNT; i++) {
+ final TextMessage msg = session.createTextMessage(TEXT);
+ msg.setStringProperty(PRP_INITIAL_ID, "initial-" + i);
+ producer.send(msg);
+ }
+
+ LATCH.await();
+ final long endTime = System.nanoTime();
+ System.out.println("Total execution time = "
+ + TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [ms].");
+ System.out.println("Rate = " + TOTAL_MESSAGES_CNT
+ / TimeUnit.SECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [msg/s].");
+
+ for (Consumer c : consumers) {
+ c.close();
+ }
+ connection.close();
+ }
+
+ private final static class Consumer implements MessageListener {
+ private final String name;
+ private final Session session;
+ private final MessageProducer producer;
+
+ private Consumer(String name) {
+ this.name = name;
+ try {
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ final Queue queue = session.createQueue(QUEUE_NAME + "?consumer.prefetchSize=10");
+ producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ final MessageConsumer consumer = session.createConsumer(queue);
+ consumer.setMessageListener(this);
+ } catch (JMSException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void onMessage(Message message) {
+ final TextMessage msg = (TextMessage) message;
+ try {
+ if (!msg.propertyExists(PRP_WORKER_ID)) {
+ for (int i = 0; i < WORKER_INTERNAL_ITERATIONS; i++) {
+ final TextMessage newMsg = session.createTextMessage(msg.getText());
+ newMsg.setStringProperty(PRP_WORKER_ID, name + "-" + i);
+ newMsg.setStringProperty(PRP_INITIAL_ID, msg.getStringProperty(PRP_INITIAL_ID));
+ producer.send(newMsg);
+ }
+ }
+ msg.acknowledge();
+
+ } catch (JMSException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ } finally {
+ final int onMsgCounter = ON_MSG_COUNTER.getAndIncrement();
+ if (onMsgCounter % 1000 == 0) {
+ System.out.println("message received: " + onMsgCounter);
+ }
+ LATCH.countDown();
+ }
+ }
+
+ private void close() {
+ if (session != null) {
+ try {
+ session.close();
+ } catch (JMSException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ bindAddress = "tcp://0.0.0.0:61617";
+ super.setUp();
+ }
+
+ @Override
+ protected BrokerService createBroker() throws Exception {
+ File dataFileDir = new File("target/test-amq-2512/datadb");
+ IOHelper.mkdirs(dataFileDir);
+ IOHelper.deleteChildren(dataFileDir);
+ KahaDBStore kaha = new KahaDBStore();
+ kaha.setDirectory(dataFileDir);
+ BrokerService answer = new BrokerService();
+ answer.setPersistenceAdapter(kaha);
+
+ kaha.setEnableJournalDiskSyncs(false);
+ //kaha.setIndexCacheSize(10);
+ answer.setDataDirectoryFile(dataFileDir);
+ answer.setUseJmx(false);
+ answer.addConnector(bindAddress);
+ return answer;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java
------------------------------------------------------------------------------
svn:mime-type = text/plain