You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by rv...@apache.org on 2014/11/11 13:59:50 UTC

[2/2] jena git commit: Support RDF Thrift as an input format

Support RDF Thrift as an input format

Supported for both triples and quads.

Refactors input format tests to prepare test data using OutputStream
instead of Writer.


Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/f08fff2a
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/f08fff2a
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/f08fff2a

Branch: refs/heads/hadoop-rdf
Commit: f08fff2a98db7f97d05b0ad1181afe1851848dac
Parents: 3ccab77
Author: Rob Vesse <rv...@apache.org>
Authored: Tue Nov 11 12:59:00 2014 +0000
Committer: Rob Vesse <rv...@apache.org>
Committed: Tue Nov 11 12:59:00 2014 +0000

----------------------------------------------------------------------
 .../input/readers/thrift/ThriftQuadReader.java  |   32 +
 .../readers/thrift/ThriftTripleReader.java      |   30 +
 .../io/input/thrift/ThriftQuadInputFormat.java  |   39 +
 .../input/thrift/ThriftTripleInputFormat.java   |   39 +
 .../AbstractNodeTupleInputFormatTests.java      | 1174 +++++++++---------
 .../io/input/AbstractQuadsInputFormatTests.java |   33 +-
 .../input/AbstractTriplesInputFormatTests.java  |   34 +-
 .../AbstractWholeFileQuadInputFormatTests.java  |   44 +-
 ...AbstractWholeFileTripleInputFormatTests.java |   46 +-
 ...ractCompressedNodeTupleInputFormatTests.java |   12 +-
 ...AbstractCompressedQuadsInputFormatTests.java |   33 +-
 ...stractCompressedTriplesInputFormatTests.java |   33 +-
 ...CompressedWholeFileQuadInputFormatTests.java |   53 +-
 ...mpressedWholeFileTripleInputFormatTests.java |  245 ++--
 .../io/input/thrift/ThriftQuadInputTest.java    |   51 +
 .../io/input/thrift/ThriftTripleInputTest.java  |   51 +
 16 files changed, 1108 insertions(+), 841 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java
