You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by rv...@apache.org on 2014/11/27 18:28:13 UTC

[2/2] jena git commit: Initial workaround for JENA-820

Initial workaround for JENA-820

Currently only works correctly when the output is line based, need to
add more test cases and research further into how to implement this for
block and whole file based modes.


Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/b65a26f4
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/b65a26f4
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/b65a26f4

Branch: refs/heads/hadoop-rdf
Commit: b65a26f445ef8c840c150ce19855e17f3e7ca5a6
Parents: 22995bb
Author: Rob Vesse <rv...@apache.org>
Authored: Thu Nov 27 17:27:11 2014 +0000
Committer: Rob Vesse <rv...@apache.org>
Committed: Thu Nov 27 17:27:11 2014 +0000

----------------------------------------------------------------------
 .../jena/hadoop/rdf/io/RdfIOConstants.java      | 101 +++++++-----
 .../hadoop/rdf/io/input/util/RdfIOUtils.java    | 154 ++++++++++---------
 .../hadoop/rdf/io/TestBlankNodeDivergence.java  |   8 +-
 3 files changed, 151 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/b65a26f4/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java
index 3b062c2..27c2bb2 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java
@@ -16,41 +16,66 @@
  * limitations under the License.
  */
 
-package org.apache.jena.hadoop.rdf.io;
-
-import java.io.IOException;
-
-/**
- * RDF IO related constants
- * 
- * 
- * 
- */
-public class RdfIOConstants {
-
-    /**
-     * Private constructor prevents instantiation
-     */
-    private RdfIOConstants() {
-    }
-
-    /**
-     * Configuration key used to set whether bad tuples are ignored. This is the
-     * default behaviour, when explicitly set to {@code false} bad tuples will
-     * result in {@link IOException} being thrown by the relevant record
-     * readers.
-     */
-    public static final String INPUT_IGNORE_BAD_TUPLES = "rdf.io.input.ignore-bad-tuples";
-
-    /**
-     * Configuration key used to set the batch size used for RDF output formats
-     * that take a batched writing approach. Default value is given by the
-     * constant {@link #DEFAULT_OUTPUT_BATCH_SIZE}.
-     */
-    public static final String OUTPUT_BATCH_SIZE = "rdf.io.output.batch-size";
-
-    /**
-     * Default batch size for batched output formats
-     */
-    public static final long DEFAULT_OUTPUT_BATCH_SIZE = 10000;
-}
+package org.apache.jena.hadoop.rdf.io;
+
+import java.io.IOException;
+
+/**
+ * RDF IO related constants
+ * 
+ * 
+ * 
+ */
+public class RdfIOConstants {
+
+    /**
+     * Private constructor prevents instantiation
+     */
+    private RdfIOConstants() {
+    }
+
+    /**
+     * Configuration key used to set whether bad tuples are ignored. This is the
+     * default behaviour, when explicitly set to {@code false} bad tuples will
+     * result in {@link IOException} being thrown by the relevant record
+     * readers.
+     */
+    public static final String INPUT_IGNORE_BAD_TUPLES = "rdf.io.input.ignore-bad-tuples";
+
+    /**
+     * Configuration key used to set the batch size used for RDF output formats
+     * that take a batched writing approach. Default value is given by the
+     * constant {@link #DEFAULT_OUTPUT_BATCH_SIZE}.
+     */
+    public static final String OUTPUT_BATCH_SIZE = "rdf.io.output.batch-size";
+
+    /**
+     * Default batch size for batched output formats
+     */
+    public static final long DEFAULT_OUTPUT_BATCH_SIZE = 10000;
+
+    /**
+     * Configuration key used to control behaviour with regards to how blank
+     * nodes are handled.
+     * <p>
+     * The default behaviour is that blank nodes are file scoped which is what
+     * the RDF specifications require.
+     * </p>
+     * <p>
+     * However in the case of a multi-stage pipeline this behaviour can cause
+     * blank nodes to diverge over several jobs and introduce spurious blank
+     * nodes over time. This is described in <a
+     * href="https://issues.apache.org/jira/browse/JENA-820">JENA-820</a> and
+     * enabling this flag for jobs in your pipeline allow you to work around
+     * this problem.
+     * </p>
+     * <h3>Warning</h3> You should only enable this flag for jobs that take in
+     * RDF output originating from previous jobs since our normal blank node
+     * allocation policy ensures that blank nodes will be file scoped and unique
+     * over all files (barring unfortunate hasing collisions). If you enable
+     * this for jobs that take in RDF originating from other sources you may
+     * incorrectly conflate blank nodes that are supposed to distinct and
+     * separate nodes.
+     */
+    public static final String GLOBAL_BNODE_IDENTITY = "rdf.io.input.bnodes.global-identity";
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/b65a26f4/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/RdfIOUtils.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/RdfIOUtils.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/RdfIOUtils.java
index 06567e5..372b22c 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/RdfIOUtils.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/RdfIOUtils.java
@@ -16,74 +16,86 @@
  * limitations under the License.
  */
 
-package org.apache.jena.hadoop.rdf.io.input.util;
-
-import java.util.UUID;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.jena.riot.lang.LabelToNode;
-import org.apache.jena.riot.system.ErrorHandlerFactory;
-import org.apache.jena.riot.system.IRIResolver;
-import org.apache.jena.riot.system.ParserProfile;
-import org.apache.jena.riot.system.ParserProfileBase;
-import org.apache.jena.riot.system.Prologue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * RDF IO utility functions
- * 
- * 
- * 
- */
-public class RdfIOUtils {
-    private static final Logger LOGGER = LoggerFactory.getLogger(RdfIOUtils.class);
-
-    /**
-     * Private constructor prevents instantiation
-     */
-    private RdfIOUtils() {
-    }
-
-    /**
-     * Creates a parser profile for the given job context
-     * 
-     * @param context
-     *            Context
-     * @param path
-     *            File path
-     * @return Parser profile
-     */
-    public static ParserProfile createParserProfile(JobContext context, Path path) {
-        Prologue prologue = new Prologue(null, IRIResolver.createNoResolve());
-        UUID seed = RdfIOUtils.getSeed(context, path);
-        LabelToNode labelMapping = LabelToNode.createScopeByDocumentHash(seed);
-        return new ParserProfileBase(prologue, ErrorHandlerFactory.errorHandlerStd, labelMapping);
-    }
-
-    /**
-     * Selects a seed for use in generating blank node identifiers
-     * 
-     * @param context
-     *            Job Context
-     * @param path
-     *            File path
-     * @return Seed
-     */
-    public static UUID getSeed(JobContext context, Path path) {
-        // This is to ensure that blank node allocation policy is constant when
-        // subsequent MapReduce jobs need that
-        String jobId = context.getJobID().toString();
-        if (jobId == null) {
-            jobId = String.valueOf(System.currentTimeMillis());
-            LOGGER.warn(
-                    "Job ID was not set, using current milliseconds of {}. Sequence of MapReduce jobs must carefully handle blank nodes.",
-                    jobId);
-        }
-        LOGGER.debug("Generating Blank Node Seed from Job Details (ID={}, Input Path={})", jobId, path);
-
-        // Form a reproducible seed for the run
-        return new UUID(jobId.hashCode(), path.hashCode());
-    }
-}
+package org.apache.jena.hadoop.rdf.io.input.util;
+
+import java.util.UUID;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.riot.lang.LabelToNode;
+import org.apache.jena.riot.system.ErrorHandlerFactory;
+import org.apache.jena.riot.system.IRIResolver;
+import org.apache.jena.riot.system.ParserProfile;
+import org.apache.jena.riot.system.ParserProfileBase;
+import org.apache.jena.riot.system.Prologue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RDF IO utility functions
+ * 
+ * 
+ * 
+ */
+public class RdfIOUtils {
+    private static final Logger LOGGER = LoggerFactory.getLogger(RdfIOUtils.class);
+
+    /**
+     * Private constructor prevents instantiation
+     */
+    private RdfIOUtils() {
+    }
+
+    /**
+     * Creates a parser profile for the given job context
+     * 
+     * @param context
+     *            Context
+     * @param path
+     *            File path
+     * @return Parser profile
+     */
+    public static ParserProfile createParserProfile(JobContext context, Path path) {
+        Prologue prologue = new Prologue(null, IRIResolver.createNoResolve());
+        UUID seed = RdfIOUtils.getSeed(context, path);
+        LabelToNode labelMapping = LabelToNode.createScopeByDocumentHash(seed);
+        return new ParserProfileBase(prologue, ErrorHandlerFactory.errorHandlerStd, labelMapping);
+    }
+
+    /**
+     * Selects a seed for use in generating blank node identifiers
+     * 
+     * @param context
+     *            Job Context
+     * @param path
+     *            File path
+     * @return Seed
+     */
+    public static UUID getSeed(JobContext context, Path path) {
+        // This is to ensure that blank node allocation policy is constant when
+        // subsequent MapReduce jobs need that
+        String jobId = context.getJobID().toString();
+        if (jobId == null) {
+            jobId = String.valueOf(System.currentTimeMillis());
+            LOGGER.warn(
+                    "Job ID was not set, using current milliseconds of {}. Sequence of MapReduce jobs must carefully handle blank nodes.",
+                    jobId);
+        }
+
+        if (!context.getConfiguration().getBoolean(RdfIOConstants.GLOBAL_BNODE_IDENTITY, false)) {
+            // Using normal file scoped blank node allocation
+            LOGGER.debug("Generating Blank Node Seed from Job Details (ID={}, Input Path={})", jobId, path);
+
+            // Form a reproducible seed for the run
+            return new UUID(jobId.hashCode(), path.hashCode());
+        } else {
+            // Using globally scoped blank node allocation
+            LOGGER.warn(
+                    "Using globally scoped blank node allocation policy from Job Details (ID={}) - this is unsafe if your RDF inputs did not originate from a previous job",
+                    jobId);
+            
+            return new UUID(jobId.hashCode(), 0);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/b65a26f4/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/TestBlankNodeDivergence.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/TestBlankNodeDivergence.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/TestBlankNodeDivergence.java
index c3d2720..11898dd 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/TestBlankNodeDivergence.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/TestBlankNodeDivergence.java
@@ -71,7 +71,7 @@ public class TestBlankNodeDivergence {
     }
 
     @Test
-    @Ignore
+    //@Ignore
     // Ignored due to JENA-820, serves as a test case that demonstrates the
     // issue, once a workaround is available this test can be enabled and should
     // pass
@@ -127,12 +127,14 @@ public class TestBlankNodeDivergence {
             // intermediate outputs
             // As described in JENA-820 at this point the blank nodes are
             // consistent, however when we read them from different files they
-            // get treated as different nodes and so the blank nodes diverge
-            // which is incorrect behaviour
+            // by default get treated as different nodes and so the blank nodes
+            // diverge which is incorrect and undesirable behaviour in
+            // multi-stage pipelines
             System.out.println(intermediateOutputDir.getAbsolutePath());
             job = Job.getInstance(config);
             job.setInputFormatClass(NTriplesInputFormat.class);
             FileInputFormat.setInputPaths(job, new Path(intermediateOutputDir.getAbsolutePath()));
+            job.getConfiguration().setBoolean(RdfIOConstants.GLOBAL_BNODE_IDENTITY, true); // JENA-820 workaround
             context = new JobContextImpl(job.getConfiguration(), job.getJobID());
 
             // Get the splits