You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2016/06/10 13:22:49 UTC

jena git commit: JENA-1191: Handle triples arriving via the quads route

Repository: jena
Updated Branches:
  refs/heads/master edba13136 -> 4ef9453ba


JENA-1191: Handle triples arriving via the quads route


Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/4ef9453b
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/4ef9453b
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/4ef9453b

Branch: refs/heads/master
Commit: 4ef9453ba1beec5fafeee3bb0095f6c7f79039f6
Parents: edba131
Author: Andy Seaborne <an...@apache.org>
Authored: Fri Jun 10 14:22:41 2016 +0100
Committer: Andy Seaborne <an...@apache.org>
Committed: Fri Jun 10 14:22:41 2016 +0100

----------------------------------------------------------------------
 .../jena/sdb/layout2/LoaderTuplesNodes.java     | 352 +++++++++----------
 1 file changed, 162 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/4ef9453b/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java
----------------------------------------------------------------------
diff --git a/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java b/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java
index 0fee236..83e9d5d 100644
--- a/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java
+++ b/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java
@@ -35,27 +35,23 @@ import org.apache.jena.sdb.sql.SDBExceptionSQL ;
 import org.apache.jena.sdb.store.StoreLoaderPlus ;
 import org.apache.jena.sdb.store.TableDesc ;
 import org.apache.jena.sdb.store.TupleLoader ;
