You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/08/20 19:50:11 UTC
svn commit: r1375140 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/store/kahadb/plist/PList.java
test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
Author: tabish
Date: Mon Aug 20 17:50:10 2012
New Revision: 1375140
URL: http://svn.apache.org/viewvc?rev=1375140&view=rev
Log:
fix and test for: https://issues.apache.org/jira/browse/AMQ-3982
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java?rev=1375140&r1=1375139&r2=1375140&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java Mon Aug 20 17:50:10 2012
@@ -25,6 +25,7 @@ import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.kahadb.index.ListIndex;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
@@ -201,7 +202,9 @@ public class PList extends ListIndex<Str
@Override
public boolean hasNext() {
- return iterator.hasNext();
+ synchronized (indexLock) {
+ return iterator.hasNext();
+ }
}
@Override
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java?rev=1375140&r1=1375139&r2=1375140&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java Mon Aug 20 17:50:10 2012
@@ -47,7 +47,7 @@ public class PListTest {
final String idSeed = new String("Seed" + new byte[1024]);
final Vector<Throwable> exceptions = new Vector<Throwable>();
ExecutorService executor;
-
+
@Test
public void testAddLast() throws Exception {
@@ -97,8 +97,8 @@ public class PListTest {
public void testRemove() throws IOException {
doTestRemove(2000);
}
-
- protected void doTestRemove(final int COUNT) throws IOException {
+
+ protected void doTestRemove(final int COUNT) throws IOException {
Map<String, ByteSequence> map = new LinkedHashMap<String, ByteSequence>();
for (int i = 0; i < COUNT; i++) {
String test = new String("test" + i);
@@ -122,7 +122,7 @@ public class PListTest {
plist.destroy();
assertEquals(0,plist.size());
}
-
+
@Test
public void testDestroyNonEmpty() throws Exception {
final int COUNT = 1000;
@@ -136,17 +136,17 @@ public class PListTest {
plist.destroy();
assertEquals(0,plist.size());
}
-
+
@Test
public void testRemoveSecond() throws Exception {
plist.addLast("First", new ByteSequence("A".getBytes()));
plist.addLast("Second", new ByteSequence("B".getBytes()));
-
+
assertTrue(plist.remove("Second"));
assertTrue(plist.remove("First"));
assertFalse(plist.remove("doesNotExist"));
}
-
+
@Test
public void testRemoveSingleEntry() throws Exception {
@@ -154,7 +154,7 @@ public class PListTest {
Iterator<PListEntry> iterator = plist.iterator();
while (iterator.hasNext()) {
- PListEntry v = iterator.next();
+ iterator.next();
iterator.remove();
}
}
@@ -163,7 +163,7 @@ public class PListTest {
public void testRemoveSecondPosition() throws Exception {
plist.addLast("First", new ByteSequence("A".getBytes()));
plist.addLast("Second", new ByteSequence("B".getBytes()));
-
+
assertTrue(plist.remove(1));
assertTrue(plist.remove(0));
assertFalse(plist.remove(0));
@@ -209,7 +209,7 @@ public class PListTest {
Thread.currentThread().setName("ALRF:"+candidate.getName());
synchronized (plistLocks(candidate)) {
candidate.addLast(String.valueOf(i), payload);
- PListEntry entry = candidate.getFirst();
+ candidate.getFirst();
assertTrue(candidate.remove(String.valueOf(i)));
}
}
@@ -233,7 +233,7 @@ public class PListTest {
Thread.currentThread().setName("ALRF:"+candidate.getName());
synchronized (plistLocks(candidate)) {
candidate.addLast(String.valueOf(i), payload);
- PListEntry entry = candidate.getFirst();
+ candidate.getFirst();
assertTrue(candidate.remove(String.valueOf(i)));
}
}
@@ -321,6 +321,7 @@ public class PListTest {
store.setDirectory(directory);
store.setJournalMaxFileLength(1024*5);
store.setCleanupInterval(5000);
+ store.setIndexWriteBatchSize(500);
store.start();
final int iterations = 500;
@@ -441,6 +442,87 @@ public class PListTest {
assertTrue("test did not timeout ", shutdown);
}
+ @Test
+ public void testSerialAddIterate() throws Exception {
+ File directory = store.getDirectory();
+ store.stop();
+ IOHelper.mkdirs(directory);
+ IOHelper.deleteChildren(directory);
+ store = new PListStore();
+ store.setIndexPageSize(512);
+ store.setJournalMaxFileLength(100*1024);
+ store.setDirectory(directory);
+ store.setCleanupInterval(-1);
+ store.setIndexEnablePageCaching(false);
+ store.setIndexWriteBatchSize(2000);
+ store.setEnableIndexWriteAsync(false);
+ store.start();
+
+ final int iterations = 1000;
+ final int numLists = 1;
+
+ LOG.info("create");
+ for (int i=0; i<numLists;i++) {
+ new Job(i, PListTest.TaskType.CREATE, iterations).run();
+ }
+
+ LOG.info("serial add and iterate");
+ for (int i=0; i<iterations; i++) {
+ new Job(0, TaskType.ADD, i).run();
+ new Job(0, TaskType.ITERATE, 0).run();
+ }
+
+ assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+ LOG.info("Num dataFiles:" + store.getJournal().getFiles().size());
+ }
+
+ @Test
+ public void testConcurrentAddIterate() throws Exception {
+ File directory = store.getDirectory();
+ store.stop();
+ IOHelper.mkdirs(directory);
+ IOHelper.deleteChildren(directory);
+ store = new PListStore();
+ store.setIndexPageSize(2*1024);
+ store.setJournalMaxFileLength(1024*1024);
+ store.setDirectory(directory);
+ store.setCleanupInterval(-1);
+ store.setIndexEnablePageCaching(false);
+ store.setIndexWriteBatchSize(100);
+ store.start();
+
+ final int iterations = 250;
+ final int numLists = 10;
+
+ LOG.info("create");
+ for (int i=0; i<numLists;i++) {
+ new Job(i, PListTest.TaskType.CREATE, iterations).run();
+ }
+
+ LOG.info("parallel add and iterate");
+ // We want a lot of adds occurring so that new free pages get created along
+ // with overlapping seeks from the iterators so that we are likely to seek into
+ // some bad area in the page file.
+ executor = Executors.newFixedThreadPool(400);
+ final int numProducer = 300;
+ final int numConsumer = 100;
+ for (int i=0; i<numLists; i++) {
+ for (int j=0; j<numProducer; j++) {
+ executor.execute(new Job(i, PListTest.TaskType.ADD, iterations));
+ }
+ for (int k=0;k<numConsumer; k++) {
+ executor.execute(new Job(i, TaskType.ITERATE, iterations*2));
+ }
+ }
+
+ executor.shutdown();
+ LOG.info("wait for parallel work to complete");
+ boolean shutdown = executor.awaitTermination(60*60, TimeUnit.SECONDS);
+ assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+ assertTrue("test did not timeout ", shutdown);
+ LOG.info("Num dataFiles:" + store.getJournal().getFiles().size());
+ }
+
enum TaskType {CREATE, DELETE, ADD, REMOVE, ITERATE, ITERATE_REMOVE}
class Job implements Runnable {
@@ -476,10 +558,17 @@ public class PListTest {
for (int j = 0; j < iterations; j++) {
synchronized (plistLocks(plist)) {
- plist.addLast ("PL>" + id + idSeed + "-" + j, payload);
+ if (exceptions.isEmpty()) {
+ plist.addLast ("PL>" + id + idSeed + "-" + j, payload);
+ } else {
+ break;
+ }
}
}
- LOG.info("Job-" + id + ", Add, done: " + iterations);
+
+ if (exceptions.isEmpty()) {
+ LOG.info("Job-" + id + ", Add, done: " + iterations);
+ }
break;
case REMOVE:
Thread.currentThread().setName("R:"+id);
@@ -497,12 +586,20 @@ public class PListTest {
case ITERATE:
Thread.currentThread().setName("I:"+id);
plist = store.getPList(String.valueOf(id));
-
+ int iterateCount = 0;
synchronized (plistLocks(plist)) {
- Iterator<PListEntry> iterator = plist.iterator();
- PListEntry element = null;
- while (iterator.hasNext()) {
- element = iterator.next();
+ if (exceptions.isEmpty()) {
+ Iterator<PListEntry> iterator = plist.iterator();
+ while (iterator.hasNext() && exceptions.isEmpty()) {
+ iterator.next();
+ iterateCount++;
+ }
+
+ //LOG.info("Job-" + id + " Done iterate: it=" + iterator + ", count:" + iterateCount + ", size:" + plist.size());
+ if (plist.size() != iterateCount) {
+ System.err.println("Count Wrong: " + iterator);
+ }
+ assertEquals("iterate got all " + id + " iterator:" + iterator , plist.size(), iterateCount);
}
}
break;
@@ -515,10 +612,9 @@ public class PListTest {
synchronized (plistLocks(plist)) {
Iterator<PListEntry> removeIterator = plist.iterator();
- PListEntry v = null;
while (removeIterator.hasNext()) {
- v = removeIterator.next();
+ removeIterator.next();
removeIterator.remove();
if (removeCount++ > iterations) {
break;
@@ -532,9 +628,12 @@ public class PListTest {
}
} catch (Exception e) {
+ LOG.warn("Job["+id+"] caught exception: " + e.getMessage());
e.printStackTrace();
exceptions.add(e);
- executor.shutdownNow();
+ if (executor != null) {
+ executor.shutdownNow();
+ }
} finally {
Thread.currentThread().setName(threadName);
}
@@ -557,7 +656,7 @@ public class PListTest {
@Before
public void setUp() throws Exception {
- File directory = new File("target/test/PlistDB");
+ File directory = new File("/tmp/target/test/PlistDB");
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
startStore(directory);