You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/08/20 19:50:11 UTC

svn commit: r1375140 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/plist/PList.java test/java/org/apache/activemq/store/kahadb/plist/PListTest.java

Author: tabish
Date: Mon Aug 20 17:50:10 2012
New Revision: 1375140

URL: http://svn.apache.org/viewvc?rev=1375140&view=rev
Log:
fix and test for: https://issues.apache.org/jira/browse/AMQ-3982

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java?rev=1375140&r1=1375139&r2=1375140&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java Mon Aug 20 17:50:10 2012
@@ -25,6 +25,7 @@ import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.kahadb.index.ListIndex;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Transaction;
@@ -201,7 +202,9 @@ public class PList extends ListIndex<Str
 
         @Override
         public boolean hasNext() {
-            return iterator.hasNext();
+            synchronized (indexLock) {
+                return iterator.hasNext();
+            }
         }
 
         @Override

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java?rev=1375140&r1=1375139&r2=1375140&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java Mon Aug 20 17:50:10 2012
@@ -47,7 +47,7 @@ public class PListTest {
     final String idSeed = new String("Seed" + new byte[1024]);
     final Vector<Throwable> exceptions = new Vector<Throwable>();
     ExecutorService executor;
-   
+
 
     @Test
     public void testAddLast() throws Exception {
@@ -97,8 +97,8 @@ public class PListTest {
     public void testRemove() throws IOException {
         doTestRemove(2000);
     }
-    
-    protected void doTestRemove(final int COUNT) throws IOException {            
+
+    protected void doTestRemove(final int COUNT) throws IOException {
         Map<String, ByteSequence> map = new LinkedHashMap<String, ByteSequence>();
         for (int i = 0; i < COUNT; i++) {
             String test = new String("test" + i);
@@ -122,7 +122,7 @@ public class PListTest {
         plist.destroy();
         assertEquals(0,plist.size());
     }
-    
+
     @Test
     public void testDestroyNonEmpty() throws Exception {
         final int COUNT = 1000;
@@ -136,17 +136,17 @@ public class PListTest {
         plist.destroy();
         assertEquals(0,plist.size());
     }
-    
+
     @Test
     public void testRemoveSecond() throws Exception {
         plist.addLast("First", new ByteSequence("A".getBytes()));
         plist.addLast("Second", new ByteSequence("B".getBytes()));
-        
+
         assertTrue(plist.remove("Second"));
         assertTrue(plist.remove("First"));
         assertFalse(plist.remove("doesNotExist"));
     }
-    
+
 
     @Test
     public void testRemoveSingleEntry() throws Exception {
@@ -154,7 +154,7 @@ public class PListTest {
 
         Iterator<PListEntry> iterator = plist.iterator();
         while (iterator.hasNext()) {
-            PListEntry v = iterator.next();
+            iterator.next();
             iterator.remove();
         }
     }
@@ -163,7 +163,7 @@ public class PListTest {
     public void testRemoveSecondPosition() throws Exception {
         plist.addLast("First", new ByteSequence("A".getBytes()));
         plist.addLast("Second", new ByteSequence("B".getBytes()));
-        
+
         assertTrue(plist.remove(1));
         assertTrue(plist.remove(0));
         assertFalse(plist.remove(0));
@@ -209,7 +209,7 @@ public class PListTest {
                         Thread.currentThread().setName("ALRF:"+candidate.getName());
                         synchronized (plistLocks(candidate)) {
                             candidate.addLast(String.valueOf(i), payload);
-                            PListEntry entry = candidate.getFirst();
+                            candidate.getFirst();
                             assertTrue(candidate.remove(String.valueOf(i)));
                         }
                     }
@@ -233,7 +233,7 @@ public class PListTest {
                         Thread.currentThread().setName("ALRF:"+candidate.getName());
                          synchronized (plistLocks(candidate)) {
                             candidate.addLast(String.valueOf(i), payload);
-                            PListEntry entry = candidate.getFirst();
+                            candidate.getFirst();
                             assertTrue(candidate.remove(String.valueOf(i)));
                          }
                     }
@@ -321,6 +321,7 @@ public class PListTest {
         store.setDirectory(directory);
         store.setJournalMaxFileLength(1024*5);
         store.setCleanupInterval(5000);
+        store.setIndexWriteBatchSize(500);
         store.start();
 
         final int iterations = 500;
@@ -441,6 +442,87 @@ public class PListTest {
         assertTrue("test did not  timeout ", shutdown);
     }
 
+    @Test
+    public void testSerialAddIterate() throws Exception {
+        File directory = store.getDirectory();
+        store.stop();
+        IOHelper.mkdirs(directory);
+        IOHelper.deleteChildren(directory);
+        store = new PListStore();
+        store.setIndexPageSize(512);
+        store.setJournalMaxFileLength(100*1024);
+        store.setDirectory(directory);
+        store.setCleanupInterval(-1);
+        store.setIndexEnablePageCaching(false);
+        store.setIndexWriteBatchSize(2000);
+        store.setEnableIndexWriteAsync(false);
+        store.start();
+
+        final int iterations = 1000;
+        final int numLists = 1;
+
+        LOG.info("create");
+        for (int i=0; i<numLists;i++) {
+            new Job(i, PListTest.TaskType.CREATE, iterations).run();
+        }
+
+        LOG.info("serial add and iterate");
+        for (int i=0; i<iterations; i++) {
+            new Job(0, TaskType.ADD, i).run();
+            new Job(0, TaskType.ITERATE, 0).run();
+        }
+
+        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+        LOG.info("Num dataFiles:" + store.getJournal().getFiles().size());
+    }
+
+    @Test
+    public void testConcurrentAddIterate() throws Exception {
+        File directory = store.getDirectory();
+        store.stop();
+        IOHelper.mkdirs(directory);
+        IOHelper.deleteChildren(directory);
+        store = new PListStore();
+        store.setIndexPageSize(2*1024);
+        store.setJournalMaxFileLength(1024*1024);
+        store.setDirectory(directory);
+        store.setCleanupInterval(-1);
+        store.setIndexEnablePageCaching(false);
+        store.setIndexWriteBatchSize(100);
+        store.start();
+
+        final int iterations = 250;
+        final int numLists = 10;
+
+        LOG.info("create");
+        for (int i=0; i<numLists;i++) {
+            new Job(i, PListTest.TaskType.CREATE, iterations).run();
+        }
+
+        LOG.info("parallel add and iterate");
+        // We want a lot of adds occurring so that new free pages get created along
+        // with overlapping seeks from the iterators so that we are likely to seek into
+        // some bad area in the page file.
+        executor = Executors.newFixedThreadPool(400);
+        final int numProducer = 300;
+        final int numConsumer = 100;
+        for (int i=0; i<numLists; i++) {
+            for (int j=0; j<numProducer; j++) {
+                executor.execute(new Job(i, PListTest.TaskType.ADD, iterations));
+            }
+            for (int k=0;k<numConsumer; k++) {
+                executor.execute(new Job(i, TaskType.ITERATE, iterations*2));
+            }
+        }
+
+        executor.shutdown();
+        LOG.info("wait for parallel work to complete");
+        boolean shutdown = executor.awaitTermination(60*60, TimeUnit.SECONDS);
+        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+        assertTrue("test did not  timeout ", shutdown);
+        LOG.info("Num dataFiles:" + store.getJournal().getFiles().size());
+    }
+
     enum TaskType {CREATE, DELETE, ADD, REMOVE, ITERATE, ITERATE_REMOVE}
 
     class Job implements Runnable {
@@ -476,10 +558,17 @@ public class PListTest {
 
                         for (int j = 0; j < iterations; j++) {
                             synchronized (plistLocks(plist)) {
-                                plist.addLast ("PL>"  + id + idSeed + "-" + j, payload);
+                                if (exceptions.isEmpty()) {
+                                    plist.addLast ("PL>"  + id + idSeed + "-" + j, payload);
+                                } else {
+                                    break;
+                                }
                             }
                         }
-                        LOG.info("Job-" + id + ", Add, done: " + iterations);
+
+                        if (exceptions.isEmpty()) {
+                            LOG.info("Job-" + id + ", Add, done: " + iterations);
+                        }
                         break;
                     case REMOVE:
                         Thread.currentThread().setName("R:"+id);
@@ -497,12 +586,20 @@ public class PListTest {
                     case ITERATE:
                         Thread.currentThread().setName("I:"+id);
                         plist = store.getPList(String.valueOf(id));
-
+                        int iterateCount = 0;
                         synchronized (plistLocks(plist)) {
-                            Iterator<PListEntry> iterator = plist.iterator();
-                            PListEntry element = null;
-                            while (iterator.hasNext()) {
-                                element = iterator.next();
+                            if (exceptions.isEmpty()) {
+                                Iterator<PListEntry> iterator = plist.iterator();
+                                while (iterator.hasNext() && exceptions.isEmpty()) {
+                                    iterator.next();
+                                    iterateCount++;
+                                }
+
+                                //LOG.info("Job-" + id + " Done iterate: it=" + iterator + ", count:" + iterateCount + ", size:" + plist.size());
+                                if (plist.size() != iterateCount) {
+                                    System.err.println("Count Wrong: " + iterator);
+                                }
+                                assertEquals("iterate got all " + id + " iterator:" + iterator , plist.size(), iterateCount);
                             }
                         }
                         break;
@@ -515,10 +612,9 @@ public class PListTest {
                         synchronized (plistLocks(plist)) {
 
                             Iterator<PListEntry> removeIterator = plist.iterator();
-                            PListEntry v = null;
 
                             while (removeIterator.hasNext()) {
-                                v = removeIterator.next();
+                                removeIterator.next();
                                 removeIterator.remove();
                                 if (removeCount++ > iterations) {
                                     break;
@@ -532,9 +628,12 @@ public class PListTest {
                 }
 
             } catch (Exception e) {
+                LOG.warn("Job["+id+"] caught exception: " + e.getMessage());
                 e.printStackTrace();
                 exceptions.add(e);
-                executor.shutdownNow();
+                if (executor != null) {
+                    executor.shutdownNow();
+                }
             } finally {
                 Thread.currentThread().setName(threadName);
             }
@@ -557,7 +656,7 @@ public class PListTest {
 
     @Before
     public void setUp() throws Exception {
-        File directory = new File("target/test/PlistDB");
+        File directory = new File("/tmp/target/test/PlistDB");
         IOHelper.mkdirs(directory);
         IOHelper.deleteChildren(directory);
         startStore(directory);