You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2022/03/13 15:01:26 UTC

[jena] branch main updated: JENA-2308: Load triples from stdin into a named graph

This is an automated email from the ASF dual-hosted git repository.

andy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/jena.git


The following commit(s) were added to refs/heads/main by this push:
     new e075c56  JENA-2308: Load triples from stdin into a named graph
     new fd98950  Merge pull request #1223 from afs/tdbloader-graph
e075c56 is described below

commit e075c5642b618013ecc7b7469514201d3e5242f1
Author: Andy Seaborne <an...@apache.org>
AuthorDate: Fri Mar 11 22:14:08 2022 +0000

    JENA-2308: Load triples from stdin into a named graph
---
 jena-cmds/src/main/java/tdb2/tdbloader.java        | 43 +++++++++++++---------
 .../org/apache/jena/tdb2/loader/DataLoader.java    | 34 +++++++++++------
 .../apache/jena/tdb2/loader/base/LoaderBase.java   | 12 ++++++
 .../apache/jena/tdb2/loader/base/LoaderOps.java    | 18 ++++++++-
 4 files changed, 77 insertions(+), 30 deletions(-)

diff --git a/jena-cmds/src/main/java/tdb2/tdbloader.java b/jena-cmds/src/main/java/tdb2/tdbloader.java
index 9f6036f..09229cc 100644
--- a/jena-cmds/src/main/java/tdb2/tdbloader.java
+++ b/jena-cmds/src/main/java/tdb2/tdbloader.java
@@ -34,8 +34,6 @@ import org.apache.jena.graph.NodeFactory;
 import org.apache.jena.query.ARQ;
 import org.apache.jena.riot.Lang;
 import org.apache.jena.riot.RDFLanguages;
-import org.apache.jena.riot.RDFParser;
-import org.apache.jena.riot.system.StreamRDF;
 import org.apache.jena.sparql.core.DatasetGraph;
 import org.apache.jena.system.Txn;
 import org.apache.jena.system.progress.MonitorOutput;
@@ -148,7 +146,10 @@ public class tdbloader extends CmdTDBGraph {
             }
         }
 
-        loadTriples(graphName, urls);
+        if ( urls.size() == 0 )
+            loadTriplesStdin();
+        else
+            loadTriples(graphName, urls);
     }
 
     // Check files exists before starting.
@@ -169,34 +170,42 @@ public class tdbloader extends CmdTDBGraph {
         }
     }
 
-    private void loadTriples(String graphName, List<String> urls) {
-        execBulkLoad(super.getDatasetGraph(), graphName, urls, showProgress);
-    }
-
     private void loadQuads(List<String> urls) {
         // generateStats
         execBulkLoad(super.getDatasetGraph(), null, urls, showProgress);
     }
 
