You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2016/05/14 14:03:14 UTC

[15/42] jena git commit: Merge commit 'refs/pull/143/head' of
diff --cc jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/
index 155d10a,155d10a..bfaff01
--- a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/
@@@ -1,611 -1,611 +1,611 @@@
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--import java.util.List;
--import org.apache.hadoop.conf.Configuration;
--import org.apache.hadoop.fs.FileSystem;
--import org.apache.hadoop.fs.Path;
--import org.apache.hadoop.mapreduce.InputFormat;
--import org.apache.hadoop.mapreduce.InputSplit;
--import org.apache.hadoop.mapreduce.Job;
--import org.apache.hadoop.mapreduce.JobContext;
--import org.apache.hadoop.mapreduce.RecordReader;
--import org.apache.hadoop.mapreduce.TaskAttemptContext;
--import org.apache.hadoop.mapreduce.TaskAttemptID;
--import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
--import org.apache.hadoop.mapreduce.lib.input.FileSplit;
--import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
--import org.apache.hadoop.mapreduce.task.JobContextImpl;
--import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
--import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
--import org.junit.* ;
--import org.junit.rules.TemporaryFolder;
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
-- * Abstract node tuple input format tests
-- * 
-- * 
-- * 
-- * @param <TValue>
-- * @param <T>
-- */
--public abstract class AbstractNodeTupleInputFormatTests<TValue, T extends AbstractNodeTupleWritable<TValue>> {
--    private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeTupleInputFormatTests.class);
--    protected static final int EMPTY_SIZE = 0, SMALL_SIZE = 100, LARGE_SIZE = 10000, BAD_SIZE = 100, MIXED_SIZE = 100;
--    protected static final String EMPTY = "empty";
--    protected static final String SMALL = "small";
--    protected static final String LARGE = "large";
--    protected static final String BAD = "bad";
--    protected static final String MIXED = "mixed";
--    /**
--     * Temporary folder for the tests
--     */
--    @Rule
--    public TemporaryFolder folder = new TemporaryFolder();
--    protected File empty, small, large, bad, mixed;
--    /**
--     * Prepares the inputs for the tests
--     * 
--     * @throws IOException
--     */
--    @Before
--    public void beforeTest() throws IOException {
--        this.prepareInputs();
--    }
--    /**
--     * Cleans up the inputs after each test
--     */
--    @After
--    public void afterTest() {
--        // Should be unnecessary since JUnit will clean up the temporary folder
--        // anyway but best to do this regardless
--        if (empty != null)
--            empty.delete();
--        if (small != null)
--            small.delete();
--        if (large != null)
--            large.delete();
--        if (bad != null)
--            bad.delete();
--        if (mixed != null)
--            mixed.delete();
--    }
--    /**
--     * Prepares a fresh configuration
--     * 
--     * @return Configuration
--     */
--    protected Configuration prepareConfiguration() {
--        Configuration config = new Configuration(true);
--        // Nothing else to do
--        return config;
--    }
--    /**
--     * Prepares the inputs
--     * 
--     * @throws IOException
--     */
--    protected void prepareInputs() throws IOException {
--        String ext = this.getFileExtension();
--        empty = folder.newFile(EMPTY + ext);
--        this.generateTuples(empty, EMPTY_SIZE);
--        small = folder.newFile(SMALL + ext);
--        this.generateTuples(small, SMALL_SIZE);
--        large = folder.newFile(LARGE + ext);
--        this.generateTuples(large, LARGE_SIZE);
--        bad = folder.newFile(BAD + ext);
--        this.generateBadTuples(bad, BAD_SIZE);
--        mixed = folder.newFile(MIXED + ext);
--        this.generateMixedTuples(mixed, MIXED_SIZE);
--    }
--    /**
--     * Gets the extra file extension to add to the filenames
--     * 
--     * @return File extension
--     */
--    protected abstract String getFileExtension();
--    /**
--     * Generates tuples used for tests
--     * 
--     * @param f
--     *            File
--     * @param num
--     *            Number of tuples to generate
--     * @throws IOException
--     */
--    protected final void generateTuples(File f, int num) throws IOException {
--        this.generateTuples(this.getOutputStream(f), num);
--    }
--    /**
--     * Gets the output stream to use for generating tuples
--     * 
--     * @param f
--     *            File
--     * @return Output Stream
--     * @throws IOException
--     */
--    protected OutputStream getOutputStream(File f) throws IOException {
--        return new FileOutputStream(f, false);
--    }
--    /**
--     * Generates tuples used for tests
--     * 
--     * @param output
--     *            Output Stream to write to
--     * @param num
--     *            Number of tuples to generate
--     * @throws IOException
--     */
--    protected abstract void generateTuples(OutputStream output, int num) throws IOException;
--    /**
--     * Generates bad tuples used for tests
--     * 
--     * @param f
--     *            File
--     * @param num
--     *            Number of bad tuples to generate
--     * @throws IOException
--     */
--    protected final void generateBadTuples(File f, int num) throws IOException {
--        this.generateBadTuples(this.getOutputStream(f), num);
--    }
--    /**
--     * Generates bad tuples used for tests
--     * 
--     * @param output
--     *            Output Stream to write to
--     * @param num
--     *            Number of bad tuples to generate
--     * @throws IOException
--     */
--    protected abstract void generateBadTuples(OutputStream output, int num) throws IOException;
--    /**
--     * Generates a mixture of good and bad tuples used for tests
--     * 
--     * @param f
--     *            File
--     * @param num
--     *            Number of tuples to generate, they should be a 50/50 mix of
--     *            good and bad tuples
--     * @throws IOException
--     */
--    protected final void generateMixedTuples(File f, int num) throws IOException {
--        this.generateMixedTuples(this.getOutputStream(f), num);
--    }
--    /**
--     * Generates a mixture of good and bad tuples used for tests
--     * 
--     * @param output
--     *            Output Stream to write to
--     * @param num
--     *            Number of tuples to generate, they should be a 50/50 mix of
--     *            good and bad tuples
--     * @throws IOException
--     */
--    protected abstract void generateMixedTuples(OutputStream output, int num) throws IOException;
--    /**
--     * Adds an input path to the job configuration
--     * 
--     * @param f
--     *            File
--     * @param config
--     *            Configuration
--     * @param job
--     *            Job
--     * @throws IOException
--     */
--    protected void addInputPath(File f, Configuration config, Job job) throws IOException {
--        FileSystem fs = FileSystem.getLocal(config);
--        Path inputPath = fs.makeQualified(new Path(f.getAbsolutePath()));
--        FileInputFormat.addInputPath(job, inputPath);
--    }
--    protected final int countTuples(RecordReader<LongWritable, T> reader) throws IOException, InterruptedException {
--        int count = 0;
--        // Check initial progress
--"Initial Reported Progress %f", reader.getProgress()));
--        float progress = reader.getProgress();
--        if (, progress) == 0) {
--            Assert.assertEquals(0.0d, reader.getProgress(), 0.0d);
--        } else if (, progress) == 0) {
--            // If reader is reported 1.0 straight away then we expect there to
--            // be no key values
--            Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
--            Assert.assertFalse(reader.nextKeyValue());
--        } else {
--                    "Expected progress of 0.0 or 1.0 before reader has been accessed for first time but got %f",
--                    progress));
--        }
--        // Count tuples
--        boolean debug = LOG.isDebugEnabled();
--        while (reader.nextKeyValue()) {
--            count++;
--            progress = reader.getProgress();
--            if (debug)
--                LOG.debug(String.format("Current Reported Progress %f", progress));
--            Assert.assertTrue(String.format("Progress should be in the range 0.0 < p <= 1.0 but got %f", progress),
--                    progress > 0.0f && progress <= 1.0f);
--        }
--        reader.close();
--"Got %d tuples from this record reader", count));
--        // Check final progress
--"Final Reported Progress %f", reader.getProgress()));
--        Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
--        return count;
--    }
--    protected final void checkTuples(RecordReader<LongWritable, T> reader, int expected) throws IOException,
--            InterruptedException {
--        Assert.assertEquals(expected, this.countTuples(reader));
--    }
--    /**
--     * Runs a test with a single input
--     * 
--     * @param input
--     *            Input
--     * @param expectedTuples
--     *            Expected tuples
--     * @throws IOException
--     * @throws InterruptedException
--     */
--    protected final void testSingleInput(File input, int expectedSplits, int expectedTuples) throws IOException,
--            InterruptedException {
--        // Prepare configuration
--        Configuration config = this.prepareConfiguration();
--        this.testSingleInput(config, input, expectedSplits, expectedTuples);
--    }
--    /**
--     * Runs a test with a single input
--     * 
--     * @param config
--     *            Configuration
--     * @param input
--     *            Input
--     * @param expectedTuples
--     *            Expected tuples
--     * @throws IOException
--     * @throws InterruptedException
--     */
--    protected final void testSingleInput(Configuration config, File input, int expectedSplits, int expectedTuples)
--            throws IOException, InterruptedException {
--        // Set up fake job
--        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
--        Job job = Job.getInstance(config);
--        job.setInputFormatClass(inputFormat.getClass());
--        this.addInputPath(input, job.getConfiguration(), job);
--        JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
--        Assert.assertEquals(1, FileInputFormat.getInputPaths(context).length);
--        NLineInputFormat.setNumLinesPerSplit(job, LARGE_SIZE);
--        // Check splits
--        List<InputSplit> splits = inputFormat.getSplits(context);
--        Assert.assertEquals(expectedSplits, splits.size());
--        // Check tuples
--        for (InputSplit split : splits) {
--            TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
--            RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
--            reader.initialize(split, taskContext);
--            this.checkTuples(reader, expectedTuples);
--        }
--    }
--    protected abstract InputFormat<LongWritable, T> getInputFormat();
--    /**
--     * Basic tuples input test
--     * 
--     * @throws IOException
--     * @throws InterruptedException
--     */
--    @Test
--    public final void single_input_01() throws IOException, InterruptedException {
--        testSingleInput(empty, this.canSplitInputs() ? 0 : 1, EMPTY_SIZE);
--    }
--    /**
--     * Basic tuples input test
--     * 
--     * @throws IOException
--     * @throws InterruptedException
--     */
--    @Test
--    public final void single_input_02() throws IOException, InterruptedException {
--        testSingleInput(small, 1, SMALL_SIZE);
--    }
--    /**
--     * Basic tuples input test
--     * 
--     * @throws IOException
--     * @throws InterruptedException
--     */
--    @Test
--    public final void single_input_03() throws IOException, InterruptedException {
--        testSingleInput(large, 1, LARGE_SIZE);
--    }
--    /**
--     * Basic tuples input test
--     * 
--     * @throws IOException
--     * @throws InterruptedException
--     */
--    @Test
--    public final void single_input_04() throws IOException, InterruptedException {
--        testSingleInput(bad, 1, 0);
--    }
--    /**
--     * Basic tuples input test
--     * 
--     * @throws IOException
--     * @throws InterruptedException
--     */
--    @Test
--    public final void single_input_05() throws IOException, InterruptedException {
--        // JSON-LD overrides this because in JSON-LD parsing a bad document gives no triples. 
--        int x = single_input_05_expected() ;
--        testSingleInput(mixed, 1, x);
--    }
--    /** Results exected for test single_input_05 */ 
--    protected int single_input_05_expected() {
--        return MIXED_SIZE / 2 ;
--    }
--    /**
--     * Tests behaviour when ignoring bad tuples is disabled
--     * 
--     * @throws InterruptedException
--     * @throws IOException
--     */
--    @Test(expected = IOException.class)
--    public final void fail_on_bad_input_01() throws IOException, InterruptedException {
--        Configuration config = this.prepareConfiguration();
--        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
--        Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true));
--        testSingleInput(config, bad, 1, 0);
--    }
--    /**
--     * Tests behaviour when ignoring bad tuples is disabled
--     * 
--     * @throws InterruptedException
--     * @throws IOException
--     */
--    @Test(expected = IOException.class)
--    public final void fail_on_bad_input_02() throws IOException, InterruptedException {
--        Configuration config = this.prepareConfiguration();
--        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
--        Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true));
--        testSingleInput(config, mixed, 1, MIXED_SIZE / 2);
--    }
--    /**
--     * Runs a multiple input test
--     * 
--     * @param inputs
--     *            Inputs
--     * @param expectedSplits
--     *            Number of splits expected
--     * @param expectedTuples
--     *            Number of tuples expected
--     * @throws IOException
--     * @throws InterruptedException
--     */
--    protected final void testMultipleInputs(File[] inputs, int expectedSplits, int expectedTuples) throws IOException,
--            InterruptedException {
--        // Prepare configuration and inputs
--        Configuration config = this.prepareConfiguration();
--        // Set up fake job
--        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
--        Job job = Job.getInstance(config);
--        job.setInputFormatClass(inputFormat.getClass());
--        for (File input : inputs) {
--            this.addInputPath(input, job.getConfiguration(), job);
--        }
--        JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
--        Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length);
--        NLineInputFormat.setNumLinesPerSplit(job, expectedTuples);
--        // Check splits
--        List<InputSplit> splits = inputFormat.getSplits(context);
--        Assert.assertEquals(expectedSplits, splits.size());
--        // Check tuples
--        int count = 0;
--        for (InputSplit split : splits) {
--            TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
--            RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
--            reader.initialize(split, taskContext);
--            count += this.countTuples(reader);
--        }
--        Assert.assertEquals(expectedTuples, count);
--    }
--    /**
--     * tuples test with multiple inputs
--     * 
--     * @throws IOException
--     * @throws InterruptedException
--     */
--    @Test
--    public final void multiple_inputs_01() throws IOException, InterruptedException {
--        testMultipleInputs(new File[] { empty, small, large }, this.canSplitInputs() ? 2 : 3, EMPTY_SIZE + SMALL_SIZE
--                + LARGE_SIZE);
--    }
--    /**
--     * tuples test with multiple inputs
--     * 
--     * @throws IOException
--     * @throws InterruptedException
--     */
--    @Test
--    public final void multiple_inputs_02() throws IOException, InterruptedException {
--        int expectedTriples = multiple_inputs_02_expected() ; 
--        testMultipleInputs(new File[] { folder.getRoot() }, this.canSplitInputs() ? 4 : 5, expectedTriples);
--    }
--    /** Results exected for test multiple_inputs_02.
--     * JSON_LD has different characteristics on bad documents.
--     * See {@link #single_input_05}.
--     */ 
--    protected int multiple_inputs_02_expected() {
--        return EMPTY_SIZE + SMALL_SIZE + LARGE_SIZE + (MIXED_SIZE / 2) ;
--    }
--    protected final void testSplitInputs(Configuration config, File[] inputs, int expectedSplits, int expectedTuples)
--            throws IOException, InterruptedException {
--        // Set up fake job
--        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
--        Job job = Job.getInstance(config);
--        job.setInputFormatClass(inputFormat.getClass());
--        for (File input : inputs) {
--            this.addInputPath(input, job.getConfiguration(), job);
--        }
--        JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
--        Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length);
--        // Check splits
--        List<InputSplit> splits = inputFormat.getSplits(context);
--        Assert.assertEquals(expectedSplits, splits.size());
--        // Check tuples
--        int count = 0;
--        for (InputSplit split : splits) {
--            // Validate split
--            Assert.assertTrue(this.isValidSplit(split, config));
--            // Read split
--            TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
--            RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
--            reader.initialize(split, taskContext);
--            count += this.countTuples(reader);
--        }
--        Assert.assertEquals(expectedTuples, count);
--    }
--    /**
--     * Determines whether an input split is valid
--     * 
--     * @param split
--     *            Input split
--     * @return True if a valid split, false otherwise
--     */
--    protected boolean isValidSplit(InputSplit split, Configuration config) {
--        return split instanceof FileSplit;
--    }
--    /**
--     * Indicates whether inputs can be split, defaults to true
--     * 
--     * @return Whether inputs can be split
--     */
--    protected boolean canSplitInputs() {
--        return true;
--    }
--    /**
--     * Tests for input splitting
--     * 
--     * @throws IOException
--     * @throws InterruptedException
--     */
--    @Test
--    public final void split_input_01() throws IOException, InterruptedException {
--        Assume.assumeTrue(this.canSplitInputs());
--        Configuration config = this.prepareConfiguration();
--        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
--        Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
--        this.testSplitInputs(config, new File[] { small }, 100, SMALL_SIZE);
--    }
--    /**
--     * Tests for input splitting
--     * 
--     * @throws IOException
--     * @throws InterruptedException
--     */
--    @Test
--    public final void split_input_02() throws IOException, InterruptedException {
--        Assume.assumeTrue(this.canSplitInputs());
--        Configuration config = this.prepareConfiguration();
--        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
--        config.setLong(NLineInputFormat.LINES_PER_MAP, 10);
--        Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
--        this.testSplitInputs(config, new File[] { small }, 10, SMALL_SIZE);
--    }
--    /**
--     * Tests for input splitting
--     * 
--     * @throws IOException
--     * @throws InterruptedException
--     */
--    @Test
--    public final void split_input_03() throws IOException, InterruptedException {
--        Assume.assumeTrue(this.canSplitInputs());
--        Configuration config = this.prepareConfiguration();
--        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
--        config.setLong(NLineInputFormat.LINES_PER_MAP, 100);
--        Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
--        this.testSplitInputs(config, new File[] { large }, 100, LARGE_SIZE);
--    }
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++import java.util.List;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.mapreduce.InputFormat;
++import org.apache.hadoop.mapreduce.InputSplit;
++import org.apache.hadoop.mapreduce.Job;
++import org.apache.hadoop.mapreduce.JobContext;
++import org.apache.hadoop.mapreduce.RecordReader;
++import org.apache.hadoop.mapreduce.TaskAttemptContext;
++import org.apache.hadoop.mapreduce.TaskAttemptID;
++import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
++import org.apache.hadoop.mapreduce.lib.input.FileSplit;
++import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
++import org.apache.hadoop.mapreduce.task.JobContextImpl;
++import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
++import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
++import org.junit.* ;
++import org.junit.rules.TemporaryFolder;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++ * Abstract node tuple input format tests
++ * 
++ * 
++ * 
++ * @param <TValue>
++ * @param <T>
++ */
++public abstract class AbstractNodeTupleInputFormatTests<TValue, T extends AbstractNodeTupleWritable<TValue>> {
++    private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeTupleInputFormatTests.class);
++    protected static final int EMPTY_SIZE = 0, SMALL_SIZE = 100, LARGE_SIZE = 10000, BAD_SIZE = 100, MIXED_SIZE = 100;
++    protected static final String EMPTY = "empty";
++    protected static final String SMALL = "small";
++    protected static final String LARGE = "large";
++    protected static final String BAD = "bad";
++    protected static final String MIXED = "mixed";
++    /**
++     * Temporary folder for the tests
++     */
++    @Rule
++    public TemporaryFolder folder = new TemporaryFolder();
++    protected File empty, small, large, bad, mixed;
++    /**
++     * Prepares the inputs for the tests
++     * 
++     * @throws IOException
++     */
++    @Before
++    public void beforeTest() throws IOException {
++        this.prepareInputs();
++    }
++    /**
++     * Cleans up the inputs after each test
++     */
++    @After
++    public void afterTest() {
++        // Should be unnecessary since JUnit will clean up the temporary folder
++        // anyway but best to do this regardless
++        if (empty != null)
++            empty.delete();
++        if (small != null)
++            small.delete();
++        if (large != null)
++            large.delete();
++        if (bad != null)
++            bad.delete();
++        if (mixed != null)
++            mixed.delete();
++    }
++    /**
++     * Prepares a fresh configuration
++     * 
++     * @return Configuration
++     */
++    protected Configuration prepareConfiguration() {
++        Configuration config = new Configuration(true);
++        // Nothing else to do
++        return config;
++    }
++    /**
++     * Prepares the inputs
++     * 
++     * @throws IOException
++     */
++    protected void prepareInputs() throws IOException {
++        String ext = this.getFileExtension();
++        empty = folder.newFile(EMPTY + ext);
++        this.generateTuples(empty, EMPTY_SIZE);
++        small = folder.newFile(SMALL + ext);
++        this.generateTuples(small, SMALL_SIZE);
++        large = folder.newFile(LARGE + ext);
++        this.generateTuples(large, LARGE_SIZE);
++        bad = folder.newFile(BAD + ext);
++        this.generateBadTuples(bad, BAD_SIZE);
++        mixed = folder.newFile(MIXED + ext);
++        this.generateMixedTuples(mixed, MIXED_SIZE);
++    }
++    /**
++     * Gets the extra file extension to add to the filenames
++     * 
++     * @return File extension
++     */
++    protected abstract String getFileExtension();
++    /**
++     * Generates tuples used for tests
++     * 
++     * @param f
++     *            File
++     * @param num
++     *            Number of tuples to generate
++     * @throws IOException
++     */
++    protected final void generateTuples(File f, int num) throws IOException {
++        this.generateTuples(this.getOutputStream(f), num);
++    }
++    /**
++     * Gets the output stream to use for generating tuples
++     * 
++     * @param f
++     *            File
++     * @return Output Stream
++     * @throws IOException
++     */
++    protected OutputStream getOutputStream(File f) throws IOException {
++        return new FileOutputStream(f, false);
++    }
++    /**
++     * Generates tuples used for tests
++     * 
++     * @param output
++     *            Output Stream to write to
++     * @param num
++     *            Number of tuples to generate
++     * @throws IOException
++     */
++    protected abstract void generateTuples(OutputStream output, int num) throws IOException;
++    /**
++     * Generates bad tuples used for tests
++     * 
++     * @param f
++     *            File
++     * @param num
++     *            Number of bad tuples to generate
++     * @throws IOException
++     */
++    protected final void generateBadTuples(File f, int num) throws IOException {
++        this.generateBadTuples(this.getOutputStream(f), num);
++    }
++    /**
++     * Generates bad tuples used for tests
++     * 
++     * @param output
++     *            Output Stream to write to
++     * @param num
++     *            Number of bad tuples to generate
++     * @throws IOException
++     */
++    protected abstract void generateBadTuples(OutputStream output, int num) throws IOException;
++    /**
++     * Generates a mixture of good and bad tuples used for tests
++     * 
++     * @param f
++     *            File
++     * @param num
++     *            Number of tuples to generate, they should be a 50/50 mix of
++     *            good and bad tuples
++     * @throws IOException
++     */
++    protected final void generateMixedTuples(File f, int num) throws IOException {
++        this.generateMixedTuples(this.getOutputStream(f), num);
++    }
++    /**
++     * Generates a mixture of good and bad tuples used for tests
++     * 
++     * @param output
++     *            Output Stream to write to
++     * @param num
++     *            Number of tuples to generate, they should be a 50/50 mix of
++     *            good and bad tuples
++     * @throws IOException
++     */
++    protected abstract void generateMixedTuples(OutputStream output, int num) throws IOException;
++    /**
++     * Adds an input path to the job configuration
++     * 
++     * @param f
++     *            File
++     * @param config
++     *            Configuration
++     * @param job
++     *            Job
++     * @throws IOException
++     */
++    protected void addInputPath(File f, Configuration config, Job job) throws IOException {
++        FileSystem fs = FileSystem.getLocal(config);
++        Path inputPath = fs.makeQualified(new Path(f.getAbsolutePath()));
++        FileInputFormat.addInputPath(job, inputPath);
++    }
++    protected final int countTuples(RecordReader<LongWritable, T> reader) throws IOException, InterruptedException {
++        int count = 0;
++        // Check initial progress
++"Initial Reported Progress %f", reader.getProgress()));
++        float progress = reader.getProgress();
++        if (, progress) == 0) {
++            Assert.assertEquals(0.0d, reader.getProgress(), 0.0d);
++        } else if (, progress) == 0) {
++            // If reader is reported 1.0 straight away then we expect there to
++            // be no key values
++            Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
++            Assert.assertFalse(reader.nextKeyValue());
++        } else {
++                    "Expected progress of 0.0 or 1.0 before reader has been accessed for first time but got %f",
++                    progress));
++        }
++        // Count tuples
++        boolean debug = LOG.isDebugEnabled();
++        while (reader.nextKeyValue()) {
++            count++;
++            progress = reader.getProgress();
++            if (debug)
++                LOG.debug(String.format("Current Reported Progress %f", progress));
++            Assert.assertTrue(String.format("Progress should be in the range 0.0 < p <= 1.0 but got %f", progress),
++                    progress > 0.0f && progress <= 1.0f);
++        }
++        reader.close();
++"Got %d tuples from this record reader", count));
++        // Check final progress
++"Final Reported Progress %f", reader.getProgress()));
++        Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
++        return count;
++    }
++    protected final void checkTuples(RecordReader<LongWritable, T> reader, int expected) throws IOException,
++            InterruptedException {
++        Assert.assertEquals(expected, this.countTuples(reader));
++    }
++    /**
++     * Runs a test with a single input
++     * 
++     * @param input
++     *            Input
++     * @param expectedTuples
++     *            Expected tuples
++     * @throws IOException
++     * @throws InterruptedException
++     */
++    protected final void testSingleInput(File input, int expectedSplits, int expectedTuples) throws IOException,
++            InterruptedException {
++        // Prepare configuration
++        Configuration config = this.prepareConfiguration();
++        this.testSingleInput(config, input, expectedSplits, expectedTuples);
++    }
++    /**
++     * Runs a test with a single input
++     * 
++     * @param config
++     *            Configuration
++     * @param input
++     *            Input
++     * @param expectedTuples
++     *            Expected tuples
++     * @throws IOException
++     * @throws InterruptedException
++     */
++    protected final void testSingleInput(Configuration config, File input, int expectedSplits, int expectedTuples)
++            throws IOException, InterruptedException {
++        // Set up fake job
++        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
++        Job job = Job.getInstance(config);
++        job.setInputFormatClass(inputFormat.getClass());
++        this.addInputPath(input, job.getConfiguration(), job);
++        JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
++        Assert.assertEquals(1, FileInputFormat.getInputPaths(context).length);
++        NLineInputFormat.setNumLinesPerSplit(job, LARGE_SIZE);
++        // Check splits
++        List<InputSplit> splits = inputFormat.getSplits(context);
++        Assert.assertEquals(expectedSplits, splits.size());
++        // Check tuples
++        for (InputSplit split : splits) {
++            TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
++            RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
++            reader.initialize(split, taskContext);
++            this.checkTuples(reader, expectedTuples);
++        }
++    }
++    protected abstract InputFormat<LongWritable, T> getInputFormat();
++    /**
++     * Basic tuples input test
++     * 
++     * @throws IOException
++     * @throws InterruptedException
++     */
++    @Test
++    public final void single_input_01() throws IOException, InterruptedException {
++        testSingleInput(empty, this.canSplitInputs() ? 0 : 1, EMPTY_SIZE);
++    }
++    /**
++     * Basic tuples input test
++     * 
++     * @throws IOException
++     * @throws InterruptedException
++     */
++    @Test
++    public final void single_input_02() throws IOException, InterruptedException {
++        testSingleInput(small, 1, SMALL_SIZE);
++    }
++    /**
++     * Basic tuples input test
++     * 
++     * @throws IOException
++     * @throws InterruptedException
++     */
++    @Test
++    public final void single_input_03() throws IOException, InterruptedException {
++        testSingleInput(large, 1, LARGE_SIZE);
++    }
++    /**
++     * Basic tuples input test
++     * 
++     * @throws IOException
++     * @throws InterruptedException
++     */
++    @Test
++    public final void single_input_04() throws IOException, InterruptedException {
++        testSingleInput(bad, 1, 0);
++    }
++    /**
++     * Basic tuples input test
++     * 
++     * @throws IOException
++     * @throws InterruptedException
++     */
++    @Test
++    public final void single_input_05() throws IOException, InterruptedException {
++        // JSON-LD overrides this because in JSON-LD parsing a bad document gives no triples. 
++        int x = single_input_05_expected() ;
++        testSingleInput(mixed, 1, x);
++    }
++    /** Results exected for test single_input_05 */ 
++    protected int single_input_05_expected() {
++        return MIXED_SIZE / 2 ;
++    }
++    /**
++     * Tests behaviour when ignoring bad tuples is disabled
++     * 
++     * @throws InterruptedException
++     * @throws IOException
++     */
++    @Test(expected = IOException.class)
++    public final void fail_on_bad_input_01() throws IOException, InterruptedException {
++        Configuration config = this.prepareConfiguration();
++        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
++        Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true));
++        testSingleInput(config, bad, 1, 0);
++    }
++    /**
++     * Tests behaviour when ignoring bad tuples is disabled
++     * 
++     * @throws InterruptedException
++     * @throws IOException
++     */
++    @Test(expected = IOException.class)
++    public final void fail_on_bad_input_02() throws IOException, InterruptedException {
++        Configuration config = this.prepareConfiguration();
++        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
++        Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true));
++        testSingleInput(config, mixed, 1, MIXED_SIZE / 2);
++    }
++    /**
++     * Runs a multiple input test
++     * 
++     * @param inputs
++     *            Inputs
++     * @param expectedSplits
++     *            Number of splits expected
++     * @param expectedTuples
++     *            Number of tuples expected
++     * @throws IOException
++     * @throws InterruptedException
++     */
++    protected final void testMultipleInputs(File[] inputs, int expectedSplits, int expectedTuples) throws IOException,
++            InterruptedException {
++        // Prepare configuration and inputs
++        Configuration config = this.prepareConfiguration();
++        // Set up fake job
++        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
++        Job job = Job.getInstance(config);
++        job.setInputFormatClass(inputFormat.getClass());
++        for (File input : inputs) {
++            this.addInputPath(input, job.getConfiguration(), job);
++        }
++        JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
++        Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length);
++        NLineInputFormat.setNumLinesPerSplit(job, expectedTuples);
++        // Check splits
++        List<InputSplit> splits = inputFormat.getSplits(context);
++        Assert.assertEquals(expectedSplits, splits.size());
++        // Check tuples
++        int count = 0;
++        for (InputSplit split : splits) {
++            TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
++            RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
++            reader.initialize(split, taskContext);
++            count += this.countTuples(reader);
++        }
++        Assert.assertEquals(expectedTuples, count);
++    }
++    /**
++     * tuples test with multiple inputs
++     * 
++     * @throws IOException
++     * @throws InterruptedException
++     */
++    @Test
++    public final void multiple_inputs_01() throws IOException, InterruptedException {
++        testMultipleInputs(new File[] { empty, small, large }, this.canSplitInputs() ? 2 : 3, EMPTY_SIZE + SMALL_SIZE
++                + LARGE_SIZE);
++    }
++    /**
++     * tuples test with multiple inputs
++     * 
++     * @throws IOException
++     * @throws InterruptedException
++     */
++    @Test
++    public final void multiple_inputs_02() throws IOException, InterruptedException {
++        int expectedTriples = multiple_inputs_02_expected() ; 
++        testMultipleInputs(new File[] { folder.getRoot() }, this.canSplitInputs() ? 4 : 5, expectedTriples);
++    }
++    /** Results exected for test multiple_inputs_02.
++     * JSON_LD has different characteristics on bad documents.
++     * See {@link #single_input_05}.
++     */ 
++    protected int multiple_inputs_02_expected() {
++        return EMPTY_SIZE + SMALL_SIZE + LARGE_SIZE + (MIXED_SIZE / 2) ;
++    }
++    protected final void testSplitInputs(Configuration config, File[] inputs, int expectedSplits, int expectedTuples)
++            throws IOException, InterruptedException {
++        // Set up fake job
++        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
++        Job job = Job.getInstance(config);
++        job.setInputFormatClass(inputFormat.getClass());
++        for (File input : inputs) {
++            this.addInputPath(input, job.getConfiguration(), job);
++        }
++        JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
++        Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length);
++        // Check splits
++        List<InputSplit> splits = inputFormat.getSplits(context);
++        Assert.assertEquals(expectedSplits, splits.size());
++        // Check tuples
++        int count = 0;
++        for (InputSplit split : splits) {
++            // Validate split
++            Assert.assertTrue(this.isValidSplit(split, config));
++            // Read split
++            TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
++            RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
++            reader.initialize(split, taskContext);
++            count += this.countTuples(reader);
++        }
++        Assert.assertEquals(expectedTuples, count);
++    }
++    /**
++     * Determines whether an input split is valid
++     * 
++     * @param split
++     *            Input split
++     * @return True if a valid split, false otherwise
++     */
++    protected boolean isValidSplit(InputSplit split, Configuration config) {
++        return split instanceof FileSplit;
++    }
++    /**
++     * Indicates whether inputs can be split, defaults to true
++     * 
++     * @return Whether inputs can be split
++     */
++    protected boolean canSplitInputs() {
++        return true;
++    }
++    /**
++     * Tests for input splitting
++     * 
++     * @throws IOException
++     * @throws InterruptedException
++     */
++    @Test
++    public final void split_input_01() throws IOException, InterruptedException {
++        Assume.assumeTrue(this.canSplitInputs());
++        Configuration config = this.prepareConfiguration();
++        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
++        Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
++        this.testSplitInputs(config, new File[] { small }, 100, SMALL_SIZE);
++    }
++    /**
++     * Tests for input splitting
++     * 
++     * @throws IOException
++     * @throws InterruptedException
++     */
++    @Test
++    public final void split_input_02() throws IOException, InterruptedException {
++        Assume.assumeTrue(this.canSplitInputs());
++        Configuration config = this.prepareConfiguration();
++        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
++        config.setLong(NLineInputFormat.LINES_PER_MAP, 10);
++        Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
++        this.testSplitInputs(config, new File[] { small }, 10, SMALL_SIZE);
++    }
++    /**
++     * Tests for input splitting
++     * 
++     * @throws IOException
++     * @throws InterruptedException
++     */
++    @Test
++    public final void split_input_03() throws IOException, InterruptedException {
++        Assume.assumeTrue(this.canSplitInputs());
++        Configuration config = this.prepareConfiguration();
++        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
++        config.setLong(NLineInputFormat.LINES_PER_MAP, 100);
++        Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
++        this.testSplitInputs(config, new File[] { large }, 100, LARGE_SIZE);
++    }
diff --cc jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/
index ec2a40d,ec2a40d..febed20
--- a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/
@@@ -1,37 -1,37 +1,37 @@@
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
--import java.nio.charset.Charset;
--import org.apache.jena.hadoop.rdf.types.QuadWritable;
--import org.apache.jena.sparql.core.Quad ;
++import java.nio.charset.Charset;
++import org.apache.jena.hadoop.rdf.types.QuadWritable;
++import org.apache.jena.sparql.core.Quad ;
   * Abstract tests for Quad input formats