new file mode 100644
index 0000000..084b1ec
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.thrift;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+public class ThriftQuadReader extends AbstractWholeFileQuadReader {
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return RDFLanguages.THRIFT;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java
new file mode 100644
index 0000000..713bfa7
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.thrift;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+public class ThriftTripleReader extends AbstractWholeFileTripleReader {
+    @Override
+    protected Lang getRdfLanguage() {
+        return RDFLanguages.THRIFT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java
new file mode 100644
index 0000000..f75542a
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.thrift;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.thrift.ThriftQuadReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+public class ThriftQuadInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+    @Override
+    public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new ThriftQuadReader();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java
new file mode 100644
index 0000000..b60380d
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.thrift;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.thrift.ThriftTripleReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+public class ThriftTripleInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+    @Override
+    public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new ThriftTripleReader();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
index ef1b8d3..e22650f 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
@@ -16,591 +16,597 @@
  * limitations under the License.
  */
 
-package org.apache.jena.hadoop.rdf.io.input;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.Writer;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.jena.hadoop.rdf.io.HadoopIOConstants;
 import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
 import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Abstract node tuple input format tests
- * 
- * 
- * 
- * @param <TValue>
- * @param <T>
- */
-public abstract class AbstractNodeTupleInputFormatTests<TValue, T extends AbstractNodeTupleWritable<TValue>> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeTupleInputFormatTests.class);
-
-    protected static final int EMPTY_SIZE = 0, SMALL_SIZE = 100, LARGE_SIZE = 10000, BAD_SIZE = 100, MIXED_SIZE = 100;
-    protected static final String EMPTY = "empty";
-    protected static final String SMALL = "small";
-    protected static final String LARGE = "large";
-    protected static final String BAD = "bad";
-    protected static final String MIXED = "mixed";
-
-    /**
-     * Temporary folder for the tests
-     */
-    @Rule
-    public TemporaryFolder folder = new TemporaryFolder();
-
-    protected File empty, small, large, bad, mixed;
-
-    /**
-     * Prepares the inputs for the tests
-     * 
-     * @throws IOException
-     */
-    @Before
-    public void beforeTest() throws IOException {
-        this.prepareInputs();
-    }
-
-    /**
-     * Cleans up the inputs after each test
-     */
-    @After
-    public void afterTest() {
-        // Should be unnecessary since JUnit will clean up the temporary folder
-        // anyway but best to do this regardless
-        empty.delete();
-        small.delete();
-        large.delete();
-        bad.delete();
-        mixed.delete();
-    }
-
-    /**
-     * Prepares a fresh configuration
-     * 
-     * @return Configuration
-     */
-    protected Configuration prepareConfiguration() {
-        Configuration config = new Configuration(true);
-        // Nothing else to do
-        return config;
-    }
-
-    /**
-     * Prepares the inputs
-     * 
-     * @throws IOException
-     */
-    protected void prepareInputs() throws IOException {
-        String ext = this.getFileExtension();
-        empty = folder.newFile(EMPTY + ext);
-        this.generateTuples(empty, EMPTY_SIZE);
-        small = folder.newFile(SMALL + ext);
-        this.generateTuples(small, SMALL_SIZE);
-        large = folder.newFile(LARGE + ext);
-        this.generateTuples(large, LARGE_SIZE);
-        bad = folder.newFile(BAD + ext);
-        this.generateBadTuples(bad, BAD_SIZE);
-        mixed = folder.newFile(MIXED + ext);
-        this.generateMixedTuples(mixed, MIXED_SIZE);
-    }
-
-    /**
-     * Gets the extra file extension to add to the filenames
-     * 
-     * @return File extension
-     */
-    protected abstract String getFileExtension();
-
-    /**
-     * Generates tuples used for tests
-     * 
-     * @param f
-     *            File
-     * @param num
-     *            Number of tuples to generate
-     * @throws IOException
-     */
-    protected final void generateTuples(File f, int num) throws IOException {
-        this.generateTuples(this.getWriter(f), num);
-    }
-
-    /**
-     * Gets the writer to use for generating tuples
-     * 
-     * @param f
-     *            File
-     * @return Writer
-     * @throws IOException
-     */
-    protected Writer getWriter(File f) throws IOException {
-        return new FileWriter(f, false);
-    }
-
-    /**
-     * Generates tuples used for tests
-     * 
-     * @param writer
-     *            Writer to write to
-     * @param num
-     *            Number of tuples to generate
-     * @throws IOException
-     */
-    protected abstract void generateTuples(Writer writer, int num) throws IOException;
-
-    /**
-     * Generates bad tuples used for tests
-     * 
-     * @param f
-     *            File
-     * @param num
-     *            Number of bad tuples to generate
-     * @throws IOException
-     */
-    protected final void generateBadTuples(File f, int num) throws IOException {
-        this.generateBadTuples(this.getWriter(f), num);
-    }
-
-    /**
-     * Generates bad tuples used for tests
-     * 
-     * @param writer
-     *            Writer to write to
-     * @param num
-     *            Number of bad tuples to generate
-     * @throws IOException
-     */
-    protected abstract void generateBadTuples(Writer writer, int num) throws IOException;
-
-    /**
-     * Generates a mixture of good and bad tuples used for tests
-     * 
-     * @param f
-     *            File
-     * @param num
-     *            Number of tuples to generate, they should be a 50/50 mix of
-     *            good and bad tuples
-     * @throws IOException
-     */
-    protected final void generateMixedTuples(File f, int num) throws IOException {
-        this.generateMixedTuples(this.getWriter(f), num);
-    }
-
-    /**
-     * Generates a mixture of good and bad tuples used for tests
-     * 
-     * @param writer
-     *            Writer to write to
-     * @param num
-     *            Number of tuples to generate, they should be a 50/50 mix of
-     *            good and bad tuples
-     * @throws IOException
-     */
-    protected abstract void generateMixedTuples(Writer write, int num) throws IOException;
-
-    /**
-     * Adds an input path to the job configuration
-     * 
-     * @param f
-     *            File
-     * @param config
-     *            Configuration
-     * @param job
-     *            Job
-     * @throws IOException
-     */
-    protected void addInputPath(File f, Configuration config, Job job) throws IOException {
-        FileSystem fs = FileSystem.getLocal(config);
-        Path inputPath = fs.makeQualified(new Path(f.getAbsolutePath()));
-        FileInputFormat.addInputPath(job, inputPath);
-    }
-
-    protected final int countTuples(RecordReader<LongWritable, T> reader) throws IOException, InterruptedException {
-        int count = 0;
-
-        // Check initial progress
-        LOG.info(String.format("Initial Reported Progress %f", reader.getProgress()));
-        float progress = reader.getProgress();
-        if (Float.compare(0.0f, progress) == 0) {
-            Assert.assertEquals(0.0d, reader.getProgress(), 0.0d);
-        } else if (Float.compare(1.0f, progress) == 0) {
-            // If reader is reported 1.0 straight away then we expect there to
-            // be no key values
-            Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
-            Assert.assertFalse(reader.nextKeyValue());
-        } else {
-            Assert.fail(String.format(
-                    "Expected progress of 0.0 or 1.0 before reader has been accessed for first time but got %f", progress));
-        }
-
-        // Count tuples
-        boolean debug = LOG.isDebugEnabled();
-        while (reader.nextKeyValue()) {
-            count++;
-            progress = reader.getProgress();
-            if (debug)
-                LOG.debug(String.format("Current Reported Progress %f", progress));
-            Assert.assertTrue(String.format("Progress should be in the range 0.0 < p <= 1.0 but got %f", progress),
-                    progress > 0.0f && progress <= 1.0f);
-        }
-        reader.close();
-        LOG.info(String.format("Got %d tuples from this record reader", count));
-
-        // Check final progress
-        LOG.info(String.format("Final Reported Progress %f", reader.getProgress()));
-        Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
-
-        return count;
-    }
-
-    protected final void checkTuples(RecordReader<LongWritable, T> reader, int expected) throws IOException, InterruptedException {
-        Assert.assertEquals(expected, this.countTuples(reader));
-    }
-
-    /**
-     * Runs a test with a single input
-     * 
-     * @param input
-     *            Input
-     * @param expectedTuples
-     *            Expected tuples
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    protected final void testSingleInput(File input, int expectedSplits, int expectedTuples) throws IOException,
-            InterruptedException {
-        // Prepare configuration
-        Configuration config = this.prepareConfiguration();
-        this.testSingleInput(config, input, expectedSplits, expectedTuples);
-    }
-
-    /**
-     * Runs a test with a single input
-     * 
-     * @param config
-     *            Configuration
-     * @param input
-     *            Input
-     * @param expectedTuples
-     *            Expected tuples
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    protected final void testSingleInput(Configuration config, File input, int expectedSplits, int expectedTuples)
-            throws IOException, InterruptedException {
-        // Set up fake job
-        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
-        Job job = Job.getInstance(config);
-        job.setInputFormatClass(inputFormat.getClass());
-        this.addInputPath(input, job.getConfiguration(), job);
-        JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
-        Assert.assertEquals(1, FileInputFormat.getInputPaths(context).length);
-        NLineInputFormat.setNumLinesPerSplit(job, LARGE_SIZE);
-
-        // Check splits
-        List<InputSplit> splits = inputFormat.getSplits(context);
-        Assert.assertEquals(expectedSplits, splits.size());
-
-        // Check tuples
-        for (InputSplit split : splits) {
-            TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
-            RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
-            reader.initialize(split, taskContext);
-            this.checkTuples(reader, expectedTuples);
-        }
-    }
-
-    protected abstract InputFormat<LongWritable, T> getInputFormat();
-
-    /**
-     * Basic tuples input test
-     * 
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     */
-    @Test
-    public final void single_input_01() throws IOException, InterruptedException, ClassNotFoundException {
-        testSingleInput(empty, this.canSplitInputs() ? 0 : 1, EMPTY_SIZE);
-    }
-
-    /**
-     * Basic tuples input test
-     * 
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     */
-    @Test
-    public final void single_input_02() throws IOException, InterruptedException, ClassNotFoundException {
-        testSingleInput(small, 1, SMALL_SIZE);
-    }
-
-    /**
-     * Basic tuples input test
-     * 
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     */
-    @Test
-    public final void single_input_03() throws IOException, InterruptedException, ClassNotFoundException {
-        testSingleInput(large, 1, LARGE_SIZE);
-    }
-
-    /**
-     * Basic tuples input test
-     * 
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     */
-    @Test
-    public final void single_input_04() throws IOException, InterruptedException, ClassNotFoundException {
-        testSingleInput(bad, 1, 0);
-    }
-
-    /**
-     * Basic tuples input test
-     * 
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     */
-    @Test
-    public final void single_input_05() throws IOException, InterruptedException, ClassNotFoundException {
-        testSingleInput(mixed, 1, MIXED_SIZE / 2);
-    }
-
-    /**
-     * Tests behaviour when ignoring bad tuples is disabled
-     * 
-     * @throws InterruptedException
-     * @throws IOException
-     */
-    @Test(expected = IOException.class)
-    public final void fail_on_bad_input_01() throws IOException, InterruptedException {
-        Configuration config = this.prepareConfiguration();
-        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
-        Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true));
-        testSingleInput(config, bad, 1, 0);
-    }
-
-    /**
-     * Tests behaviour when ignoring bad tuples is disabled
-     * 
-     * @throws InterruptedException
-     * @throws IOException
-     */
-    @Test(expected = IOException.class)
-    public final void fail_on_bad_input_02() throws IOException, InterruptedException {
-        Configuration config = this.prepareConfiguration();
-        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
-        Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true));
-        testSingleInput(config, mixed, 1, MIXED_SIZE / 2);
-    }
-
-    /**
-     * Runs a multiple input test
-     * 
-     * @param inputs
-     *            Inputs
-     * @param expectedSplits
-     *            Number of splits expected
-     * @param expectedTuples
-     *            Number of tuples expected
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    protected final void testMultipleInputs(File[] inputs, int expectedSplits, int expectedTuples) throws IOException,
-            InterruptedException {
-        // Prepare configuration and inputs
-        Configuration config = this.prepareConfiguration();
-
-        // Set up fake job
-        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
-        Job job = Job.getInstance(config);
-        job.setInputFormatClass(inputFormat.getClass());
-        for (File input : inputs) {
-            this.addInputPath(input, job.getConfiguration(), job);
-        }
-        JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
-        Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length);
-        NLineInputFormat.setNumLinesPerSplit(job, expectedTuples);
-
-        // Check splits
-        List<InputSplit> splits = inputFormat.getSplits(context);
-        Assert.assertEquals(expectedSplits, splits.size());
-
-        // Check tuples
-        int count = 0;
-        for (InputSplit split : splits) {
-            TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
-            RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
-            reader.initialize(split, taskContext);
-            count += this.countTuples(reader);
-        }
-        Assert.assertEquals(expectedTuples, count);
-    }
-
-    /**
-     * tuples test with multiple inputs
-     * 
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     */
-    @Test
-    public final void multiple_inputs_01() throws IOException, InterruptedException, ClassNotFoundException {
-        testMultipleInputs(new File[] { empty, small, large }, this.canSplitInputs() ? 2 : 3, EMPTY_SIZE + SMALL_SIZE
-                + LARGE_SIZE);
-    }
-
-    /**
-     * tuples test with multiple inputs
-     * 
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     */
-    @Test
-    public final void multiple_inputs_02() throws IOException, InterruptedException, ClassNotFoundException {
-        testMultipleInputs(new File[] { folder.getRoot() }, this.canSplitInputs() ? 4 : 5, EMPTY_SIZE + SMALL_SIZE + LARGE_SIZE
-                + (MIXED_SIZE / 2));
-    }
-
-    protected final void testSplitInputs(Configuration config, File[] inputs, int expectedSplits, int expectedTuples)
-            throws IOException, InterruptedException {
-        // Set up fake job
-        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
-        Job job = Job.getInstance(config);
-        job.setInputFormatClass(inputFormat.getClass());
-        for (File input : inputs) {
-            this.addInputPath(input, job.getConfiguration(), job);
-        }
-        JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
-        Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length);
-
-        // Check splits
-        List<InputSplit> splits = inputFormat.getSplits(context);
-        Assert.assertEquals(expectedSplits, splits.size());
-
-        // Check tuples
-        int count = 0;
-        for (InputSplit split : splits) {
-            // Validate split
-            Assert.assertTrue(this.isValidSplit(split, config));
-
-            // Read split
-            TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
-            RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
-            reader.initialize(split, taskContext);
-            count += this.countTuples(reader);
-        }
-        Assert.assertEquals(expectedTuples, count);
-    }
-
-    /**
-     * Determines whether an input split is valid
-     * 
-     * @param split
-     *            Input split
-     * @return True if a valid split, false otherwise
-     * @throws IOException 
-     */
-    protected boolean isValidSplit(InputSplit split, Configuration config) throws IOException {
-        return split instanceof FileSplit;
-    }
-
-    /**
-     * Indicates whether inputs can be split, defaults to true
-     * 
-     * @return Whether inputs can be split
-     */
-    protected boolean canSplitInputs() {
-        return true;
-    }
-
-    /**
-     * Tests for input splitting
-     * 
-     * @throws IOException
-     * @throws InterruptedException
-     * @throws ClassNotFoundException
-     */
-    @Test
-    public final void split_input_01() throws IOException, InterruptedException, ClassNotFoundException {
-        Assume.assumeTrue(this.canSplitInputs());
-
-        Configuration config = this.prepareConfiguration();
-        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
-        Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
-        this.testSplitInputs(config, new File[] { small }, 100, SMALL_SIZE);
-    }
-
-    /**
-     * Tests for input splitting
-     * 
-     * @throws IOException
-     * @throws InterruptedException
-     * @throws ClassNotFoundException
-     */
-    @Test
-    public final void split_input_02() throws IOException, InterruptedException, ClassNotFoundException {
-        Assume.assumeTrue(this.canSplitInputs());
-
-        Configuration config = this.prepareConfiguration();
-        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
-        config.setLong(NLineInputFormat.LINES_PER_MAP, 10);
-        Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
-        this.testSplitInputs(config, new File[] { small }, 10, SMALL_SIZE);
-    }
-
-    /**
-     * Tests for input splitting
-     * 
-     * @throws IOException
-     * @throws InterruptedException
-     * @throws ClassNotFoundException
-     */
-    @Test
-    public final void split_input_03() throws IOException, InterruptedException, ClassNotFoundException {
-        Assume.assumeTrue(this.canSplitInputs());
-
-        Configuration config = this.prepareConfiguration();
-        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
-        config.setLong(NLineInputFormat.LINES_PER_MAP, 100);
-        Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
-        this.testSplitInputs(config, new File[] { large }, 100, LARGE_SIZE);
-    }
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract node tuple input format tests
+ * 
+ * 
+ * 
+ * @param <TValue>
+ * @param <T>
+ */
+public abstract class AbstractNodeTupleInputFormatTests<TValue, T extends AbstractNodeTupleWritable<TValue>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeTupleInputFormatTests.class);
+
+    protected static final int EMPTY_SIZE = 0, SMALL_SIZE = 100, LARGE_SIZE = 10000, BAD_SIZE = 100, MIXED_SIZE = 100;
+    protected static final String EMPTY = "empty";
+    protected static final String SMALL = "small";
+    protected static final String LARGE = "large";
+    protected static final String BAD = "bad";
+    protected static final String MIXED = "mixed";
+
+    /**
+     * Temporary folder for the tests
+     */
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder();
+
+    protected File empty, small, large, bad, mixed;
+
+    /**
+     * Prepares the inputs for the tests
+     * 
+     * @throws IOException
+     */
+    @Before
+    public void beforeTest() throws IOException {
+        this.prepareInputs();
+    }
+
+    /**
+     * Cleans up the inputs after each test
+     */
+    @After
+    public void afterTest() {
+        // Should be unnecessary since JUnit will clean up the temporary folder
+        // anyway but best to do this regardless
+        if (empty != null)
+            empty.delete();
+        if (small != null)
+            small.delete();
+        if (large != null)
+            large.delete();
+        if (bad != null)
+            bad.delete();
+        if (mixed != null)
+            mixed.delete();
+    }
+
+    /**
+     * Prepares a fresh configuration
+     * 
+     * @return Configuration
+     */
+    protected Configuration prepareConfiguration() {
+        Configuration config = new Configuration(true);
+        // Nothing else to do
+        return config;
+    }
+
+    /**
+     * Prepares the inputs
+     * 
+     * @throws IOException
+     */
+    protected void prepareInputs() throws IOException {
+        String ext = this.getFileExtension();
+        empty = folder.newFile(EMPTY + ext);
+        this.generateTuples(empty, EMPTY_SIZE);
+        small = folder.newFile(SMALL + ext);
+        this.generateTuples(small, SMALL_SIZE);
+        large = folder.newFile(LARGE + ext);
+        this.generateTuples(large, LARGE_SIZE);
+        bad = folder.newFile(BAD + ext);
+        this.generateBadTuples(bad, BAD_SIZE);
+        mixed = folder.newFile(MIXED + ext);
+        this.generateMixedTuples(mixed, MIXED_SIZE);
+    }
+
+    /**
+     * Gets the extra file extension to add to the filenames
+     * 
+     * @return File extension
+     */
+    protected abstract String getFileExtension();
+
+    /**
+     * Generates tuples used for tests
+     * 
+     * @param f
+     *            File
+     * @param num
+     *            Number of tuples to generate
+     * @throws IOException
+     */
+    protected final void generateTuples(File f, int num) throws IOException {
+        this.generateTuples(this.getOutputStream(f), num);
+    }
+
+    /**
+     * Gets the output stream to use for generating tuples
+     * 
+     * @param f
+     *            File
+     * @return Output Stream
+     * @throws IOException
+     */
+    protected OutputStream getOutputStream(File f) throws IOException {
+        return new FileOutputStream(f, false);
+    }
+
+    /**
+     * Generates tuples used for tests
+     * 
+     * @param output
+     *            Output Stream to write to
+     * @param num
+     *            Number of tuples to generate
+     * @throws IOException
+     */
+    protected abstract void generateTuples(OutputStream output, int num) throws IOException;
+
+    /**
+     * Generates bad tuples used for tests
+     * 
+     * @param f
+     *            File
+     * @param num
+     *            Number of bad tuples to generate
+     * @throws IOException
+     */
+    protected final void generateBadTuples(File f, int num) throws IOException {
+        this.generateBadTuples(this.getOutputStream(f), num);
+    }
+
+    /**
+     * Generates bad tuples used for tests
+     * 
+     * @param output
+     *            Output Stream to write to
+     * @param num
+     *            Number of bad tuples to generate
+     * @throws IOException
+     */
+    protected abstract void generateBadTuples(OutputStream output, int num) throws IOException;
+
+    /**
+     * Generates a mixture of good and bad tuples used for tests
+     * 
+     * @param f
+     *            File
+     * @param num
+     *            Number of tuples to generate, they should be a 50/50 mix of
+     *            good and bad tuples
+     * @throws IOException
+     */
+    protected final void generateMixedTuples(File f, int num) throws IOException {
+        this.generateMixedTuples(this.getOutputStream(f), num);
+    }
+
+    /**
+     * Generates a mixture of good and bad tuples used for tests
+     * 
+     * @param output
+     *            Output Stream to write to
+     * @param num
+     *            Number of tuples to generate, they should be a 50/50 mix of
+     *            good and bad tuples
+     * @throws IOException
+     */
+    protected abstract void generateMixedTuples(OutputStream output, int num) throws IOException;
+
+    /**
+     * Adds an input path to the job configuration
+     * 
+     * @param f
+     *            File
+     * @param config
+     *            Configuration
+     * @param job
+     *            Job
+     * @throws IOException
+     */
+    protected void addInputPath(File f, Configuration config, Job job) throws IOException {
+        FileSystem fs = FileSystem.getLocal(config);
+        Path inputPath = fs.makeQualified(new Path(f.getAbsolutePath()));
+        FileInputFormat.addInputPath(job, inputPath);
+    }
+
+    protected final int countTuples(RecordReader<LongWritable, T> reader) throws IOException, InterruptedException {
+        int count = 0;
+
+        // Check initial progress
+        LOG.info(String.format("Initial Reported Progress %f", reader.getProgress()));
+        float progress = reader.getProgress();
+        if (Float.compare(0.0f, progress) == 0) {
+            Assert.assertEquals(0.0d, reader.getProgress(), 0.0d);
+        } else if (Float.compare(1.0f, progress) == 0) {
+            // If reader is reported 1.0 straight away then we expect there to
+            // be no key values
+            Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
+            Assert.assertFalse(reader.nextKeyValue());
+        } else {
+            Assert.fail(String.format(
+                    "Expected progress of 0.0 or 1.0 before reader has been accessed for first time but got %f",
+                    progress));
+        }
+
+        // Count tuples
+        boolean debug = LOG.isDebugEnabled();
+        while (reader.nextKeyValue()) {
+            count++;
+            progress = reader.getProgress();
+            if (debug)
+                LOG.debug(String.format("Current Reported Progress %f", progress));
+            Assert.assertTrue(String.format("Progress should be in the range 0.0 < p <= 1.0 but got %f", progress),
+                    progress > 0.0f && progress <= 1.0f);
+        }
+        reader.close();
+        LOG.info(String.format("Got %d tuples from this record reader", count));
+
+        // Check final progress
+        LOG.info(String.format("Final Reported Progress %f", reader.getProgress()));
+        Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
+
+        return count;
+    }
+
+    protected final void checkTuples(RecordReader<LongWritable, T> reader, int expected) throws IOException,
+            InterruptedException {
+        Assert.assertEquals(expected, this.countTuples(reader));
+    }
+
+    /**
+     * Runs a test with a single input
+     * 
+     * @param input
+     *            Input
+     * @param expectedTuples
+     *            Expected tuples
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    protected final void testSingleInput(File input, int expectedSplits, int expectedTuples) throws IOException,
+            InterruptedException {
+        // Prepare configuration
+        Configuration config = this.prepareConfiguration();
+        this.testSingleInput(config, input, expectedSplits, expectedTuples);
+    }
+
+    /**
+     * Runs a test with a single input
+     * 
+     * @param config
+     *            Configuration
+     * @param input
+     *            Input
+     * @param expectedTuples
+     *            Expected tuples
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    protected final void testSingleInput(Configuration config, File input, int expectedSplits, int expectedTuples)
+            throws IOException, InterruptedException {
+        // Set up fake job
+        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
+        Job job = Job.getInstance(config);
+        job.setInputFormatClass(inputFormat.getClass());
+        this.addInputPath(input, job.getConfiguration(), job);
+        JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+        Assert.assertEquals(1, FileInputFormat.getInputPaths(context).length);
+        NLineInputFormat.setNumLinesPerSplit(job, LARGE_SIZE);
+
+        // Check splits
+        List<InputSplit> splits = inputFormat.getSplits(context);
+        Assert.assertEquals(expectedSplits, splits.size());
+
+        // Check tuples
+        for (InputSplit split : splits) {
+            TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+            RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
+            reader.initialize(split, taskContext);
+            this.checkTuples(reader, expectedTuples);
+        }
+    }
+
+    protected abstract InputFormat<LongWritable, T> getInputFormat();
+
+    /**
+     * Basic tuples input test
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void single_input_01() throws IOException, InterruptedException, ClassNotFoundException {
+        testSingleInput(empty, this.canSplitInputs() ? 0 : 1, EMPTY_SIZE);
+    }
+
+    /**
+     * Basic tuples input test
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void single_input_02() throws IOException, InterruptedException, ClassNotFoundException {
+        testSingleInput(small, 1, SMALL_SIZE);
+    }
+
+    /**
+     * Basic tuples input test
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void single_input_03() throws IOException, InterruptedException, ClassNotFoundException {
+        testSingleInput(large, 1, LARGE_SIZE);
+    }
+
+    /**
+     * Basic tuples input test
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void single_input_04() throws IOException, InterruptedException, ClassNotFoundException {
+        testSingleInput(bad, 1, 0);
+    }
+
+    /**
+     * Basic tuples input test
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void single_input_05() throws IOException, InterruptedException, ClassNotFoundException {
+        testSingleInput(mixed, 1, MIXED_SIZE / 2);
+    }
+
+    /**
+     * Tests behaviour when ignoring bad tuples is disabled
+     * 
+     * @throws InterruptedException
+     * @throws IOException
+     */
+    @Test(expected = IOException.class)
+    public final void fail_on_bad_input_01() throws IOException, InterruptedException {
+        Configuration config = this.prepareConfiguration();
+        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+        Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true));
+        testSingleInput(config, bad, 1, 0);
+    }
+
+    /**
+     * Tests behaviour when ignoring bad tuples is disabled
+     * 
+     * @throws InterruptedException
+     * @throws IOException
+     */
+    @Test(expected = IOException.class)
+    public final void fail_on_bad_input_02() throws IOException, InterruptedException {
+        Configuration config = this.prepareConfiguration();
+        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+        Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true));
+        testSingleInput(config, mixed, 1, MIXED_SIZE / 2);
+    }
+
+    /**
+     * Runs a multiple input test
+     * 
+     * @param inputs
+     *            Inputs
+     * @param expectedSplits
+     *            Number of splits expected
+     * @param expectedTuples
+     *            Number of tuples expected
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    protected final void testMultipleInputs(File[] inputs, int expectedSplits, int expectedTuples) throws IOException,
+            InterruptedException {
+        // Prepare configuration and inputs
+        Configuration config = this.prepareConfiguration();
+
+        // Set up fake job
+        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
+        Job job = Job.getInstance(config);
+        job.setInputFormatClass(inputFormat.getClass());
+        for (File input : inputs) {
+            this.addInputPath(input, job.getConfiguration(), job);
+        }
+        JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+        Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length);
+        NLineInputFormat.setNumLinesPerSplit(job, expectedTuples);
+
+        // Check splits
+        List<InputSplit> splits = inputFormat.getSplits(context);
+        Assert.assertEquals(expectedSplits, splits.size());
+
+        // Check tuples
+        int count = 0;
+        for (InputSplit split : splits) {
+            TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+            RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
+            reader.initialize(split, taskContext);
+            count += this.countTuples(reader);
+        }
+        Assert.assertEquals(expectedTuples, count);
+    }
+
+    /**
+     * tuples test with multiple inputs
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void multiple_inputs_01() throws IOException, InterruptedException, ClassNotFoundException {
+        testMultipleInputs(new File[] { empty, small, large }, this.canSplitInputs() ? 2 : 3, EMPTY_SIZE + SMALL_SIZE
+                + LARGE_SIZE);
+    }
+
+    /**
+     * tuples test with multiple inputs
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void multiple_inputs_02() throws IOException, InterruptedException, ClassNotFoundException {
+        testMultipleInputs(new File[] { folder.getRoot() }, this.canSplitInputs() ? 4 : 5, EMPTY_SIZE + SMALL_SIZE
+                + LARGE_SIZE + (MIXED_SIZE / 2));
+    }
+
+    protected final void testSplitInputs(Configuration config, File[] inputs, int expectedSplits, int expectedTuples)
+            throws IOException, InterruptedException {
+        // Set up fake job
+        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
+        Job job = Job.getInstance(config);
+        job.setInputFormatClass(inputFormat.getClass());
+        for (File input : inputs) {
+            this.addInputPath(input, job.getConfiguration(), job);
+        }
+        JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+        Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length);
+
+        // Check splits
+        List<InputSplit> splits = inputFormat.getSplits(context);
+        Assert.assertEquals(expectedSplits, splits.size());
+
+        // Check tuples
+        int count = 0;
+        for (InputSplit split : splits) {
+            // Validate split
+            Assert.assertTrue(this.isValidSplit(split, config));
+
+            // Read split
+            TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+            RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
+            reader.initialize(split, taskContext);
+            count += this.countTuples(reader);
+        }
+        Assert.assertEquals(expectedTuples, count);
+    }
+
+    /**
+     * Determines whether an input split is valid
+     * 
+     * @param split
+     *            Input split
+     * @return True if a valid split, false otherwise
+     * @throws IOException
+     */
+    protected boolean isValidSplit(InputSplit split, Configuration config) throws IOException {
+        return split instanceof FileSplit;
+    }
+
+    /**
+     * Indicates whether inputs can be split, defaults to true
+     * 
+     * @return Whether inputs can be split
+     */
+    protected boolean canSplitInputs() {
+        return true;
+    }
+
+    /**
+     * Tests for input splitting
+     * 
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public final void split_input_01() throws IOException, InterruptedException, ClassNotFoundException {
+        Assume.assumeTrue(this.canSplitInputs());
+
+        Configuration config = this.prepareConfiguration();
+        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+        Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
+        this.testSplitInputs(config, new File[] { small }, 100, SMALL_SIZE);
+    }
+
+    /**
+     * Tests for input splitting
+     * 
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public final void split_input_02() throws IOException, InterruptedException, ClassNotFoundException {
+        Assume.assumeTrue(this.canSplitInputs());
+
+        Configuration config = this.prepareConfiguration();
+        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+        config.setLong(NLineInputFormat.LINES_PER_MAP, 10);
+        Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
+        this.testSplitInputs(config, new File[] { small }, 10, SMALL_SIZE);
+    }
+
+    /**
+     * Tests for input splitting
+     * 
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public final void split_input_03() throws IOException, InterruptedException, ClassNotFoundException {
+        Assume.assumeTrue(this.canSplitInputs());
+
+        Configuration config = this.prepareConfiguration();
+        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+        config.setLong(NLineInputFormat.LINES_PER_MAP, 100);
+        Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
+        this.testSplitInputs(config, new File[] { large }, 100, LARGE_SIZE);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
index 4dd396b..78d7f33 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
@@ -19,7 +19,8 @@
 package org.apache.jena.hadoop.rdf.io.input;
 
 import java.io.IOException;
-import java.io.Writer;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
 
 import org.apache.jena.hadoop.rdf.types.QuadWritable;
 
@@ -30,38 +31,40 @@ import com.hp.hpl.jena.sparql.core.Quad;
  * 
  *
  */
