You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/04/25 18:15:21 UTC

svn commit: r651637 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/kaha/impl/async/ test/java/org/apache/activemq/kaha/impl/async/ test/java/org/apache/activemq/perf/ test/resources/org/apache/activemq/perf/

Author: chirino
Date: Fri Apr 25 09:15:20 2008
New Revision: 651637

URL: http://svn.apache.org/viewvc?rev=651637&view=rev
Log:
applying a modified version of that patch attached to https://issues.apache.org/activemq/browse/AMQ-1689
 - This reduces the the number of disk syncs that the broker does on some workloads which increases performance
 - Fixed and issue where the last write to disk migh get lost on broker shutdown.


Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/DataFileAppenderTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/NioDataFileAppenderTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java   (with props)
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/networkSync.xml   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java?rev=651637&r1=651636&r2=651637&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java Fri Apr 25 09:15:20 2008
@@ -36,7 +36,7 @@
 class DataFileAppender {
 
     protected static final byte[] RESERVED_SPACE = new byte[AsyncDataManager.ITEM_HEAD_RESERVED_SPACE];
-    protected static final String SHUTDOWN_COMMAND = "SHUTDOWN";
+    protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4;
 
     protected final AsyncDataManager dataManager;
     protected final Map<WriteKey, WriteCommand> inflightWrites;
@@ -46,7 +46,7 @@
     protected boolean shutdown;
     protected IOException firstAsyncException;
     protected final CountDownLatch shutdownDone = new CountDownLatch(1);
-    protected int maxWriteBatchSize = 1024 * 1024 * 4;
+    protected int maxWriteBatchSize = DEFAULT_MAX_BATCH_SIZE;
 
     private boolean running;
     private Thread thread;
@@ -311,24 +311,19 @@
                 // Block till we get a command.
                 synchronized (enqueueMutex) {
                     while (true) {
-                        if (shutdown) {
-                            o = SHUTDOWN_COMMAND;
-                            break;
-                        }
                         if (nextWriteBatch != null) {
                             o = nextWriteBatch;
                             nextWriteBatch = null;
                             break;
                         }
+                        if (shutdown) {
+                            return;
+                        }
                         enqueueMutex.wait();
                     }
                     enqueueMutex.notify();
                 }
 
-                if (o == SHUTDOWN_COMMAND) {
-                    break;
-                }
-
                 WriteBatch wb = (WriteBatch)o;
                 if (dataFile != wb.dataFile) {
                     if (file != null) {
@@ -345,10 +340,14 @@
                 // are in sequence.
                 file.seek(write.location.getOffset());
 
+                
+                boolean forceToDisk=false;
+                
                 // 
                 // is it just 1 big write?
                 if (wb.size == write.location.getSize()) {
-
+                    forceToDisk = write.sync | write.onComplete!=null;
+                    
                     // Just write it directly..
                     file.writeInt(write.location.getSize());
                     file.writeByte(write.location.getType());
@@ -361,6 +360,7 @@
 
                     // Combine the smaller writes into 1 big buffer
                     while (write != null) {
+                        forceToDisk |= write.sync | write.onComplete!=null;
 
                         buff.writeInt(write.location.getSize());
                         buff.writeByte(write.location.getType());
@@ -378,14 +378,13 @@
                     buff.reset();
                 }
 
-                file.getFD().sync();                
+                if( forceToDisk ) {
+                    file.getFD().sync();
+                }
                 
                 WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
                 dataManager.setLastAppendLocation(lastWrite.location);
 
-                // Signal any waiting threads that the write is on disk.
-                wb.latch.countDown();
-
                 // Now that the data is on disk, remove the writes from the in
                 // flight
                 // cache.
@@ -403,8 +402,10 @@
                     }
                     write = (WriteCommand)write.getNext();
                 }
+                
+                // Signal any waiting threads that the write is on disk.
+                wb.latch.countDown();
             }
-            buff.close();
         } catch (IOException e) {
             synchronized (enqueueMutex) {
                 firstAsyncException = e;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java?rev=651637&r1=651636&r2=651637&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java Fri Apr 25 09:15:20 2008
@@ -68,24 +68,19 @@
                 // Block till we get a command.
                 synchronized (enqueueMutex) {
                     while (true) {
-                        if (shutdown) {
-                            o = SHUTDOWN_COMMAND;
-                            break;
-                        }
                         if (nextWriteBatch != null) {
                             o = nextWriteBatch;
                             nextWriteBatch = null;
                             break;
                         }
+                        if (shutdown) {
+                            return;
+                        }
                         enqueueMutex.wait();
                     }
                     enqueueMutex.notify();
                 }
 
-                if (o == SHUTDOWN_COMMAND) {
-                    break;
-                }
-
                 WriteBatch wb = (WriteBatch)o;
                 if (dataFile != wb.dataFile) {
                     if (file != null) {
@@ -103,10 +98,14 @@
                 // are in sequence.
                 file.seek(write.location.getOffset());
 
+                
+                boolean forceToDisk=false;
+                
                 // 
                 // is it just 1 big write?
                 if (wb.size == write.location.getSize()) {
-
+                    forceToDisk = write.sync | write.onComplete!=null;
+                    
                     header.clear();
                     header.putInt(write.location.getSize());
                     header.put(write.location.getType());
@@ -122,7 +121,8 @@
 
                     // Combine the smaller writes into 1 big buffer
                     while (write != null) {
-
+                        forceToDisk |= write.sync | write.onComplete!=null;
+                        
                         header.clear();
                         header.putInt(write.location.getSize());
                         header.put(write.location.getType());
@@ -148,16 +148,13 @@
                     buffer.clear();
                 }
 
-                file.getChannel().force(false);
+                if( forceToDisk ) {
+                    file.getChannel().force(false);
+                }
 
                 WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
                 dataManager.setLastAppendLocation(lastWrite.location);
 
-                // Signal any waiting threads that the write is on disk.
-                if (wb.latch != null) {
-                    wb.latch.countDown();
-                }
-
                 // Now that the data is on disk, remove the writes from the in
                 // flight
                 // cache.
@@ -175,6 +172,9 @@
 					}
                     write = (WriteCommand)write.getNext();
                 }
+                
+                // Signal any waiting threads that the write is on disk.
+                wb.latch.countDown();
             }
 
         } catch (IOException e) {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/DataFileAppenderTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/DataFileAppenderTest.java?rev=651637&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/DataFileAppenderTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/DataFileAppenderTest.java Fri Apr 25 09:15:20 2008
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.util.ByteSequence;
+
+public class DataFileAppenderTest extends TestCase {
+    AsyncDataManager dataManager;
+    File dir;
+    
+    @Override
+    public void setUp() throws Exception {
+        dir = new File("target/tests/DataFileAppenderTest");
+        dir.mkdirs();
+        dataManager = new AsyncDataManager();
+        dataManager.setDirectory(dir);
+        configure(dataManager);
+        dataManager.start();
+    }
+    
+    protected void configure(AsyncDataManager dataManager) {
+        dataManager.setUseNio(false);
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        dataManager.close();
+        deleteFilesInDirectory(dir);
+        dir.delete();
+    }
+
+    private void deleteFilesInDirectory(File directory) {
+        File[] files = directory.listFiles();
+        for (int i=0; i<files.length; i++) {
+            File f = files[i];
+            if (f.isDirectory()) {
+                deleteFilesInDirectory(f);
+            }   
+            f.delete();
+        }  
+    }
+    
+    public void testBatchWriteCompleteAfterTimeout() throws Exception {
+        ByteSequence data = new ByteSequence("DATA".getBytes());
+        final int iterations = 10;
+        for (int i=0; i<iterations; i++) {
+            dataManager.write(data, false);
+        }
+        assertTrue("writes are queued up", dataManager.getInflightWrites().size() >= iterations);
+        Thread.sleep(1000);
+        assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty());
+    }
+
+
+    public void testBatchWriteCallbackCompleteAfterTimeout() throws Exception {
+        final int iterations = 10;
+        final CountDownLatch latch = new CountDownLatch(iterations);
+        ByteSequence data = new ByteSequence("DATA".getBytes());
+        for (int i=0; i<iterations; i++) {
+            dataManager.write(data, new Runnable() {
+                public void run() {
+                    latch.countDown();                 
+                }
+            });
+        }
+        assertTrue("writes are queued up", dataManager.getInflightWrites().size() >= iterations);
+        assertEquals("none written", iterations, latch.getCount());
+        Thread.sleep(1000);
+        assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty());
+        assertEquals("none written", 0, latch.getCount());
+    }
+
+    public void testBatchWriteCallbackCompleteAfterClose() throws Exception {
+        final int iterations = 10;
+        final CountDownLatch latch = new CountDownLatch(iterations);
+        ByteSequence data = new ByteSequence("DATA".getBytes());
+        for (int i=0; i<iterations; i++) {
+            dataManager.write(data, new Runnable() {
+                public void run() {
+                    latch.countDown();                 
+                }
+            });
+        }
+        assertTrue("writes are queued up", dataManager.getInflightWrites().size() >= iterations);
+        assertEquals("none written", iterations, latch.getCount());
+        dataManager.close();
+        assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty());
+        assertEquals("none written", 0, latch.getCount());
+    }
+    
+    public void testBatchWriteCompleteAfterClose() throws Exception {
+        ByteSequence data = new ByteSequence("DATA".getBytes());
+        final int iterations = 10;
+        for (int i=0; i<iterations; i++) {
+            dataManager.write(data, false);
+        }
+        dataManager.close();
+        assertTrue("queued data is written:" + dataManager.getInflightWrites().size(), dataManager.getInflightWrites().isEmpty());
+    }
+    
+    public void testBatchWriteToMaxMessageSize() throws Exception {
+        final int iterations = 4;
+        final CountDownLatch latch = new CountDownLatch(iterations);
+        Runnable done = new Runnable() {
+            public void run() {
+                latch.countDown();                 
+            }
+        };
+        int messageSize = DataFileAppender.DEFAULT_MAX_BATCH_SIZE / iterations;
+        byte[] message = new byte[messageSize];
+        ByteSequence data = new ByteSequence(message);
+        
+        for (int i=0; i< iterations - 1; i++) {
+            dataManager.write(data, done);
+        }
+        assertEquals("all writes are queued", iterations, latch.getCount());
+        dataManager.write(data, done);
+        latch.await(10, TimeUnit.SECONDS); // write may take some time
+        assertEquals("all callbacks complete", 0, latch.getCount());
+    }
+    
+    public void testNoBatchWriteWithSync() throws Exception {
+        ByteSequence data = new ByteSequence("DATA".getBytes());
+        final int iterations = 10;
+        for (int i=0; i<iterations; i++) {
+            dataManager.write(data, true);
+            assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty());
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/DataFileAppenderTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/DataFileAppenderTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/NioDataFileAppenderTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/NioDataFileAppenderTest.java?rev=651637&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/NioDataFileAppenderTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/NioDataFileAppenderTest.java Fri Apr 25 09:15:20 2008
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+public class NioDataFileAppenderTest extends DataFileAppenderTest {
+
+    @Override
+    protected void configure(AsyncDataManager dataManager) {
+        dataManager.setUseNio(true);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/NioDataFileAppenderTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/NioDataFileAppenderTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java?rev=651637&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java Fri Apr 25 09:15:20 2008
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.perf;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import junit.framework.TestCase;
+import junit.textui.TestRunner;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
+import org.springframework.core.io.ClassPathResource;
+
+
+public class NetworkedSyncTest extends TestCase {
+
+    // constants
+    public static final int MESSAGE_COUNT = 10000; //100000;
+    public final static String config = "org/apache/activemq/perf/networkSync.xml";
+    public final static String broker1URL = "tcp://localhost:61616";
+    public final static String broker2URL = "tcp://localhost:62616";
+    private final String networkConnectorURL = "static://(" + broker2URL + ")";
+    private static final Log LOG = LogFactory.getLog(NetworkedSyncTest.class);
+    BrokerService broker1 = null;
+    BrokerService broker2 = null;
+    NetworkConnector connector = null;
+
+    /**
+     * @param name
+     */
+    public NetworkedSyncTest(String name) {
+        super(name);
+        LOG.info("Testcase started.");
+    }
+    
+   public static void main(String args[]) {
+       TestRunner.run(NetworkedSyncTest.class);
+   }
+
+    /**
+     * @throws java.lang.Exception
+     */
+    protected void setUp() throws Exception {
+        LOG.info("setUp() called.");
+        ClassPathXmlApplicationContext context1 = null;
+        BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(config));
+
+        /* start up first broker instance */
+        try {
+            // resolve broker1
+            Thread.currentThread().setContextClassLoader(
+                    NetworkedSyncTest.class.getClassLoader());
+            context1 = new ClassPathXmlApplicationContext(config);
+            broker1 = (BrokerService) context1.getBean("broker1");
+
+            // start the broker
+            if (!broker1.isStarted()) {
+                LOG.info("Broker broker1 not yet started. Kicking it off now.");
+                broker1.start();
+            } else {
+                LOG.info("Broker broker1 already started. Not kicking it off a second time.");
+                broker1.waitUntilStopped();
+            }
+        } catch (Exception e) {
+            LOG.fatal("Error: " + e.getMessage());
+            throw e;
+            // brokerService.stop();
+        }
+
+        /* start up second broker instance */
+        try {
+            Thread.currentThread().setContextClassLoader(
+                    NetworkedSyncTest.class.getClassLoader());
+            context1 = new ClassPathXmlApplicationContext(config);
+            broker2 = (BrokerService) context1.getBean("broker2");
+
+            // start the broker
+            if (!broker2.isStarted()) {
+                LOG.info("Broker broker2 not yet started. Kicking it off now.");
+                broker2.start();
+            } else {
+                LOG.info("Broker broker2 already started. Not kicking it off a second time.");
+                broker2.waitUntilStopped();
+            }
+        } catch (Exception e) {
+            LOG.fatal("Error: " + e.getMessage());
+            throw e;
+        }
+
+        // setup network connector from broker1 to broker2
+        connector = broker1.addNetworkConnector(networkConnectorURL);
+        connector.setBrokerName(broker1.getBrokerName());
+        connector.setDuplex(true);
+        connector.start();
+        LOG.info("Network connector created.");
+    }
+
+    /**
+     * @throws java.lang.Exception
+     */
+    protected void tearDown() throws Exception {
+
+        LOG.info("tearDown() called.");
+
+        if (broker1 != null && broker1.isStarted()) {
+            LOG.info("Broker1 still running, stopping it now.");
+            broker1.stop();
+        } else {
+            LOG.info("Broker1 not running, nothing to shutdown.");
+        }
+        if (broker2 != null && broker2.isStarted()) {
+            LOG.info("Broker2 still running, stopping it now.");
+            broker2.stop();
+        } else {
+            LOG.info("Broker2 not running, nothing to shutdown.");
+        }
+
+    }
+
+    public void testMessageExchange() throws Exception {
+        LOG.info("testMessageExchange() called.");
+
+        long start = System.currentTimeMillis();
+        
+        // create producer and consumer threads
+        Thread producer = new Thread(new Producer());
+        Thread consumer = new Thread(new Consumer());
+        // start threads
+        consumer.start();
+        Thread.sleep(2000);
+        producer.start();
+        
+
+        // wait for threads to finish
+        producer.join();
+        consumer.join();
+        long end = System.currentTimeMillis();
+        
+        System.out.println("Duration: "+(end-start));
+    }
+}
+
+/**
+ * Message producer running as a separate thread, connecting to broker1
+ * 
+ * @author tmielke
+ * 
+ */
+class Producer implements Runnable {
+
+    private static final Log LOG = LogFactory.getLog(Producer.class);
+
+    /**
+     * connect to broker and constantly send messages
+     */
+    public void run() {
+
+        Connection connection = null;
+        Session session = null;
+        MessageProducer producer = null;
+
+        try {
+            ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(
+                    NetworkedSyncTest.broker1URL);
+            connection = amq.createConnection();
+
+            connection.setExceptionListener(new javax.jms.ExceptionListener() {
+                public void onException(javax.jms.JMSException e) {
+                    e.printStackTrace();
+                }
+            });
+
+            connection.start();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Topic destination = session.createTopic("TEST.FOO");
+
+            producer = session.createProducer(destination);
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+            long counter = 0;
+
+            // Create and send message
+            for (int i = 0; i < NetworkedSyncTest.MESSAGE_COUNT; i++) {
+
+                String text = "Hello world! From: "
+                        + Thread.currentThread().getName() + " : "
+                        + this.hashCode() + ":" + counter;
+                TextMessage message = session.createTextMessage(text);
+                producer.send(message);
+                counter++;
+
+                if ((counter % 1000) == 0)
+                    LOG.info("sent " + counter + " messages");
+
+            }
+        } catch (Exception ex) {
+            LOG.error(ex);
+            return;
+        } finally {
+            try {
+                if (producer != null)
+                    producer.close();
+                if (session != null)
+                    session.close();
+                if (connection != null)
+                    connection.close();
+            } catch (Exception e) {
+                LOG.error("Problem closing down JMS objects: " + e);
+            }
+        }
+    }
+}
+
+/*
+ * * Message consumer running as a separate thread, connecting to broker2
+ * @author tmielke
+ * 
+ */
+class Consumer implements Runnable {
+
+    private static final Log LOG = LogFactory.getLog(Consumer.class);;
+
+    
+    /**
+     * connect to broker and receive messages
+     */
+    public void run() {
+        Connection connection = null;
+        Session session = null;
+        MessageConsumer consumer = null;
+
+        try {
+            ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(
+                    NetworkedSyncTest.broker2URL);
+            connection = amq.createConnection();
+            // need to set clientID when using durable subscription.
+            connection.setClientID("tmielke");
+
+            connection.setExceptionListener(new javax.jms.ExceptionListener() {
+                public void onException(javax.jms.JMSException e) {
+                    e.printStackTrace();
+                }
+            });
+
+            connection.start();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createTopic("TEST.FOO");
+            consumer = session.createDurableSubscriber((Topic) destination,"tmielke");
+
+            long counter = 0;
+            // Wait for a message
+            for (int i = 0; i < NetworkedSyncTest.MESSAGE_COUNT; i++) { 
+                Message message2 = consumer.receive();
+                if (message2 instanceof TextMessage) {
+                    TextMessage textMessage = (TextMessage) message2;
+                    String text = textMessage.getText();
+                    // logger.info("Received: " + text);
+                } else {
+                    LOG.error("Received message of unsupported type. Expecting TextMessage. "+ message2);
+                }
+                counter++;
+                if ((counter % 1000) == 0)
+                    LOG.info("received " + counter + " messages");
+
+                
+            }
+        } catch (Exception e) {
+            LOG.error("Error in Consumer: " + e);
+            return;
+        } finally {
+            try {
+                if (consumer != null)
+                    consumer.close();
+                if (session != null)
+                    session.close();
+                if (connection != null)
+                    connection.close();
+            } catch (Exception ex) {
+                LOG.error("Error closing down JMS objects: " + ex);
+            }
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/networkSync.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/networkSync.xml?rev=651637&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/networkSync.xml (added)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/networkSync.xml Fri Apr 25 09:15:20 2008
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans 
+  xmlns="http://www.springframework.org/schema/beans" 
+  xmlns:amq="http://activemq.org/config/1.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.org/config/1.0 http://activemq.apache.org/snapshot-schema/activemq-core-5.0-SNAPSHOT.xsd">
+ 
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+  </bean>
+ 
+  <!-- Broker1 -->
+  <broker brokerName="broker1" id="broker1" useJmx="true" persistent="true" deleteAllMessagesOnStartup="true" start="false" xmlns="http://activemq.org/config/1.0">
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:61616" />
+    </transportConnectors>
+ 
+    <persistenceAdapter>
+      <amqPersistenceAdapter directory="target/Broker1-data/activemq-data" syncOnWrite="true"  indexPageSize="16kb" indexBinSize="100" maxReferenceFileLength="8192"/>
+    </persistenceAdapter>
+  </broker>
+ 
+ 
+  <!-- Broker2 -->
+  <broker brokerName="broker2" id="broker2" useJmx="true" persistent="false" deleteAllMessagesOnStartup="true" start="false" xmlns="http://activemq.org/config/1.0">
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:62616" />
+    </transportConnectors>
+     <persistenceAdapter>
+      <amqPersistenceAdapter directory="target/Broker2-data/activemq-data" syncOnWrite="true"  indexPageSize="16kb" indexBinSize="100" maxReferenceFileLength="8192"/>
+    </persistenceAdapter>
+  </broker>
+</beans>

Propchange: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/networkSync.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/networkSync.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/networkSync.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml