You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2012/12/27 14:48:16 UTC

svn commit: r1426205 - in /jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb: TDBLoader.java store/bulkloader/BulkLoader.java store/bulkloader/Destination.java

Author: andy
Date: Thu Dec 27 13:48:15 2012
New Revision: 1426205

URL: http://svn.apache.org/viewvc?rev=1426205&view=rev
Log:
Connect the bulk loader directly to the output of RIOT parsing.

Removed:
    jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/store/bulkloader/Destination.java
Modified:
    jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDBLoader.java
    jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/store/bulkloader/BulkLoader.java

Modified: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDBLoader.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDBLoader.java?rev=1426205&r1=1426204&r2=1426205&view=diff
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDBLoader.java (original)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDBLoader.java Thu Dec 27 13:48:15 2012
@@ -39,8 +39,8 @@ import com.hp.hpl.jena.tdb.store.bulkloa
 public class TDBLoader
 {
     /** Load the contents of URL into a dataset.  URL must name a quads format file (NQuads or TriG - NTriples is also accepted).
-     *  To a triples format, use @link{#load(GraphTDB, String)}
-     *  or @link{#loadTriples(DatasetGraphTDB, List<String>, boolean)}
+     *  To a triples format, use {@link #load(GraphTDB, String)}
+     *  or {@link #load(DatasetGraphTDB, List, boolean)}
     */
     public static void load(DatasetGraphTDB dataset, String url)
     {
@@ -48,8 +48,8 @@ public class TDBLoader
     }
 
     /** Load the contents of URL into a dataset.  URL must name a quads format file (NQuads or TriG - NTriples is also accepted).
-     *  To a triples format, use @link{#load(GraphTDB, String, boolean)} 
-     *  or @link{#loadTriples(DatasetGraphTDB, List<String>, boolean)}
+     *  To a triples format, use {@link #load(GraphTDB, String, boolean)} 
+     *  or {@link #load(DatasetGraphTDB, List, boolean)}
     */
     public static void load(DatasetGraphTDB dataset, String url, boolean showProgress)
     {
@@ -57,8 +57,8 @@ public class TDBLoader
     }
 
     /** Load the contents of URL into a dataset.  URL must name a quads format file (NQuads or TriG - NTriples is also accepted).
-     *  To load a triples format, use @link{#load(GraphTDB, List<String>, boolean)} 
-     *  or @link{#loadTriples(DatasetGraphTDB, List<String>, boolean)} 
+     *  To load a triples format, use {@link #load(GraphTDB, List, boolean)} 
+     *  or {@link #load(DatasetGraphTDB, List, boolean)} 
     */
     public static void load(DatasetGraphTDB dataset, List<String> urls)
     {
@@ -66,8 +66,8 @@ public class TDBLoader
     }
     
     /** Load the contents of URL into a dataset.  URL must name a quads format file (NQuads or TriG - NTriples is also accepted).
-     *  To load a triples format, use @link{#load(GraphTDB, List<String>, boolean)} 
-     *  or @link{#loadTriples(DatasetGraphTDB, List<String>, boolean)} 
+     *  To load a triples format, use {@link #load(GraphTDB, List, boolean)} 
+     *  or {@link #load(DatasetGraphTDB, List, boolean)} 
     */
     public static void load(DatasetGraphTDB dataset, List<String> urls, boolean showProgress)
     {

Modified: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/store/bulkloader/BulkLoader.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/store/bulkloader/BulkLoader.java?rev=1426205&r1=1426204&r2=1426205&view=diff
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/store/bulkloader/BulkLoader.java (original)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/store/bulkloader/BulkLoader.java Thu Dec 27 13:48:15 2012
@@ -22,8 +22,10 @@ import java.io.InputStream ;
 import java.util.List ;
 
 import org.apache.jena.atlas.event.EventType ;
+import org.apache.jena.atlas.lib.Tuple ;
 import org.apache.jena.riot.RDFLanguages ;
 import org.apache.jena.riot.RiotReader ;
+import org.apache.jena.riot.lang.RDFParserOutput ;
 import org.slf4j.Logger ;
 
 import com.hp.hpl.jena.graph.Node ;
@@ -31,6 +33,7 @@ import com.hp.hpl.jena.graph.Triple ;
 import com.hp.hpl.jena.sparql.core.Quad ;
 import com.hp.hpl.jena.sparql.util.Utils ;
 import com.hp.hpl.jena.tdb.TDB ;
+import com.hp.hpl.jena.tdb.TDBException ;
 import com.hp.hpl.jena.tdb.nodetable.NodeTupleTable ;
 import com.hp.hpl.jena.tdb.nodetable.NodeTupleTableView ;
 import com.hp.hpl.jena.tdb.solver.stats.Stats ;
@@ -81,102 +84,102 @@ public class BulkLoader
     /** Load into default graph */
     public static void loadDefaultGraph(DatasetGraphTDB dsg, List<String> urls, boolean showProgress)
     {
-        Destination<Triple> dest = destinationDefaultGraph(dsg, showProgress) ;
+        Destination dest = destinationDefaultGraph(dsg, showProgress) ;
         loadTriples$(dest, urls) ;
     }
 
     /** Load into default graph */
     public static void loadDefaultGraph(DatasetGraphTDB dsg, InputStream input, boolean showProgress)
     {
-        Destination<Triple> dest = destinationDefaultGraph(dsg, showProgress) ;
+        Destination dest = destinationDefaultGraph(dsg, showProgress) ;
         loadTriples$(dest, input) ;
     }
 
-    private static Destination<Triple> destinationDefaultGraph(DatasetGraphTDB dsg, boolean showProgress)
+    private static Destination destinationDefaultGraph(DatasetGraphTDB dsg, boolean showProgress)
     {
         NodeTupleTable ntt = dsg.getTripleTable().getNodeTupleTable() ;
-        return destination(dsg, ntt, showProgress) ;
+        return destinationGraph(dsg, ntt, showProgress) ;
     }
 
     /** Load into named graph */
     public static void loadNamedGraph(DatasetGraphTDB dsg, Node graphNode, List<String> urls, boolean showProgress)
     {
-        Destination<Triple> dest = destinationNamedGraph(dsg, graphNode, showProgress) ;
+        Destination dest = destinationNamedGraph(dsg, graphNode, showProgress) ;
         loadTriples$(dest, urls) ;
     }
     
     /** Load into named graph */
     public static void loadNamedGraph(DatasetGraphTDB dsg, Node graphNode, InputStream input, boolean showProgress)
     {
-        Destination<Triple> dest = destinationNamedGraph(dsg, graphNode, showProgress) ;
+        Destination dest = destinationNamedGraph(dsg, graphNode, showProgress) ;
         loadTriples$(dest, input) ;
     }
 
     /** Load into a dataset */
     public static void loadDataset(DatasetGraphTDB dsg, List<String> urls, boolean showProgress)
     {
-        Destination<Quad> dest = destinationDataset(dsg, showProgress) ;
+        Destination dest = destinationDataset(dsg, showProgress) ;
         loadQuads$(dest, urls) ;
     }
     
     /** Load into a dataset */
     public static void loadDataset(DatasetGraphTDB dsg, InputStream input, boolean showProgress)
     {
-        Destination<Quad> dest = destinationDataset(dsg, showProgress) ;
+        Destination dest = destinationDataset(dsg, showProgress) ;
         loadQuads$(dest, input) ;
     }
     
 
     /** Load into a graph */
-    private static void loadTriples$(Destination<Triple> dest, List<String> urls)
+    private static void loadTriples$(Destination dest, List<String> urls)
     {
-        dest.start() ;
+        dest.startBulk() ;
         for ( String url : urls )
         {
             loadLogger.info("Load: "+url+" -- "+Utils.nowAsString()) ;
-            RiotReader.parseTriples(url, dest) ;
+            RiotReader.parse(url, dest) ;
         }            
-        dest.finish() ;
+        dest.finishBulk() ;
     }
 
     /** Load into a graph */
-    private static void loadTriples$(Destination<Triple> dest, InputStream input)
+    private static void loadTriples$(Destination dest, InputStream input)
     {
         loadLogger.info("Load: from input stream -- "+Utils.nowAsString()) ;
-        dest.start() ;
-        RiotReader.parseTriples(input, RDFLanguages.NTriples, null, dest) ;
-        dest.finish() ;
+        dest.startBulk() ;
+        RiotReader.parse(input, RDFLanguages.NTriples, null, dest) ;
+        dest.finishBulk() ;
     }
     
     /** Load quads into a dataset */
-    private static void loadQuads$(Destination<Quad> dest, List<String> urls)
+    private static void loadQuads$(Destination dest, List<String> urls)
     {
-        dest.start() ;
+        dest.startBulk() ;
         for ( String url : urls )
         {
             loadLogger.info("Load: "+url+" -- "+Utils.nowAsString()) ;
-            RiotReader.parseQuads(url, dest) ;
+            RiotReader.parse(url, dest) ;
         }
-        dest.finish() ;
+        dest.finishBulk() ;
     }
 
     /** Load quads into a dataset */
-    private static void loadQuads$(Destination<Quad> dest, InputStream input)
+    private static void loadQuads$(Destination dest, InputStream input)
     {
         loadLogger.info("Load: from input stream -- "+Utils.nowAsString()) ;
-        dest.start() ;
-        RiotReader.parseQuads(input, RDFLanguages.NQuads, null, dest) ;
-        dest.finish() ;
+        dest.startBulk() ;
+        RiotReader.parse(input, RDFLanguages.NQuads, null, dest) ;
+        dest.finishBulk() ;
     }
     
-    private static Destination<Triple> destinationNamedGraph(DatasetGraphTDB dsg, Node graphName, boolean showProgress)
+    private static Destination destinationNamedGraph(DatasetGraphTDB dsg, Node graphName, boolean showProgress)
     {
         if ( graphName == null )
             return destinationDefaultGraph(dsg,showProgress) ;
         
         NodeTupleTable ntt = dsg.getQuadTable().getNodeTupleTable() ;
         NodeTupleTable ntt2 = new NodeTupleTableView(ntt, graphName) ;
-        return destination(dsg, ntt2, showProgress) ;
+        return destinationGraph(dsg, ntt2, showProgress) ;
     }
 
     public static LoadMonitor createLoadMonitor(DatasetGraphTDB dsg, String itemName, boolean showProgress)
@@ -186,141 +189,193 @@ public class BulkLoader
         else
             return new LoadMonitor(dsg, null, itemName, DataTickPoint, IndexTickPoint) ; 
     }
+
+    interface Destination extends RDFParserOutput
+    {
+        public void startBulk() ;
+        public void finishBulk() ;
+    }
+
+    private static Destination destinationDataset(DatasetGraphTDB dsg, boolean showProgress)
+    {
+        return new DestinationDSG(dsg, showProgress) ;
+    }
     
-    private static Destination<Triple> destination(final DatasetGraphTDB dsg, NodeTupleTable nodeTupleTable, final boolean showProgress)
+    private static Destination destinationGraph(DatasetGraphTDB dsg, NodeTupleTable nodeTupleTable, boolean showProgress)
+    {
+        return new DestinationGraph(dsg, nodeTupleTable, showProgress) ;
+    }
+
+    // Load triples and quads into a dataset.
+    private static final class DestinationDSG implements Destination
     {
-        LoadMonitor monitor = createLoadMonitor(dsg, "triples", showProgress) ;
-        final LoaderNodeTupleTable loaderTriples = new LoaderNodeTupleTable(nodeTupleTable, "triples", monitor) ;
+        final private DatasetGraphTDB dsg ; 
+        final private boolean startedEmpty ;
+        final private LoadMonitor monitor1 ; 
+        final private LoadMonitor monitor2 ;
+        final private LoaderNodeTupleTable loaderTriples ;
+        final private LoaderNodeTupleTable loaderQuads ;
+        final private boolean showProgress ;
+        private long count = 0 ;
+        private StatsCollector stats ;
+    
+        DestinationDSG(final DatasetGraphTDB dsg, boolean showProgress)
+        {
+            this.dsg = dsg ;
+            startedEmpty = dsg.isEmpty() ;
+            monitor1 = createLoadMonitor(dsg, "triples", showProgress) ;
+            monitor2 = createLoadMonitor(dsg, "quads", showProgress) ;
+    
+            loaderTriples = new LoaderNodeTupleTable(dsg.getTripleTable().getNodeTupleTable(), "triples", monitor1) ;
+            loaderQuads = new LoaderNodeTupleTable(dsg.getQuadTable().getNodeTupleTable(), "quads", monitor2) ;
+            this.showProgress = showProgress ;
+        }
         
-        Destination<Triple> sink = new Destination<Triple>() {
-            long count = 0 ;
-            private StatsCollector stats ;
-            private boolean startedEmpty = dsg.isEmpty() ;
-            
-            @Override
-            final public void start()
-            {
-                loaderTriples.loadStart() ;
-                loaderTriples.loadDataStart() ;
-                
-                this.stats = new StatsCollector() ;
-            }
-            @Override
-            final public void send(Triple triple)
+        @Override
+        final public void startBulk()
+        {
+            loaderTriples.loadStart() ;
+            loaderQuads.loadStart() ;
+    
+            loaderTriples.loadDataStart() ;
+            loaderQuads.loadDataStart() ;
+            this.stats = new StatsCollector() ;
+        }
+    
+        @Override
+        public void triple(Triple triple)
+        {
+            Node s = triple.getSubject() ;
+            Node p = triple.getPredicate() ;
+            Node o = triple.getObject() ;
+            process(Quad.tripleInQuad, s, p, o ) ;
+        }
+    
+        @Override
+        public void quad(Quad quad)
+        {
+            Node s = quad.getSubject() ;
+            Node p = quad.getPredicate() ;
+            Node o = quad.getObject() ;
+            Node g = null ;
+            // Union graph?!
+            if ( ! quad.isTriple() && ! quad.isDefaultGraph() )
+                g = quad.getGraph() ;
+            process(g,s,p,o) ;
+        }
+    
+        private void process(Node g, Node s, Node p, Node o)
+        {
+            if ( g == null ) 
+                loaderTriples.load(s, p, o) ;
+            else
+                loaderQuads.load(g, s, p, o) ;
+            count++ ;
+            stats.record(g, s, p, o) ; 
+        }
+    
+        @Override
+        public void finishBulk()
+        {
+            loaderTriples.loadDataFinish() ;
+            loaderQuads.loadDataFinish() ;
+    
+            loaderTriples.loadIndexStart() ;
+            loaderQuads.loadIndexStart() ;
+    
+            loaderTriples.loadIndexFinish() ;
+            loaderQuads.loadIndexFinish() ;
+    
+            loaderTriples.loadFinish() ;
+            loaderQuads.loadFinish() ;
+            if ( ! dsg.getLocation().isMem() && startedEmpty )
             {
-                Node s = triple.getSubject() ;
-                Node p = triple.getPredicate() ;
-                Node o = triple.getObject() ;
-                
-                loaderTriples.load(s, p, o)  ;
-                stats.record(null, s, p, o) ; 
-                
-                count++ ;
+                String filename = dsg.getLocation().getPath(Names.optStats) ;
+                Stats.write(filename, stats) ;
             }
+            forceSync(dsg) ;
+        }
+    
+        @Override
+        public void start()                     {}
+        @Override
+        public void tuple(Tuple<Node> tuple)    { throw new TDBException("Tuple encountered while loading a dataset") ; }
+        @Override
+        public void base(String base)           {}
+        @Override
+        public void prefix(String prefix, String iri)   {} // TODO
+        @Override
+        public void finish()                    {}
+    }
+
+    // Load triples into a specific NodeTupleTable
+    private static final class DestinationGraph implements Destination
+    {
+        final private DatasetGraphTDB dsg ;
+        final private LoadMonitor monitor ;
+        final private LoaderNodeTupleTable loaderTriples ;
+        final private boolean startedEmpty ;
+        private long count = 0 ;
+        private StatsCollector stats ;
 
-            @Override
-            final public void flush() { }
-            @Override
-            public void close() { }
+        DestinationGraph(final DatasetGraphTDB dsg, NodeTupleTable nodeTupleTable, boolean showProgress)
+        {
+            this.dsg = dsg ;
+            startedEmpty = dsg.isEmpty() ;
+            monitor = createLoadMonitor(dsg, "triples", showProgress) ;
+            loaderTriples = new LoaderNodeTupleTable(nodeTupleTable, "triples", monitor) ;
+        }
 
-            @Override
-            final public void finish()
-            {
-                loaderTriples.loadDataFinish() ;
-                loaderTriples.loadIndexStart() ;
-                loaderTriples.loadIndexFinish() ;
-                loaderTriples.loadFinish() ;
-                
-                if ( ! dsg.getLocation().isMem() && startedEmpty )
-                {
-                    String filename = dsg.getLocation().getPath(Names.optStats) ;
-                    Stats.write(filename, stats) ;
-                }
-                
-                forceSync(dsg) ;
-            }
-        } ;
-        return sink ;
-    }
+        @Override
+        final public void startBulk()
+        {
+            loaderTriples.loadStart() ;
+            loaderTriples.loadDataStart() ;
 
-    private static Destination<Quad> destinationDataset(final DatasetGraphTDB dsg, boolean showProgress)
-    {
-        LoadMonitor monitor1 = createLoadMonitor(dsg, "triples", showProgress) ;
-        LoadMonitor monitor2 = createLoadMonitor(dsg, "quads", showProgress) ;
-        
-        final LoaderNodeTupleTable loaderTriples = new LoaderNodeTupleTable(
-                                                                dsg.getTripleTable().getNodeTupleTable(),
-                                                                "triples",
-                                                                monitor1) ;
-        final LoaderNodeTupleTable loaderQuads = new LoaderNodeTupleTable( 
-                                                                 dsg.getQuadTable().getNodeTupleTable(),
-                                                                 "quads",
-                                                                 monitor2) ;
-        Destination<Quad> sink = new Destination<Quad>() {
-            long count = 0 ;
-            private StatsCollector stats ;
-            private boolean startedEmpty = dsg.isEmpty() ;
-            
-            @Override
-            final public void start()
-            {
-                loaderTriples.loadStart() ;
-                loaderQuads.loadStart() ;
+            this.stats = new StatsCollector() ;
+        }
+        @Override
+        final public void triple(Triple triple)
+        {
+            Node s = triple.getSubject() ;
+            Node p = triple.getPredicate() ;
+            Node o = triple.getObject() ;
+
+            loaderTriples.load(s, p, o)  ;
+            stats.record(null, s, p, o) ; 
+            count++ ;
+        }
 
-                loaderTriples.loadDataStart() ;
-                loaderQuads.loadDataStart() ;
-                this.stats = new StatsCollector() ;
-            }
-            
-            @Override
-            final public void send(Quad quad)
-            {
-                Node s = quad.getSubject() ;
-                Node p = quad.getPredicate() ;
-                Node o = quad.getObject() ;
-                Node g = null ;
-                // Union graph?!
-                if ( ! quad.isTriple() && ! quad.isDefaultGraph() )
-                    g = quad.getGraph() ;
-                
-                if ( g == null ) 
-                    loaderTriples.load(s, p, o) ;
-                else
-                    loaderQuads.load(g, s, p, o) ;
-                count++ ;
-                stats.record(g, s, p, o) ; 
-            }
+        @Override
+        final public void finishBulk()
+        {
+            loaderTriples.loadDataFinish() ;
+            loaderTriples.loadIndexStart() ;
+            loaderTriples.loadIndexFinish() ;
+            loaderTriples.loadFinish() ;
 
-            @Override
-            final public void finish()
+            if ( ! dsg.getLocation().isMem() && startedEmpty )
             {
-                loaderTriples.loadDataFinish() ;
-                loaderQuads.loadDataFinish() ;
-                
-                loaderTriples.loadIndexStart() ;
-                loaderQuads.loadIndexStart() ;
-
-                loaderTriples.loadIndexFinish() ;
-                loaderQuads.loadIndexFinish() ;
-
-                loaderTriples.loadFinish() ;
-                loaderQuads.loadFinish() ;
-                if ( ! dsg.getLocation().isMem() && startedEmpty )
-                {
-                    String filename = dsg.getLocation().getPath(Names.optStats) ;
-                    Stats.write(filename, stats) ;
-                }
-                forceSync(dsg) ;
+                String filename = dsg.getLocation().getPath(Names.optStats) ;
+                Stats.write(filename, stats) ;
             }
-            
-            @Override
-            final public void flush() { }
-            @Override
-            final public void close() { }
-        } ;
-        return sink ;
+            forceSync(dsg) ;
+        }
+
+        @Override
+        public void start()                     {}
+        @Override
+        public void quad(Quad quad)             { throw new TDBException("Quad encountered while loading a single graph") ; }
+        @Override
+        public void tuple(Tuple<Node> tuple)    { throw new TDBException("Tuple encountered while loading a single graph") ; }
+        @Override
+        public void base(String base)           { }
+        @Override
+        public void prefix(String prefix, String iri)  { } // TODO
+        @Override
+        public void finish()                    {}
     }
-    
+
     static void forceSync(DatasetGraphTDB dsg)
     {
         // Force sync - we have been bypassing DSG tables.
@@ -329,8 +384,7 @@ public class BulkLoader
         dsg.getQuadTable().getNodeTupleTable().getNodeTable().sync();
         dsg.getQuadTable().getNodeTupleTable().getNodeTable().sync();
         dsg.getPrefixes().getNodeTupleTable().getNodeTable().sync();                
-
-        // This is not enough -- modules whether sync needed.
+        // This is not enough -- modules check whether sync needed.
         dsg.sync() ;
         
     }