--public abstract class AbstractQuadsInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
++public abstract class AbstractQuadsInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
      private static final Charset utf8 = Charset.forName("utf-8");
diff --cc jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/
index 1f1f652,1f1f652..edd99c7
--- a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/
@@@ -1,29 -1,29 +1,29 @@@
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
--import java.nio.charset.Charset;
--import org.apache.jena.graph.Triple ;
--import org.apache.jena.hadoop.rdf.types.TripleWritable;
++import java.nio.charset.Charset;
++import org.apache.jena.graph.Triple ;
++import org.apache.jena.hadoop.rdf.types.TripleWritable;
   * Abstract tests for Triple input formats
@@@ -31,8 -31,8 +31,8 @@@
--public abstract class AbstractTriplesInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
++public abstract class AbstractTriplesInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
      private static final Charset utf8 = Charset.forName("utf-8");
@@@ -45,7 -45,7 +45,7 @@@
--    protected void generateBadTuples(OutputStream output, int num) throws IOException {
++    protected void generateBadTuples(OutputStream output, int num) throws IOException {
          byte[] junk = "<http://broken\n".getBytes(utf8);
          for (int i = 0; i < num; i++) {
diff --cc jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/
index 50a468b,50a468b..8181148
--- a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/
@@@ -1,38 -1,38 +1,38 @@@
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
--import java.nio.charset.Charset;
--import org.apache.jena.hadoop.rdf.types.QuadWritable;
--import org.apache.jena.query.Dataset ;
--import org.apache.jena.query.DatasetFactory ;
--import org.apache.jena.rdf.model.Model ;
--import org.apache.jena.rdf.model.ModelFactory ;
--import org.apache.jena.rdf.model.Property ;
--import org.apache.jena.rdf.model.Resource ;
++import java.nio.charset.Charset;
++import org.apache.jena.hadoop.rdf.types.QuadWritable;
++import org.apache.jena.query.Dataset ;
++import org.apache.jena.query.DatasetFactory ;
++import org.apache.jena.rdf.model.Model ;
++import org.apache.jena.rdf.model.ModelFactory ;
++import org.apache.jena.rdf.model.Property ;
++import org.apache.jena.rdf.model.Resource ;
  import org.apache.jena.riot.Lang;
  import org.apache.jena.riot.RDFDataMgr;
  import org.apache.jena.riot.RDFWriterRegistry;
--import org.apache.jena.sparql.core.Quad ;
++import org.apache.jena.sparql.core.Quad ;
   * Abstract tests for Quad input formats
@@@ -40,8 -40,8 +40,8 @@@
--public abstract class AbstractWholeFileQuadInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
++public abstract class AbstractWholeFileQuadInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
      private static final Charset utf8 = Charset.forName("utf-8");
@@@ -92,7 -92,7 +92,7 @@@
          // Write good data
          this.writeGoodTuples(output, num / 2);
--        // Write junk data
++        // Write junk data
          byte[] junk = "junk data\n".getBytes(utf8);
          for (int i = 0; i < num / 2; i++) {
@@@ -103,7 -103,7 +103,7 @@@
--    protected final void generateBadTuples(OutputStream output, int num) throws IOException {
++    protected final void generateBadTuples(OutputStream output, int num) throws IOException {
          byte[] junk = "junk data\n".getBytes(utf8);
          for (int i = 0; i < num; i++) {
diff --cc jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/
index da3789a,da3789a..0a9d0f9
--- a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/
@@@ -1,33 -1,33 +1,33 @@@
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
--import java.nio.charset.Charset;
--import org.apache.jena.graph.Triple ;
--import org.apache.jena.hadoop.rdf.types.TripleWritable;
--import org.apache.jena.rdf.model.Model ;
--import org.apache.jena.rdf.model.ModelFactory ;
--import org.apache.jena.rdf.model.Property ;
--import org.apache.jena.rdf.model.Resource ;
++import java.nio.charset.Charset;
++import org.apache.jena.graph.Triple ;
++import org.apache.jena.hadoop.rdf.types.TripleWritable;
++import org.apache.jena.rdf.model.Model ;
++import org.apache.jena.rdf.model.ModelFactory ;
++import org.apache.jena.rdf.model.Property ;
++import org.apache.jena.rdf.model.Resource ;
  import org.apache.jena.riot.Lang;
  import org.apache.jena.riot.RDFDataMgr;
@@@ -37,8 -37,8 +37,8 @@@
--public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
++public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
      private static final Charset utf8 = Charset.forName("utf-8");
@@@ -48,7 -48,7 +48,7 @@@
      private void writeTuples(Model m, OutputStream output) {
          RDFDataMgr.write(output, m, this.getRdfLanguage());
--    }
++    }
       * Gets the RDF language to write out generate tuples in
@@@ -69,7 -69,7 +69,7 @@@
          this.writeTuples(m, output);
--    }
++    }
      protected final void generateMixedTuples(OutputStream output, int num) throws IOException {
@@@ -85,7 -85,7 +85,7 @@@
          this.writeTuples(m, output);
--        // Write junk data
++        // Write junk data
          byte[] junk = "junk data\n".getBytes(utf8);
          for (int i = 0; i < num / 2; i++) {
@@@ -96,7 -96,7 +96,7 @@@
--    protected final void generateBadTuples(OutputStream output, int num) throws IOException {
++    protected final void generateBadTuples(OutputStream output, int num) throws IOException {
          byte[] junk = "junk data\n".getBytes(utf8);
          for (int i = 0; i < num; i++) {
diff --cc jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/
index 1f18a95,1f18a95..b34aa74
--- a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/
@@@ -1,21 -1,21 +1,21 @@@
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
@@@ -25,10 -25,10 +25,10 @@@ import
  import org.apache.hadoop.conf.Configurable;
  import org.apache.hadoop.conf.Configuration;
--import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
++import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
@@@ -37,8 -37,8 +37,8 @@@
   * @param <T>
  public abstract class AbstractCompressedNodeTupleInputFormatTests<TValue, T extends AbstractNodeTupleWritable<TValue>> extends
--        AbstractNodeTupleInputFormatTests<TValue, T> {
++        AbstractNodeTupleInputFormatTests<TValue, T> {
      protected Configuration prepareConfiguration() {
          Configuration config = super.prepareConfiguration();
diff --cc jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/
index 40f3733,40f3733..06bdbf4
--- a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/
@@@ -1,29 -1,29 +1,29 @@@
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
--import java.nio.charset.Charset;
--import org.apache.jena.hadoop.rdf.types.QuadWritable;
--import org.apache.jena.sparql.core.Quad ;
++import java.nio.charset.Charset;
++import org.apache.jena.hadoop.rdf.types.QuadWritable;
++import org.apache.jena.sparql.core.Quad ;
   * Abstract tests for Quad input formats
@@@ -32,8 -32,8 +32,8 @@@
  public abstract class AbstractCompressedQuadsInputFormatTests extends
--        AbstractCompressedNodeTupleInputFormatTests<Quad, QuadWritable> {
++        AbstractCompressedNodeTupleInputFormatTests<Quad, QuadWritable> {
      private static final Charset utf8 = Charset.forName("utf-8");
diff --cc jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/
index f5e3d5a,f5e3d5a..675ccab
--- a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/
@@@ -1,29 -1,29 +1,29 @@@
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
--import java.nio.charset.Charset;
--import org.apache.jena.graph.Triple ;
--import org.apache.jena.hadoop.rdf.types.TripleWritable;
++import java.nio.charset.Charset;
++import org.apache.jena.graph.Triple ;
++import org.apache.jena.hadoop.rdf.types.TripleWritable;
   * Abstract tests for Triple input formats
@@@ -32,8 -32,8 +32,8 @@@
  public abstract class AbstractCompressedTriplesInputFormatTests extends
--        AbstractCompressedNodeTupleInputFormatTests<Triple, TripleWritable> {
++        AbstractCompressedNodeTupleInputFormatTests<Triple, TripleWritable> {
      private static final Charset utf8 = Charset.forName("utf-8");
diff --cc jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/
index 029203b,029203b..ecd4616
--- a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/
@@@ -1,45 -1,45 +1,45 @@@
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
--import java.nio.charset.Charset;
++import java.nio.charset.Charset;
  import org.apache.hadoop.conf.Configurable;
  import org.apache.hadoop.conf.Configuration;
--import org.apache.jena.hadoop.rdf.types.QuadWritable;
--import org.apache.jena.query.Dataset ;
--import org.apache.jena.query.DatasetFactory ;
--import org.apache.jena.rdf.model.Model ;
--import org.apache.jena.rdf.model.ModelFactory ;
--import org.apache.jena.rdf.model.Property ;
--import org.apache.jena.rdf.model.Resource ;
++import org.apache.jena.hadoop.rdf.types.QuadWritable;
++import org.apache.jena.query.Dataset ;
++import org.apache.jena.query.DatasetFactory ;
++import org.apache.jena.rdf.model.Model ;
++import org.apache.jena.rdf.model.ModelFactory ;
++import org.apache.jena.rdf.model.Property ;
++import org.apache.jena.rdf.model.Resource ;
  import org.apache.jena.riot.Lang;
  import org.apache.jena.riot.RDFDataMgr;
--import org.apache.jena.riot.RDFWriterRegistry;
--import org.apache.jena.sparql.core.Quad ;
++import org.apache.jena.riot.RDFWriterRegistry;
++import org.apache.jena.sparql.core.Quad ;
   * Abstract tests for compressed whole file quad formats
@@@ -47,8 -47,8 +47,8 @@@
  public abstract class AbstractCompressedWholeFileQuadInputFormatTests extends
--        AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
++        AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
      private static final Charset utf8 = Charset.forName("utf-8");
@@@ -127,7 -127,7 +127,7 @@@
          // Write good data
          this.writeGoodTuples(output, num / 2);
--        // Write junk data
++        // Write junk data
          byte[] junk = "junk data\n".getBytes(utf8);
          for (int i = 0; i < num / 2; i++) {
@@@ -138,7 -138,7 +138,7 @@@
--    protected final void generateBadTuples(OutputStream output, int num) throws IOException {
++    protected final void generateBadTuples(OutputStream output, int num) throws IOException {
          byte[] junk = "junk data\n".getBytes(utf8);
          for (int i = 0; i < num; i++) {
diff --cc jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/
index f6454ea,f6454ea..a55729c
--- a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/
@@@ -1,143 -1,143 +1,143 @@@
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--import java.nio.charset.Charset;
--import org.apache.hadoop.conf.Configurable;
--import org.apache.hadoop.conf.Configuration;
--import org.apache.jena.graph.Triple ;
--import org.apache.jena.hadoop.rdf.types.TripleWritable;
--import org.apache.jena.rdf.model.Model ;
--import org.apache.jena.rdf.model.ModelFactory ;
--import org.apache.jena.rdf.model.Property ;
--import org.apache.jena.rdf.model.Resource ;
--import org.apache.jena.riot.Lang;
--import org.apache.jena.riot.RDFDataMgr;
-- * Abstract tests for compressed whole file triple formats
-- * 
-- * 
-- */
--public abstract class AbstractCompressedWholeFileTripleInputFormatTests extends
--        AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
--    private static final Charset utf8 = Charset.forName("utf-8");
--    @Override
--    protected Configuration prepareConfiguration() {
--        Configuration config = super.prepareConfiguration();
--        config.set(HadoopIOConstants.IO_COMPRESSION_CODECS, this.getCompressionCodec().getClass().getCanonicalName());
--        return config;
--    }
--    @Override
--    protected OutputStream getOutputStream(File f) throws IOException {
--        CompressionCodec codec = this.getCompressionCodec();
--        if (codec instanceof Configurable) {
--            ((Configurable) codec).setConf(this.prepareConfiguration());
--        }
--        FileOutputStream fileOutput = new FileOutputStream(f, false);
--        return codec.createOutputStream(fileOutput);
--    }
--    /**
--     * Gets the compression codec to use
--     * 
--     * @return Compression codec
--     */
--    protected abstract CompressionCodec getCompressionCodec();
--    /**
--     * Indicates whether inputs can be split, defaults to false for compressed
--     * input tests
--     */
--    @Override
--    protected boolean canSplitInputs() {
--        return false;
--    }
--    private void writeTuples(Model m, OutputStream output) {
--        RDFDataMgr.write(output, m, this.getRdfLanguage());
--    }
--    /**
--     * Gets the RDF language to write out generated tuples in
--     * 
--     * @return RDF language
--     */
--    protected abstract Lang getRdfLanguage();
--    @Override
--    protected final void generateTuples(OutputStream output, int num) throws IOException {
--        Model m = ModelFactory.createDefaultModel();
--        Resource currSubj = m.createResource("");
--        Property predicate = m.createProperty("");
--        for (int i = 0; i < num; i++) {
--            if (i % 10 == 0) {
--                currSubj = m.createResource("" + (i / 10));
--            }
--            m.add(currSubj, predicate, m.createTypedLiteral(i));
--        }
--        this.writeTuples(m, output);
--        output.close();
--    }
--    @Override
--    protected final void generateMixedTuples(OutputStream output, int num) throws IOException {
--        // Write good data
--        Model m = ModelFactory.createDefaultModel();
--        Resource currSubj = m.createResource("");
--        Property predicate = m.createProperty("");
--        for (int i = 0; i < num / 2; i++) {
--            if (i % 10 == 0) {
--                currSubj = m.createResource("" + (i / 10));
--            }
--            m.add(currSubj, predicate, m.createTypedLiteral(i));
--        }
--        this.writeTuples(m, output);
--        // Write junk data
--        byte[] junk = "junk data\n".getBytes(utf8);
--        for (int i = 0; i < num / 2; i++) {
--            output.write(junk);
--        }
--        output.flush();
--        output.close();
--    }
--    @Override
--    protected final void generateBadTuples(OutputStream output, int num) throws IOException {
--        byte[] junk = "junk data\n".getBytes(utf8);
--        for (int i = 0; i < num; i++) {
--            output.write(junk);
--        }
--        output.flush();
--        output.close();
--    }
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++import java.nio.charset.Charset;
++import org.apache.hadoop.conf.Configurable;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.jena.graph.Triple ;
++import org.apache.jena.hadoop.rdf.types.TripleWritable;
++import org.apache.jena.rdf.model.Model ;
++import org.apache.jena.rdf.model.ModelFactory ;
++import org.apache.jena.rdf.model.Property ;
++import org.apache.jena.rdf.model.Resource ;
++import org.apache.jena.riot.Lang;
++import org.apache.jena.riot.RDFDataMgr;
++ * Abstract tests for compressed whole file triple formats
++ * 
++ * 
++ */
++public abstract class AbstractCompressedWholeFileTripleInputFormatTests extends
++        AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
++    private static final Charset utf8 = Charset.forName("utf-8");
++    @Override
++    protected Configuration prepareConfiguration() {
++        Configuration config = super.prepareConfiguration();
++        config.set(HadoopIOConstants.IO_COMPRESSION_CODECS, this.getCompressionCodec().getClass().getCanonicalName());
++        return config;
++    }
++    @Override
++    protected OutputStream getOutputStream(File f) throws IOException {
++        CompressionCodec codec = this.getCompressionCodec();
++        if (codec instanceof Configurable) {
++            ((Configurable) codec).setConf(this.prepareConfiguration());
++        }
++        FileOutputStream fileOutput = new FileOutputStream(f, false);
++        return codec.createOutputStream(fileOutput);
++    }
++    /**
++     * Gets the compression codec to use
++     * 
++     * @return Compression codec
++     */
++    protected abstract CompressionCodec getCompressionCodec();
++    /**
++     * Indicates whether inputs can be split, defaults to false for compressed
++     * input tests
++     */
++    @Override
++    protected boolean canSplitInputs() {
++        return false;
++    }
++    private void writeTuples(Model m, OutputStream output) {
++        RDFDataMgr.write(output, m, this.getRdfLanguage());
++    }
++    /**
++     * Gets the RDF language to write out generated tuples in
++     * 
++     * @return RDF language
++     */
++    protected abstract Lang getRdfLanguage();
++    @Override
++    protected final void generateTuples(OutputStream output, int num) throws IOException {
++        Model m = ModelFactory.createDefaultModel();
++        Resource currSubj = m.createResource("");
++        Property predicate = m.createProperty("");
++        for (int i = 0; i < num; i++) {
++            if (i % 10 == 0) {
++                currSubj = m.createResource("" + (i / 10));
++            }
++            m.add(currSubj, predicate, m.createTypedLiteral(i));
++        }
++        this.writeTuples(m, output);
++        output.close();
++    }
++    @Override
++    protected final void generateMixedTuples(OutputStream output, int num) throws IOException {
++        // Write good data
++        Model m = ModelFactory.createDefaultModel();
++        Resource currSubj = m.createResource("");
++        Property predicate = m.createProperty("");
++        for (int i = 0; i < num / 2; i++) {
++            if (i % 10 == 0) {
++                currSubj = m.createResource("" + (i / 10));
++            }
++            m.add(currSubj, predicate, m.createTypedLiteral(i));
++        }
++        this.writeTuples(m, output);
++        // Write junk data
++        byte[] junk = "junk data\n".getBytes(utf8);
++        for (int i = 0; i < num / 2; i++) {
++            output.write(junk);
++        }
++        output.flush();
++        output.close();
++    }
++    @Override
++    protected final void generateBadTuples(OutputStream output, int num) throws IOException {
++        byte[] junk = "junk data\n".getBytes(utf8);
++        for (int i = 0; i < num; i++) {
++            output.write(junk);
++        }
++        output.flush();
++        output.close();
++    }