-    private void loadQuadsStdin() {
-        DataLoader loader = chooseLoader(super.getDatasetGraph(), graphName);
-        StreamRDF dest = loader.stream();
-        if ( lang == null )
-            lang = Lang.NQUADS;
-        RDFParser parser = RDFParser.create().lang(lang).source(System.in).build();
+    private void loadTriples(String graphName, List<String> urls) {
+        execBulkLoad(super.getDatasetGraph(), graphName, urls, showProgress);
+    }
+
+    private long execBulkLoad(DatasetGraph dsg, String graphName, List<String> urls, boolean showProgress) {
+        DataLoader loader = chooseLoader(dsg, graphName);
         long elapsed = Timer.time(()->{
                     loader.startBulk();
-                    parser.parse(dest);
+                    loader.load(urls);
                     loader.finishBulk();
         });
-        //return elapsed;
+        return elapsed;
     }
 
-    private long execBulkLoad(DatasetGraph dsg, String graphName, List<String> urls, boolean showProgress) {
+    private long loadQuadsStdin() {
+        Lang parseLang = ( lang != null ) ? lang : Lang.NQUADS;
+        long elapsed = execBulkLoadStdin(super.getDatasetGraph(), null, parseLang, showProgress);
+        return elapsed;
+    }
+
+    private long loadTriplesStdin() {
+        Lang parseLang = ( lang != null ) ? lang : Lang.NTRIPLES;
+        long elapsed = execBulkLoadStdin(super.getDatasetGraph(), graphName, parseLang, showProgress);
+        return elapsed;
+    }
+
+    private long execBulkLoadStdin(DatasetGraph dsg, String graphName, Lang syntax, boolean showProgress) {
         DataLoader loader = chooseLoader(dsg, graphName);
         long elapsed = Timer.time(()->{
                     loader.startBulk();
-                    loader.load(urls);
+                    loader.loadFromInputStream("(stdin)", System.in, syntax);
                     loader.finishBulk();
         });
         return elapsed;
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/DataLoader.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/DataLoader.java
index 3f31ca8..b231645 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/DataLoader.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/DataLoader.java
@@ -18,19 +18,21 @@
 
 package org.apache.jena.tdb2.loader;
 
+import java.io.InputStream;
 import java.util.Arrays;
 import java.util.List;
 
 import org.apache.jena.dboe.base.block.FileMode;
+import org.apache.jena.riot.Lang;
 import org.apache.jena.riot.system.StreamRDF;
 
 /**
  * Bulk loaders improve the loading of data into datasets. Each bulk loader has
- * consequences in achiving its improvements, including in some cases locking out all
- * other access to the daatset while the loading is underway.
+ * consequences in achieving its improvements, including in some cases locking out all
+ * other access to the dataset while the loading is underway.
  * <p>
  * Finding the best loader to use takes experimentation.
- * Loading speed depends on hardware, partcularly for the parallel bulk loader.
+ * Loading speed depends on hardware, particularly for the parallel bulk loader.
  * <p>
  * Giving a loader more heap space does <em>not</em> improve performance, and will likely decrease it.
  * All loaders use OS file system caching, not in JVM caches (except when run in {@link FileMode#direct direct file mode}s
@@ -38,7 +40,7 @@ import org.apache.jena.riot.system.StreamRDF;
  *
  * <h4>basic</h4>
  * The basic loader is full transactional and good for incrementally adding data up to a few million triples/quads
- * to large datasets and doe sssot max up the hardware so it is suitable for runtime operation at larger scales.
+ * to large datasets and does not max out the hardware so it is suitable for runtime operation at larger scales.
  *
  * <h4>sequential</h4>
  * A fully transactional loader that loads the primary indexes then does multiple passes to load the secondary indexes.
@@ -47,7 +49,7 @@ import org.apache.jena.riot.system.StreamRDF;
  *
  * <h4>phased</h4>
  * The phased loader use some multiple threads to process data and to index the {@code DatasetGraph}.
- * It proceeds by loading data into theprimaryindexes, then, separately, builds the other indexes.
+ * It proceeds by loading data into the primary indexes, then, separately, builds the other indexes.
  * Loading is not fully transaction-safe in the presence of persistent
  * storage problems or a JVM/machine crash when finishing writing.
  * Otherwise it is transactional.
@@ -57,8 +59,8 @@ import org.apache.jena.riot.system.StreamRDF;
  * Loading is not fully transaction-safe in the presence of persistent
  * storage problems or a JVM/machine crash when finishing writing.
  * Otherwise it is transactional.
- * Because it uses many threads to write to peristsne storage,
- * it can interfer with performance of other applications on the machine it is run on.
+ * Because it uses many threads to write to persistent storage,
+ * it can interfere with performance of other applications on the machine it is run on.
  *
  * <h4>{@code DataLoader} API</h4>
  *
@@ -93,21 +95,31 @@ public interface DataLoader {
     public void finishBulk();
 
     /**
-     * Alternative finish in case soekthgin went wrong.
+     * Alternative finish in case something went wrong.
      * This operation attempts to clear up and abort the changes.
      * If there was a file system problem with the {@code DatasetGraph} being
      * loaded, then recovery may not have been possible.
-     * The ability of loaders to clearup is implementation specific.
+     * The ability of loaders to cleanup is implementation specific.
      */
     public void finishException(Exception ex);
 
-    /** Load files with syntax given by the file name extension,
+    /**
+     * Load files with syntax given by the file name extension,
      * or URLs, with content negotiation.
      * @param filenames
      */
     public void load(List<String> filenames);
 
-    /** Load files with syntax given by the file name extension,
+    /**
+     * Load from an {@link InputStream} with the given syntax.
+     * @param label Label for progress monitor
+     * @param input
+     * @param syntax
+     */
+    public void loadFromInputStream(String label, InputStream input, Lang syntax);
+
+    /**
+     * Load files with syntax given by the file name extension,
      * or URLs, with content negotiation.
      * @param filenames
      */
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderBase.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderBase.java
index 762c51e..4a847c4 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderBase.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderBase.java
@@ -18,11 +18,13 @@
 
 package org.apache.jena.tdb2.loader.base;
 
+import java.io.InputStream;
 import java.util.List;
 
 import org.apache.jena.atlas.lib.Timer;
 import org.apache.jena.graph.Node;
 import org.apache.jena.query.TxnType;
+import org.apache.jena.riot.Lang;
 import org.apache.jena.sparql.core.DatasetGraph;
 import org.apache.jena.system.progress.MonitorOutput;
 import org.apache.jena.system.progress.ProgressMonitor;
@@ -116,6 +118,16 @@ public abstract class LoaderBase implements DataLoader {
         }
     }
 
+    @Override
+    public void loadFromInputStream(String label, InputStream input, Lang syntax) {
+        ProgressMonitor monitor = createProgressMonitor(output);
+        monitor.startMessage("Start: "+label);
+        monitor.start();
+        LoaderOps.inputStream(stream(), input, syntax, monitor);
+        monitor.finish();
+        monitor.finishMessage("Finished: "+label);
+    }
+
     protected abstract ProgressMonitor createProgressMonitor(MonitorOutput output);
 
     /** Subclasses must provide a setting. */
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderOps.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderOps.java
index a9d0a2e..499d9f2 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderOps.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/LoaderOps.java
@@ -18,6 +18,7 @@
 
 package org.apache.jena.tdb2.loader.base;
 
+import java.io.InputStream;
 import java.util.Iterator;
 import java.util.Objects;
 
@@ -30,6 +31,7 @@ import org.apache.jena.dboe.trans.bplustree.BPlusTree;
 import org.apache.jena.dboe.trans.data.TransBinaryDataFile;
 import org.apache.jena.graph.Node;
 import org.apache.jena.graph.Triple;
+import org.apache.jena.riot.Lang;
 import org.apache.jena.riot.RDFParser;
 import org.apache.jena.riot.system.StreamRDF;
 import org.apache.jena.riot.system.StreamRDFWrapper;
@@ -99,14 +101,26 @@ public class LoaderOps {
      * "no output".
      */
     public static void inputFile(StreamRDF sink, String source, ProgressMonitor monitor) {
-        if ( monitor != null ) {
+        if ( monitor != null )
             sink = new ProgressStreamRDF(sink, monitor);
-        }
         sink.start();
         RDFParser.source(source).parse(sink);
         sink.finish();
     }
 
+    /**
+     * Parse one file, with an optional progress monitor. Pass null to {@code monitor} for
+     * "no output".
+     * @param syntax
+     */
+    public static void inputStream(StreamRDF sink, InputStream input, Lang syntax, ProgressMonitor monitor) {
+        if ( monitor != null )
+            sink = new ProgressStreamRDF(sink, monitor);
+        sink.start();
+        RDFParser.source(input).lang(syntax).parse(sink);
+        sink.finish();
+    }
+
     /** Copy a stream to several indexes (sequential version) */
     public static void copyIndex(Iterator<Tuple<NodeId>> srcIter, TupleIndex[] destIndexes, ProgressMonitor monitor) {
         long counter = 0;