You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2016/05/14 14:03:17 UTC

[18/42] jena git commit: Merge commit 'refs/pull/143/head' of github.com:apache/jena

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java
index d0ffed8,d0ffed8..030155f
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java
@@@ -1,108 -1,108 +1,108 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.hadoop.rdf.io.input.readers;
--
--import java.io.IOException;
--
--import org.apache.hadoop.fs.Path;
--import org.apache.hadoop.io.LongWritable;
--import org.apache.hadoop.mapreduce.InputSplit;
--import org.apache.hadoop.mapreduce.RecordReader;
--import org.apache.hadoop.mapreduce.TaskAttemptContext;
--import org.apache.hadoop.mapreduce.lib.input.FileSplit;
--import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
--import org.apache.jena.riot.Lang;
--import org.apache.jena.riot.RDFLanguages;
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
--/**
-- * An abstract record reader for arbitrary RDF which provides support for
-- * selecting the actual record reader to use based on detecting the RDF language
-- * from the file name
-- * 
-- * @param <TValue>
-- *            Tuple type
-- * @param <T>
-- *            Writable tuple type
-- */
--public abstract class AbstractRdfReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends
--        RecordReader<LongWritable, T> {
--    private static final Logger LOG = LoggerFactory.getLogger(AbstractRdfReader.class);
--
--    private RecordReader<LongWritable, T> reader;
--
--    @Override
--    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException,
--            InterruptedException {
--        LOG.debug("initialize({}, {})", genericSplit, context);
--
--        // Assuming file split
--        if (!(genericSplit instanceof FileSplit))
--            throw new IOException("This record reader only supports FileSplit inputs");
--
--        // Find RDF language
--        FileSplit split = (FileSplit) genericSplit;
--        Path path = split.getPath();
--        Lang lang = RDFLanguages.filenameToLang(path.getName());
--        if (lang == null)
--            throw new IOException("There is no registered RDF language for the input file " + path.toString());
--
--        // Select the record reader and initialize
--        this.reader = this.selectRecordReader(lang);
--        this.reader.initialize(split, context);
--    }
--
--    /**
--     * Selects the appropriate record reader to use for the given RDF language
--     * 
--     * @param lang
--     *            RDF language
--     * @return Record reader
--     * @throws IOException
--     *             Should be thrown if no record reader can be selected
--     */
--    protected abstract RecordReader<LongWritable, T> selectRecordReader(Lang lang) throws IOException;
--
--    @Override
--    public final boolean nextKeyValue() throws IOException, InterruptedException {
--        return this.reader.nextKeyValue();
--    }
--
--    @Override
--    public final LongWritable getCurrentKey() throws IOException, InterruptedException {
--        return this.reader.getCurrentKey();
--    }
--
--    @Override
--    public final T getCurrentValue() throws IOException, InterruptedException {
--        return this.reader.getCurrentValue();
--    }
--
--    @Override
--    public final float getProgress() throws IOException, InterruptedException {
--        return this.reader.getProgress();
--    }
--
--    @Override
--    public final void close() throws IOException {
--        this.reader.close();
--    }
--
--}
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.hadoop.rdf.io.input.readers;
++
++import java.io.IOException;
++
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.io.LongWritable;
++import org.apache.hadoop.mapreduce.InputSplit;
++import org.apache.hadoop.mapreduce.RecordReader;
++import org.apache.hadoop.mapreduce.TaskAttemptContext;
++import org.apache.hadoop.mapreduce.lib.input.FileSplit;
++import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
++import org.apache.jena.riot.Lang;
++import org.apache.jena.riot.RDFLanguages;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++/**
++ * An abstract record reader for arbitrary RDF which provides support for
++ * selecting the actual record reader to use based on detecting the RDF language
++ * from the file name
++ * 
++ * @param <TValue>
++ *            Tuple type
++ * @param <T>
++ *            Writable tuple type
++ */
++public abstract class AbstractRdfReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends
++        RecordReader<LongWritable, T> {
++    private static final Logger LOG = LoggerFactory.getLogger(AbstractRdfReader.class);
++
++    private RecordReader<LongWritable, T> reader;
++
++    @Override
++    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException,
++            InterruptedException {
++        LOG.debug("initialize({}, {})", genericSplit, context);
++
++        // Assuming file split
++        if (!(genericSplit instanceof FileSplit))
++            throw new IOException("This record reader only supports FileSplit inputs");
++
++        // Find RDF language
++        FileSplit split = (FileSplit) genericSplit;
++        Path path = split.getPath();
++        Lang lang = RDFLanguages.filenameToLang(path.getName());
++        if (lang == null)
++            throw new IOException("There is no registered RDF language for the input file " + path.toString());
++
++        // Select the record reader and initialize
++        this.reader = this.selectRecordReader(lang);
++        this.reader.initialize(split, context);
++    }
++
++    /**
++     * Selects the appropriate record reader to use for the given RDF language
++     * 
++     * @param lang
++     *            RDF language
++     * @return Record reader
++     * @throws IOException
++     *             Should be thrown if no record reader can be selected
++     */
++    protected abstract RecordReader<LongWritable, T> selectRecordReader(Lang lang) throws IOException;
++
++    @Override
++    public final boolean nextKeyValue() throws IOException, InterruptedException {
++        return this.reader.nextKeyValue();
++    }
++
++    @Override
++    public final LongWritable getCurrentKey() throws IOException, InterruptedException {
++        return this.reader.getCurrentKey();
++    }
++
++    @Override
++    public final T getCurrentValue() throws IOException, InterruptedException {
++        return this.reader.getCurrentValue();
++    }
++
++    @Override
++    public final float getProgress() throws IOException, InterruptedException {
++        return this.reader.getProgress();
++    }
++
++    @Override
++    public final void close() throws IOException {
++        this.reader.close();
++    }
++
++}

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileNodeTupleReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileNodeTupleReader.java
index dd738d6,dd738d6..b0327f6
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileNodeTupleReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileNodeTupleReader.java
@@@ -1,328 -1,328 +1,328 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.hadoop.rdf.io.input.readers;
--
--import java.io.IOException;
--import java.io.InputStream;
--
--import org.apache.hadoop.conf.Configuration;
--import org.apache.hadoop.fs.FSDataInputStream;
--import org.apache.hadoop.fs.FileSystem;
--import org.apache.hadoop.fs.Path;
--import org.apache.hadoop.io.LongWritable;
--import org.apache.hadoop.io.compress.CompressionCodec;
--import org.apache.hadoop.io.compress.CompressionCodecFactory;
--import org.apache.hadoop.mapreduce.InputSplit;
--import org.apache.hadoop.mapreduce.RecordReader;
--import org.apache.hadoop.mapreduce.TaskAttemptContext;
--import org.apache.hadoop.mapreduce.lib.input.FileSplit;
--import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
--import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackedInputStream;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
--import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
--import org.apache.jena.riot.Lang;
--import org.apache.jena.riot.RDFDataMgr;
--import org.apache.jena.riot.ReaderRIOT;
--import org.apache.jena.riot.lang.PipedRDFIterator;
--import org.apache.jena.riot.lang.PipedRDFStream;
--import org.apache.jena.riot.system.ParserProfile;
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
--/**
-- * An abstract implementation for a record reader that reads records from whole
-- * files i.e. the whole file must be kept together to allow tuples to be
-- * successfully read. This only supports reading from file splits currently.
-- * <p>
-- * The keys produced are the approximate position in the file at which a tuple
-- * was found and the values will be node tuples. Positions are approximate
-- * because they are recorded after the point at which the most recent tuple was
-- * parsed from the input thus they reflect the approximate position in the
-- * stream immediately after which the triple was found.
-- * </p>
-- * <p>
-- * You should also be aware that with whole file formats syntax compressions in
-- * the format may mean that there are multiple triples produced with the same
-- * position and thus key.
-- * </p>
-- * 
-- * 
-- * 
-- * @param <TValue>
-- *            Value type
-- * @param <T>
-- *            Tuple type
-- */
--public abstract class AbstractWholeFileNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> {
--
--    private static final Logger LOG = LoggerFactory.getLogger(AbstractLineBasedNodeTupleReader.class);
--    private CompressionCodec compressionCodecs;
--    private TrackedInputStream input;
--    private LongWritable key;
--    private long length;
--    private T tuple;
--    private TrackedPipedRDFStream<TValue> stream;
--    private PipedRDFIterator<TValue> iter;
--    private Thread parserThread;
--    private boolean finished = false;
--    private boolean ignoreBadTuples = true;
--    private boolean parserFinished = false;
--    private Throwable parserError = null;
--
--    @Override
--    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
--        LOG.debug("initialize({}, {})", genericSplit, context);
--
--        // Assuming file split
--        if (!(genericSplit instanceof FileSplit))
--            throw new IOException("This record reader only supports FileSplit inputs");
--        FileSplit split = (FileSplit) genericSplit;
--
--        // Configuration
--        Configuration config = context.getConfiguration();
--        this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true);
--        if (this.ignoreBadTuples)
--            LOG.warn(
--                    "Configured to ignore bad tuples, parsing errors will be logged and further parsing aborted but no user visible errors will be thrown.  Consider setting {} to false to disable this behaviour",
--                    RdfIOConstants.INPUT_IGNORE_BAD_TUPLES);
--
--        // Figure out what portion of the file to read
--        if (split.getStart() > 0)
--            throw new IOException("This record reader requires a file split which covers the entire file");
--        final Path file = split.getPath();
--        long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen();
--        CompressionCodecFactory factory = new CompressionCodecFactory(config);
--        this.compressionCodecs = factory.getCodec(file);
--
--        LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { split.getStart(), split.getLength(),
--                totalLength }));
--
--        if (totalLength > split.getLength())
--            throw new IOException("This record reader requires a file split which covers the entire file");
--
--        // Open the file and prepare the input stream
--        FileSystem fs = file.getFileSystem(config);
--        FSDataInputStream fileIn = fs.open(file);
--        this.length = split.getLength();
--        if (this.compressionCodecs != null) {
--            // Compressed input
--            input = new TrackedInputStream(this.compressionCodecs.createInputStream(fileIn));
--        } else {
--            // Uncompressed input
--            input = new TrackedInputStream(fileIn);
--        }
--
--        // Set up background thread for parser
--        iter = this.getPipedIterator();
--        this.stream = this.getPipedStream(iter, this.input);
--        ParserProfile profile = RdfIOUtils.createParserProfile(context, file);
--        Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage(), profile);
--        this.parserThread = new Thread(parserRunnable);
--        this.parserThread.setDaemon(true);
--        this.parserThread.start();
--    }
--
--    /**
--     * Gets the RDF iterator to use
--     * 
--     * @return Iterator
--     */
--    protected abstract PipedRDFIterator<TValue> getPipedIterator();
--
--    /**
--     * Gets the RDF stream to parse to
--     * 
--     * @param iterator
--     *            Iterator
--     * @return RDF stream
--     */
--    protected abstract TrackedPipedRDFStream<TValue> getPipedStream(PipedRDFIterator<TValue> iterator, TrackableInputStream input);
--
--    /**
--     * Gets the RDF language to use for parsing
--     * 
--     * @return
--     */
--    protected abstract Lang getRdfLanguage();
--
--    /**
--     * Creates the runnable upon which the parsing will run
--     * 
--     * @param input
--     *            Input
--     * @param stream
--     *            Stream
--     * @param lang
--     *            Language to use for parsing
--     * @return Parser runnable
--     */
--    private Runnable createRunnable(@SuppressWarnings("rawtypes") final AbstractWholeFileNodeTupleReader reader, final InputStream input,
--            final PipedRDFStream<TValue> stream, final Lang lang, final ParserProfile profile) {
--        return new Runnable() {
--            @Override
--            public void run() {
--                try {
--                    ReaderRIOT riotReader = RDFDataMgr.createReader(lang);
--                    riotReader.setParserProfile(profile);
--                    riotReader.read(input, null, lang.getContentType(), stream, null);
--                    reader.setParserFinished(null);
--                } catch (Throwable e) {
--                    reader.setParserFinished(e);
--                }
--            }
--        };
--    }
--
--    /**
--     * Sets the parser thread finished state
--     * 
--     * @param e
--     *            Error (if any)
--     */
--    private void setParserFinished(Throwable e) {
--        synchronized (this.parserThread) {
--            this.parserError = e;
--            this.parserFinished = true;
--        }
--    }
--
--    /**
--     * Waits for the parser thread to have reported as finished
--     * 
--     * @throws InterruptedException
--     */
--    private void waitForParserFinished() throws InterruptedException {
--        do {
--            synchronized (this.parserThread) {
--                if (this.parserFinished)
--                    return;
--            }
--            Thread.sleep(50);
--        } while (true);
--    }
--
--    /**
--     * Creates an instance of a writable tuple from the given tuple value
--     * 
--     * @param tuple
--     *            Tuple value
--     * @return Writable tuple
--     */
--    protected abstract T createInstance(TValue tuple);
--
--    @Override
--    public boolean nextKeyValue() throws IOException {
--        // Reuse key for efficiency
--        if (key == null) {
--            key = new LongWritable();
--        }
--
--        if (this.finished)
--            return false;
--
--        try {
--            if (this.iter.hasNext()) {
--                Long l = this.stream.getPosition();
--                if (l != null) {
--                    this.key.set(l);
--                    // For compressed input the actual length from which we
--                    // calculate progress is likely less than the actual
--                    // uncompressed length so we may need to increment the
--                    // length as we go along
--                    // We always add 1 more than the current length because we
--                    // don't want to report 100% progress until we really have
--                    // finished
--                    if (this.compressionCodecs != null && l > this.length)
--                        this.length = l + 1;
--                }
--                this.tuple = this.createInstance(this.iter.next());
--                return true;
--            } else {
--                // Need to ensure that the parser thread has finished in order
--                // to determine whether we finished without error
--                this.waitForParserFinished();
--                if (this.parserError != null) {
--                    LOG.error("Error parsing whole file, aborting further parsing", this.parserError);
--                    if (!this.ignoreBadTuples)
--                        throw new IOException("Error parsing whole file at position " + this.input.getBytesRead() + ", aborting further parsing",
--                                this.parserError);
--
--                }
--
--                this.key = null;
--                this.tuple = null;
--                this.finished = true;
--                // This is necessary so that when compressed input is used we
--                // report 100% progress once we've reached the genuine end of
--                // the stream
--                if (this.compressionCodecs != null)
--                    this.length--;
--                return false;
--            }
--        } catch (Throwable e) {
--            // Failed to read the tuple on this line
--            LOG.error("Error parsing whole file, aborting further parsing", e);
--            if (!this.ignoreBadTuples) {
--                this.iter.close();
--                throw new IOException("Error parsing whole file at position " + this.input.getBytesRead() + ", aborting further parsing", e);
--            }
--            this.key = null;
--            this.tuple = null;
--            this.finished = true;
--            return false;
--        }
--    }
--
--    @Override
--    public LongWritable getCurrentKey() {
--        return this.key;
--    }
--
--    @Override
--    public T getCurrentValue() {
--        return this.tuple;
--    }
--
--    @Override
--    public float getProgress() {
--        float progress = 0.0f;
--        if (this.key == null) {
--            // We've either not started or we've finished
--            progress = (this.finished ? 1.0f : 0.0f);
--        } else if (this.key.get() == Long.MIN_VALUE) {
--            // We don't have a position so we've either in-progress or finished
--            progress = (this.finished ? 1.0f : 0.5f);
--        } else {
--            // We're some way through the file
--            progress = this.key.get() / (float) this.length;
--        }
--        LOG.debug("getProgress() --> {}", progress);
--        return progress;
--    }
--
--    @Override
--    public void close() throws IOException {
--        this.iter.close();
--        this.input.close();
--        this.finished = true;
--    }
--
--}
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.hadoop.rdf.io.input.readers;
++
++import java.io.IOException;
++import java.io.InputStream;
++
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FSDataInputStream;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.io.LongWritable;
++import org.apache.hadoop.io.compress.CompressionCodec;
++import org.apache.hadoop.io.compress.CompressionCodecFactory;
++import org.apache.hadoop.mapreduce.InputSplit;
++import org.apache.hadoop.mapreduce.RecordReader;
++import org.apache.hadoop.mapreduce.TaskAttemptContext;
++import org.apache.hadoop.mapreduce.lib.input.FileSplit;
++import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
++import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackedInputStream;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
++import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
++import org.apache.jena.riot.Lang;
++import org.apache.jena.riot.RDFDataMgr;
++import org.apache.jena.riot.ReaderRIOT;
++import org.apache.jena.riot.lang.PipedRDFIterator;
++import org.apache.jena.riot.lang.PipedRDFStream;
++import org.apache.jena.riot.system.ParserProfile;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++/**
++ * An abstract implementation for a record reader that reads records from whole
++ * files i.e. the whole file must be kept together to allow tuples to be
++ * successfully read. This only supports reading from file splits currently.
++ * <p>
++ * The keys produced are the approximate position in the file at which a tuple
++ * was found and the values will be node tuples. Positions are approximate
++ * because they are recorded after the point at which the most recent tuple was
++ * parsed from the input thus they reflect the approximate position in the
++ * stream immediately after which the triple was found.
++ * </p>
++ * <p>
++ * You should also be aware that with whole file formats syntax compressions in
++ * the format may mean that there are multiple triples produced with the same
++ * position and thus key.
++ * </p>
++ * 
++ * 
++ * 
++ * @param <TValue>
++ *            Value type
++ * @param <T>
++ *            Tuple type
++ */
++public abstract class AbstractWholeFileNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> {
++
++    private static final Logger LOG = LoggerFactory.getLogger(AbstractLineBasedNodeTupleReader.class);
++    private CompressionCodec compressionCodecs;
++    private TrackedInputStream input;
++    private LongWritable key;
++    private long length;
++    private T tuple;
++    private TrackedPipedRDFStream<TValue> stream;
++    private PipedRDFIterator<TValue> iter;
++    private Thread parserThread;
++    private boolean finished = false;
++    private boolean ignoreBadTuples = true;
++    private boolean parserFinished = false;
++    private Throwable parserError = null;
++
++    @Override
++    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
++        LOG.debug("initialize({}, {})", genericSplit, context);
++
++        // Assuming file split
++        if (!(genericSplit instanceof FileSplit))
++            throw new IOException("This record reader only supports FileSplit inputs");
++        FileSplit split = (FileSplit) genericSplit;
++
++        // Configuration
++        Configuration config = context.getConfiguration();
++        this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true);
++        if (this.ignoreBadTuples)
++            LOG.warn(
++                    "Configured to ignore bad tuples, parsing errors will be logged and further parsing aborted but no user visible errors will be thrown.  Consider setting {} to false to disable this behaviour",
++                    RdfIOConstants.INPUT_IGNORE_BAD_TUPLES);
++
++        // Figure out what portion of the file to read
++        if (split.getStart() > 0)
++            throw new IOException("This record reader requires a file split which covers the entire file");
++        final Path file = split.getPath();
++        long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen();
++        CompressionCodecFactory factory = new CompressionCodecFactory(config);
++        this.compressionCodecs = factory.getCodec(file);
++
++        LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { split.getStart(), split.getLength(),
++                totalLength }));
++
++        if (totalLength > split.getLength())
++            throw new IOException("This record reader requires a file split which covers the entire file");
++
++        // Open the file and prepare the input stream
++        FileSystem fs = file.getFileSystem(config);
++        FSDataInputStream fileIn = fs.open(file);
++        this.length = split.getLength();
++        if (this.compressionCodecs != null) {
++            // Compressed input
++            input = new TrackedInputStream(this.compressionCodecs.createInputStream(fileIn));
++        } else {
++            // Uncompressed input
++            input = new TrackedInputStream(fileIn);
++        }
++
++        // Set up background thread for parser
++        iter = this.getPipedIterator();
++        this.stream = this.getPipedStream(iter, this.input);
++        ParserProfile profile = RdfIOUtils.createParserProfile(context, file);
++        Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage(), profile);
++        this.parserThread = new Thread(parserRunnable);
++        this.parserThread.setDaemon(true);
++        this.parserThread.start();
++    }
++
++    /**
++     * Gets the RDF iterator to use
++     * 
++     * @return Iterator
++     */
++    protected abstract PipedRDFIterator<TValue> getPipedIterator();
++
++    /**
++     * Gets the RDF stream to parse to
++     * 
++     * @param iterator
++     *            Iterator
++     * @return RDF stream
++     */
++    protected abstract TrackedPipedRDFStream<TValue> getPipedStream(PipedRDFIterator<TValue> iterator, TrackableInputStream input);
++
++    /**
++     * Gets the RDF language to use for parsing
++     * 
++     * @return
++     */
++    protected abstract Lang getRdfLanguage();
++
++    /**
++     * Creates the runnable upon which the parsing will run
++     * 
++     * @param input
++     *            Input
++     * @param stream
++     *            Stream
++     * @param lang
++     *            Language to use for parsing
++     * @return Parser runnable
++     */
++    private Runnable createRunnable(@SuppressWarnings("rawtypes") final AbstractWholeFileNodeTupleReader reader, final InputStream input,
++            final PipedRDFStream<TValue> stream, final Lang lang, final ParserProfile profile) {
++        return new Runnable() {
++            @Override
++            public void run() {
++                try {
++                    ReaderRIOT riotReader = RDFDataMgr.createReader(lang);
++                    riotReader.setParserProfile(profile);
++                    riotReader.read(input, null, lang.getContentType(), stream, null);
++                    reader.setParserFinished(null);
++                } catch (Throwable e) {
++                    reader.setParserFinished(e);
++                }
++            }
++        };
++    }
++
++    /**
++     * Sets the parser thread finished state
++     * 
++     * @param e
++     *            Error (if any)
++     */
++    private void setParserFinished(Throwable e) {
++        synchronized (this.parserThread) {
++            this.parserError = e;
++            this.parserFinished = true;
++        }
++    }
++
++    /**
++     * Waits for the parser thread to have reported as finished
++     * 
++     * @throws InterruptedException
++     */
++    private void waitForParserFinished() throws InterruptedException {
++        do {
++            synchronized (this.parserThread) {
++                if (this.parserFinished)
++                    return;
++            }
++            Thread.sleep(50);
++        } while (true);
++    }
++
++    /**
++     * Creates an instance of a writable tuple from the given tuple value
++     * 
++     * @param tuple
++     *            Tuple value
++     * @return Writable tuple
++     */
++    protected abstract T createInstance(TValue tuple);
++
++    @Override
++    public boolean nextKeyValue() throws IOException {
++        // Reuse key for efficiency
++        if (key == null) {
++            key = new LongWritable();
++        }
++
++        if (this.finished)
++            return false;
++
++        try {
++            if (this.iter.hasNext()) {
++                Long l = this.stream.getPosition();
++                if (l != null) {
++                    this.key.set(l);
++                    // For compressed input the actual length from which we
++                    // calculate progress is likely less than the actual
++                    // uncompressed length so we may need to increment the
++                    // length as we go along
++                    // We always add 1 more than the current length because we
++                    // don't want to report 100% progress until we really have
++                    // finished
++                    if (this.compressionCodecs != null && l > this.length)
++                        this.length = l + 1;
++                }
++                this.tuple = this.createInstance(this.iter.next());
++                return true;
++            } else {
++                // Need to ensure that the parser thread has finished in order
++                // to determine whether we finished without error
++                this.waitForParserFinished();
++                if (this.parserError != null) {
++                    LOG.error("Error parsing whole file, aborting further parsing", this.parserError);
++                    if (!this.ignoreBadTuples)
++                        throw new IOException("Error parsing whole file at position " + this.input.getBytesRead() + ", aborting further parsing",
++                                this.parserError);
++
++                }
++
++                this.key = null;
++                this.tuple = null;
++                this.finished = true;
++                // This is necessary so that when compressed input is used we
++                // report 100% progress once we've reached the genuine end of
++                // the stream
++                if (this.compressionCodecs != null)
++                    this.length--;
++                return false;
++            }
++        } catch (Throwable e) {
++            // Failed to read the tuple on this line
++            LOG.error("Error parsing whole file, aborting further parsing", e);
++            if (!this.ignoreBadTuples) {
++                this.iter.close();
++                throw new IOException("Error parsing whole file at position " + this.input.getBytesRead() + ", aborting further parsing", e);
++            }
++            this.key = null;
++            this.tuple = null;
++            this.finished = true;
++            return false;
++        }
++    }
++
++    @Override
++    public LongWritable getCurrentKey() {
++        return this.key;
++    }
++
++    @Override
++    public T getCurrentValue() {
++        return this.tuple;
++    }
++
++    @Override
++    public float getProgress() {
++        float progress = 0.0f;
++        if (this.key == null) {
++            // We've either not started or we've finished
++            progress = (this.finished ? 1.0f : 0.0f);
++        } else if (this.key.get() == Long.MIN_VALUE) {
++            // We don't have a position so we've either in-progress or finished
++            progress = (this.finished ? 1.0f : 0.5f);
++        } else {
++            // We're some way through the file
++            progress = this.key.get() / (float) this.length;
++        }
++        LOG.debug("getProgress() --> {}", progress);
++        return progress;
++    }
++
++    @Override
++    public void close() throws IOException {
++        this.iter.close();
++        this.input.close();
++        this.finished = true;
++    }
++
++}

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileQuadReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileQuadReader.java
index 9e62c1e,9e62c1e..8097d52
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileQuadReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileQuadReader.java
@@@ -1,29 -1,29 +1,29 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
  package org.apache.jena.hadoop.rdf.io.input.readers;
  
