You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@clerezza.apache.org by rw...@apache.org on 2013/03/18 08:33:13 UTC
svn commit: r1457660 - in /clerezza/trunk/rdf.jena.tdb.storage/src:
main/java/org/apache/clerezza/rdf/jena/tdb/storage/SingleTdbDatasetTcProvider.java
test/java/org/apache/clerezza/rdf/jena/tdb/storage/MultiThreadedSingleTdbDatasetTest.java
Author: rwesten
Date: Mon Mar 18 07:33:13 2013
New Revision: 1457660
URL: http://svn.apache.org/r1457660
Log:
CLEREZZA-745: Changes the SingleSingleTdbDatasetTcProvider to use a single ReadWrite lock for all Clerezza TripleCollections backed by the same Jena Dataset. Also adapts the MultiThreadedSingleTdbDatasetTest to explicitly validate the issue reported by this issue
Modified:
clerezza/trunk/rdf.jena.tdb.storage/src/main/java/org/apache/clerezza/rdf/jena/tdb/storage/SingleTdbDatasetTcProvider.java
clerezza/trunk/rdf.jena.tdb.storage/src/test/java/org/apache/clerezza/rdf/jena/tdb/storage/MultiThreadedSingleTdbDatasetTest.java
Modified: clerezza/trunk/rdf.jena.tdb.storage/src/main/java/org/apache/clerezza/rdf/jena/tdb/storage/SingleTdbDatasetTcProvider.java
URL: http://svn.apache.org/viewvc/clerezza/trunk/rdf.jena.tdb.storage/src/main/java/org/apache/clerezza/rdf/jena/tdb/storage/SingleTdbDatasetTcProvider.java?rev=1457660&r1=1457659&r2=1457660&view=diff
==============================================================================
--- clerezza/trunk/rdf.jena.tdb.storage/src/main/java/org/apache/clerezza/rdf/jena/tdb/storage/SingleTdbDatasetTcProvider.java (original)
+++ clerezza/trunk/rdf.jena.tdb.storage/src/main/java/org/apache/clerezza/rdf/jena/tdb/storage/SingleTdbDatasetTcProvider.java Mon Mar 18 07:33:13 2013
@@ -12,23 +12,32 @@ import java.math.BigDecimal;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
+import java.util.Collection;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.clerezza.rdf.core.Graph;
import org.apache.clerezza.rdf.core.MGraph;
+import org.apache.clerezza.rdf.core.NonLiteral;
+import org.apache.clerezza.rdf.core.Resource;
+import org.apache.clerezza.rdf.core.Triple;
import org.apache.clerezza.rdf.core.TripleCollection;
import org.apache.clerezza.rdf.core.UriRef;
import org.apache.clerezza.rdf.core.access.EntityAlreadyExistsException;
import org.apache.clerezza.rdf.core.access.EntityUndeletableException;
+import org.apache.clerezza.rdf.core.access.LockableMGraph;
import org.apache.clerezza.rdf.core.access.LockableMGraphWrapper;
import org.apache.clerezza.rdf.core.access.NoSuchEntityException;
import org.apache.clerezza.rdf.core.access.TcProvider;
import org.apache.clerezza.rdf.core.access.WeightedTcProvider;
+import org.apache.clerezza.rdf.core.event.FilterTriple;
+import org.apache.clerezza.rdf.core.event.GraphListener;
import org.apache.clerezza.rdf.core.impl.SimpleGraph;
import org.apache.clerezza.rdf.core.impl.util.PrivilegedGraphWrapper;
import org.apache.clerezza.rdf.core.impl.util.PrivilegedMGraphWrapper;
@@ -40,7 +49,6 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Properties;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Service;
-import org.openjena.atlas.lib.Tuple;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.service.cm.ConfigurationAdmin;
@@ -111,6 +119,7 @@ public class SingleTdbDatasetTcProvider
private SyncThread syncThread;
private Dataset dataset;
+ private final ReadWriteLock datasetLock = new ReentrantReadWriteLock();;
private File graphConfigFile;
private File mGraphConfigFile;
@@ -182,7 +191,7 @@ public class SingleTdbDatasetTcProvider
graph = new PrivilegedGraphWrapper(jenaAdapter.getGraph());
} else { //construct an MGraph
jenaAdapter = new JenaGraphAdaptor(model.getGraph());
- this.graph = new LockableMGraphWrapper(
+ this.graph = new DatasetLockedMGraph(
new PrivilegedMGraphWrapper(jenaAdapter));
}
}
@@ -275,12 +284,15 @@ public class SingleTdbDatasetTcProvider
interrupt();
}
if (!stopRequested) {
- synchronized (dataset) {
+ datasetLock.writeLock().lock();
+ try {
for(ModelGraph mg : initModels.values()){
if(mg.isReadWrite()){
mg.sync();
} //else we do not need to sync read-only models
}
+ } finally {
+ datasetLock.writeLock().unlock();
}
}
}
@@ -415,7 +427,8 @@ public class SingleTdbDatasetTcProvider
} //else exists and is a directory ... nothing to do
TDB.getContext().set(TDB.symUnionDefaultGraph, true);
dataset = TDBFactory.createDataset(dataDir.getAbsolutePath());
-
+ //init the read/write lock
+
//init the graph config (stores the graph and mgraph names in a config file)
initGraphConfigs(dataDir,config);
@@ -458,13 +471,16 @@ public class SingleTdbDatasetTcProvider
@Deactivate
protected void deactivate(ComponentContext ctx) {
if(dataset != null){ //avoid NPE on multiple calls
- synchronized (dataset) {
+ datasetLock.writeLock().lock();
+ try {
for(ModelGraph mg : initModels.values()){
mg.close(); //close also syncs!
}
TDB.sync(dataset);
dataset.close();
dataset = null;
+ } finally {
+ datasetLock.writeLock().unlock();
}
}
if(syncThread != null){
@@ -472,7 +488,6 @@ public class SingleTdbDatasetTcProvider
syncThread = null;
}
initModels = null;
- dataset = null;
graphConfigFile = null;
graphNames = null;
mGraphConfigFile = null;
@@ -495,7 +510,8 @@ public class SingleTdbDatasetTcProvider
*/
private ModelGraph getModelGraph(UriRef name, boolean readWrite,boolean create) throws NoSuchEntityException {
ModelGraph modelGraph;
- synchronized (dataset) {
+ datasetLock.readLock().lock();
+ try {
modelGraph = initModels.get(name);
if(modelGraph != null && create){
throw new EntityAlreadyExistsException(name);
@@ -506,6 +522,8 @@ public class SingleTdbDatasetTcProvider
dataset.getNamedModel(modelName),readWrite);
this.initModels.put(name, modelGraph);
}
+ } finally {
+ datasetLock.readLock().unlock();
}
return modelGraph;
}
@@ -518,12 +536,15 @@ public class SingleTdbDatasetTcProvider
if(name == null){
throw new IllegalArgumentException("The parsed Graph UriRef MUST NOT be NULL!");
}
- synchronized (dataset) {
+ datasetLock.readLock().lock();
+ try {
if(graphNames.contains(name) || name.equals(defaultGraphName)){
return getModelGraph(name,false,false).getGraph();
} else {
throw new NoSuchEntityException(name);
}
+ } finally {
+ datasetLock.readLock().unlock();
}
}
/*
@@ -535,12 +556,15 @@ public class SingleTdbDatasetTcProvider
if(name == null){
throw new IllegalArgumentException("The parsed Graph UriRef MUST NOT be NULL!");
}
- synchronized (dataset) {
+ datasetLock.readLock().lock();
+ try {
if(mGraphNames.contains(name)){
return getModelGraph(name,true,false).getMGraph();
} else {
throw new NoSuchEntityException(name);
}
+ } finally {
+ datasetLock.readLock().unlock();
}
}
/*
@@ -552,7 +576,8 @@ public class SingleTdbDatasetTcProvider
if(name == null){
throw new IllegalArgumentException("The parsed Graph UriRef MUST NOT be NULL!");
}
- synchronized (dataset) {
+ datasetLock.readLock().lock();
+ try {
if(graphNames.contains(name) || name.equals(defaultGraphName)){
return getGraph(name);
} else if(mGraphNames.contains(name)){
@@ -560,6 +585,8 @@ public class SingleTdbDatasetTcProvider
} else {
throw new NoSuchEntityException(name);
}
+ } finally {
+ datasetLock.readLock().unlock();
}
}
/*
@@ -594,10 +621,13 @@ public class SingleTdbDatasetTcProvider
@Override
public Set<UriRef> listTripleCollections() {
Set<UriRef> graphNames = new HashSet<UriRef>();
- synchronized (dataset) {
+ datasetLock.readLock().lock();
+ try {
for(Iterator<String> names = dataset.listNames();
names.hasNext();
graphNames.add(new UriRef(names.next())));
+ } finally {
+ datasetLock.readLock().unlock();
}
if(defaultGraphName != null){
graphNames.add(defaultGraphName);
@@ -614,7 +644,8 @@ public class SingleTdbDatasetTcProvider
if(name == null){
throw new IllegalArgumentException("The parsed MGrpah name MUST NOT be NULL!");
}
- synchronized (dataset) {
+ datasetLock.writeLock().lock();
+ try {
if(graphNames.contains(name) || mGraphNames.contains(name) || name.equals(defaultGraphName)){
throw new EntityAlreadyExistsException(name);
}
@@ -627,6 +658,8 @@ public class SingleTdbDatasetTcProvider
+ mGraphConfigFile+"'!",e);
}
return graph;
+ } finally {
+ datasetLock.writeLock().unlock();
}
}
/*
@@ -640,7 +673,8 @@ public class SingleTdbDatasetTcProvider
throw new IllegalArgumentException("The parsed Grpah name MUST NOT be NULL!");
}
ModelGraph mg;
- synchronized (dataset) {
+ datasetLock.writeLock().lock();
+ try {
if(graphNames.contains(name) || mGraphNames.contains(name) || name.equals(defaultGraphName)){
throw new EntityAlreadyExistsException(name);
}
@@ -652,13 +686,13 @@ public class SingleTdbDatasetTcProvider
throw new IllegalStateException("Unable to wirte GraphName config file '"
+ graphConfigFile+"'!",e);
}
- }
- //add the parsed data!
- if(triples != null) { //load the initial and final set of triples
- mg.getJenaAdapter().addAll(triples);
- synchronized(dataset){
- mg.sync();
+ //add the parsed data!
+ if(triples != null) { //load the initial and final set of triples
+ mg.getJenaAdapter().addAll(triples);
+ mg.sync();
}
+ } finally {
+ datasetLock.writeLock().unlock();
}
return mg.getGraph();
}
@@ -673,7 +707,8 @@ public class SingleTdbDatasetTcProvider
if(name == null){
throw new IllegalArgumentException("The parsed MGrpah name MUST NOT be NULL!");
}
- synchronized (dataset) {
+ datasetLock.writeLock().lock();
+ try {
if(mGraphNames.remove(name)){
try {
writeMGraphConfig();
@@ -699,6 +734,8 @@ public class SingleTdbDatasetTcProvider
}
//delete the graph from the initModels list
initModels.remove(name);
+ } finally {
+ datasetLock.writeLock().unlock();
}
}
/*
@@ -822,8 +859,13 @@ public class SingleTdbDatasetTcProvider
graphNames = new HashSet<UriRef>();
boolean configPresent = readGraphConfig(graphConfigFile, graphNames);
log.info("Present named Models");
- for(Iterator<String> it = dataset.listNames();it.hasNext();){
- log.info(" > {}",it.next());
+ datasetLock.readLock().lock();
+ try {
+ for(Iterator<String> it = dataset.listNames();it.hasNext();){
+ log.info(" > {}",it.next());
+ }
+ } finally {
+ datasetLock.readLock().unlock();
}
if(configPresent) {
//validate that all Graphs and MGraphs in the configFile also are
@@ -880,11 +922,14 @@ public class SingleTdbDatasetTcProvider
// }
// }
} else { //read pre-existing models in the dataset
- synchronized (dataset) {
+ datasetLock.readLock().lock();
+ try {
for(Iterator<String> it = dataset.listNames();it.hasNext();){
mGraphNames.add(new UriRef(it.next()));
}
writeMGraphConfig();
+ } finally {
+ datasetLock.readLock().unlock();
}
}
}
@@ -913,5 +958,206 @@ public class SingleTdbDatasetTcProvider
return false;
}
}
+ /**
+ * {@link LockableMGraph} wrapper that uses a single {@link ReadWriteLock} for
+ * the Jena TDB {@link SingleTdbDatasetTcProvider#dataset}
+ * @author Rupert Westenthaler
+ *
+ */
+ private class DatasetLockedMGraph implements LockableMGraph {
+
+ private final MGraph wrapped;
+
+ /**
+ * Constructs a LocalbleMGraph for an MGraph.
+ *
+ * @param providedMGraph a non-lockable mgraph
+ */
+ public DatasetLockedMGraph(final MGraph providedMGraph) {
+ this.wrapped = providedMGraph;
+ }
+
+ @Override
+ public ReadWriteLock getLock() {
+ return datasetLock;
+ }
+
+ @Override
+ public Graph getGraph() {
+ datasetLock.readLock().lock();
+ try {
+ return wrapped.getGraph();
+ } finally {
+ datasetLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public Iterator<Triple> filter(NonLiteral subject, UriRef predicate, Resource object) {
+ //users will need to aquire a readlock while iterating
+ return wrapped.filter(subject, predicate, object);
+ }
+
+ @Override
+ public int size() {
+ datasetLock.readLock().lock();
+ try {
+ return wrapped.size();
+ } finally {
+ datasetLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ datasetLock.readLock().lock();
+ try {
+ return wrapped.isEmpty();
+ } finally {
+ datasetLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ datasetLock.readLock().lock();
+ try {
+ return wrapped.contains(o);
+ } finally {
+ datasetLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public Iterator<Triple> iterator() {
+ //users will need it acquire a read lock while iterating!
+ return wrapped.iterator();
+ }
+
+ @Override
+ public Object[] toArray() {
+ datasetLock.readLock().lock();
+ try {
+ return wrapped.toArray();
+ } finally {
+ datasetLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public <T> T[] toArray(T[] a) {
+ datasetLock.readLock().lock();
+ try {
+ return wrapped.toArray(a);
+ } finally {
+ datasetLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ datasetLock.readLock().lock();
+ try {
+ return wrapped.containsAll(c);
+ } finally {
+ datasetLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean add(Triple e) {
+ datasetLock.writeLock().lock();
+ try {
+ return wrapped.add(e);
+ } finally {
+ datasetLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ datasetLock.writeLock().lock();
+ try {
+ return wrapped.remove(o);
+ } finally {
+ datasetLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends Triple> c) {
+ datasetLock.writeLock().lock();
+ try {
+ return wrapped.addAll(c);
+ } finally {
+ datasetLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ datasetLock.writeLock().lock();
+ try {
+ return wrapped.removeAll(c);
+ } finally {
+ datasetLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ datasetLock.writeLock().lock();
+ try {
+ return wrapped.retainAll(c);
+ } finally {
+ datasetLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void clear() {
+ datasetLock.writeLock().lock();
+ try {
+ wrapped.clear();
+ } finally {
+ datasetLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void addGraphListener(GraphListener listener, FilterTriple filter, long delay) {
+ wrapped.addGraphListener(listener, filter, delay);
+ }
+
+ @Override
+ public void addGraphListener(GraphListener listener, FilterTriple filter) {
+ wrapped.addGraphListener(listener, filter);
+ }
+
+ @Override
+ public void removeGraphListener(GraphListener listener) {
+ wrapped.removeGraphListener(listener);
+ }
+
+ @Override
+ public int hashCode() {
+ return wrapped.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if(obj instanceof DatasetLockedMGraph){
+ DatasetLockedMGraph other = (DatasetLockedMGraph) obj;
+ return wrapped.equals(other.wrapped);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return wrapped.toString();
+ }
+ }
}
Modified: clerezza/trunk/rdf.jena.tdb.storage/src/test/java/org/apache/clerezza/rdf/jena/tdb/storage/MultiThreadedSingleTdbDatasetTest.java
URL: http://svn.apache.org/viewvc/clerezza/trunk/rdf.jena.tdb.storage/src/test/java/org/apache/clerezza/rdf/jena/tdb/storage/MultiThreadedSingleTdbDatasetTest.java?rev=1457660&r1=1457659&r2=1457660&view=diff
==============================================================================
--- clerezza/trunk/rdf.jena.tdb.storage/src/test/java/org/apache/clerezza/rdf/jena/tdb/storage/MultiThreadedSingleTdbDatasetTest.java (original)
+++ clerezza/trunk/rdf.jena.tdb.storage/src/test/java/org/apache/clerezza/rdf/jena/tdb/storage/MultiThreadedSingleTdbDatasetTest.java Mon Mar 18 07:33:13 2013
@@ -23,11 +23,17 @@ package org.apache.clerezza.rdf.jena.tdb
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashSet;
import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import junit.framework.Assert;
@@ -37,6 +43,7 @@ import org.apache.clerezza.rdf.core.MGra
import org.apache.clerezza.rdf.core.Resource;
import org.apache.clerezza.rdf.core.Triple;
import org.apache.clerezza.rdf.core.UriRef;
+import org.apache.clerezza.rdf.core.access.LockableMGraph;
import org.apache.clerezza.rdf.core.impl.PlainLiteralImpl;
import org.apache.clerezza.rdf.core.impl.TripleImpl;
import org.apache.felix.scr.annotations.Activate;
@@ -44,35 +51,41 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.openjena.atlas.logging.Log;
import org.osgi.service.cm.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.wymiwyg.commons.util.Util;
public class MultiThreadedSingleTdbDatasetTest {
+ private Logger log = LoggerFactory.getLogger(MultiThreadedSingleTdbDatasetTest.class);
-
+ private static final String TEST_GRAPH_URI_PREFIX = "http://www.example.org/multiThreadTest";
+ private int[] graphNum = new int[]{0};
/**
* how many threads to start
*/
- private static final int THREAD_COUNT = 100;
+ private static final int TEST_THREAD_COUNT = 25;
+ private static final int VALIDATE_THREAD_COUNT = 2;
/**
* how many seconds to let them run
*/
- private static final int DELAY = 30;
+ private static final int DELAY = 15;
- private MGraph mGraph;
- private Set<Triple> testTriples = Collections.synchronizedSet(new HashSet<Triple>());
+ protected final List<MGraph> mGraphs = new ArrayList<MGraph>();
+ protected final List<Set<Triple>> testTriplesList = new ArrayList<Set<Triple>>();
+ private Random random = new Random();
class TestThread extends Thread {
- private final int id;
private boolean stopRequested;
private int addedTripleCount = 0;
public TestThread(final int id) {
- this.id = id;
+ setName("Test Thread "+id);
start();
}
@@ -83,9 +96,35 @@ public class MultiThreadedSingleTdbDatas
@Override
public void run() {
while (!stopRequested) {
+ float r;
+ synchronized (random) {
+ r = random.nextFloat();
+ }
+ MGraph graph;
+ Set<Triple> testTriples;
+ if(r > 0.995){
+ int num;
+ synchronized (graphNum) {
+ num = graphNum[0];
+ graphNum[0]++;
+ }
+ graph = provider.createMGraph(new UriRef(TEST_GRAPH_URI_PREFIX+num));
+ log.info(" ... creating the {}. Grpah", num+1);
+ testTriples = new HashSet<Triple>();
+ synchronized (mGraphs) {
+ mGraphs.add(graph);
+ testTriplesList.add(testTriples);
+ }
+ } else { //map the range [0..0.995] to the mGraphs
+ synchronized (mGraphs) {
+ int num = Math.round(r*(float)(mGraphs.size()-1)/0.995f);
+ graph = mGraphs.get(num);
+ testTriples = testTriplesList.get(num);
+ }
+ }
Literal randomLiteral = new PlainLiteralImpl(Util.createRandomString(22));
Triple triple = new TripleImpl(new BNode(), new UriRef("http://example.com/property"), randomLiteral);
- mGraph.add(triple);
+ graph.add(triple);
addedTripleCount++;
if ((addedTripleCount % 100) == 0) {
testTriples.add(triple);
@@ -98,6 +137,60 @@ public class MultiThreadedSingleTdbDatas
}
}
+ /**
+ * Iterates over max. the first 10 triples of a Graph
+ * while acquiring a read lock on the graph.
+ * @author westei
+ *
+ */
+ class ValidatorThread extends Thread {
+
+ boolean stopRequested = false;
+
+ public ValidatorThread(int id) {
+ setName("Validator Thread "+id);
+ start();
+ }
+
+ public void requestStop() {
+ stopRequested = true;
+ }
+
+ @Override
+ public void run() {
+ while (!stopRequested) {
+ float r;
+ synchronized (random) {
+ r = random.nextFloat();
+ }
+ int num = Math.round(r*(float)(mGraphs.size()-1));
+ LockableMGraph graph;
+ synchronized (mGraphs) {
+ graph = (LockableMGraph)mGraphs.get(num);
+ }
+ int elem = 0;
+ graph.getLock().readLock().lock();
+ try {
+ Iterator<Triple> it = graph.iterator();
+ while(it.hasNext() && elem < 10){
+ elem++;
+ it.next();
+ }
+ } finally {
+ graph.getLock().readLock().unlock();
+ }
+ //iterate inly every 200ms
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ }
+ }
+
+
+
+ }
@@ -114,30 +207,52 @@ public class MultiThreadedSingleTdbDatas
provider = new SingleTdbDatasetTcProvider(config);
}
@Before
- public void createGraph(){
- this.mGraph = provider.createMGraph(new UriRef("http://www.example.org/multiThreadTest"));
+ public void createGraphs(){
+ mGraphs.add(provider.createMGraph(new UriRef(TEST_GRAPH_URI_PREFIX+graphNum[0])));
+ testTriplesList.add(new HashSet<Triple>());
+ graphNum[0]++;
+ mGraphs.add(provider.createMGraph(new UriRef(TEST_GRAPH_URI_PREFIX+graphNum[0])));
+ testTriplesList.add(new HashSet<Triple>());
+ graphNum[0]++;
}
@Test
public void perform() throws InterruptedException {
- TestThread[] threads = new TestThread[THREAD_COUNT];
+ TestThread[] threads = new TestThread[TEST_THREAD_COUNT];
for (int i = 0; i < threads.length; i++) {
threads[i] = new TestThread(i);
}
+ ValidatorThread[] validators = new ValidatorThread[VALIDATE_THREAD_COUNT];
+ for(int i = 0; i < validators.length; i++) {
+ validators [i] = new ValidatorThread(i);
+ }
Thread.sleep(DELAY*1000);
for (TestThread testThread : threads) {
testThread.requestStop();
}
+ for (ValidatorThread validator : validators) {
+ validator.requestStop();
+ }
for (TestThread testThread : threads) {
testThread.join();
}
+ for (ValidatorThread validator : validators) {
+ validator.join();
+ }
int addedTriples = 0;
for (TestThread testThread : threads) {
addedTriples += testThread.getAddedTripleCount();
}
- Assert.assertEquals(addedTriples, mGraph.size());
- for (Triple testTriple : testTriples) {
- Assert.assertTrue(mGraph.contains(testTriple));
+ int graphTriples = 0;
+ log.info("Test created {} graphs with {} triples", mGraphs.size(), addedTriples);
+ for(int i = 0;i < mGraphs.size(); i++){
+ MGraph graph = mGraphs.get(i);
+ graphTriples += graph.size();
+ log.info(" > Grpah {}: {} triples",i,graph.size());
+ for (Triple testTriple : testTriplesList.get(i)) {
+ Assert.assertTrue(graph.contains(testTriple));
+ }
}
+ Assert.assertEquals(addedTriples, graphTriples);
}
@AfterClass
public static void cleanUpDirectory() {