+import org.apache.jena.sparql.core.Quad ;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-public class LoaderTuplesNodes
-    extends SDBConnectionHolder
-    implements StoreLoaderPlus
+public class LoaderTuplesNodes extends SDBConnectionHolder implements StoreLoaderPlus
 {
     private static Logger log = LoggerFactory.getLogger(LoaderTuplesNodes.class);
-    //private static final String classShortName = Utils.classShortName(LoaderTriplesNodes.class)  ;
-    
     // Delayed initialization until first bulk load.
     private boolean initialized = false ;
     
-    boolean threading = true; // Do we want to thread?
-    Thread commitThread = null ; // The loader thread
-    final static TupleChange flushSignal = new TupleChange(); // Signal to thread to commit
-    final static TupleChange finishSignal = new TupleChange(); // Signal to thread to finish
-    ArrayBlockingQueue<TupleChange> queue ; // Pipeline to loader thread
-    AtomicReference<Throwable> threadException ; // Placeholder for problems thrown in the thread
-    Object threadFlushing = new Object(); // We lock on this when flushing
+    boolean threading = true;                                   // Do we want to thread?
+    Thread commitThread = null ;                                // The loader thread
+    final static TupleChange flushSignal = new TupleChange();   // Signal to thread to commit
+    final static TupleChange finishSignal = new TupleChange();  // Signal to thread to finish
+    ArrayBlockingQueue<TupleChange> queue ;                     // Pipeline to loader thread
+    AtomicReference<Throwable> threadException ;                // Placeholder for problems thrown in the thread
+    Object threadFlushing = new Object();                       // We lock on this when flushing
     
     Map<String, TupleLoader> tupleLoaders;
     TupleLoader currentLoader;
@@ -67,8 +63,7 @@ public class LoaderTuplesNodes
 
 	private Store store;
 	
-    public LoaderTuplesNodes(SDBConnection connection, Class<? extends TupleLoader> tupleLoaderClass)
-    {
+    public LoaderTuplesNodes(SDBConnection connection, Class<? extends TupleLoader> tupleLoaderClass) {
         super(connection) ;
         this.tupleLoaderClass = tupleLoaderClass ;
     }
@@ -78,69 +73,63 @@ public class LoaderTuplesNodes
     }
     
     @Override
-    public void startBulkUpdate()
-	{
-    	init() ;
-	}
+    public void startBulkUpdate() {
+        init() ;
+    }
 
-	@Override
-    public void finishBulkUpdate()
-	{
-		flushTriples() ;
-	}
+    @Override
+    public void finishBulkUpdate() {
+        flushTriples() ;
+    }
 
 	/**
      * Close this loader and finish the thread (if required)
      *
      */
     @Override
-    public void close()
-    {
-    	if (!initialized) return;
-    	
-    	try
-    	{
-    		if (threading && commitThread.isAlive())
-    		{
-    			queue.put(finishSignal);
-    			commitThread.join();
-    		}
-    		else
-    		{
-    			flushTriples();
-    		}
-    	}
-    	catch (Exception e)
-    	{
-    		log.error("Problem closing loader: " + e.getMessage());
-    		throw new SDBException("Problem closing loader", e);
-    	}
-    	finally
-    	{
-    		for (TupleLoader loader: this.tupleLoaders.values()) loader.close();
-    		this.initialized = false;
-    		this.commitThread = null;
-    		this.queue = null;
-    		this.tupleLoaderClass = null;
-    		this.tupleLoaders = null;
-    	}
+    public void close() {
+        if ( !initialized )
+            return ;
+
+        try {
+            if ( threading && commitThread.isAlive() ) {
+                queue.put(finishSignal) ;
+                commitThread.join() ;
+            } else {
+                flushTriples() ;
+            }
+        }
+        catch (Exception e) {
+            log.error("Problem closing loader: " + e.getMessage()) ;
+            throw new SDBException("Problem closing loader", e) ;
+        }
+        finally {
+            for ( TupleLoader loader : this.tupleLoaders.values() )
+                loader.close() ;
+            this.initialized = false ;
+            this.commitThread = null ;
+            this.queue = null ;
+            this.tupleLoaderClass = null ;
+            this.tupleLoaders = null ;
+        }
     }
-    
+
     @Override
-    public void addTriple(Triple triple)
-	{
+    public void addTriple(Triple triple) {
     	updateStore(new TupleChange(true, store.getTripleTableDesc(), triple.getSubject(), triple.getPredicate(), triple.getObject()));
 	}
     
     @Override
-    public void deleteTriple(Triple triple) 
-    {
+    public void deleteTriple(Triple triple) {
     	updateStore(new TupleChange(false, store.getTripleTableDesc(), triple.getSubject(), triple.getPredicate(), triple.getObject()));
     }
     
 	@Override
     public void addQuad(Node g, Node s, Node p, Node o) {
-		updateStore(new TupleChange(true, store.getQuadTableDesc(), g, s, p, o));		
+	    if ( g == Quad.tripleInQuad || Quad.isDefaultGraph(o) )
+	        updateStore(new TupleChange(true, store.getTripleTableDesc(), s, p, o));
+	    else
+	        updateStore(new TupleChange(true, store.getQuadTableDesc(), g, s, p, o));		
 	}
 
 	@Override
@@ -150,7 +139,10 @@ public class LoaderTuplesNodes
 
 	@Override
     public void deleteQuad(Node g, Node s, Node p, Node o) {
-		updateStore(new TupleChange(false, store.getQuadTableDesc(), g, s, p, o));
+	    if ( g == Quad.tripleInQuad || Quad.isDefaultGraph(o) )
+            updateStore(new TupleChange(false, store.getTripleTableDesc(), s, p, o));
+        else
+            updateStore(new TupleChange(false, store.getQuadTableDesc(), g, s, p, o));
 	}
 
 	@Override
@@ -186,97 +178,83 @@ public class LoaderTuplesNodes
     	}
     }
     
-    private void updateStore(TupleChange tuple)
-    {   
-	    if (threading)
-	    {
-	        checkThreadStatus();
-	        try
-	        {
-	        	queue.put(tuple);
-	        }
-	        catch (InterruptedException e)
-	        {
-	        	log.error("Issue adding to queue: " + e.getMessage());
-	        	throw new SDBException("Issue adding to queue" + e.getMessage(), e);
-	        }
-	    }
-	    else
-	    {
-			updateOneTuple(tuple);
-	    }
-	}
+    private void updateStore(TupleChange tuple) {
+        if ( threading ) {
+            checkThreadStatus() ;
+            try {
+                queue.put(tuple) ;
+            }
+            catch (InterruptedException e) {
+                log.error("Issue adding to queue: " + e.getMessage()) ;
+                throw new SDBException("Issue adding to queue" + e.getMessage(), e) ;
+            }
+        } else {
+            updateOneTuple(tuple) ;
+        }
+    }
 
 	/**
 	 * Flush remain triples in queue to database. If threading this blocks until flush is complete.
 	 */
-	private void flushTriples()
-	{
-		if (threading)
-	    {
-			if (!commitThread.isAlive()) throw new SDBException("Thread has died");
-	    	// finish up threaded load
-	    	try {
-	    		synchronized (threadFlushing) {
-	    			queue.put(flushSignal);
-	    			threadFlushing.wait();
-	    		}
-	    	}
-	    	catch (InterruptedException e)
-	    	{
-	    		log.error("Problem sending flush signal: " + e.getMessage());
-	    		throw new SDBException("Problem sending flush signal", e);
-	    	}
-	    	checkThreadStatus();
-	    }
-		else
-		{
-			commitTuples();
-		}
-	}
+    private void flushTriples() {
+        if ( threading ) {
+            if ( !commitThread.isAlive() )
+                throw new SDBException("Thread has died") ;
+            // finish up threaded load
+            try {
+                synchronized (threadFlushing) {
+                    queue.put(flushSignal) ;
+                    threadFlushing.wait() ;
+                }
+            }
+            catch (InterruptedException e) {
+                log.error("Problem sending flush signal: " + e.getMessage()) ;
+                throw new SDBException("Problem sending flush signal", e) ;
+            }
+            checkThreadStatus() ;
+        } else {
+            commitTuples() ;
+        }
+    }
 
-	private void init()
-	{
-	    if ( initialized ) return ;
-	    
-	    tupleLoaders = new HashMap<String, TupleLoader>();
-	    currentLoader = null;
-	    
-	    count = 0;
-	    
-	    if (threading)
-	    {
-	        queue = new ArrayBlockingQueue<TupleChange>(chunkSize);
-	        threadException = new AtomicReference<Throwable>();
-	        threadFlushing = new AtomicBoolean();
-	        commitThread = new Thread(new Commiter());
-	        commitThread.setDaemon(true);
-	        commitThread.start();
-	        log.debug("Threading started");
-	    }
-	    
-	    initialized = true;
-	}
+    private void init() {
+        if ( initialized )
+            return ;
+
+        tupleLoaders = new HashMap<String, TupleLoader>() ;
+        currentLoader = null ;
 
-	private void checkThreadStatus()
-    {
-		Throwable e = threadException.getAndSet(null);
-    	if (e != null)
-        {
-        	if (e instanceof SQLException)
-        		throw new SDBExceptionSQL("Loader thread exception", (SQLException) e);
-        	else if (e instanceof RuntimeException)
-        		throw (RuntimeException) e;
-        	else
-        		throw new SDBException("Loader thread exception", e);
+        count = 0 ;
+
+        if ( threading ) {
+            queue = new ArrayBlockingQueue<TupleChange>(chunkSize) ;
+            threadException = new AtomicReference<Throwable>() ;
+            threadFlushing = new AtomicBoolean() ;
+            commitThread = new Thread(new Commiter()) ;
+            commitThread.setDaemon(true) ;
+            commitThread.start() ;
+            log.debug("Threading started") ;
+        }
+
+        initialized = true ;
+    }
+
+    private void checkThreadStatus() {
+        Throwable e = threadException.getAndSet(null) ;
+        if ( e != null ) {
+            if ( e instanceof SQLException )
+                throw new SDBExceptionSQL("Loader thread exception", (SQLException)e) ;
+            else if ( e instanceof RuntimeException )
+                throw (RuntimeException)e ;
+            else
+                throw new SDBException("Loader thread exception", e) ;
         }
-    	if (!commitThread.isAlive())
-    		throw new SDBException("Thread has died");
+        if ( !commitThread.isAlive() )
+            throw new SDBException("Thread has died") ;
     }
     
     // Queue up a triple, committing if we have enough chunks
-    private void updateOneTuple(TupleChange tuple)
-    {
+    private void updateOneTuple(TupleChange tuple) {
     	if (currentLoader == null || !currentLoader.getTableDesc().getTableName().equals(tuple.table.getTableName())) {
     		
     		commitTuples(); // mode is changing, so commit
@@ -298,11 +276,10 @@ public class LoaderTuplesNodes
     	else currentLoader.unload(tuple.tuple);
     }
     
-    private void commitTuples()
-    {
-    	if (currentLoader != null) {
-    		currentLoader.finish();
-    	}
+    private void commitTuples() {
+        if ( currentLoader != null ) {
+            currentLoader.finish() ;
+        }
     }
     
     @Override
@@ -324,52 +301,47 @@ public class LoaderTuplesNodes
      * The (very minimal) thread code
      */
 
-    class Commiter implements Runnable
-    {
-
+    class Commiter implements Runnable {
         @Override
-        public void run()
-        {
-            log.debug("Running loader thread");
-            threadException.set(null);
-            while (true)
-            {
-            	try
-            	{
-            		TupleChange tuple = queue.take();
-            		if (tuple == flushSignal)
-            		{
-            			synchronized (threadFlushing) {
-            				try {
-            					commitTuples();
-            				} catch (Throwable e) { handleIssue(e); }
-            				
-            				threadFlushing.notify();
-            			}
-            		}
-            		else if (tuple == finishSignal)
-            		{
-            			try {
-            				commitTuples(); // force commit
-            			} catch (Throwable e) { handleIssue(e); }
-            			
-            			break;
-            		}
-            		else
-            		{
-            			updateOneTuple(tuple);
-            		}
-            	}
-            	catch (Throwable e)
-            	{
-            		handleIssue(e);
-            	}
+        public void run() {
+            log.debug("Running loader thread") ;
+            threadException.set(null) ;
+            while (true) {
+                try {
+                    TupleChange tuple = queue.take() ;
+                    if ( tuple == flushSignal ) {
+                        synchronized (threadFlushing) {
+                            try {
+                                commitTuples() ;
+                            }
+                            catch (Throwable e) {
+                                handleIssue(e) ;
+                            }
+
+                            threadFlushing.notify() ;
+                        }
+                    } else if ( tuple == finishSignal ) {
+                        try {
+                            commitTuples() ; // force commit
+                        }
+                        catch (Throwable e) {
+                            handleIssue(e) ;
+                        }
+
+                        break ;
+                    } else {
+                        updateOneTuple(tuple) ;
+                    }
+                }
+                catch (Throwable e) {
+                    handleIssue(e) ;
+                }
             }
         }
 
-		private void handleIssue(Throwable e) {
-    		log.error("Error in thread: " + e.getMessage(), e);
-    		threadException.set(e);
-		}
+        private void handleIssue(Throwable e) {
+            log.error("Error in thread: " + e.getMessage(), e) ;
+            threadException.set(e) ;
+        }
     }
 }