-public abstract class AbstractQuadsInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+public abstract class AbstractQuadsInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+    
+    private static final Charset utf8 = Charset.forName("utf-8");
 
     @Override
-    protected void generateTuples(Writer writer, int num) throws IOException {
+    protected void generateTuples(OutputStream output, int num) throws IOException {
         for (int i = 0; i < num; i++) {
-            writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n");
+            output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n").getBytes(utf8));
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
     
     @Override
-    protected void generateBadTuples(Writer writer, int num) throws IOException {
+    protected void generateBadTuples(OutputStream output, int num) throws IOException {
         for (int i = 0; i < num; i++) {
-            writer.write("<http://broken\n");
+            output.write("<http://broken\n".getBytes(utf8));
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected void generateMixedTuples(Writer writer, int num) throws IOException {
+    protected void generateMixedTuples(OutputStream output, int num) throws IOException {
         boolean bad = false;
         for (int i = 0; i < num; i++, bad = !bad) {
             if (bad) {
-                writer.write("<http://broken\n");
+                output.write("<http://broken\n".getBytes(utf8));
             } else {
-                writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n");
+                output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n").getBytes(utf8));
             }
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
index 04572d3..65a9889 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
@@ -19,7 +19,8 @@
 package org.apache.jena.hadoop.rdf.io.input;
 
 import java.io.IOException;
-import java.io.Writer;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
 
 import org.apache.jena.hadoop.rdf.types.TripleWritable;
 
@@ -31,38 +32,41 @@ import com.hp.hpl.jena.graph.Triple;
  * 
  * 
  */
-public abstract class AbstractTriplesInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
+public abstract class AbstractTriplesInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
+    
+    private static final Charset utf8 = Charset.forName("utf-8");
 
     @Override
-    protected void generateTuples(Writer writer, int num) throws IOException {
+    protected void generateTuples(OutputStream output, int num) throws IOException {
         for (int i = 0; i < num; i++) {
-            writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n");
+            output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n").getBytes(utf8));
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected void generateBadTuples(Writer writer, int num) throws IOException {
+    protected void generateBadTuples(OutputStream output, int num) throws IOException {
+        byte[] junk = "<http://broken\n".getBytes(utf8);
         for (int i = 0; i < num; i++) {
-            writer.write("<http://broken\n");
+            output.write(junk);
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected void generateMixedTuples(Writer writer, int num) throws IOException {
+    protected void generateMixedTuples(OutputStream output, int num) throws IOException {
         boolean bad = false;
         for (int i = 0; i < num; i++, bad = !bad) {
             if (bad) {
-                writer.write("<http://broken\n");
+                output.write("<http://broken\n".getBytes(utf8));
             } else {
-                writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n");
+                output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n").getBytes(utf8));
             }
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
index 3f2c8d2..0b6cfde 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
@@ -19,7 +19,8 @@
 package org.apache.jena.hadoop.rdf.io.input;
 
 import java.io.IOException;
-import java.io.Writer;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
 
 import org.apache.jena.hadoop.rdf.types.QuadWritable;
 import org.apache.jena.riot.Lang;
@@ -40,16 +41,17 @@ import com.hp.hpl.jena.sparql.core.Quad;
  * 
  * 
  */
-public abstract class AbstractWholeFileQuadInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+public abstract class AbstractWholeFileQuadInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+    
+    private static final Charset utf8 = Charset.forName("utf-8");
 
     @Override
     protected boolean canSplitInputs() {
         return false;
     }
 
-    @SuppressWarnings("deprecation")
-    private void writeTuples(Dataset ds, Writer writer) {
-        RDFDataMgr.write(writer, ds, RDFWriterRegistry.defaultSerialization(this.getRdfLanguage()));
+    private void writeTuples(Dataset ds, OutputStream output) {
+        RDFDataMgr.write(output, ds, RDFWriterRegistry.defaultSerialization(this.getRdfLanguage()));
     }
 
     /**
@@ -59,7 +61,7 @@ public abstract class AbstractWholeFileQuadInputFormatTests extends AbstractNode
      */
     protected abstract Lang getRdfLanguage();
 
-    private void writeGoodTuples(Writer writer, int num) throws IOException {
+    private void writeGoodTuples(OutputStream output, int num) throws IOException {
         Dataset ds = DatasetFactory.createMem();
         Model m = ModelFactory.createDefaultModel();
         Resource currSubj = m.createResource("http://example.org/subjects/0");
@@ -77,35 +79,37 @@ public abstract class AbstractWholeFileQuadInputFormatTests extends AbstractNode
         if (!m.isEmpty()) {
             ds.addNamedModel("http://example.org/graphs/extra", m);
         }
-        this.writeTuples(ds, writer);
+        this.writeTuples(ds, output);
     }
 
     @Override
-    protected final void generateTuples(Writer writer, int num) throws IOException {
-        this.writeGoodTuples(writer, num);
-        writer.close();
+    protected final void generateTuples(OutputStream output, int num) throws IOException {
+        this.writeGoodTuples(output, num);
+        output.close();
     }
 
     @Override
-    protected final void generateMixedTuples(Writer writer, int num) throws IOException {
+    protected final void generateMixedTuples(OutputStream output, int num) throws IOException {
         // Write good data
-        this.writeGoodTuples(writer, num / 2);
+        this.writeGoodTuples(output, num / 2);
 
-        // Write junk data
+        // Write junk data
+        byte[] junk = "junk data\n".getBytes(utf8);
         for (int i = 0; i < num / 2; i++) {
-            writer.write("junk data\n");
+            output.write(junk);
         }
 
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected final void generateBadTuples(Writer writer, int num) throws IOException {
+    protected final void generateBadTuples(OutputStream output, int num) throws IOException {
+        byte[] junk = "junk data\n".getBytes(utf8);
         for (int i = 0; i < num; i++) {
-            writer.write("junk data\n");
+            output.write(junk);
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
index bacd7ba..b68d662 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
@@ -19,7 +19,8 @@
 package org.apache.jena.hadoop.rdf.io.input;
 
 import java.io.IOException;
-import java.io.Writer;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
 
 import org.apache.jena.hadoop.rdf.types.TripleWritable;
 import org.apache.jena.riot.Lang;
@@ -37,18 +38,19 @@ import com.hp.hpl.jena.rdf.model.Resource;
  * 
  * 
  */
-public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
+public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
+    
+    private static final Charset utf8 = Charset.forName("utf-8");
 
     @Override
     protected boolean canSplitInputs() {
         return false;
     }
     
-    @SuppressWarnings("deprecation")
-    private void writeTuples(Model m, Writer writer) {
-        RDFDataMgr.write(writer, m, this.getRdfLanguage());
-    }
-    
+    private void writeTuples(Model m, OutputStream output) {
+        RDFDataMgr.write(output, m, this.getRdfLanguage());
+    }
+        
     /**
      * Gets the RDF language to write out generate tuples in
      * @return RDF language
@@ -56,7 +58,7 @@ public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNo
     protected abstract Lang getRdfLanguage();
     
     @Override
-    protected final void generateTuples(Writer writer, int num) throws IOException {
+    protected final void generateTuples(OutputStream output, int num) throws IOException {
         Model m = ModelFactory.createDefaultModel();
         Resource currSubj = m.createResource("http://example.org/subjects/0");
         Property predicate = m.createProperty("http://example.org/predicate");
@@ -66,12 +68,12 @@ public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNo
             }
             m.add(currSubj, predicate, m.createTypedLiteral(i));
         }
-        this.writeTuples(m, writer);
-        writer.close();
-    }
+        this.writeTuples(m, output);
+        output.close();
+    }
     
     @Override
-    protected final void generateMixedTuples(Writer writer, int num) throws IOException {
+    protected final void generateMixedTuples(OutputStream output, int num) throws IOException {
         // Write good data
         Model m = ModelFactory.createDefaultModel();
         Resource currSubj = m.createResource("http://example.org/subjects/0");
@@ -82,23 +84,25 @@ public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNo
             }
             m.add(currSubj, predicate, m.createTypedLiteral(i));
         }
-        this.writeTuples(m, writer);
+        this.writeTuples(m, output);
         
-        // Write junk data
+        // Write junk data
+        byte[] junk = "junk data\n".getBytes(utf8);
         for (int i = 0; i < num / 2; i++) {
-            writer.write("junk data\n");
+            output.write(junk);
         }
         
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected final void generateBadTuples(Writer writer, int num) throws IOException {
+    protected final void generateBadTuples(OutputStream output, int num) throws IOException {
+        byte[] junk = "junk data\n".getBytes(utf8);
         for (int i = 0; i < num; i++) {
-            writer.write("junk data\n");
+            output.write(junk);
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java
index 5725cf7..1f18a95 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java
@@ -22,9 +22,6 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -40,8 +37,8 @@ import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
  * @param <T>
  */
 public abstract class AbstractCompressedNodeTupleInputFormatTests<TValue, T extends AbstractNodeTupleWritable<TValue>> extends
-        AbstractNodeTupleInputFormatTests<TValue, T> {
-
+        AbstractNodeTupleInputFormatTests<TValue, T> {
+    
     @Override
     protected Configuration prepareConfiguration() {
         Configuration config = super.prepareConfiguration();
@@ -50,14 +47,13 @@ public abstract class AbstractCompressedNodeTupleInputFormatTests<TValue, T exte
     }
 
     @Override
-    protected Writer getWriter(File f) throws IOException {
+    protected OutputStream getOutputStream(File f) throws IOException {
         CompressionCodec codec = this.getCompressionCodec();
         if (codec instanceof Configurable) {
             ((Configurable) codec).setConf(this.prepareConfiguration());
         }
         FileOutputStream fileOutput = new FileOutputStream(f, false);
-        OutputStream output = codec.createOutputStream(fileOutput);
-        return new OutputStreamWriter(output);
+        return codec.createOutputStream(fileOutput);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java
index 386e772..312aae7 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java
@@ -19,7 +19,8 @@
 package org.apache.jena.hadoop.rdf.io.input.compressed;
 
 import java.io.IOException;
-import java.io.Writer;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
 
 import org.apache.jena.hadoop.rdf.types.QuadWritable;
 
@@ -32,37 +33,39 @@ import com.hp.hpl.jena.sparql.core.Quad;
  * 
  */
 public abstract class AbstractCompressedQuadsInputFormatTests extends
-        AbstractCompressedNodeTupleInputFormatTests<Quad, QuadWritable> {
+        AbstractCompressedNodeTupleInputFormatTests<Quad, QuadWritable> {
+    
+    private static final Charset utf8 = Charset.forName("utf-8");
 
     @Override
-    protected void generateTuples(Writer writer, int num) throws IOException {
+    protected void generateTuples(OutputStream output, int num) throws IOException {
         for (int i = 0; i < num; i++) {
-            writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n");
+            output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n").getBytes(utf8));
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected void generateBadTuples(Writer writer, int num) throws IOException {
+    protected void generateBadTuples(OutputStream output, int num) throws IOException {
         for (int i = 0; i < num; i++) {
-            writer.write("<http://broken\n");
+            output.write("<http://broken\n".getBytes(utf8));
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected void generateMixedTuples(Writer writer, int num) throws IOException {
+    protected void generateMixedTuples(OutputStream output, int num) throws IOException {
         boolean bad = false;
         for (int i = 0; i < num; i++, bad = !bad) {
             if (bad) {
-                writer.write("<http://broken\n");
+                output.write("<http://broken\n".getBytes(utf8));
             } else {
-                writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n");
+                output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n").getBytes(utf8));
             }
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java
index 5312b9e..f0f0caf 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java
@@ -19,7 +19,8 @@
 package org.apache.jena.hadoop.rdf.io.input.compressed;
 
 import java.io.IOException;
-import java.io.Writer;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
 
 import org.apache.jena.hadoop.rdf.types.TripleWritable;
 
@@ -32,37 +33,39 @@ import com.hp.hpl.jena.graph.Triple;
  * 
  */
 public abstract class AbstractCompressedTriplesInputFormatTests extends
-        AbstractCompressedNodeTupleInputFormatTests<Triple, TripleWritable> {
+        AbstractCompressedNodeTupleInputFormatTests<Triple, TripleWritable> {
+    
+    private static final Charset utf8 = Charset.forName("utf-8");
 
     @Override
-    protected void generateTuples(Writer writer, int num) throws IOException {
+    protected void generateTuples(OutputStream output, int num) throws IOException {
         for (int i = 0; i < num; i++) {
-            writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n");
+            output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n").getBytes(utf8));
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected void generateBadTuples(Writer writer, int num) throws IOException {
+    protected void generateBadTuples(OutputStream output, int num) throws IOException {
         for (int i = 0; i < num; i++) {
-            writer.write("<http://broken\n");
+            output.write("<http://broken\n".getBytes(utf8));
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected void generateMixedTuples(Writer writer, int num) throws IOException {
+    protected void generateMixedTuples(OutputStream output, int num) throws IOException {
         boolean bad = false;
         for (int i = 0; i < num; i++, bad = !bad) {
             if (bad) {
-                writer.write("<http://broken\n");
+                output.write("<http://broken\n".getBytes(utf8));
             } else {
-                writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n");
+                output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n").getBytes(utf8));
             }
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java
index 3dd0bd0..be2b1d7 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java
@@ -22,8 +22,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
+import java.nio.charset.Charset;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -33,8 +32,8 @@ import org.apache.jena.hadoop.rdf.io.input.AbstractNodeTupleInputFormatTests;
 import org.apache.jena.hadoop.rdf.types.QuadWritable;
 import org.apache.jena.riot.Lang;
 import org.apache.jena.riot.RDFDataMgr;
-import org.apache.jena.riot.RDFWriterRegistry;
-
+import org.apache.jena.riot.RDFWriterRegistry;
+
 import com.hp.hpl.jena.query.Dataset;
 import com.hp.hpl.jena.query.DatasetFactory;
 import com.hp.hpl.jena.rdf.model.Model;
@@ -49,7 +48,9 @@ import com.hp.hpl.jena.sparql.core.Quad;
  * 
  */
 public abstract class AbstractCompressedWholeFileQuadInputFormatTests extends
-        AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+        AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+    
+    private static final Charset utf8 = Charset.forName("utf-8");
 
     @Override
     protected Configuration prepareConfiguration() {
@@ -59,14 +60,13 @@ public abstract class AbstractCompressedWholeFileQuadInputFormatTests extends
     }
 
     @Override
-    protected Writer getWriter(File f) throws IOException {
+    protected OutputStream getOutputStream(File f) throws IOException {
         CompressionCodec codec = this.getCompressionCodec();
         if (codec instanceof Configurable) {
             ((Configurable) codec).setConf(this.prepareConfiguration());
         }
         FileOutputStream fileOutput = new FileOutputStream(f, false);
-        OutputStream output = codec.createOutputStream(fileOutput);
-        return new OutputStreamWriter(output);
+        return codec.createOutputStream(fileOutput);
     }
 
     /**
@@ -85,9 +85,8 @@ public abstract class AbstractCompressedWholeFileQuadInputFormatTests extends
         return false;
     }
 
-    @SuppressWarnings("deprecation")
-    private void writeTuples(Dataset ds, Writer writer) {
-        RDFDataMgr.write(writer, ds, RDFWriterRegistry.defaultSerialization(this.getRdfLanguage()));
+    private void writeTuples(Dataset ds, OutputStream output) {
+        RDFDataMgr.write(output, ds, RDFWriterRegistry.defaultSerialization(this.getRdfLanguage()));
     }
 
     /**
@@ -97,7 +96,7 @@ public abstract class AbstractCompressedWholeFileQuadInputFormatTests extends
      */
     protected abstract Lang getRdfLanguage();
 
-    private void writeGoodTuples(Writer writer, int num) throws IOException {
+    private void writeGoodTuples(OutputStream output, int num) throws IOException {
         Dataset ds = DatasetFactory.createMem();
         Model m = ModelFactory.createDefaultModel();
         Resource currSubj = m.createResource("http://example.org/subjects/0");
@@ -115,35 +114,37 @@ public abstract class AbstractCompressedWholeFileQuadInputFormatTests extends
         if (!m.isEmpty()) {
             ds.addNamedModel("http://example.org/graphs/extra", m);
         }
-        this.writeTuples(ds, writer);
+        this.writeTuples(ds, output);
     }
 
     @Override
-    protected final void generateTuples(Writer writer, int num) throws IOException {
-        this.writeGoodTuples(writer, num);
-        writer.close();
+    protected final void generateTuples(OutputStream output, int num) throws IOException {
+        this.writeGoodTuples(output, num);
+        output.close();
     }
 
     @Override
-    protected final void generateMixedTuples(Writer writer, int num) throws IOException {
+    protected final void generateMixedTuples(OutputStream output, int num) throws IOException {
         // Write good data
-        this.writeGoodTuples(writer, num / 2);
+        this.writeGoodTuples(output, num / 2);
 
-        // Write junk data
+        // Write junk data
+        byte[] junk = "junk data\n".getBytes(utf8);
         for (int i = 0; i < num / 2; i++) {
-            writer.write("junk data\n");
+            output.write(junk);
         }
 
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected final void generateBadTuples(Writer writer, int num) throws IOException {
+    protected final void generateBadTuples(OutputStream output, int num) throws IOException {
+        byte[] junk = "junk data\n".getBytes(utf8);
         for (int i = 0; i < num; i++) {
-            writer.write("junk data\n");
+            output.write(junk);
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 }