You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by rv...@apache.org on 2015/01/05 16:07:13 UTC

[05/52] [abbrv] jena git commit: Rebrand to Jena Elephas per community vote

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractBlankNodeTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractBlankNodeTests.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractBlankNodeTests.java
new file mode 100644
index 0000000..4bb0939
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractBlankNodeTests.java
@@ -0,0 +1,636 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.hadoop.rdf.io.input.bnodes;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.FileAttribute;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
+import org.apache.jena.riot.system.ParserProfile;
+import org.apache.log4j.BasicConfigurator;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.NodeFactory;
+
+/**
+ * Test case that embodies the scenario described in JENA-820
+ */
+@SuppressWarnings("unused")
+public abstract class AbstractBlankNodeTests<T, TValue extends AbstractNodeTupleWritable<T>> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBlankNodeTests.class);
+
+    @BeforeClass
+    public static void setup() {
+        // Enable if you need to diagnose test failures
+        // Useful since it includes printing the file names of the temporary
+        // files being used
+        // BasicConfigurator.resetConfiguration();
+        // BasicConfigurator.configure();
+    }
+
+    /**
+     * Gets the extension for the initial input files
+     * 
+     * @return Extension including the {@code .}
+     */
+    protected abstract String getInitialInputExtension();
+
+    /**
+     * Creates a tuple
+     * 
+     * @param s
+     *            Subject
+     * @param p
+     *            Predicate
+     * @param o
+     *            Object
+     * @return Tuple
+     */
+    protected abstract T createTuple(Node s, Node p, Node o);
+
+    /**
+     * Writes out the given tuples to the given file
+     * 
+     * @param f
+     *            File
+     * @param tuples
+     *            Tuples
+     * @throws FileNotFoundException
+     */
+    protected abstract void writeTuples(File f, List<T> tuples) throws FileNotFoundException;
+
+    /**
+     * Creates the input format for reading the initial inputs
+     * 
+     * @return Input format
+     */
+    protected abstract InputFormat<LongWritable, TValue> createInitialInputFormat();
+
+    /**
+     * Creates the output format for writing the intermediate output
+     * 
+     * @return Output format
+     */
+    protected abstract OutputFormat<LongWritable, TValue> createIntermediateOutputFormat();
+
+    /**
+     * Creates the input format for reading the intermediate outputs back in
+     * 
+     * @return Input format
+     */
+    protected abstract InputFormat<LongWritable, TValue> createIntermediateInputFormat();
+
+    /**
+     * Gets the subject of the tuple
+     * 
+     * @param value
+     *            Tuple
+     * @return Subject
+     */
+    protected abstract Node getSubject(T value);
+
+    /**
+     * Gets whether the format being tested respects the RIOT
+     * {@link ParserProfile}
+     * 
+     * @return True if parser profile is respected, false otherwise
+     */
+    protected boolean respectsParserProfile() {
+        return true;
+    }
+
+    /**
+     * Gets whether the format being tested preserves blank node identity
+     * 
+     * @return True if identity is presereved, false otherwise
+     */
+    protected boolean preservesBlankNodeIdentity() {
+        return false;
+    }
+
+    /**
+     * Test that starts with two blank nodes with the same identity in a single
+     * file, splits them over two files and checks that we can workaround
+     * JENA-820 successfully by setting the
+     * {@link RdfIOConstants#GLOBAL_BNODE_IDENTITY} flag for our subsequent job
+     * 
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void blank_node_divergence_01() throws IOException, InterruptedException {
+        Assume.assumeTrue("Requires ParserProfile be respected", this.respectsParserProfile());
+        Assume.assumeFalse("Requires that Blank Node identity not be preserved", this.preservesBlankNodeIdentity());
+        
+        // Temporary files
+        File a = File.createTempFile("bnode_divergence", getInitialInputExtension());
+        File intermediateOutputDir = Files.createTempDirectory("bnode_divergence", new FileAttribute[0]).toFile();
+
+        try {
+            // Prepare the input data
+            // Two mentions of the same blank node in the same file
+            List<T> tuples = new ArrayList<>();
+            Node bnode = NodeFactory.createAnon();
+            Node pred = NodeFactory.createURI("http://example.org/predicate");
+            tuples.add(createTuple(bnode, pred, NodeFactory.createLiteral("first")));
+            tuples.add(createTuple(bnode, pred, NodeFactory.createLiteral("second")));
+            writeTuples(a, tuples);
+
+            // Set up fake job which will process the file as a single split
+            Configuration config = new Configuration(true);
+            InputFormat<LongWritable, TValue> inputFormat = createInitialInputFormat();
+            Job job = Job.getInstance(config);
+            job.setInputFormatClass(inputFormat.getClass());
+            NLineInputFormat.setNumLinesPerSplit(job, 100);
+            FileInputFormat.setInputPaths(job, new Path(a.getAbsolutePath()));
+            FileOutputFormat.setOutputPath(job, new Path(intermediateOutputDir.getAbsolutePath()));
+            JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+
+            // Get the splits
+            List<InputSplit> splits = inputFormat.getSplits(context);
+            Assert.assertEquals(1, splits.size());
+
+            for (InputSplit split : splits) {
+                // Initialize the input reading
+                TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+                        createAttemptID(1, 1, 1));
+                RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext);
+                reader.initialize(split, inputTaskContext);
+
+                // Copy the input to the output - each triple goes to a separate
+                // output file
+                // This is how we force multiple files to be produced
+                int taskID = 1;
+                while (reader.nextKeyValue()) {
+                    // Prepare the output writing
+                    OutputFormat<LongWritable, TValue> outputFormat = createIntermediateOutputFormat();
+                    TaskAttemptContext outputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+                            createAttemptID(1, ++taskID, 1));
+                    RecordWriter<LongWritable, TValue> writer = outputFormat.getRecordWriter(outputTaskContext);
+
+                    writer.write(reader.getCurrentKey(), reader.getCurrentValue());
+                    writer.close(outputTaskContext);
+                }
+            }
+
+            // Promote outputs from temporary status
+            promoteInputs(intermediateOutputDir);
+
+            // Now we need to create a subsequent job that reads the
+            // intermediate outputs
+            // As described in JENA-820 at this point the blank nodes are
+            // consistent, however when we read them from different files they
+            // by default get treated as different nodes and so the blank nodes
+            // diverge which is incorrect and undesirable behaviour in
+            // multi-stage pipelines
+            System.out.println(intermediateOutputDir.getAbsolutePath());
+            job = Job.getInstance(config);
+            inputFormat = createIntermediateInputFormat();
+            job.setInputFormatClass(inputFormat.getClass());
+            FileInputFormat.setInputPaths(job, new Path(intermediateOutputDir.getAbsolutePath()));
+
+            // Enabling this flag works around the JENA-820 issue
+            job.getConfiguration().setBoolean(RdfIOConstants.GLOBAL_BNODE_IDENTITY, true);
+            context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+
+            // Get the splits
+            splits = inputFormat.getSplits(context);
+            Assert.assertEquals(2, splits.size());
+
+            // Expect to end up with a single blank node
+            Set<Node> nodes = new HashSet<Node>();
+            for (InputSplit split : splits) {
+                TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+                        new TaskAttemptID());
+                RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext);
+                reader.initialize(split, inputTaskContext);
+
+                while (reader.nextKeyValue()) {
+                    nodes.add(getSubject(reader.getCurrentValue().get()));
+                }
+            }
+            // Nodes should not have diverged
+            Assert.assertEquals(1, nodes.size());
+
+        } finally {
+            a.delete();
+            deleteDirectory(intermediateOutputDir);
+        }
+    }
+
+    /**
+     * Test that starts with two blank nodes with the same identity in a single
+     * file, splits them over two files and shows that they diverge in the
+     * subsequent job when the JENA-820 workaround is not enabled
+     * 
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Test
+    public void blank_node_divergence_02() throws IOException, InterruptedException {
+        Assume.assumeTrue("Requires ParserProfile be respected", this.respectsParserProfile());
+        Assume.assumeFalse("Requires that Blank Node identity not be preserved", this.preservesBlankNodeIdentity());
+        
+        // Temporary files
+        File a = File.createTempFile("bnode_divergence", getInitialInputExtension());
+        File intermediateOutputDir = Files.createTempDirectory("bnode_divergence", new FileAttribute[0]).toFile();
+
+        try {
+            // Prepare the input data
+            // Two mentions of the same blank node in the same file
+            List<T> tuples = new ArrayList<>();
+            Node bnode = NodeFactory.createAnon();
+            Node pred = NodeFactory.createURI("http://example.org/predicate");
+            tuples.add(createTuple(bnode, pred, NodeFactory.createLiteral("first")));
+            tuples.add(createTuple(bnode, pred, NodeFactory.createLiteral("second")));
+            writeTuples(a, tuples);
+
+            // Set up fake job which will process the file as a single split
+            Configuration config = new Configuration(true);
+            InputFormat<LongWritable, TValue> inputFormat = createInitialInputFormat();
+            Job job = Job.getInstance(config);
+            job.setInputFormatClass(inputFormat.getClass());
+            NLineInputFormat.setNumLinesPerSplit(job, 100);
+            FileInputFormat.setInputPaths(job, new Path(a.getAbsolutePath()));
+            FileOutputFormat.setOutputPath(job, new Path(intermediateOutputDir.getAbsolutePath()));
+            JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+
+            // Get the splits
+            List<InputSplit> splits = inputFormat.getSplits(context);
+            Assert.assertEquals(1, splits.size());
+
+            for (InputSplit split : splits) {
+                // Initialize the input reading
+                TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+                        createAttemptID(1, 1, 1));
+                RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext);
+                reader.initialize(split, inputTaskContext);
+
+                // Copy the input to the output - each triple goes to a separate
+                // output file
+                // This is how we force multiple files to be produced
+                int taskID = 1;
+                while (reader.nextKeyValue()) {
+                    // Prepare the output writing
+                    OutputFormat<LongWritable, TValue> outputFormat = createIntermediateOutputFormat();
+                    TaskAttemptContext outputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+                            createAttemptID(1, ++taskID, 1));
+                    RecordWriter<LongWritable, TValue> writer = outputFormat.getRecordWriter(outputTaskContext);
+
+                    writer.write(reader.getCurrentKey(), reader.getCurrentValue());
+                    writer.close(outputTaskContext);
+                }
+            }
+
+            // Promote outputs from temporary status
+            promoteInputs(intermediateOutputDir);
+
+            // Now we need to create a subsequent job that reads the
+            // intermediate outputs
+            // As described in JENA-820 at this point the blank nodes are
+            // consistent, however when we read them from different files they
+            // by default get treated as different nodes and so the blank nodes
+            // diverge which is incorrect and undesirable behaviour in
+            // multi-stage pipelines. However it is the default behaviour
+            // because when we start from external inputs we want them to be
+            // file scoped.
+            System.out.println(intermediateOutputDir.getAbsolutePath());
+            job = Job.getInstance(config);
+            inputFormat = createIntermediateInputFormat();
+            job.setInputFormatClass(inputFormat.getClass());
+            FileInputFormat.setInputPaths(job, new Path(intermediateOutputDir.getAbsolutePath()));
+
+            // Make sure JENA-820 flag is disabled
+            job.getConfiguration().setBoolean(RdfIOConstants.GLOBAL_BNODE_IDENTITY, false);
+            context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+
+            // Get the splits
+            splits = inputFormat.getSplits(context);
+            Assert.assertEquals(2, splits.size());
+
+            // Expect to end up with a single blank node
+            Set<Node> nodes = new HashSet<Node>();
+            for (InputSplit split : splits) {
+                TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+                        new TaskAttemptID());
+                RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext);
+                reader.initialize(split, inputTaskContext);
+
+                while (reader.nextKeyValue()) {
+                    nodes.add(getSubject(reader.getCurrentValue().get()));
+                }
+            }
+            // Nodes should have diverged
+            Assert.assertEquals(2, nodes.size());
+
+        } finally {
+            a.delete();
+            deleteDirectory(intermediateOutputDir);
+        }
+    }
+
+    /**
+     * Test that starts with two blank nodes in two different files and checks
+     * that writing them to a single file does not conflate them
+     * 
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Test
+    public void blank_node_identity_01() throws IOException, InterruptedException {
+        Assume.assumeTrue("Requires ParserProfile be respected", this.respectsParserProfile());
+        Assume.assumeFalse("Requires that Blank Node identity not be preserved", this.preservesBlankNodeIdentity());
+        
+        // Temporary files
+        File a = File.createTempFile("bnode_identity", getInitialInputExtension());
+        File b = File.createTempFile("bnode_identity", getInitialInputExtension());
+        File intermediateOutputDir = Files.createTempDirectory("bnode_identity", new FileAttribute[0]).toFile();
+
+        try {
+            // Prepare the input data
+            // Different blank nodes in different files
+            List<T> tuples = new ArrayList<>();
+            Node bnode1 = NodeFactory.createAnon();
+            Node bnode2 = NodeFactory.createAnon();
+            Node pred = NodeFactory.createURI("http://example.org/predicate");
+
+            tuples.add(createTuple(bnode1, pred, NodeFactory.createLiteral("first")));
+            writeTuples(a, tuples);
+
+            tuples.clear();
+            tuples.add(createTuple(bnode2, pred, NodeFactory.createLiteral("second")));
+            writeTuples(b, tuples);
+
+            // Set up fake job which will process the two files
+            Configuration config = new Configuration(true);
+            InputFormat<LongWritable, TValue> inputFormat = createInitialInputFormat();
+            Job job = Job.getInstance(config);
+            job.setInputFormatClass(inputFormat.getClass());
+            NLineInputFormat.setNumLinesPerSplit(job, 100);
+            FileInputFormat.setInputPaths(job, new Path(a.getAbsolutePath()), new Path(b.getAbsolutePath()));
+            FileOutputFormat.setOutputPath(job, new Path(intermediateOutputDir.getAbsolutePath()));
+            JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+
+            // Get the splits
+            List<InputSplit> splits = inputFormat.getSplits(context);
+            Assert.assertEquals(2, splits.size());
+
+            // Prepare the output writing - putting all output to a single file
+            OutputFormat<LongWritable, TValue> outputFormat = createIntermediateOutputFormat();
+            TaskAttemptContext outputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(), createAttemptID(
+                    1, 2, 1));
+            RecordWriter<LongWritable, TValue> writer = outputFormat.getRecordWriter(outputTaskContext);
+
+            for (InputSplit split : splits) {
+                // Initialize the input reading
+                TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+                        createAttemptID(1, 1, 1));
+                RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext);
+                reader.initialize(split, inputTaskContext);
+
+                // Copy the input to the output - all triples go to a single
+                // output
+                while (reader.nextKeyValue()) {
+                    writer.write(reader.getCurrentKey(), reader.getCurrentValue());
+                }
+            }
+            writer.close(outputTaskContext);
+
+            // Promote outputs from temporary status
+            promoteInputs(intermediateOutputDir);
+
+            // Now we need to create a subsequent job that reads the
+            // intermediate outputs
+            // The Blank nodes should have been given separate identities so we
+            // should not be conflating them, this is the opposite problem to
+            // that described in JENA-820
+            System.out.println(intermediateOutputDir.getAbsolutePath());
+            job = Job.getInstance(config);
+            inputFormat = createIntermediateInputFormat();
+            job.setInputFormatClass(inputFormat.getClass());
+            NLineInputFormat.setNumLinesPerSplit(job, 100);
+            FileInputFormat.setInputPaths(job, new Path(intermediateOutputDir.getAbsolutePath()));
+            context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+
+            // Get the splits
+            splits = inputFormat.getSplits(context);
+            Assert.assertEquals(1, splits.size());
+
+            // Expect to end up with a single blank node
+            Set<Node> nodes = new HashSet<Node>();
+            for (InputSplit split : splits) {
+                TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+                        new TaskAttemptID());
+                RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext);
+                reader.initialize(split, inputTaskContext);
+
+                while (reader.nextKeyValue()) {
+                    nodes.add(getSubject(reader.getCurrentValue().get()));
+                }
+            }
+            // Nodes must not have converged
+            Assert.assertEquals(2, nodes.size());
+
+        } finally {
+            a.delete();
+            b.delete();
+            deleteDirectory(intermediateOutputDir);
+        }
+    }
+
+    /**
+     * Test that starts with two blank nodes in two different files and checks
+     * that writing them to a single file does not conflate them
+     * 
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Test
+    public void blank_node_identity_02() throws IOException, InterruptedException {
+        Assume.assumeTrue("Requires ParserProfile be respected", this.respectsParserProfile());
+        Assume.assumeFalse("Requires that Blank Node identity not be preserved", this.preservesBlankNodeIdentity());
+        
+        // Temporary files
+        File a = File.createTempFile("bnode_identity", getInitialInputExtension());
+        File b = File.createTempFile("bnode_identity", getInitialInputExtension());
+        File intermediateOutputDir = Files.createTempDirectory("bnode_identity", new FileAttribute[0]).toFile();
+
+        try {
+            // Prepare the input data
+            // Same blank node but in different files so must be treated as
+            // different blank nodes and not converge
+            List<T> tuples = new ArrayList<>();
+            Node bnode = NodeFactory.createAnon();
+            Node pred = NodeFactory.createURI("http://example.org/predicate");
+
+            tuples.add(createTuple(bnode, pred, NodeFactory.createLiteral("first")));
+            writeTuples(a, tuples);
+
+            tuples.clear();
+            tuples.add(createTuple(bnode, pred, NodeFactory.createLiteral("second")));
+            writeTuples(b, tuples);
+
+            // Set up fake job which will process the two files
+            Configuration config = new Configuration(true);
+            InputFormat<LongWritable, TValue> inputFormat = createInitialInputFormat();
+            Job job = Job.getInstance(config);
+            job.setInputFormatClass(inputFormat.getClass());
+            NLineInputFormat.setNumLinesPerSplit(job, 100);
+            FileInputFormat.setInputPaths(job, new Path(a.getAbsolutePath()), new Path(b.getAbsolutePath()));
+            FileOutputFormat.setOutputPath(job, new Path(intermediateOutputDir.getAbsolutePath()));
+            JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+
+            // Get the splits
+            List<InputSplit> splits = inputFormat.getSplits(context);
+            Assert.assertEquals(2, splits.size());
+
+            // Prepare the output writing - putting all output to a single file
+            OutputFormat<LongWritable, TValue> outputFormat = createIntermediateOutputFormat();
+            TaskAttemptContext outputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(), createAttemptID(
+                    1, 2, 1));
+            RecordWriter<LongWritable, TValue> writer = outputFormat.getRecordWriter(outputTaskContext);
+
+            for (InputSplit split : splits) {
+                // Initialize the input reading
+                TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+                        createAttemptID(1, 1, 1));
+                RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext);
+                reader.initialize(split, inputTaskContext);
+
+                // Copy the input to the output - all triples go to a single
+                // output
+                while (reader.nextKeyValue()) {
+                    writer.write(reader.getCurrentKey(), reader.getCurrentValue());
+                }
+            }
+            writer.close(outputTaskContext);
+
+            // Promote outputs from temporary status
+            promoteInputs(intermediateOutputDir);
+
+            // Now we need to create a subsequent job that reads the
+            // intermediate outputs
+            // The Blank nodes should have been given separate identities so we
+            // should not be conflating them, this is the opposite problem to
+            // that described in JENA-820
+            System.out.println(intermediateOutputDir.getAbsolutePath());
+            job = Job.getInstance(config);
+            inputFormat = createIntermediateInputFormat();
+            job.setInputFormatClass(inputFormat.getClass());
+            NLineInputFormat.setNumLinesPerSplit(job, 100);
+            FileInputFormat.setInputPaths(job, new Path(intermediateOutputDir.getAbsolutePath()));
+            context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+
+            // Get the splits
+            splits = inputFormat.getSplits(context);
+            Assert.assertEquals(1, splits.size());
+
+            // Expect to end up with a single blank node
+            Set<Node> nodes = new HashSet<Node>();
+            for (InputSplit split : splits) {
+                TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+                        new TaskAttemptID());
+                RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext);
+                reader.initialize(split, inputTaskContext);
+
+                while (reader.nextKeyValue()) {
+                    nodes.add(getSubject(reader.getCurrentValue().get()));
+                }
+            }
+            // Nodes must not diverge
+            Assert.assertEquals(2, nodes.size());
+
+        } finally {
+            a.delete();
+            b.delete();
+            deleteDirectory(intermediateOutputDir);
+        }
+    }
+
+    private TaskAttemptID createAttemptID(int jobID, int taskID, int id) {
+        return new TaskAttemptID("outputTest", jobID, TaskType.MAP, taskID, 1);
+    }
+
+    private void promoteInputs(File baseDir) throws IOException {
+        for (File f : baseDir.listFiles()) {
+            if (f.isDirectory()) {
+                promoteInputs(baseDir, f);
+            }
+        }
+    }
+
+    private void promoteInputs(File targetDir, File dir) throws IOException {
+        java.nio.file.Path target = Paths.get(targetDir.toURI());
+        for (File f : dir.listFiles()) {
+            if (f.isDirectory()) {
+                promoteInputs(targetDir, f);
+            } else {
+                LOGGER.debug("Moving {} to {}", f.getAbsolutePath(), target.resolve(f.getName()));
+                Files.move(Paths.get(f.toURI()), target.resolve(f.getName()), StandardCopyOption.REPLACE_EXISTING);
+            }
+        }
+
+        // Remove defunct sub-directory
+        dir.delete();
+    }
+
+    private void deleteDirectory(File dir) throws IOException {
+        for (File f : dir.listFiles()) {
+            if (f.isFile())
+                f.delete();
+            if (f.isDirectory())
+                deleteDirectory(f);
+        }
+        dir.delete();
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractTripleBlankNodeTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractTripleBlankNodeTests.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractTripleBlankNodeTests.java
new file mode 100644
index 0000000..bbd6742
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractTripleBlankNodeTests.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.hadoop.rdf.io.input.bnodes;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.util.List;
+
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFDataMgr;
+
+import com.hp.hpl.jena.graph.Graph;
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.Triple;
+import com.hp.hpl.jena.sparql.graph.GraphFactory;
+
+/**
+ *
+ */
+public abstract class AbstractTripleBlankNodeTests extends AbstractBlankNodeTests<Triple, TripleWritable> {
+    
+    /**
+     * Gets the language to use
+     * 
+     * @return Language
+     */
+    protected abstract Lang getLanguage();
+
+    @Override
+    protected Triple createTuple(Node s, Node p, Node o) {
+        return new Triple(s, p, o);
+    }
+
+    @Override
+    protected void writeTuples(File f, List<Triple> tuples) throws FileNotFoundException {
+        Graph g = GraphFactory.createGraphMem();
+        for (Triple t : tuples) {
+            g.add(t);
+        }
+        RDFDataMgr.write(new FileOutputStream(f), g, getLanguage());
+    }
+
+    @Override
+    protected Node getSubject(Triple value) {
+        return value.getSubject();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/JsonLdTripleBlankNodeTest.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/JsonLdTripleBlankNodeTest.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/JsonLdTripleBlankNodeTest.java
new file mode 100644
index 0000000..f234127
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/JsonLdTripleBlankNodeTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.hadoop.rdf.io.input.bnodes;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.input.jsonld.JsonLDTripleInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.rdfjson.RdfJsonInputFormat;
+import org.apache.jena.hadoop.rdf.io.output.jsonld.JsonLDTripleOutputFormat;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+
+/**
+ * Tests blank node divergence when using the {@link RdfJsonInputFormat}
+ */
+public class JsonLdTripleBlankNodeTest extends AbstractTripleBlankNodeTests {
+
+    @Override
+    protected Lang getLanguage() {
+        return Lang.JSONLD;
+    }
+
+    @Override
+    protected String getInitialInputExtension() {
+        return ".jsonld";
+    }
+
+    @Override
+    protected InputFormat<LongWritable, TripleWritable> createInitialInputFormat() {
+        return new JsonLDTripleInputFormat();
+    }
+
+    @Override
+    protected OutputFormat<LongWritable, TripleWritable> createIntermediateOutputFormat() {
+        return new JsonLDTripleOutputFormat<>();
+    }
+
+    @Override
+    protected InputFormat<LongWritable, TripleWritable> createIntermediateInputFormat() {
+        return new JsonLDTripleInputFormat();
+    }
+
+    @Override
+    protected boolean respectsParserProfile() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/NTriplesBlankNodeTest.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/NTriplesBlankNodeTest.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/NTriplesBlankNodeTest.java
new file mode 100644
index 0000000..4c350c7
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/NTriplesBlankNodeTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.hadoop.rdf.io.input.bnodes;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.input.ntriples.NTriplesInputFormat;
+import org.apache.jena.hadoop.rdf.io.output.ntriples.NTriplesOutputFormat;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+
+/**
+ * Tests blank node divergence when using the {@link NTriplesInputFormat}
+ */
+public class NTriplesBlankNodeTest extends AbstractTripleBlankNodeTests {
+
+    @Override
+    protected Lang getLanguage() {
+        return Lang.NTRIPLES;
+    }
+
+    @Override
+    protected String getInitialInputExtension() {
+        return ".nt";
+    }
+
+    @Override
+    protected InputFormat<LongWritable, TripleWritable> createInitialInputFormat() {
+        return new NTriplesInputFormat();
+    }
+
+    @Override
+    protected OutputFormat<LongWritable, TripleWritable> createIntermediateOutputFormat() {
+        return new NTriplesOutputFormat<>();
+    }
+
+    @Override
+    protected InputFormat<LongWritable, TripleWritable> createIntermediateInputFormat() {
+        return new NTriplesInputFormat();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/RdfJsonBlankNodeTest.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/RdfJsonBlankNodeTest.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/RdfJsonBlankNodeTest.java
new file mode 100644
index 0000000..2be1e0e
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/RdfJsonBlankNodeTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.hadoop.rdf.io.input.bnodes;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.input.rdfjson.RdfJsonInputFormat;
+import org.apache.jena.hadoop.rdf.io.output.rdfjson.RdfJsonOutputFormat;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+
+/**
+ * Tests blank node divergence when using the {@link RdfJsonInputFormat}
+ */
+public class RdfJsonBlankNodeTest extends AbstractTripleBlankNodeTests {
+
+    @Override
+    protected Lang getLanguage() {
+        return Lang.RDFJSON;
+    }
+
+    @Override
+    protected String getInitialInputExtension() {
+        return ".rj";
+    }
+
+    @Override
+    protected InputFormat<LongWritable, TripleWritable> createInitialInputFormat() {
+        return new RdfJsonInputFormat();
+    }
+
+    @Override
+    protected OutputFormat<LongWritable, TripleWritable> createIntermediateOutputFormat() {
+        return new RdfJsonOutputFormat<>();
+    }
+
+    @Override
+    protected InputFormat<LongWritable, TripleWritable> createIntermediateInputFormat() {
+        return new RdfJsonInputFormat();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/RdfThriftBlankNodeTest.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/RdfThriftBlankNodeTest.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/RdfThriftBlankNodeTest.java
new file mode 100644
index 0000000..d6f32a2
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/RdfThriftBlankNodeTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.hadoop.rdf.io.input.bnodes;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.input.thrift.ThriftTripleInputFormat;
+import org.apache.jena.hadoop.rdf.io.output.thrift.ThriftTripleOutputFormat;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+/**
+ * Tests blank node divergence when using the {@link RdfThriftInputFormat}
+ */
+public class RdfThriftBlankNodeTest extends AbstractTripleBlankNodeTests {
+
+    @Override
+    protected Lang getLanguage() {
+        return RDFLanguages.THRIFT;
+    }
+
+    @Override
+    protected String getInitialInputExtension() {
+        return ".trdf";
+    }
+
+    @Override
+    protected InputFormat<LongWritable, TripleWritable> createInitialInputFormat() {
+        return new ThriftTripleInputFormat();
+    }
+
+    @Override
+    protected OutputFormat<LongWritable, TripleWritable> createIntermediateOutputFormat() {
+        return new ThriftTripleOutputFormat<>();
+    }
+
+    @Override
+    protected InputFormat<LongWritable, TripleWritable> createIntermediateInputFormat() {
+        return new ThriftTripleInputFormat();
+    }
+
+    @Override
+    protected boolean respectsParserProfile() {
+        return false;
+    }
+    
+    @Override
+    protected boolean preservesBlankNodeIdentity() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/RdfXmlBlankNodeTest.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/RdfXmlBlankNodeTest.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/RdfXmlBlankNodeTest.java
new file mode 100644
index 0000000..da70007
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/RdfXmlBlankNodeTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.hadoop.rdf.io.input.bnodes;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.input.rdfxml.RdfXmlInputFormat;
+import org.apache.jena.hadoop.rdf.io.output.rdfxml.RdfXmlOutputFormat;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+
+/**
+ * Tests blank node divergence when using the {@link RdfXmlInputFormat}
+ */
+public class RdfXmlBlankNodeTest extends AbstractTripleBlankNodeTests {
+
+    @Override
+    protected Lang getLanguage() {
+        return Lang.RDFXML;
+    }
+
+    @Override
+    protected String getInitialInputExtension() {
+        return ".rdf";
+    }
+
+    @Override
+    protected InputFormat<LongWritable, TripleWritable> createInitialInputFormat() {
+        return new RdfXmlInputFormat();
+    }
+
+    @Override
+    protected OutputFormat<LongWritable, TripleWritable> createIntermediateOutputFormat() {
+        return new RdfXmlOutputFormat<>();
+    }
+
+    @Override
+    protected InputFormat<LongWritable, TripleWritable> createIntermediateInputFormat() {
+        return new RdfXmlInputFormat();
+    }
+
+    @Override
+    protected boolean respectsParserProfile() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/TurtleBlankNodeTest.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/TurtleBlankNodeTest.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/TurtleBlankNodeTest.java
new file mode 100644
index 0000000..146c836
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/TurtleBlankNodeTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.hadoop.rdf.io.input.bnodes;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.input.turtle.TurtleInputFormat;
+import org.apache.jena.hadoop.rdf.io.output.turtle.TurtleOutputFormat;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+
+/**
+ * Tests blank node divergence when using the {@link TurtleInputFormat}
+ */
+public class TurtleBlankNodeTest extends AbstractTripleBlankNodeTests {
+
+    @Override
+    protected Lang getLanguage() {
+        return Lang.TURTLE;
+    }
+
+    @Override
+    protected String getInitialInputExtension() {
+        return ".ttl";
+    }
+
+    @Override
+    protected InputFormat<LongWritable, TripleWritable> createInitialInputFormat() {
+        return new TurtleInputFormat();
+    }
+
+    @Override
+    protected OutputFormat<LongWritable, TripleWritable> createIntermediateOutputFormat() {
+        return new TurtleOutputFormat<>();
+    }
+
+    @Override
+    protected InputFormat<LongWritable, TripleWritable> createIntermediateInputFormat() {
+        return new TurtleInputFormat();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java
new file mode 100644
index 0000000..1f18a95
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.compressed;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.jena.hadoop.rdf.io.HadoopIOConstants;
+import org.apache.jena.hadoop.rdf.io.input.AbstractNodeTupleInputFormatTests;
+import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
+
+
+/**
+ * 
+ * 
+ * @param <TValue>
+ * @param <T>
+ */
+public abstract class AbstractCompressedNodeTupleInputFormatTests<TValue, T extends AbstractNodeTupleWritable<TValue>> extends
+        AbstractNodeTupleInputFormatTests<TValue, T> {
+    
+    @Override
+    protected Configuration prepareConfiguration() {
+        Configuration config = super.prepareConfiguration();
+        config.set(HadoopIOConstants.IO_COMPRESSION_CODECS, this.getCompressionCodec().getClass().getCanonicalName());
+        return config;
+    }
+
+    @Override
+    protected OutputStream getOutputStream(File f) throws IOException {
+        CompressionCodec codec = this.getCompressionCodec();
+        if (codec instanceof Configurable) {
+            ((Configurable) codec).setConf(this.prepareConfiguration());
+        }
+        FileOutputStream fileOutput = new FileOutputStream(f, false);
+        return codec.createOutputStream(fileOutput);
+    }
+
+    /**
+     * Gets the compression codec to use
+     * 
+     * @return Compression codec
+     */
+    protected abstract CompressionCodec getCompressionCodec();
+
+    /**
+     * Indicates whether inputs can be split, defaults to false for compressed
+     * input tests
+     */
+    @Override
+    protected boolean canSplitInputs() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java
new file mode 100644
index 0000000..312aae7
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.compressed;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * Abstract tests for Quad input formats
+ * 
+ * 
+ * 
+ */
+public abstract class AbstractCompressedQuadsInputFormatTests extends
+        AbstractCompressedNodeTupleInputFormatTests<Quad, QuadWritable> {
+    
+    private static final Charset utf8 = Charset.forName("utf-8");
+
+    @Override
+    protected void generateTuples(OutputStream output, int num) throws IOException {
+        for (int i = 0; i < num; i++) {
+            output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n").getBytes(utf8));
+        }
+        output.flush();
+        output.close();
+    }
+
+    @Override
+    protected void generateBadTuples(OutputStream output, int num) throws IOException {
+        for (int i = 0; i < num; i++) {
+            output.write("<http://broken\n".getBytes(utf8));
+        }
+        output.flush();
+        output.close();
+    }
+
+    @Override
+    protected void generateMixedTuples(OutputStream output, int num) throws IOException {
+        boolean bad = false;
+        for (int i = 0; i < num; i++, bad = !bad) {
+            if (bad) {
+                output.write("<http://broken\n".getBytes(utf8));
+            } else {
+                output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n").getBytes(utf8));
+            }
+        }
+        output.flush();
+        output.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java
new file mode 100644
index 0000000..f0f0caf
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.compressed;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+import com.hp.hpl.jena.graph.Triple;
+
+/**
+ * Abstract tests for Triple input formats
+ * 
+ * 
+ * 
+ */
+public abstract class AbstractCompressedTriplesInputFormatTests extends
+        AbstractCompressedNodeTupleInputFormatTests<Triple, TripleWritable> {
+    
+    private static final Charset utf8 = Charset.forName("utf-8");
+
+    @Override
+    protected void generateTuples(OutputStream output, int num) throws IOException {
+        for (int i = 0; i < num; i++) {
+            output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n").getBytes(utf8));
+        }
+        output.flush();
+        output.close();
+    }
+
+    @Override
+    protected void generateBadTuples(OutputStream output, int num) throws IOException {
+        for (int i = 0; i < num; i++) {
+            output.write("<http://broken\n".getBytes(utf8));
+        }
+        output.flush();
+        output.close();
+    }
+
+    @Override
+    protected void generateMixedTuples(OutputStream output, int num) throws IOException {
+        boolean bad = false;
+        for (int i = 0; i < num; i++, bad = !bad) {
+            if (bad) {
+                output.write("<http://broken\n".getBytes(utf8));
+            } else {
+                output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n").getBytes(utf8));
+            }
+        }
+        output.flush();
+        output.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java
new file mode 100644
index 0000000..be2b1d7
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.compressed;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.jena.hadoop.rdf.io.HadoopIOConstants;
+import org.apache.jena.hadoop.rdf.io.input.AbstractNodeTupleInputFormatTests;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFDataMgr;
+import org.apache.jena.riot.RDFWriterRegistry;
+
+import com.hp.hpl.jena.query.Dataset;
+import com.hp.hpl.jena.query.DatasetFactory;
+import com.hp.hpl.jena.rdf.model.Model;
+import com.hp.hpl.jena.rdf.model.ModelFactory;
+import com.hp.hpl.jena.rdf.model.Property;
+import com.hp.hpl.jena.rdf.model.Resource;
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * Abstract tests for compressed whole file quad formats
+ * 
+ * 
+ */
+public abstract class AbstractCompressedWholeFileQuadInputFormatTests extends
+        AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+    
+    private static final Charset utf8 = Charset.forName("utf-8");
+
+    @Override
+    protected Configuration prepareConfiguration() {
+        Configuration config = super.prepareConfiguration();
+        config.set(HadoopIOConstants.IO_COMPRESSION_CODECS, this.getCompressionCodec().getClass().getCanonicalName());
+        return config;
+    }
+
+    @Override
+    protected OutputStream getOutputStream(File f) throws IOException {
+        CompressionCodec codec = this.getCompressionCodec();
+        if (codec instanceof Configurable) {
+            ((Configurable) codec).setConf(this.prepareConfiguration());
+        }
+        FileOutputStream fileOutput = new FileOutputStream(f, false);
+        return codec.createOutputStream(fileOutput);
+    }
+
+    /**
+     * Gets the compression codec to use
+     * 
+     * @return Compression codec
+     */
+    protected abstract CompressionCodec getCompressionCodec();
+
+    /**
+     * Indicates whether inputs can be split, defaults to false for compressed
+     * input tests
+     */
+    @Override
+    protected boolean canSplitInputs() {
+        return false;
+    }
+
+    private void writeTuples(Dataset ds, OutputStream output) {
+        RDFDataMgr.write(output, ds, RDFWriterRegistry.defaultSerialization(this.getRdfLanguage()));
+    }
+
+    /**
+     * Gets the RDF language to write out generated tuples in
+     * 
+     * @return RDF language
+     */
+    protected abstract Lang getRdfLanguage();
+
+    private void writeGoodTuples(OutputStream output, int num) throws IOException {
+        Dataset ds = DatasetFactory.createMem();
+        Model m = ModelFactory.createDefaultModel();
+        Resource currSubj = m.createResource("http://example.org/subjects/0");
+        Property predicate = m.createProperty("http://example.org/predicate");
+        for (int i = 0; i < num; i++) {
+            if (i % 100 == 0) {
+                ds.addNamedModel("http://example.org/graphs/" + (i / 100), m);
+                m = ModelFactory.createDefaultModel();
+            }
+            if (i % 10 == 0) {
+                currSubj = m.createResource("http://example.org/subjects/" + (i / 10));
+            }
+            m.add(currSubj, predicate, m.createTypedLiteral(i));
+        }
+        if (!m.isEmpty()) {
+            ds.addNamedModel("http://example.org/graphs/extra", m);
+        }
+        this.writeTuples(ds, output);
+    }
+
+    @Override
+    protected final void generateTuples(OutputStream output, int num) throws IOException {
+        this.writeGoodTuples(output, num);
+        output.close();
+    }
+
+    @Override
+    protected final void generateMixedTuples(OutputStream output, int num) throws IOException {
+        // Write good data
+        this.writeGoodTuples(output, num / 2);
+
+        // Write junk data
+        byte[] junk = "junk data\n".getBytes(utf8);
+        for (int i = 0; i < num / 2; i++) {
+            output.write(junk);
+        }
+
+        output.flush();
+        output.close();
+    }
+
+    @Override
+    protected final void generateBadTuples(OutputStream output, int num) throws IOException {
+        byte[] junk = "junk data\n".getBytes(utf8);
+        for (int i = 0; i < num; i++) {
+            output.write(junk);
+        }
+        output.flush();
+        output.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileTripleInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileTripleInputFormatTests.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileTripleInputFormatTests.java
new file mode 100644
index 0000000..56dd8ca
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileTripleInputFormatTests.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.compressed;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.jena.hadoop.rdf.io.HadoopIOConstants;
+import org.apache.jena.hadoop.rdf.io.input.AbstractNodeTupleInputFormatTests;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFDataMgr;
+
+import com.hp.hpl.jena.graph.Triple;
+import com.hp.hpl.jena.rdf.model.Model;
+import com.hp.hpl.jena.rdf.model.ModelFactory;
+import com.hp.hpl.jena.rdf.model.Property;
+import com.hp.hpl.jena.rdf.model.Resource;
+
+/**
+ * Abstract tests for compressed whole file triple formats
+ * 
+ * 
+ */
+public abstract class AbstractCompressedWholeFileTripleInputFormatTests extends
+        AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
+
+    private static final Charset utf8 = Charset.forName("utf-8");
+
+    @Override
+    protected Configuration prepareConfiguration() {
+        Configuration config = super.prepareConfiguration();
+        config.set(HadoopIOConstants.IO_COMPRESSION_CODECS, this.getCompressionCodec().getClass().getCanonicalName());
+        return config;
+    }
+
+    @Override
+    protected OutputStream getOutputStream(File f) throws IOException {
+        CompressionCodec codec = this.getCompressionCodec();
+        if (codec instanceof Configurable) {
+            ((Configurable) codec).setConf(this.prepareConfiguration());
+        }
+        FileOutputStream fileOutput = new FileOutputStream(f, false);
+        return codec.createOutputStream(fileOutput);
+    }
+
+    /**
+     * Gets the compression codec to use
+     * 
+     * @return Compression codec
+     */
+    protected abstract CompressionCodec getCompressionCodec();
+
+    /**
+     * Indicates whether inputs can be split, defaults to false for compressed
+     * input tests
+     */
+    @Override
+    protected boolean canSplitInputs() {
+        return false;
+    }
+
+    private void writeTuples(Model m, OutputStream output) {
+        RDFDataMgr.write(output, m, this.getRdfLanguage());
+    }
+
+    /**
+     * Gets the RDF language to write out generated tuples in
+     * 
+     * @return RDF language
+     */
+    protected abstract Lang getRdfLanguage();
+
+    @Override
+    protected final void generateTuples(OutputStream output, int num) throws IOException {
+        Model m = ModelFactory.createDefaultModel();
+        Resource currSubj = m.createResource("http://example.org/subjects/0");
+        Property predicate = m.createProperty("http://example.org/predicate");
+        for (int i = 0; i < num; i++) {
+            if (i % 10 == 0) {
+                currSubj = m.createResource("http://example.org/subjects/" + (i / 10));
+            }
+            m.add(currSubj, predicate, m.createTypedLiteral(i));
+        }
+        this.writeTuples(m, output);
+        output.close();
+    }
+
+    @Override
+    protected final void generateMixedTuples(OutputStream output, int num) throws IOException {
+        // Write good data
+        Model m = ModelFactory.createDefaultModel();
+        Resource currSubj = m.createResource("http://example.org/subjects/0");
+        Property predicate = m.createProperty("http://example.org/predicate");
+        for (int i = 0; i < num / 2; i++) {
+            if (i % 10 == 0) {
+                currSubj = m.createResource("http://example.org/subjects/" + (i / 10));
+            }
+            m.add(currSubj, predicate, m.createTypedLiteral(i));
+        }
+        this.writeTuples(m, output);
+
+        // Write junk data
+        byte[] junk = "junk data\n".getBytes(utf8);
+        for (int i = 0; i < num / 2; i++) {
+            output.write(junk);
+        }
+
+        output.flush();
+        output.close();
+    }
+
+    @Override
+    protected final void generateBadTuples(OutputStream output, int num) throws IOException {
+        byte[] junk = "junk data\n".getBytes(utf8);
+        for (int i = 0; i < num; i++) {
+            output.write(junk);
+        }
+        output.flush();
+        output.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/AbstractCompressedJsonLDQuadInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/AbstractCompressedJsonLDQuadInputFormatTests.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/AbstractCompressedJsonLDQuadInputFormatTests.java
new file mode 100644
index 0000000..d118f29
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/AbstractCompressedJsonLDQuadInputFormatTests.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.compressed.jsonld;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.jena.hadoop.rdf.io.input.compressed.AbstractCompressedWholeFileQuadInputFormatTests;
+import org.apache.jena.hadoop.rdf.io.input.jsonld.JsonLDQuadInputFormat;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.Lang;
+
+
+/**
+ * Abstract compressed JSON-LD input tests
+ * 
+ * 
+ * 
+ */
+public abstract class AbstractCompressedJsonLDQuadInputFormatTests extends AbstractCompressedWholeFileQuadInputFormatTests {
+
+    private String ext;
+    private CompressionCodec codec;
+
+    /**
+     * Creates new tests
+     * 
+     * @param ext
+     *            File extension
+     * @param codec
+     *            Compression codec
+     */
+    public AbstractCompressedJsonLDQuadInputFormatTests(String ext, CompressionCodec codec) {
+        this.ext = ext;
+        this.codec = codec;
+    }
+
+    @Override
+    protected final String getFileExtension() {
+        return this.ext;
+    }
+
+    @Override
+    protected final CompressionCodec getCompressionCodec() {
+        return this.codec;
+    }
+
+    @Override
+    protected final Lang getRdfLanguage() {
+        return Lang.JSONLD;
+    }
+
+    @Override
+    protected final InputFormat<LongWritable, QuadWritable> getInputFormat() {
+        return new JsonLDQuadInputFormat();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/AbstractCompressedJsonLDTripleInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/AbstractCompressedJsonLDTripleInputFormatTests.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/AbstractCompressedJsonLDTripleInputFormatTests.java
new file mode 100644
index 0000000..acb9e08
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/AbstractCompressedJsonLDTripleInputFormatTests.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.compressed.jsonld;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.jena.hadoop.rdf.io.input.compressed.AbstractCompressedWholeFileTripleInputFormatTests;
+import org.apache.jena.hadoop.rdf.io.input.jsonld.JsonLDTripleInputFormat;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+
+
+/**
+ * Abstract compressed JSON-LD input tests
+ * 
+ * 
+ * 
+ */
+public abstract class AbstractCompressedJsonLDTripleInputFormatTests extends AbstractCompressedWholeFileTripleInputFormatTests {
+
+    private String ext;
+    private CompressionCodec codec;
+
+    /**
+     * Creates new tests
+     * 
+     * @param ext
+     *            File extension
+     * @param codec
+     *            Compression codec
+     */
+    public AbstractCompressedJsonLDTripleInputFormatTests(String ext, CompressionCodec codec) {
+        this.ext = ext;
+        this.codec = codec;
+    }
+
+    @Override
+    protected final String getFileExtension() {
+        return this.ext;
+    }
+
+    @Override
+    protected final CompressionCodec getCompressionCodec() {
+        return this.codec;
+    }
+
+    @Override
+    protected final Lang getRdfLanguage() {
+        return Lang.JSONLD;
+    }
+
+    @Override
+    protected final InputFormat<LongWritable, TripleWritable> getInputFormat() {
+        return new JsonLDTripleInputFormat();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/BZippedJsonLDQuadInputTest.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/BZippedJsonLDQuadInputTest.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/BZippedJsonLDQuadInputTest.java
new file mode 100644
index 0000000..e5e7066
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/BZippedJsonLDQuadInputTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.compressed.jsonld;
+
+import org.apache.hadoop.io.compress.BZip2Codec;
+
+/**
+ * Tests for BZipped JSON-LD input
+ */
+public class BZippedJsonLDQuadInputTest extends AbstractCompressedJsonLDQuadInputFormatTests {
+
+    /**
+     * Creates new tests
+     */
+    public BZippedJsonLDQuadInputTest() {
+        super(".jsonld.bz2", new BZip2Codec());
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/BZippedJsonLDTripleInputTest.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/BZippedJsonLDTripleInputTest.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/BZippedJsonLDTripleInputTest.java
new file mode 100644
index 0000000..8d2e122
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/BZippedJsonLDTripleInputTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.compressed.jsonld;
+
+import org.apache.hadoop.io.compress.BZip2Codec;
+
+/**
+ * Tests for BZipped JSON-LD input
+ */
+public class BZippedJsonLDTripleInputTest extends AbstractCompressedJsonLDTripleInputFormatTests {
+
+    /**
+     * Creates new tests
+     */
+    public BZippedJsonLDTripleInputTest() {
+        super(".jsonld.bz2", new BZip2Codec());
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/DeflatedJsonLDQuadInputTest.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/DeflatedJsonLDQuadInputTest.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/DeflatedJsonLDQuadInputTest.java
new file mode 100644
index 0000000..292b17f
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/DeflatedJsonLDQuadInputTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.compressed.jsonld;
+
+import org.apache.hadoop.io.compress.DefaultCodec;
+
+/**
+ * Tests for Deflated JSON-LD input
+ */
+public class DeflatedJsonLDQuadInputTest extends AbstractCompressedJsonLDQuadInputFormatTests {
+
+    /**
+     * Creates new tests
+     */
+    public DeflatedJsonLDQuadInputTest() {
+        super(".jsonld.deflate", new DefaultCodec());
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/DeflatedJsonLDTripleInputTest.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/DeflatedJsonLDTripleInputTest.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/DeflatedJsonLDTripleInputTest.java
new file mode 100644
index 0000000..e5edd6a
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/DeflatedJsonLDTripleInputTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.compressed.jsonld;
+
+import org.apache.hadoop.io.compress.DefaultCodec;
+
+/**
+ * Tests for Deflated JSON-LD input
+ */
+public class DeflatedJsonLDTripleInputTest extends AbstractCompressedJsonLDTripleInputFormatTests {
+
+    /**
+     * Creates new tests
+     */
+    public DeflatedJsonLDTripleInputTest() {
+        super(".jsonld.deflate", new DefaultCodec());
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/GZippedJsonLDQuadInputTest.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/GZippedJsonLDQuadInputTest.java b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/GZippedJsonLDQuadInputTest.java
new file mode 100644
index 0000000..0a4a240
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/jsonld/GZippedJsonLDQuadInputTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.compressed.jsonld;
+
+import org.apache.hadoop.io.compress.GzipCodec;
+
+/**
+ * Tests for GZipped JSON-LD input
+ */
+public class GZippedJsonLDQuadInputTest extends AbstractCompressedJsonLDQuadInputFormatTests {
+
+    /**
+     * Creates new tests
+     */
+    public GZippedJsonLDQuadInputTest() {
+        super(".jsonld.gz", new GzipCodec());
+    }
+}