You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2016/06/10 13:22:49 UTC
jena git commit: JENA-1191: Handle triples arriving via the quads
route
Repository: jena
Updated Branches:
refs/heads/master edba13136 -> 4ef9453ba
JENA-1191: Handle triples arriving via the quads route
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/4ef9453b
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/4ef9453b
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/4ef9453b
Branch: refs/heads/master
Commit: 4ef9453ba1beec5fafeee3bb0095f6c7f79039f6
Parents: edba131
Author: Andy Seaborne <an...@apache.org>
Authored: Fri Jun 10 14:22:41 2016 +0100
Committer: Andy Seaborne <an...@apache.org>
Committed: Fri Jun 10 14:22:41 2016 +0100
----------------------------------------------------------------------
.../jena/sdb/layout2/LoaderTuplesNodes.java | 352 +++++++++----------
1 file changed, 162 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jena/blob/4ef9453b/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java
----------------------------------------------------------------------
diff --git a/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java b/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java
index 0fee236..83e9d5d 100644
--- a/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java
+++ b/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java
@@ -35,27 +35,23 @@ import org.apache.jena.sdb.sql.SDBExceptionSQL ;
import org.apache.jena.sdb.store.StoreLoaderPlus ;
import org.apache.jena.sdb.store.TableDesc ;
import org.apache.jena.sdb.store.TupleLoader ;
+import org.apache.jena.sparql.core.Quad ;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-public class LoaderTuplesNodes
- extends SDBConnectionHolder
- implements StoreLoaderPlus
+public class LoaderTuplesNodes extends SDBConnectionHolder implements StoreLoaderPlus
{
private static Logger log = LoggerFactory.getLogger(LoaderTuplesNodes.class);
- //private static final String classShortName = Utils.classShortName(LoaderTriplesNodes.class) ;
-
// Delayed initialization until first bulk load.
private boolean initialized = false ;
- boolean threading = true; // Do we want to thread?
- Thread commitThread = null ; // The loader thread
- final static TupleChange flushSignal = new TupleChange(); // Signal to thread to commit
- final static TupleChange finishSignal = new TupleChange(); // Signal to thread to finish
- ArrayBlockingQueue<TupleChange> queue ; // Pipeline to loader thread
- AtomicReference<Throwable> threadException ; // Placeholder for problems thrown in the thread
- Object threadFlushing = new Object(); // We lock on this when flushing
+ boolean threading = true; // Do we want to thread?
+ Thread commitThread = null ; // The loader thread
+ final static TupleChange flushSignal = new TupleChange(); // Signal to thread to commit
+ final static TupleChange finishSignal = new TupleChange(); // Signal to thread to finish
+ ArrayBlockingQueue<TupleChange> queue ; // Pipeline to loader thread
+ AtomicReference<Throwable> threadException ; // Placeholder for problems thrown in the thread
+ Object threadFlushing = new Object(); // We lock on this when flushing
Map<String, TupleLoader> tupleLoaders;
TupleLoader currentLoader;
@@ -67,8 +63,7 @@ public class LoaderTuplesNodes
private Store store;
- public LoaderTuplesNodes(SDBConnection connection, Class<? extends TupleLoader> tupleLoaderClass)
- {
+ public LoaderTuplesNodes(SDBConnection connection, Class<? extends TupleLoader> tupleLoaderClass) {
super(connection) ;
this.tupleLoaderClass = tupleLoaderClass ;
}
@@ -78,69 +73,63 @@ public class LoaderTuplesNodes
}
@Override
- public void startBulkUpdate()
- {
- init() ;
- }
+ public void startBulkUpdate() {
+ init() ;
+ }
- @Override
- public void finishBulkUpdate()
- {
- flushTriples() ;
- }
+ @Override
+ public void finishBulkUpdate() {
+ flushTriples() ;
+ }
/**
* Close this loader and finish the thread (if required)
*
*/
@Override
- public void close()
- {
- if (!initialized) return;
-
- try
- {
- if (threading && commitThread.isAlive())
- {
- queue.put(finishSignal);
- commitThread.join();
- }
- else
- {
- flushTriples();
- }
- }
- catch (Exception e)
- {
- log.error("Problem closing loader: " + e.getMessage());
- throw new SDBException("Problem closing loader", e);
- }
- finally
- {
- for (TupleLoader loader: this.tupleLoaders.values()) loader.close();
- this.initialized = false;
- this.commitThread = null;
- this.queue = null;
- this.tupleLoaderClass = null;
- this.tupleLoaders = null;
- }
+ public void close() {
+ if ( !initialized )
+ return ;
+
+ try {
+ if ( threading && commitThread.isAlive() ) {
+ queue.put(finishSignal) ;
+ commitThread.join() ;
+ } else {
+ flushTriples() ;
+ }
+ }
+ catch (Exception e) {
+ log.error("Problem closing loader: " + e.getMessage()) ;
+ throw new SDBException("Problem closing loader", e) ;
+ }
+ finally {
+ for ( TupleLoader loader : this.tupleLoaders.values() )
+ loader.close() ;
+ this.initialized = false ;
+ this.commitThread = null ;
+ this.queue = null ;
+ this.tupleLoaderClass = null ;
+ this.tupleLoaders = null ;
+ }
}
-
+
@Override
- public void addTriple(Triple triple)
- {
+ public void addTriple(Triple triple) {
updateStore(new TupleChange(true, store.getTripleTableDesc(), triple.getSubject(), triple.getPredicate(), triple.getObject()));
}
@Override
- public void deleteTriple(Triple triple)
- {
+ public void deleteTriple(Triple triple) {
updateStore(new TupleChange(false, store.getTripleTableDesc(), triple.getSubject(), triple.getPredicate(), triple.getObject()));
}
@Override
public void addQuad(Node g, Node s, Node p, Node o) {
- updateStore(new TupleChange(true, store.getQuadTableDesc(), g, s, p, o));
+ if ( g == Quad.tripleInQuad || Quad.isDefaultGraph(o) )
+ updateStore(new TupleChange(true, store.getTripleTableDesc(), s, p, o));
+ else
+ updateStore(new TupleChange(true, store.getQuadTableDesc(), g, s, p, o));
}
@Override
@@ -150,7 +139,10 @@ public class LoaderTuplesNodes
@Override
public void deleteQuad(Node g, Node s, Node p, Node o) {
- updateStore(new TupleChange(false, store.getQuadTableDesc(), g, s, p, o));
+ if ( g == Quad.tripleInQuad || Quad.isDefaultGraph(o) )
+ updateStore(new TupleChange(false, store.getTripleTableDesc(), s, p, o));
+ else
+ updateStore(new TupleChange(false, store.getQuadTableDesc(), g, s, p, o));
}
@Override
@@ -186,97 +178,83 @@ public class LoaderTuplesNodes
}
}
- private void updateStore(TupleChange tuple)
- {
- if (threading)
- {
- checkThreadStatus();
- try
- {
- queue.put(tuple);
- }
- catch (InterruptedException e)
- {
- log.error("Issue adding to queue: " + e.getMessage());
- throw new SDBException("Issue adding to queue" + e.getMessage(), e);
- }
- }
- else
- {
- updateOneTuple(tuple);
- }
- }
+ private void updateStore(TupleChange tuple) {
+ if ( threading ) {
+ checkThreadStatus() ;
+ try {
+ queue.put(tuple) ;
+ }
+ catch (InterruptedException e) {
+ log.error("Issue adding to queue: " + e.getMessage()) ;
+ throw new SDBException("Issue adding to queue" + e.getMessage(), e) ;
+ }
+ } else {
+ updateOneTuple(tuple) ;
+ }
+ }
/**
* Flush remain triples in queue to database. If threading this blocks until flush is complete.
*/
- private void flushTriples()
- {
- if (threading)
- {
- if (!commitThread.isAlive()) throw new SDBException("Thread has died");
- // finish up threaded load
- try {
- synchronized (threadFlushing) {
- queue.put(flushSignal);
- threadFlushing.wait();
- }
- }
- catch (InterruptedException e)
- {
- log.error("Problem sending flush signal: " + e.getMessage());
- throw new SDBException("Problem sending flush signal", e);
- }
- checkThreadStatus();
- }
- else
- {
- commitTuples();
- }
- }
+ private void flushTriples() {
+ if ( threading ) {
+ if ( !commitThread.isAlive() )
+ throw new SDBException("Thread has died") ;
+ // finish up threaded load
+ try {
+ synchronized (threadFlushing) {
+ queue.put(flushSignal) ;
+ threadFlushing.wait() ;
+ }
+ }
+ catch (InterruptedException e) {
+ log.error("Problem sending flush signal: " + e.getMessage()) ;
+ throw new SDBException("Problem sending flush signal", e) ;
+ }
+ checkThreadStatus() ;
+ } else {
+ commitTuples() ;
+ }
+ }
- private void init()
- {
- if ( initialized ) return ;
-
- tupleLoaders = new HashMap<String, TupleLoader>();
- currentLoader = null;
-
- count = 0;
-
- if (threading)
- {
- queue = new ArrayBlockingQueue<TupleChange>(chunkSize);
- threadException = new AtomicReference<Throwable>();
- threadFlushing = new AtomicBoolean();
- commitThread = new Thread(new Commiter());
- commitThread.setDaemon(true);
- commitThread.start();
- log.debug("Threading started");
- }
-
- initialized = true;
- }
+ private void init() {
+ if ( initialized )
+ return ;
+
+ tupleLoaders = new HashMap<String, TupleLoader>() ;
+ currentLoader = null ;
- private void checkThreadStatus()
- {
- Throwable e = threadException.getAndSet(null);
- if (e != null)
- {
- if (e instanceof SQLException)
- throw new SDBExceptionSQL("Loader thread exception", (SQLException) e);
- else if (e instanceof RuntimeException)
- throw (RuntimeException) e;
- else
- throw new SDBException("Loader thread exception", e);
+ count = 0 ;
+
+ if ( threading ) {
+ queue = new ArrayBlockingQueue<TupleChange>(chunkSize) ;
+ threadException = new AtomicReference<Throwable>() ;
+ threadFlushing = new AtomicBoolean() ;
+ commitThread = new Thread(new Commiter()) ;
+ commitThread.setDaemon(true) ;
+ commitThread.start() ;
+ log.debug("Threading started") ;
+ }
+
+ initialized = true ;
+ }
+
+ private void checkThreadStatus() {
+ Throwable e = threadException.getAndSet(null) ;
+ if ( e != null ) {
+ if ( e instanceof SQLException )
+ throw new SDBExceptionSQL("Loader thread exception", (SQLException)e) ;
+ else if ( e instanceof RuntimeException )
+ throw (RuntimeException)e ;
+ else
+ throw new SDBException("Loader thread exception", e) ;
}
- if (!commitThread.isAlive())
- throw new SDBException("Thread has died");
+ if ( !commitThread.isAlive() )
+ throw new SDBException("Thread has died") ;
}
// Queue up a triple, committing if we have enough chunks
- private void updateOneTuple(TupleChange tuple)
- {
+ private void updateOneTuple(TupleChange tuple) {
if (currentLoader == null || !currentLoader.getTableDesc().getTableName().equals(tuple.table.getTableName())) {
commitTuples(); // mode is changing, so commit
@@ -298,11 +276,10 @@ public class LoaderTuplesNodes
else currentLoader.unload(tuple.tuple);
}
- private void commitTuples()
- {
- if (currentLoader != null) {
- currentLoader.finish();
- }
+ private void commitTuples() {
+ if ( currentLoader != null ) {
+ currentLoader.finish() ;
+ }
}
@Override
@@ -324,52 +301,47 @@ public class LoaderTuplesNodes
* The (very minimal) thread code
*/
- class Commiter implements Runnable
- {
-
+ class Commiter implements Runnable {
@Override
- public void run()
- {
- log.debug("Running loader thread");
- threadException.set(null);
- while (true)
- {
- try
- {
- TupleChange tuple = queue.take();
- if (tuple == flushSignal)
- {
- synchronized (threadFlushing) {
- try {
- commitTuples();
- } catch (Throwable e) { handleIssue(e); }
-
- threadFlushing.notify();
- }
- }
- else if (tuple == finishSignal)
- {
- try {
- commitTuples(); // force commit
- } catch (Throwable e) { handleIssue(e); }
-
- break;
- }
- else
- {
- updateOneTuple(tuple);
- }
- }
- catch (Throwable e)
- {
- handleIssue(e);
- }
+ public void run() {
+ log.debug("Running loader thread") ;
+ threadException.set(null) ;
+ while (true) {
+ try {
+ TupleChange tuple = queue.take() ;
+ if ( tuple == flushSignal ) {
+ synchronized (threadFlushing) {
+ try {
+ commitTuples() ;
+ }
+ catch (Throwable e) {
+ handleIssue(e) ;
+ }
+
+ threadFlushing.notify() ;
+ }
+ } else if ( tuple == finishSignal ) {
+ try {
+ commitTuples() ; // force commit
+ }
+ catch (Throwable e) {
+ handleIssue(e) ;
+ }
+
+ break ;
+ } else {
+ updateOneTuple(tuple) ;
+ }
+ }
+ catch (Throwable e) {
+ handleIssue(e) ;
+ }
}
}
- private void handleIssue(Throwable e) {
- log.error("Error in thread: " + e.getMessage(), e);
- threadException.set(e);
- }
+ private void handleIssue(Throwable e) {
+ log.error("Error in thread: " + e.getMessage(), e) ;
+ threadException.set(e) ;
+ }
}
}