--import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedQuadsStream;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
--import org.apache.jena.hadoop.rdf.types.QuadWritable;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedQuadsStream;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
++import org.apache.jena.hadoop.rdf.types.QuadWritable;
  import org.apache.jena.riot.lang.PipedRDFIterator;
--import org.apache.jena.sparql.core.Quad ;
++import org.apache.jena.sparql.core.Quad ;
  
  /**
   * An abstract record reader for whole file triple formats

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileTripleReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileTripleReader.java
index 384f456,384f456..1f56b07
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileTripleReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileTripleReader.java
@@@ -1,28 -1,28 +1,28 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
  package org.apache.jena.hadoop.rdf.io.input.readers;
  
--import org.apache.jena.graph.Triple ;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedTriplesStream;
--import org.apache.jena.hadoop.rdf.types.TripleWritable;
++import org.apache.jena.graph.Triple ;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedTriplesStream;
++import org.apache.jena.hadoop.rdf.types.TripleWritable;
  import org.apache.jena.riot.lang.PipedRDFIterator;
  
  /**

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/QuadsReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/QuadsReader.java
index 61f2236,61f2236..ecd930a
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/QuadsReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/QuadsReader.java
@@@ -1,48 -1,48 +1,48 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.hadoop.rdf.io.input.readers;
--
--import java.io.IOException;
--
--import org.apache.hadoop.io.LongWritable;
--import org.apache.hadoop.mapreduce.RecordReader;
--import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry;
--import org.apache.jena.hadoop.rdf.types.QuadWritable;
--import org.apache.jena.riot.Lang;
--import org.apache.jena.riot.RDFLanguages;
--import org.apache.jena.sparql.core.Quad ;
--
--/**
-- * A record reader that reads triples from any RDF quads format
-- */
--public class QuadsReader extends AbstractRdfReader<Quad, QuadWritable> {
--
--    @Override
--    protected RecordReader<LongWritable, QuadWritable> selectRecordReader(Lang lang) throws IOException {
--        if (!RDFLanguages.isQuads(lang))
--            throw new IOException(
--                    lang.getLabel()
--                            + " is not a RDF quads format, perhaps you wanted TriplesInputFormat or TriplesOrQuadsInputFormat instead?");
--
--        // This will throw an appropriate error if the language does not support
--        // triples
--        return HadoopRdfIORegistry.createQuadReader(lang);
--    }
--
--}
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.hadoop.rdf.io.input.readers;
++
++import java.io.IOException;
++
++import org.apache.hadoop.io.LongWritable;
++import org.apache.hadoop.mapreduce.RecordReader;
++import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry;
++import org.apache.jena.hadoop.rdf.types.QuadWritable;
++import org.apache.jena.riot.Lang;
++import org.apache.jena.riot.RDFLanguages;
++import org.apache.jena.sparql.core.Quad ;
++
++/**
++ * A record reader that reads triples from any RDF quads format
++ */
++public class QuadsReader extends AbstractRdfReader<Quad, QuadWritable> {
++
++    @Override
++    protected RecordReader<LongWritable, QuadWritable> selectRecordReader(Lang lang) throws IOException {
++        if (!RDFLanguages.isQuads(lang))
++            throw new IOException(
++                    lang.getLabel()
++                            + " is not a RDF quads format, perhaps you wanted TriplesInputFormat or TriplesOrQuadsInputFormat instead?");
++
++        // This will throw an appropriate error if the language does not support
++        // triples
++        return HadoopRdfIORegistry.createQuadReader(lang);
++    }
++
++}

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesOrQuadsReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesOrQuadsReader.java
index 308d915,308d915..9d78266
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesOrQuadsReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesOrQuadsReader.java
@@@ -1,71 -1,71 +1,71 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.hadoop.rdf.io.input.readers;
--
--import java.io.IOException;
--
--import org.apache.hadoop.io.LongWritable;
--import org.apache.hadoop.mapreduce.RecordReader;
--import org.apache.jena.graph.Node ;
--import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry;
--import org.apache.jena.hadoop.rdf.types.QuadWritable;
--import org.apache.jena.riot.Lang;
--import org.apache.jena.riot.RDFLanguages;
--import org.apache.jena.sparql.core.Quad ;
--
--/**
-- * A record reader that reads RDF from any triples/quads format. Triples are
-- * converted into quads in the default graph. This behaviour can be changed by
-- * deriving from this class and overriding the {@link #getGraphNode()} method
-- * 
-- * 
-- * 
-- */
--@SuppressWarnings("javadoc")
--public class TriplesOrQuadsReader extends AbstractRdfReader<Quad, QuadWritable> {
--
--    @Override
--    protected RecordReader<LongWritable, QuadWritable> selectRecordReader(Lang lang) throws IOException {
--        if (!RDFLanguages.isQuads(lang) && !RDFLanguages.isTriples(lang))
--            throw new IOException(lang.getLabel() + " is not a RDF triples/quads format");
--
--        if (HadoopRdfIORegistry.hasQuadReader(lang)) {
--            // Supports quads directly
--            return HadoopRdfIORegistry.createQuadReader(lang);
--        } else {
--            // Try to create a triples reader and wrap upwards into quads
--            // This will throw an error if a triple reader is not available
--            return new TriplesToQuadsReader(HadoopRdfIORegistry.createTripleReader(lang));
--        }
--    }
--
--    /**
--     * Gets the graph node which represents the graph into which triples will be
--     * indicated to belong to when they are converting into quads.
--     * <p>
--     * Defaults to {@link Quad#defaultGraphNodeGenerated} which represents the
--     * default graph
--     * </p>
--     * 
--     * @return Graph node
--     */
--    protected Node getGraphNode() {
--        return Quad.defaultGraphNodeGenerated;
--    }
--}
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.hadoop.rdf.io.input.readers;
++
++import java.io.IOException;
++
++import org.apache.hadoop.io.LongWritable;
++import org.apache.hadoop.mapreduce.RecordReader;
++import org.apache.jena.graph.Node ;
++import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry;
++import org.apache.jena.hadoop.rdf.types.QuadWritable;
++import org.apache.jena.riot.Lang;
++import org.apache.jena.riot.RDFLanguages;
++import org.apache.jena.sparql.core.Quad ;
++
++/**
++ * A record reader that reads RDF from any triples/quads format. Triples are
++ * converted into quads in the default graph. This behaviour can be changed by
++ * deriving from this class and overriding the {@link #getGraphNode()} method
++ * 
++ * 
++ * 
++ */
++@SuppressWarnings("javadoc")
++public class TriplesOrQuadsReader extends AbstractRdfReader<Quad, QuadWritable> {
++
++    @Override
++    protected RecordReader<LongWritable, QuadWritable> selectRecordReader(Lang lang) throws IOException {
++        if (!RDFLanguages.isQuads(lang) && !RDFLanguages.isTriples(lang))
++            throw new IOException(lang.getLabel() + " is not a RDF triples/quads format");
++
++        if (HadoopRdfIORegistry.hasQuadReader(lang)) {
++            // Supports quads directly
++            return HadoopRdfIORegistry.createQuadReader(lang);
++        } else {
++            // Try to create a triples reader and wrap upwards into quads
++            // This will throw an error if a triple reader is not available
++            return new TriplesToQuadsReader(HadoopRdfIORegistry.createTripleReader(lang));
++        }
++    }
++
++    /**
++     * Gets the graph node which represents the graph into which triples will be
++     * indicated to belong to when they are converting into quads.
++     * <p>
++     * Defaults to {@link Quad#defaultGraphNodeGenerated} which represents the
++     * default graph
++     * </p>
++     * 
++     * @return Graph node
++     */
++    protected Node getGraphNode() {
++        return Quad.defaultGraphNodeGenerated;
++    }
++}

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesReader.java
index 9fe930c,9fe930c..0467b5c
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesReader.java
@@@ -1,48 -1,48 +1,48 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.hadoop.rdf.io.input.readers;
--
--import java.io.IOException;
--
--import org.apache.hadoop.io.LongWritable;
--import org.apache.hadoop.mapreduce.RecordReader;
--import org.apache.jena.graph.Triple ;
--import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry;
--import org.apache.jena.hadoop.rdf.types.TripleWritable;
--import org.apache.jena.riot.Lang;
--import org.apache.jena.riot.RDFLanguages;
--
--/**
-- * A record reader that reads triples from any RDF triples format
-- */
--public class TriplesReader extends AbstractRdfReader<Triple, TripleWritable> {
--
--    @Override
--    protected RecordReader<LongWritable, TripleWritable> selectRecordReader(Lang lang) throws IOException {
--        if (!RDFLanguages.isTriples(lang))
--            throw new IOException(
--                    lang.getLabel()
--                            + " is not a RDF triples format, perhaps you wanted QuadsInputFormat or TriplesOrQuadsInputFormat instead?");
--
--        // This will throw an appropriate error if the language does not support
--        // triples
--        return HadoopRdfIORegistry.createTripleReader(lang);
--    }
--
--}
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.hadoop.rdf.io.input.readers;
++
++import java.io.IOException;
++
++import org.apache.hadoop.io.LongWritable;
++import org.apache.hadoop.mapreduce.RecordReader;
++import org.apache.jena.graph.Triple ;
++import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry;
++import org.apache.jena.hadoop.rdf.types.TripleWritable;
++import org.apache.jena.riot.Lang;
++import org.apache.jena.riot.RDFLanguages;
++
++/**
++ * A record reader that reads triples from any RDF triples format
++ */
++public class TriplesReader extends AbstractRdfReader<Triple, TripleWritable> {
++
++    @Override
++    protected RecordReader<LongWritable, TripleWritable> selectRecordReader(Lang lang) throws IOException {
++        if (!RDFLanguages.isTriples(lang))
++            throw new IOException(
++                    lang.getLabel()
++                            + " is not a RDF triples format, perhaps you wanted QuadsInputFormat or TriplesOrQuadsInputFormat instead?");
++
++        // This will throw an appropriate error if the language does not support
++        // triples
++        return HadoopRdfIORegistry.createTripleReader(lang);
++    }
++
++}

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesToQuadsReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesToQuadsReader.java
index 14d6c6e,14d6c6e..e4de126
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesToQuadsReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesToQuadsReader.java
@@@ -1,33 -1,33 +1,33 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
  package org.apache.jena.hadoop.rdf.io.input.readers;
  
  import java.io.IOException;
