You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2012/12/27 14:48:16 UTC
svn commit: r1426205 - in
/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb: TDBLoader.java
store/bulkloader/BulkLoader.java store/bulkloader/Destination.java
Author: andy
Date: Thu Dec 27 13:48:15 2012
New Revision: 1426205
URL: http://svn.apache.org/viewvc?rev=1426205&view=rev
Log:
Connect the bulk loader directly to the output of RIOT parsing.
Removed:
jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/store/bulkloader/Destination.java
Modified:
jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDBLoader.java
jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/store/bulkloader/BulkLoader.java
Modified: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDBLoader.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDBLoader.java?rev=1426205&r1=1426204&r2=1426205&view=diff
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDBLoader.java (original)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDBLoader.java Thu Dec 27 13:48:15 2012
@@ -39,8 +39,8 @@ import com.hp.hpl.jena.tdb.store.bulkloa
public class TDBLoader
{
/** Load the contents of URL into a dataset. URL must name a quads format file (NQuads or TriG - NTriples is also accepted).
- * To a triples format, use @link{#load(GraphTDB, String)}
- * or @link{#loadTriples(DatasetGraphTDB, List<String>, boolean)}
+ * To a triples format, use {@link #load(GraphTDB, String)}
+ * or {@link #load(DatasetGraphTDB, List, boolean)}
*/
public static void load(DatasetGraphTDB dataset, String url)
{
@@ -48,8 +48,8 @@ public class TDBLoader
}
/** Load the contents of URL into a dataset. URL must name a quads format file (NQuads or TriG - NTriples is also accepted).
- * To a triples format, use @link{#load(GraphTDB, String, boolean)}
- * or @link{#loadTriples(DatasetGraphTDB, List<String>, boolean)}
+ * To a triples format, use {@link #load(GraphTDB, String, boolean)}
+ * or {@link #load(DatasetGraphTDB, List, boolean)}
*/
public static void load(DatasetGraphTDB dataset, String url, boolean showProgress)
{
@@ -57,8 +57,8 @@ public class TDBLoader
}
/** Load the contents of URL into a dataset. URL must name a quads format file (NQuads or TriG - NTriples is also accepted).
- * To load a triples format, use @link{#load(GraphTDB, List<String>, boolean)}
- * or @link{#loadTriples(DatasetGraphTDB, List<String>, boolean)}
+ * To load a triples format, use {@link #load(GraphTDB, List, boolean)}
+ * or {@link #load(DatasetGraphTDB, List, boolean)}
*/
public static void load(DatasetGraphTDB dataset, List<String> urls)
{
@@ -66,8 +66,8 @@ public class TDBLoader
}
/** Load the contents of URL into a dataset. URL must name a quads format file (NQuads or TriG - NTriples is also accepted).
- * To load a triples format, use @link{#load(GraphTDB, List<String>, boolean)}
- * or @link{#loadTriples(DatasetGraphTDB, List<String>, boolean)}
+ * To load a triples format, use {@link #load(GraphTDB, List, boolean)}
+ * or {@link #load(DatasetGraphTDB, List, boolean)}
*/
public static void load(DatasetGraphTDB dataset, List<String> urls, boolean showProgress)
{
Modified: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/store/bulkloader/BulkLoader.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/store/bulkloader/BulkLoader.java?rev=1426205&r1=1426204&r2=1426205&view=diff
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/store/bulkloader/BulkLoader.java (original)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/store/bulkloader/BulkLoader.java Thu Dec 27 13:48:15 2012
@@ -22,8 +22,10 @@ import java.io.InputStream ;
import java.util.List ;
import org.apache.jena.atlas.event.EventType ;
+import org.apache.jena.atlas.lib.Tuple ;
import org.apache.jena.riot.RDFLanguages ;
import org.apache.jena.riot.RiotReader ;
+import org.apache.jena.riot.lang.RDFParserOutput ;
import org.slf4j.Logger ;
import com.hp.hpl.jena.graph.Node ;
@@ -31,6 +33,7 @@ import com.hp.hpl.jena.graph.Triple ;
import com.hp.hpl.jena.sparql.core.Quad ;
import com.hp.hpl.jena.sparql.util.Utils ;
import com.hp.hpl.jena.tdb.TDB ;
+import com.hp.hpl.jena.tdb.TDBException ;
import com.hp.hpl.jena.tdb.nodetable.NodeTupleTable ;
import com.hp.hpl.jena.tdb.nodetable.NodeTupleTableView ;
import com.hp.hpl.jena.tdb.solver.stats.Stats ;
@@ -81,102 +84,102 @@ public class BulkLoader
/** Load into default graph */
public static void loadDefaultGraph(DatasetGraphTDB dsg, List<String> urls, boolean showProgress)
{
- Destination<Triple> dest = destinationDefaultGraph(dsg, showProgress) ;
+ Destination dest = destinationDefaultGraph(dsg, showProgress) ;
loadTriples$(dest, urls) ;
}
/** Load into default graph */
public static void loadDefaultGraph(DatasetGraphTDB dsg, InputStream input, boolean showProgress)
{
- Destination<Triple> dest = destinationDefaultGraph(dsg, showProgress) ;
+ Destination dest = destinationDefaultGraph(dsg, showProgress) ;
loadTriples$(dest, input) ;
}
- private static Destination<Triple> destinationDefaultGraph(DatasetGraphTDB dsg, boolean showProgress)
+ private static Destination destinationDefaultGraph(DatasetGraphTDB dsg, boolean showProgress)
{
NodeTupleTable ntt = dsg.getTripleTable().getNodeTupleTable() ;
- return destination(dsg, ntt, showProgress) ;
+ return destinationGraph(dsg, ntt, showProgress) ;
}
/** Load into named graph */
public static void loadNamedGraph(DatasetGraphTDB dsg, Node graphNode, List<String> urls, boolean showProgress)
{
- Destination<Triple> dest = destinationNamedGraph(dsg, graphNode, showProgress) ;
+ Destination dest = destinationNamedGraph(dsg, graphNode, showProgress) ;
loadTriples$(dest, urls) ;
}
/** Load into named graph */
public static void loadNamedGraph(DatasetGraphTDB dsg, Node graphNode, InputStream input, boolean showProgress)
{
- Destination<Triple> dest = destinationNamedGraph(dsg, graphNode, showProgress) ;
+ Destination dest = destinationNamedGraph(dsg, graphNode, showProgress) ;
loadTriples$(dest, input) ;
}
/** Load into a dataset */
public static void loadDataset(DatasetGraphTDB dsg, List<String> urls, boolean showProgress)
{
- Destination<Quad> dest = destinationDataset(dsg, showProgress) ;
+ Destination dest = destinationDataset(dsg, showProgress) ;
loadQuads$(dest, urls) ;
}
/** Load into a dataset */
public static void loadDataset(DatasetGraphTDB dsg, InputStream input, boolean showProgress)
{
- Destination<Quad> dest = destinationDataset(dsg, showProgress) ;
+ Destination dest = destinationDataset(dsg, showProgress) ;
loadQuads$(dest, input) ;
}
/** Load into a graph */
- private static void loadTriples$(Destination<Triple> dest, List<String> urls)
+ private static void loadTriples$(Destination dest, List<String> urls)
{
- dest.start() ;
+ dest.startBulk() ;
for ( String url : urls )
{
loadLogger.info("Load: "+url+" -- "+Utils.nowAsString()) ;
- RiotReader.parseTriples(url, dest) ;
+ RiotReader.parse(url, dest) ;
}
- dest.finish() ;
+ dest.finishBulk() ;
}
/** Load into a graph */
- private static void loadTriples$(Destination<Triple> dest, InputStream input)
+ private static void loadTriples$(Destination dest, InputStream input)
{
loadLogger.info("Load: from input stream -- "+Utils.nowAsString()) ;
- dest.start() ;
- RiotReader.parseTriples(input, RDFLanguages.NTriples, null, dest) ;
- dest.finish() ;
+ dest.startBulk() ;
+ RiotReader.parse(input, RDFLanguages.NTriples, null, dest) ;
+ dest.finishBulk() ;
}
/** Load quads into a dataset */
- private static void loadQuads$(Destination<Quad> dest, List<String> urls)
+ private static void loadQuads$(Destination dest, List<String> urls)
{
- dest.start() ;
+ dest.startBulk() ;
for ( String url : urls )
{
loadLogger.info("Load: "+url+" -- "+Utils.nowAsString()) ;
- RiotReader.parseQuads(url, dest) ;
+ RiotReader.parse(url, dest) ;
}
- dest.finish() ;
+ dest.finishBulk() ;
}
/** Load quads into a dataset */
- private static void loadQuads$(Destination<Quad> dest, InputStream input)
+ private static void loadQuads$(Destination dest, InputStream input)
{
loadLogger.info("Load: from input stream -- "+Utils.nowAsString()) ;
- dest.start() ;
- RiotReader.parseQuads(input, RDFLanguages.NQuads, null, dest) ;
- dest.finish() ;
+ dest.startBulk() ;
+ RiotReader.parse(input, RDFLanguages.NQuads, null, dest) ;
+ dest.finishBulk() ;
}
- private static Destination<Triple> destinationNamedGraph(DatasetGraphTDB dsg, Node graphName, boolean showProgress)
+ private static Destination destinationNamedGraph(DatasetGraphTDB dsg, Node graphName, boolean showProgress)
{
if ( graphName == null )
return destinationDefaultGraph(dsg,showProgress) ;
NodeTupleTable ntt = dsg.getQuadTable().getNodeTupleTable() ;
NodeTupleTable ntt2 = new NodeTupleTableView(ntt, graphName) ;
- return destination(dsg, ntt2, showProgress) ;
+ return destinationGraph(dsg, ntt2, showProgress) ;
}
public static LoadMonitor createLoadMonitor(DatasetGraphTDB dsg, String itemName, boolean showProgress)
@@ -186,141 +189,193 @@ public class BulkLoader
else
return new LoadMonitor(dsg, null, itemName, DataTickPoint, IndexTickPoint) ;
}
+
+ interface Destination extends RDFParserOutput
+ {
+ public void startBulk() ;
+ public void finishBulk() ;
+ }
+
+ private static Destination destinationDataset(DatasetGraphTDB dsg, boolean showProgress)
+ {
+ return new DestinationDSG(dsg, showProgress) ;
+ }
- private static Destination<Triple> destination(final DatasetGraphTDB dsg, NodeTupleTable nodeTupleTable, final boolean showProgress)
+ private static Destination destinationGraph(DatasetGraphTDB dsg, NodeTupleTable nodeTupleTable, boolean showProgress)
+ {
+ return new DestinationGraph(dsg, nodeTupleTable, showProgress) ;
+ }
+
+ // Load triples and quads into a dataset.
+ private static final class DestinationDSG implements Destination
{
- LoadMonitor monitor = createLoadMonitor(dsg, "triples", showProgress) ;
- final LoaderNodeTupleTable loaderTriples = new LoaderNodeTupleTable(nodeTupleTable, "triples", monitor) ;
+ final private DatasetGraphTDB dsg ;
+ final private boolean startedEmpty ;
+ final private LoadMonitor monitor1 ;
+ final private LoadMonitor monitor2 ;
+ final private LoaderNodeTupleTable loaderTriples ;
+ final private LoaderNodeTupleTable loaderQuads ;
+ final private boolean showProgress ;
+ private long count = 0 ;
+ private StatsCollector stats ;
+
+ DestinationDSG(final DatasetGraphTDB dsg, boolean showProgress)
+ {
+ this.dsg = dsg ;
+ startedEmpty = dsg.isEmpty() ;
+ monitor1 = createLoadMonitor(dsg, "triples", showProgress) ;
+ monitor2 = createLoadMonitor(dsg, "quads", showProgress) ;
+
+ loaderTriples = new LoaderNodeTupleTable(dsg.getTripleTable().getNodeTupleTable(), "triples", monitor1) ;
+ loaderQuads = new LoaderNodeTupleTable(dsg.getQuadTable().getNodeTupleTable(), "quads", monitor2) ;
+ this.showProgress = showProgress ;
+ }
- Destination<Triple> sink = new Destination<Triple>() {
- long count = 0 ;
- private StatsCollector stats ;
- private boolean startedEmpty = dsg.isEmpty() ;
-
- @Override
- final public void start()
- {
- loaderTriples.loadStart() ;
- loaderTriples.loadDataStart() ;
-
- this.stats = new StatsCollector() ;
- }
- @Override
- final public void send(Triple triple)
+ @Override
+ final public void startBulk()
+ {
+ loaderTriples.loadStart() ;
+ loaderQuads.loadStart() ;
+
+ loaderTriples.loadDataStart() ;
+ loaderQuads.loadDataStart() ;
+ this.stats = new StatsCollector() ;
+ }
+
+ @Override
+ public void triple(Triple triple)
+ {
+ Node s = triple.getSubject() ;
+ Node p = triple.getPredicate() ;
+ Node o = triple.getObject() ;
+ process(Quad.tripleInQuad, s, p, o ) ;
+ }
+
+ @Override
+ public void quad(Quad quad)
+ {
+ Node s = quad.getSubject() ;
+ Node p = quad.getPredicate() ;
+ Node o = quad.getObject() ;
+ Node g = null ;
+ // Union graph?!
+ if ( ! quad.isTriple() && ! quad.isDefaultGraph() )
+ g = quad.getGraph() ;
+ process(g,s,p,o) ;
+ }
+
+ private void process(Node g, Node s, Node p, Node o)
+ {
+ if ( g == null )
+ loaderTriples.load(s, p, o) ;
+ else
+ loaderQuads.load(g, s, p, o) ;
+ count++ ;
+ stats.record(g, s, p, o) ;
+ }
+
+ @Override
+ public void finishBulk()
+ {
+ loaderTriples.loadDataFinish() ;
+ loaderQuads.loadDataFinish() ;
+
+ loaderTriples.loadIndexStart() ;
+ loaderQuads.loadIndexStart() ;
+
+ loaderTriples.loadIndexFinish() ;
+ loaderQuads.loadIndexFinish() ;
+
+ loaderTriples.loadFinish() ;
+ loaderQuads.loadFinish() ;
+ if ( ! dsg.getLocation().isMem() && startedEmpty )
{
- Node s = triple.getSubject() ;
- Node p = triple.getPredicate() ;
- Node o = triple.getObject() ;
-
- loaderTriples.load(s, p, o) ;
- stats.record(null, s, p, o) ;
-
- count++ ;
+ String filename = dsg.getLocation().getPath(Names.optStats) ;
+ Stats.write(filename, stats) ;
}
+ forceSync(dsg) ;
+ }
+
+ @Override
+ public void start() {}
+ @Override
+ public void tuple(Tuple<Node> tuple) { throw new TDBException("Tuple encountered while loading a dataset") ; }
+ @Override
+ public void base(String base) {}
+ @Override
+ public void prefix(String prefix, String iri) {} // TODO
+ @Override
+ public void finish() {}
+ }
+
+ // Load triples into a specific NodeTupleTable
+ private static final class DestinationGraph implements Destination
+ {
+ final private DatasetGraphTDB dsg ;
+ final private LoadMonitor monitor ;
+ final private LoaderNodeTupleTable loaderTriples ;
+ final private boolean startedEmpty ;
+ private long count = 0 ;
+ private StatsCollector stats ;
- @Override
- final public void flush() { }
- @Override
- public void close() { }
+ DestinationGraph(final DatasetGraphTDB dsg, NodeTupleTable nodeTupleTable, boolean showProgress)
+ {
+ this.dsg = dsg ;
+ startedEmpty = dsg.isEmpty() ;
+ monitor = createLoadMonitor(dsg, "triples", showProgress) ;
+ loaderTriples = new LoaderNodeTupleTable(nodeTupleTable, "triples", monitor) ;
+ }
- @Override
- final public void finish()
- {
- loaderTriples.loadDataFinish() ;
- loaderTriples.loadIndexStart() ;
- loaderTriples.loadIndexFinish() ;
- loaderTriples.loadFinish() ;
-
- if ( ! dsg.getLocation().isMem() && startedEmpty )
- {
- String filename = dsg.getLocation().getPath(Names.optStats) ;
- Stats.write(filename, stats) ;
- }
-
- forceSync(dsg) ;
- }
- } ;
- return sink ;
- }
+ @Override
+ final public void startBulk()
+ {
+ loaderTriples.loadStart() ;
+ loaderTriples.loadDataStart() ;
- private static Destination<Quad> destinationDataset(final DatasetGraphTDB dsg, boolean showProgress)
- {
- LoadMonitor monitor1 = createLoadMonitor(dsg, "triples", showProgress) ;
- LoadMonitor monitor2 = createLoadMonitor(dsg, "quads", showProgress) ;
-
- final LoaderNodeTupleTable loaderTriples = new LoaderNodeTupleTable(
- dsg.getTripleTable().getNodeTupleTable(),
- "triples",
- monitor1) ;
- final LoaderNodeTupleTable loaderQuads = new LoaderNodeTupleTable(
- dsg.getQuadTable().getNodeTupleTable(),
- "quads",
- monitor2) ;
- Destination<Quad> sink = new Destination<Quad>() {
- long count = 0 ;
- private StatsCollector stats ;
- private boolean startedEmpty = dsg.isEmpty() ;
-
- @Override
- final public void start()
- {
- loaderTriples.loadStart() ;
- loaderQuads.loadStart() ;
+ this.stats = new StatsCollector() ;
+ }
+ @Override
+ final public void triple(Triple triple)
+ {
+ Node s = triple.getSubject() ;
+ Node p = triple.getPredicate() ;
+ Node o = triple.getObject() ;
+
+ loaderTriples.load(s, p, o) ;
+ stats.record(null, s, p, o) ;
+ count++ ;
+ }
- loaderTriples.loadDataStart() ;
- loaderQuads.loadDataStart() ;
- this.stats = new StatsCollector() ;
- }
-
- @Override
- final public void send(Quad quad)
- {
- Node s = quad.getSubject() ;
- Node p = quad.getPredicate() ;
- Node o = quad.getObject() ;
- Node g = null ;
- // Union graph?!
- if ( ! quad.isTriple() && ! quad.isDefaultGraph() )
- g = quad.getGraph() ;
-
- if ( g == null )
- loaderTriples.load(s, p, o) ;
- else
- loaderQuads.load(g, s, p, o) ;
- count++ ;
- stats.record(g, s, p, o) ;
- }
+ @Override
+ final public void finishBulk()
+ {
+ loaderTriples.loadDataFinish() ;
+ loaderTriples.loadIndexStart() ;
+ loaderTriples.loadIndexFinish() ;
+ loaderTriples.loadFinish() ;
- @Override
- final public void finish()
+ if ( ! dsg.getLocation().isMem() && startedEmpty )
{
- loaderTriples.loadDataFinish() ;
- loaderQuads.loadDataFinish() ;
-
- loaderTriples.loadIndexStart() ;
- loaderQuads.loadIndexStart() ;
-
- loaderTriples.loadIndexFinish() ;
- loaderQuads.loadIndexFinish() ;
-
- loaderTriples.loadFinish() ;
- loaderQuads.loadFinish() ;
- if ( ! dsg.getLocation().isMem() && startedEmpty )
- {
- String filename = dsg.getLocation().getPath(Names.optStats) ;
- Stats.write(filename, stats) ;
- }
- forceSync(dsg) ;
+ String filename = dsg.getLocation().getPath(Names.optStats) ;
+ Stats.write(filename, stats) ;
}
-
- @Override
- final public void flush() { }
- @Override
- final public void close() { }
- } ;
- return sink ;
+ forceSync(dsg) ;
+ }
+
+ @Override
+ public void start() {}
+ @Override
+ public void quad(Quad quad) { throw new TDBException("Quad encountered while loading a single graph") ; }
+ @Override
+ public void tuple(Tuple<Node> tuple) { throw new TDBException("Tuple encountered while loading a single graph") ; }
+ @Override
+ public void base(String base) { }
+ @Override
+ public void prefix(String prefix, String iri) { } // TODO
+ @Override
+ public void finish() {}
}
-
+
static void forceSync(DatasetGraphTDB dsg)
{
// Force sync - we have been bypassing DSG tables.
@@ -329,8 +384,7 @@ public class BulkLoader
dsg.getQuadTable().getNodeTupleTable().getNodeTable().sync();
dsg.getQuadTable().getNodeTupleTable().getNodeTable().sync();
dsg.getPrefixes().getNodeTupleTable().getNodeTable().sync();
-
- // This is not enough -- modules whether sync needed.
+ // This is not enough -- modules check whether sync needed.
dsg.sync() ;
}