--
++
  import org.apache.hadoop.io.LongWritable;
  import org.apache.hadoop.mapreduce.InputSplit;
  import org.apache.hadoop.mapreduce.RecordReader;
  import org.apache.hadoop.mapreduce.TaskAttemptContext;
--import org.apache.jena.graph.Node ;
--import org.apache.jena.hadoop.rdf.types.QuadWritable;
--import org.apache.jena.hadoop.rdf.types.TripleWritable;
--import org.apache.jena.sparql.core.Quad ;
++import org.apache.jena.graph.Node ;
++import org.apache.jena.hadoop.rdf.types.QuadWritable;
++import org.apache.jena.hadoop.rdf.types.TripleWritable;
++import org.apache.jena.sparql.core.Quad ;
  
  /**
   * A record reader that converts triples into quads by wrapping a

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/BlockedNQuadsReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/BlockedNQuadsReader.java
index cef8ef1,cef8ef1..ad46cde
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/BlockedNQuadsReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/BlockedNQuadsReader.java
@@@ -1,24 -1,24 +1,24 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
  package org.apache.jena.hadoop.rdf.io.input.readers.nquads;
  
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractBlockBasedQuadReader;
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractBlockBasedQuadReader;
  import org.apache.jena.riot.Lang;
  
  /**

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/NQuadsReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/NQuadsReader.java
index 437b43f,437b43f..356cd94
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/NQuadsReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/NQuadsReader.java
@@@ -1,31 -1,31 +1,31 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
  package org.apache.jena.hadoop.rdf.io.input.readers.nquads;
  
  import java.util.Iterator;
--
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractLineBasedQuadReader;
++
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractLineBasedQuadReader;
  import org.apache.jena.riot.lang.LangNQuads;
  import org.apache.jena.riot.system.ParserProfile;
  import org.apache.jena.riot.tokens.Tokenizer;
  import org.apache.jena.riot.tokens.TokenizerFactory;
--import org.apache.jena.sparql.core.Quad ;
++import org.apache.jena.sparql.core.Quad ;
  
  /**
   * A record reader for NQuads

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/WholeFileNQuadsReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/WholeFileNQuadsReader.java
index 96e6f80,96e6f80..b96d458
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/WholeFileNQuadsReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/WholeFileNQuadsReader.java
@@@ -1,24 -1,24 +1,24 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
  package org.apache.jena.hadoop.rdf.io.input.readers.nquads;
  
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
  import org.apache.jena.riot.Lang;
  
  /**

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/BlockedNTriplesReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/BlockedNTriplesReader.java
index 7268d5a,7268d5a..ee6ee22
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/BlockedNTriplesReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/BlockedNTriplesReader.java
@@@ -1,24 -1,24 +1,24 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
  package org.apache.jena.hadoop.rdf.io.input.readers.ntriples;
  
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractBlockBasedTripleReader;
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractBlockBasedTripleReader;
  import org.apache.jena.riot.Lang;
  
  /**

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/NTriplesReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/NTriplesReader.java
index 97a9bf4,97a9bf4..e38166f
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/NTriplesReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/NTriplesReader.java
@@@ -1,27 -1,27 +1,27 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
  package org.apache.jena.hadoop.rdf.io.input.readers.ntriples;
  
  import java.util.Iterator;
--
--import org.apache.jena.graph.Triple ;
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractLineBasedTripleReader;
++
++import org.apache.jena.graph.Triple ;
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractLineBasedTripleReader;
  import org.apache.jena.riot.lang.LangNTriples;
  import org.apache.jena.riot.system.ParserProfile;
  import org.apache.jena.riot.tokens.Tokenizer;

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/WholeFileNTriplesReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/WholeFileNTriplesReader.java
index c200d93,c200d93..b01c3df
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/WholeFileNTriplesReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/WholeFileNTriplesReader.java
@@@ -1,24 -1,24 +1,24 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
  package org.apache.jena.hadoop.rdf.io.input.readers.ntriples;
  
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
  import org.apache.jena.riot.Lang;
  
  /**

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfjson/RdfJsonReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfjson/RdfJsonReader.java
index 009024b,009024b..6cab094
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfjson/RdfJsonReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfjson/RdfJsonReader.java
@@@ -1,24 -1,24 +1,24 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
  package org.apache.jena.hadoop.rdf.io.input.readers.rdfjson;
  
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
  import org.apache.jena.riot.Lang;
  
  /**

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfxml/RdfXmlReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfxml/RdfXmlReader.java
index 9c374c6,9c374c6..b5e943f
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfxml/RdfXmlReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfxml/RdfXmlReader.java
@@@ -1,24 -1,24 +1,24 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
  package org.apache.jena.hadoop.rdf.io.input.readers.rdfxml;
  
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
  import org.apache.jena.riot.Lang;
  
  /**

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trig/TriGReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trig/TriGReader.java
index b1b0c3c,b1b0c3c..237c9c1
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trig/TriGReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trig/TriGReader.java
@@@ -1,24 -1,24 +1,24 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
  package org.apache.jena.hadoop.rdf.io.input.readers.trig;
  
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
  import org.apache.jena.riot.Lang;
  
  /**

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trix/TriXReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trix/TriXReader.java
index 6873c64,6873c64..5087370
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trix/TriXReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trix/TriXReader.java
@@@ -1,24 -1,24 +1,24 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *     
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * 
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *     
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
  package org.apache.jena.hadoop.rdf.io.input.readers.trix;
  
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
  import org.apache.jena.riot.Lang;
  
  /**