You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2016/05/14 14:03:17 UTC
[18/42] jena git commit: Merge commit 'refs/pull/143/head' of
github.com:apache/jena
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java
index d0ffed8,d0ffed8..030155f
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java
@@@ -1,108 -1,108 +1,108 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.hadoop.rdf.io.input.readers;
--
--import java.io.IOException;
--
--import org.apache.hadoop.fs.Path;
--import org.apache.hadoop.io.LongWritable;
--import org.apache.hadoop.mapreduce.InputSplit;
--import org.apache.hadoop.mapreduce.RecordReader;
--import org.apache.hadoop.mapreduce.TaskAttemptContext;
--import org.apache.hadoop.mapreduce.lib.input.FileSplit;
--import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
--import org.apache.jena.riot.Lang;
--import org.apache.jena.riot.RDFLanguages;
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
--/**
-- * An abstract record reader for arbitrary RDF which provides support for
-- * selecting the actual record reader to use based on detecting the RDF language
-- * from the file name
-- *
-- * @param <TValue>
-- * Tuple type
-- * @param <T>
-- * Writable tuple type
-- */
--public abstract class AbstractRdfReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends
-- RecordReader<LongWritable, T> {
-- private static final Logger LOG = LoggerFactory.getLogger(AbstractRdfReader.class);
--
-- private RecordReader<LongWritable, T> reader;
--
-- @Override
-- public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException,
-- InterruptedException {
-- LOG.debug("initialize({}, {})", genericSplit, context);
--
-- // Assuming file split
-- if (!(genericSplit instanceof FileSplit))
-- throw new IOException("This record reader only supports FileSplit inputs");
--
-- // Find RDF language
-- FileSplit split = (FileSplit) genericSplit;
-- Path path = split.getPath();
-- Lang lang = RDFLanguages.filenameToLang(path.getName());
-- if (lang == null)
-- throw new IOException("There is no registered RDF language for the input file " + path.toString());
--
-- // Select the record reader and initialize
-- this.reader = this.selectRecordReader(lang);
-- this.reader.initialize(split, context);
-- }
--
-- /**
-- * Selects the appropriate record reader to use for the given RDF language
-- *
-- * @param lang
-- * RDF language
-- * @return Record reader
-- * @throws IOException
-- * Should be thrown if no record reader can be selected
-- */
-- protected abstract RecordReader<LongWritable, T> selectRecordReader(Lang lang) throws IOException;
--
-- @Override
-- public final boolean nextKeyValue() throws IOException, InterruptedException {
-- return this.reader.nextKeyValue();
-- }
--
-- @Override
-- public final LongWritable getCurrentKey() throws IOException, InterruptedException {
-- return this.reader.getCurrentKey();
-- }
--
-- @Override
-- public final T getCurrentValue() throws IOException, InterruptedException {
-- return this.reader.getCurrentValue();
-- }
--
-- @Override
-- public final float getProgress() throws IOException, InterruptedException {
-- return this.reader.getProgress();
-- }
--
-- @Override
-- public final void close() throws IOException {
-- this.reader.close();
-- }
--
--}
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.hadoop.rdf.io.input.readers;
++
++import java.io.IOException;
++
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.io.LongWritable;
++import org.apache.hadoop.mapreduce.InputSplit;
++import org.apache.hadoop.mapreduce.RecordReader;
++import org.apache.hadoop.mapreduce.TaskAttemptContext;
++import org.apache.hadoop.mapreduce.lib.input.FileSplit;
++import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
++import org.apache.jena.riot.Lang;
++import org.apache.jena.riot.RDFLanguages;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++/**
++ * An abstract record reader for arbitrary RDF which provides support for
++ * selecting the actual record reader to use based on detecting the RDF language
++ * from the file name
++ *
++ * @param <TValue>
++ * Tuple type
++ * @param <T>
++ * Writable tuple type
++ */
++public abstract class AbstractRdfReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends
++ RecordReader<LongWritable, T> {
++ private static final Logger LOG = LoggerFactory.getLogger(AbstractRdfReader.class);
++
++ private RecordReader<LongWritable, T> reader;
++
++ @Override
++ public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException,
++ InterruptedException {
++ LOG.debug("initialize({}, {})", genericSplit, context);
++
++ // Assuming file split
++ if (!(genericSplit instanceof FileSplit))
++ throw new IOException("This record reader only supports FileSplit inputs");
++
++ // Find RDF language
++ FileSplit split = (FileSplit) genericSplit;
++ Path path = split.getPath();
++ Lang lang = RDFLanguages.filenameToLang(path.getName());
++ if (lang == null)
++ throw new IOException("There is no registered RDF language for the input file " + path.toString());
++
++ // Select the record reader and initialize
++ this.reader = this.selectRecordReader(lang);
++ this.reader.initialize(split, context);
++ }
++
++ /**
++ * Selects the appropriate record reader to use for the given RDF language
++ *
++ * @param lang
++ * RDF language
++ * @return Record reader
++ * @throws IOException
++ * Should be thrown if no record reader can be selected
++ */
++ protected abstract RecordReader<LongWritable, T> selectRecordReader(Lang lang) throws IOException;
++
++ @Override
++ public final boolean nextKeyValue() throws IOException, InterruptedException {
++ return this.reader.nextKeyValue();
++ }
++
++ @Override
++ public final LongWritable getCurrentKey() throws IOException, InterruptedException {
++ return this.reader.getCurrentKey();
++ }
++
++ @Override
++ public final T getCurrentValue() throws IOException, InterruptedException {
++ return this.reader.getCurrentValue();
++ }
++
++ @Override
++ public final float getProgress() throws IOException, InterruptedException {
++ return this.reader.getProgress();
++ }
++
++ @Override
++ public final void close() throws IOException {
++ this.reader.close();
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileNodeTupleReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileNodeTupleReader.java
index dd738d6,dd738d6..b0327f6
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileNodeTupleReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileNodeTupleReader.java
@@@ -1,328 -1,328 +1,328 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.hadoop.rdf.io.input.readers;
--
--import java.io.IOException;
--import java.io.InputStream;
--
--import org.apache.hadoop.conf.Configuration;
--import org.apache.hadoop.fs.FSDataInputStream;
--import org.apache.hadoop.fs.FileSystem;
--import org.apache.hadoop.fs.Path;
--import org.apache.hadoop.io.LongWritable;
--import org.apache.hadoop.io.compress.CompressionCodec;
--import org.apache.hadoop.io.compress.CompressionCodecFactory;
--import org.apache.hadoop.mapreduce.InputSplit;
--import org.apache.hadoop.mapreduce.RecordReader;
--import org.apache.hadoop.mapreduce.TaskAttemptContext;
--import org.apache.hadoop.mapreduce.lib.input.FileSplit;
--import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
--import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackedInputStream;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
--import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
--import org.apache.jena.riot.Lang;
--import org.apache.jena.riot.RDFDataMgr;
--import org.apache.jena.riot.ReaderRIOT;
--import org.apache.jena.riot.lang.PipedRDFIterator;
--import org.apache.jena.riot.lang.PipedRDFStream;
--import org.apache.jena.riot.system.ParserProfile;
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
--/**
-- * An abstract implementation for a record reader that reads records from whole
-- * files i.e. the whole file must be kept together to allow tuples to be
-- * successfully read. This only supports reading from file splits currently.
-- * <p>
-- * The keys produced are the approximate position in the file at which a tuple
-- * was found and the values will be node tuples. Positions are approximate
-- * because they are recorded after the point at which the most recent tuple was
-- * parsed from the input thus they reflect the approximate position in the
-- * stream immediately after which the triple was found.
-- * </p>
-- * <p>
-- * You should also be aware that with whole file formats syntax compressions in
-- * the format may mean that there are multiple triples produced with the same
-- * position and thus key.
-- * </p>
-- *
-- *
-- *
-- * @param <TValue>
-- * Value type
-- * @param <T>
-- * Tuple type
-- */
--public abstract class AbstractWholeFileNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> {
--
-- private static final Logger LOG = LoggerFactory.getLogger(AbstractLineBasedNodeTupleReader.class);
-- private CompressionCodec compressionCodecs;
-- private TrackedInputStream input;
-- private LongWritable key;
-- private long length;
-- private T tuple;
-- private TrackedPipedRDFStream<TValue> stream;
-- private PipedRDFIterator<TValue> iter;
-- private Thread parserThread;
-- private boolean finished = false;
-- private boolean ignoreBadTuples = true;
-- private boolean parserFinished = false;
-- private Throwable parserError = null;
--
-- @Override
-- public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
-- LOG.debug("initialize({}, {})", genericSplit, context);
--
-- // Assuming file split
-- if (!(genericSplit instanceof FileSplit))
-- throw new IOException("This record reader only supports FileSplit inputs");
-- FileSplit split = (FileSplit) genericSplit;
--
-- // Configuration
-- Configuration config = context.getConfiguration();
-- this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true);
-- if (this.ignoreBadTuples)
-- LOG.warn(
-- "Configured to ignore bad tuples, parsing errors will be logged and further parsing aborted but no user visible errors will be thrown. Consider setting {} to false to disable this behaviour",
-- RdfIOConstants.INPUT_IGNORE_BAD_TUPLES);
--
-- // Figure out what portion of the file to read
-- if (split.getStart() > 0)
-- throw new IOException("This record reader requires a file split which covers the entire file");
-- final Path file = split.getPath();
-- long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen();
-- CompressionCodecFactory factory = new CompressionCodecFactory(config);
-- this.compressionCodecs = factory.getCodec(file);
--
-- LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { split.getStart(), split.getLength(),
-- totalLength }));
--
-- if (totalLength > split.getLength())
-- throw new IOException("This record reader requires a file split which covers the entire file");
--
-- // Open the file and prepare the input stream
-- FileSystem fs = file.getFileSystem(config);
-- FSDataInputStream fileIn = fs.open(file);
-- this.length = split.getLength();
-- if (this.compressionCodecs != null) {
-- // Compressed input
-- input = new TrackedInputStream(this.compressionCodecs.createInputStream(fileIn));
-- } else {
-- // Uncompressed input
-- input = new TrackedInputStream(fileIn);
-- }
--
-- // Set up background thread for parser
-- iter = this.getPipedIterator();
-- this.stream = this.getPipedStream(iter, this.input);
-- ParserProfile profile = RdfIOUtils.createParserProfile(context, file);
-- Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage(), profile);
-- this.parserThread = new Thread(parserRunnable);
-- this.parserThread.setDaemon(true);
-- this.parserThread.start();
-- }
--
-- /**
-- * Gets the RDF iterator to use
-- *
-- * @return Iterator
-- */
-- protected abstract PipedRDFIterator<TValue> getPipedIterator();
--
-- /**
-- * Gets the RDF stream to parse to
-- *
-- * @param iterator
-- * Iterator
-- * @return RDF stream
-- */
-- protected abstract TrackedPipedRDFStream<TValue> getPipedStream(PipedRDFIterator<TValue> iterator, TrackableInputStream input);
--
-- /**
-- * Gets the RDF language to use for parsing
-- *
-- * @return
-- */
-- protected abstract Lang getRdfLanguage();
--
-- /**
-- * Creates the runnable upon which the parsing will run
-- *
-- * @param input
-- * Input
-- * @param stream
-- * Stream
-- * @param lang
-- * Language to use for parsing
-- * @return Parser runnable
-- */
-- private Runnable createRunnable(@SuppressWarnings("rawtypes") final AbstractWholeFileNodeTupleReader reader, final InputStream input,
-- final PipedRDFStream<TValue> stream, final Lang lang, final ParserProfile profile) {
-- return new Runnable() {
-- @Override
-- public void run() {
-- try {
-- ReaderRIOT riotReader = RDFDataMgr.createReader(lang);
-- riotReader.setParserProfile(profile);
-- riotReader.read(input, null, lang.getContentType(), stream, null);
-- reader.setParserFinished(null);
-- } catch (Throwable e) {
-- reader.setParserFinished(e);
-- }
-- }
-- };
-- }
--
-- /**
-- * Sets the parser thread finished state
-- *
-- * @param e
-- * Error (if any)
-- */
-- private void setParserFinished(Throwable e) {
-- synchronized (this.parserThread) {
-- this.parserError = e;
-- this.parserFinished = true;
-- }
-- }
--
-- /**
-- * Waits for the parser thread to have reported as finished
-- *
-- * @throws InterruptedException
-- */
-- private void waitForParserFinished() throws InterruptedException {
-- do {
-- synchronized (this.parserThread) {
-- if (this.parserFinished)
-- return;
-- }
-- Thread.sleep(50);
-- } while (true);
-- }
--
-- /**
-- * Creates an instance of a writable tuple from the given tuple value
-- *
-- * @param tuple
-- * Tuple value
-- * @return Writable tuple
-- */
-- protected abstract T createInstance(TValue tuple);
--
-- @Override
-- public boolean nextKeyValue() throws IOException {
-- // Reuse key for efficiency
-- if (key == null) {
-- key = new LongWritable();
-- }
--
-- if (this.finished)
-- return false;
--
-- try {
-- if (this.iter.hasNext()) {
-- Long l = this.stream.getPosition();
-- if (l != null) {
-- this.key.set(l);
-- // For compressed input the actual length from which we
-- // calculate progress is likely less than the actual
-- // uncompressed length so we may need to increment the
-- // length as we go along
-- // We always add 1 more than the current length because we
-- // don't want to report 100% progress until we really have
-- // finished
-- if (this.compressionCodecs != null && l > this.length)
-- this.length = l + 1;
-- }
-- this.tuple = this.createInstance(this.iter.next());
-- return true;
-- } else {
-- // Need to ensure that the parser thread has finished in order
-- // to determine whether we finished without error
-- this.waitForParserFinished();
-- if (this.parserError != null) {
-- LOG.error("Error parsing whole file, aborting further parsing", this.parserError);
-- if (!this.ignoreBadTuples)
-- throw new IOException("Error parsing whole file at position " + this.input.getBytesRead() + ", aborting further parsing",
-- this.parserError);
--
-- }
--
-- this.key = null;
-- this.tuple = null;
-- this.finished = true;
-- // This is necessary so that when compressed input is used we
-- // report 100% progress once we've reached the genuine end of
-- // the stream
-- if (this.compressionCodecs != null)
-- this.length--;
-- return false;
-- }
-- } catch (Throwable e) {
-- // Failed to read the tuple on this line
-- LOG.error("Error parsing whole file, aborting further parsing", e);
-- if (!this.ignoreBadTuples) {
-- this.iter.close();
-- throw new IOException("Error parsing whole file at position " + this.input.getBytesRead() + ", aborting further parsing", e);
-- }
-- this.key = null;
-- this.tuple = null;
-- this.finished = true;
-- return false;
-- }
-- }
--
-- @Override
-- public LongWritable getCurrentKey() {
-- return this.key;
-- }
--
-- @Override
-- public T getCurrentValue() {
-- return this.tuple;
-- }
--
-- @Override
-- public float getProgress() {
-- float progress = 0.0f;
-- if (this.key == null) {
-- // We've either not started or we've finished
-- progress = (this.finished ? 1.0f : 0.0f);
-- } else if (this.key.get() == Long.MIN_VALUE) {
-- // We don't have a position so we've either in-progress or finished
-- progress = (this.finished ? 1.0f : 0.5f);
-- } else {
-- // We're some way through the file
-- progress = this.key.get() / (float) this.length;
-- }
-- LOG.debug("getProgress() --> {}", progress);
-- return progress;
-- }
--
-- @Override
-- public void close() throws IOException {
-- this.iter.close();
-- this.input.close();
-- this.finished = true;
-- }
--
--}
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.hadoop.rdf.io.input.readers;
++
++import java.io.IOException;
++import java.io.InputStream;
++
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FSDataInputStream;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.io.LongWritable;
++import org.apache.hadoop.io.compress.CompressionCodec;
++import org.apache.hadoop.io.compress.CompressionCodecFactory;
++import org.apache.hadoop.mapreduce.InputSplit;
++import org.apache.hadoop.mapreduce.RecordReader;
++import org.apache.hadoop.mapreduce.TaskAttemptContext;
++import org.apache.hadoop.mapreduce.lib.input.FileSplit;
++import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
++import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackedInputStream;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
++import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
++import org.apache.jena.riot.Lang;
++import org.apache.jena.riot.RDFDataMgr;
++import org.apache.jena.riot.ReaderRIOT;
++import org.apache.jena.riot.lang.PipedRDFIterator;
++import org.apache.jena.riot.lang.PipedRDFStream;
++import org.apache.jena.riot.system.ParserProfile;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++/**
++ * An abstract implementation for a record reader that reads records from whole
++ * files i.e. the whole file must be kept together to allow tuples to be
++ * successfully read. This only supports reading from file splits currently.
++ * <p>
++ * The keys produced are the approximate position in the file at which a tuple
++ * was found and the values will be node tuples. Positions are approximate
++ * because they are recorded after the point at which the most recent tuple was
++ * parsed from the input thus they reflect the approximate position in the
++ * stream immediately after which the triple was found.
++ * </p>
++ * <p>
++ * You should also be aware that with whole file formats syntax compressions in
++ * the format may mean that there are multiple triples produced with the same
++ * position and thus key.
++ * </p>
++ *
++ *
++ *
++ * @param <TValue>
++ * Value type
++ * @param <T>
++ * Tuple type
++ */
++public abstract class AbstractWholeFileNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> {
++
++ private static final Logger LOG = LoggerFactory.getLogger(AbstractLineBasedNodeTupleReader.class);
++ private CompressionCodec compressionCodecs;
++ private TrackedInputStream input;
++ private LongWritable key;
++ private long length;
++ private T tuple;
++ private TrackedPipedRDFStream<TValue> stream;
++ private PipedRDFIterator<TValue> iter;
++ private Thread parserThread;
++ private boolean finished = false;
++ private boolean ignoreBadTuples = true;
++ private boolean parserFinished = false;
++ private Throwable parserError = null;
++
++ @Override
++ public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
++ LOG.debug("initialize({}, {})", genericSplit, context);
++
++ // Assuming file split
++ if (!(genericSplit instanceof FileSplit))
++ throw new IOException("This record reader only supports FileSplit inputs");
++ FileSplit split = (FileSplit) genericSplit;
++
++ // Configuration
++ Configuration config = context.getConfiguration();
++ this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true);
++ if (this.ignoreBadTuples)
++ LOG.warn(
++ "Configured to ignore bad tuples, parsing errors will be logged and further parsing aborted but no user visible errors will be thrown. Consider setting {} to false to disable this behaviour",
++ RdfIOConstants.INPUT_IGNORE_BAD_TUPLES);
++
++ // Figure out what portion of the file to read
++ if (split.getStart() > 0)
++ throw new IOException("This record reader requires a file split which covers the entire file");
++ final Path file = split.getPath();
++ long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen();
++ CompressionCodecFactory factory = new CompressionCodecFactory(config);
++ this.compressionCodecs = factory.getCodec(file);
++
++ LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { split.getStart(), split.getLength(),
++ totalLength }));
++
++ if (totalLength > split.getLength())
++ throw new IOException("This record reader requires a file split which covers the entire file");
++
++ // Open the file and prepare the input stream
++ FileSystem fs = file.getFileSystem(config);
++ FSDataInputStream fileIn = fs.open(file);
++ this.length = split.getLength();
++ if (this.compressionCodecs != null) {
++ // Compressed input
++ input = new TrackedInputStream(this.compressionCodecs.createInputStream(fileIn));
++ } else {
++ // Uncompressed input
++ input = new TrackedInputStream(fileIn);
++ }
++
++ // Set up background thread for parser
++ iter = this.getPipedIterator();
++ this.stream = this.getPipedStream(iter, this.input);
++ ParserProfile profile = RdfIOUtils.createParserProfile(context, file);
++ Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage(), profile);
++ this.parserThread = new Thread(parserRunnable);
++ this.parserThread.setDaemon(true);
++ this.parserThread.start();
++ }
++
++ /**
++ * Gets the RDF iterator to use
++ *
++ * @return Iterator
++ */
++ protected abstract PipedRDFIterator<TValue> getPipedIterator();
++
++ /**
++ * Gets the RDF stream to parse to
++ *
++ * @param iterator
++ * Iterator
++ * @return RDF stream
++ */
++ protected abstract TrackedPipedRDFStream<TValue> getPipedStream(PipedRDFIterator<TValue> iterator, TrackableInputStream input);
++
++ /**
++ * Gets the RDF language to use for parsing
++ *
++ * @return
++ */
++ protected abstract Lang getRdfLanguage();
++
++ /**
++ * Creates the runnable upon which the parsing will run
++ *
++ * @param input
++ * Input
++ * @param stream
++ * Stream
++ * @param lang
++ * Language to use for parsing
++ * @return Parser runnable
++ */
++ private Runnable createRunnable(@SuppressWarnings("rawtypes") final AbstractWholeFileNodeTupleReader reader, final InputStream input,
++ final PipedRDFStream<TValue> stream, final Lang lang, final ParserProfile profile) {
++ return new Runnable() {
++ @Override
++ public void run() {
++ try {
++ ReaderRIOT riotReader = RDFDataMgr.createReader(lang);
++ riotReader.setParserProfile(profile);
++ riotReader.read(input, null, lang.getContentType(), stream, null);
++ reader.setParserFinished(null);
++ } catch (Throwable e) {
++ reader.setParserFinished(e);
++ }
++ }
++ };
++ }
++
++ /**
++ * Sets the parser thread finished state
++ *
++ * @param e
++ * Error (if any)
++ */
++ private void setParserFinished(Throwable e) {
++ synchronized (this.parserThread) {
++ this.parserError = e;
++ this.parserFinished = true;
++ }
++ }
++
++ /**
++ * Waits for the parser thread to have reported as finished
++ *
++ * @throws InterruptedException
++ */
++ private void waitForParserFinished() throws InterruptedException {
++ do {
++ synchronized (this.parserThread) {
++ if (this.parserFinished)
++ return;
++ }
++ Thread.sleep(50);
++ } while (true);
++ }
++
++ /**
++ * Creates an instance of a writable tuple from the given tuple value
++ *
++ * @param tuple
++ * Tuple value
++ * @return Writable tuple
++ */
++ protected abstract T createInstance(TValue tuple);
++
++ @Override
++ public boolean nextKeyValue() throws IOException {
++ // Reuse key for efficiency
++ if (key == null) {
++ key = new LongWritable();
++ }
++
++ if (this.finished)
++ return false;
++
++ try {
++ if (this.iter.hasNext()) {
++ Long l = this.stream.getPosition();
++ if (l != null) {
++ this.key.set(l);
++ // For compressed input the actual length from which we
++ // calculate progress is likely less than the actual
++ // uncompressed length so we may need to increment the
++ // length as we go along
++ // We always add 1 more than the current length because we
++ // don't want to report 100% progress until we really have
++ // finished
++ if (this.compressionCodecs != null && l > this.length)
++ this.length = l + 1;
++ }
++ this.tuple = this.createInstance(this.iter.next());
++ return true;
++ } else {
++ // Need to ensure that the parser thread has finished in order
++ // to determine whether we finished without error
++ this.waitForParserFinished();
++ if (this.parserError != null) {
++ LOG.error("Error parsing whole file, aborting further parsing", this.parserError);
++ if (!this.ignoreBadTuples)
++ throw new IOException("Error parsing whole file at position " + this.input.getBytesRead() + ", aborting further parsing",
++ this.parserError);
++
++ }
++
++ this.key = null;
++ this.tuple = null;
++ this.finished = true;
++ // This is necessary so that when compressed input is used we
++ // report 100% progress once we've reached the genuine end of
++ // the stream
++ if (this.compressionCodecs != null)
++ this.length--;
++ return false;
++ }
++ } catch (Throwable e) {
++ // Failed to read the tuple on this line
++ LOG.error("Error parsing whole file, aborting further parsing", e);
++ if (!this.ignoreBadTuples) {
++ this.iter.close();
++ throw new IOException("Error parsing whole file at position " + this.input.getBytesRead() + ", aborting further parsing", e);
++ }
++ this.key = null;
++ this.tuple = null;
++ this.finished = true;
++ return false;
++ }
++ }
++
++ @Override
++ public LongWritable getCurrentKey() {
++ return this.key;
++ }
++
++ @Override
++ public T getCurrentValue() {
++ return this.tuple;
++ }
++
++ @Override
++ public float getProgress() {
++ float progress = 0.0f;
++ if (this.key == null) {
++ // We've either not started or we've finished
++ progress = (this.finished ? 1.0f : 0.0f);
++ } else if (this.key.get() == Long.MIN_VALUE) {
++ // We don't have a position so we've either in-progress or finished
++ progress = (this.finished ? 1.0f : 0.5f);
++ } else {
++ // We're some way through the file
++ progress = this.key.get() / (float) this.length;
++ }
++ LOG.debug("getProgress() --> {}", progress);
++ return progress;
++ }
++
++ @Override
++ public void close() throws IOException {
++ this.iter.close();
++ this.input.close();
++ this.finished = true;
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileQuadReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileQuadReader.java
index 9e62c1e,9e62c1e..8097d52
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileQuadReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileQuadReader.java
@@@ -1,29 -1,29 +1,29 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.readers;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedQuadsStream;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
--import org.apache.jena.hadoop.rdf.types.QuadWritable;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedQuadsStream;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
++import org.apache.jena.hadoop.rdf.types.QuadWritable;
import org.apache.jena.riot.lang.PipedRDFIterator;
--import org.apache.jena.sparql.core.Quad ;
++import org.apache.jena.sparql.core.Quad ;
/**
* An abstract record reader for whole file triple formats
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileTripleReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileTripleReader.java
index 384f456,384f456..1f56b07
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileTripleReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractWholeFileTripleReader.java
@@@ -1,28 -1,28 +1,28 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.readers;
--import org.apache.jena.graph.Triple ;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedTriplesStream;
--import org.apache.jena.hadoop.rdf.types.TripleWritable;
++import org.apache.jena.graph.Triple ;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedTriplesStream;
++import org.apache.jena.hadoop.rdf.types.TripleWritable;
import org.apache.jena.riot.lang.PipedRDFIterator;
/**
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/QuadsReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/QuadsReader.java
index 61f2236,61f2236..ecd930a
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/QuadsReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/QuadsReader.java
@@@ -1,48 -1,48 +1,48 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.hadoop.rdf.io.input.readers;
--
--import java.io.IOException;
--
--import org.apache.hadoop.io.LongWritable;
--import org.apache.hadoop.mapreduce.RecordReader;
--import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry;
--import org.apache.jena.hadoop.rdf.types.QuadWritable;
--import org.apache.jena.riot.Lang;
--import org.apache.jena.riot.RDFLanguages;
--import org.apache.jena.sparql.core.Quad ;
--
--/**
-- * A record reader that reads triples from any RDF quads format
-- */
--public class QuadsReader extends AbstractRdfReader<Quad, QuadWritable> {
--
-- @Override
-- protected RecordReader<LongWritable, QuadWritable> selectRecordReader(Lang lang) throws IOException {
-- if (!RDFLanguages.isQuads(lang))
-- throw new IOException(
-- lang.getLabel()
-- + " is not a RDF quads format, perhaps you wanted TriplesInputFormat or TriplesOrQuadsInputFormat instead?");
--
-- // This will throw an appropriate error if the language does not support
-- // triples
-- return HadoopRdfIORegistry.createQuadReader(lang);
-- }
--
--}
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.hadoop.rdf.io.input.readers;
++
++import java.io.IOException;
++
++import org.apache.hadoop.io.LongWritable;
++import org.apache.hadoop.mapreduce.RecordReader;
++import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry;
++import org.apache.jena.hadoop.rdf.types.QuadWritable;
++import org.apache.jena.riot.Lang;
++import org.apache.jena.riot.RDFLanguages;
++import org.apache.jena.sparql.core.Quad ;
++
++/**
++ * A record reader that reads triples from any RDF quads format
++ */
++public class QuadsReader extends AbstractRdfReader<Quad, QuadWritable> {
++
++ @Override
++ protected RecordReader<LongWritable, QuadWritable> selectRecordReader(Lang lang) throws IOException {
++ if (!RDFLanguages.isQuads(lang))
++ throw new IOException(
++ lang.getLabel()
++ + " is not a RDF quads format, perhaps you wanted TriplesInputFormat or TriplesOrQuadsInputFormat instead?");
++
++ // This will throw an appropriate error if the language does not support
++ // triples
++ return HadoopRdfIORegistry.createQuadReader(lang);
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesOrQuadsReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesOrQuadsReader.java
index 308d915,308d915..9d78266
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesOrQuadsReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesOrQuadsReader.java
@@@ -1,71 -1,71 +1,71 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.hadoop.rdf.io.input.readers;
--
--import java.io.IOException;
--
--import org.apache.hadoop.io.LongWritable;
--import org.apache.hadoop.mapreduce.RecordReader;
--import org.apache.jena.graph.Node ;
--import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry;
--import org.apache.jena.hadoop.rdf.types.QuadWritable;
--import org.apache.jena.riot.Lang;
--import org.apache.jena.riot.RDFLanguages;
--import org.apache.jena.sparql.core.Quad ;
--
--/**
-- * A record reader that reads RDF from any triples/quads format. Triples are
-- * converted into quads in the default graph. This behaviour can be changed by
-- * deriving from this class and overriding the {@link #getGraphNode()} method
-- *
-- *
-- *
-- */
--@SuppressWarnings("javadoc")
--public class TriplesOrQuadsReader extends AbstractRdfReader<Quad, QuadWritable> {
--
-- @Override
-- protected RecordReader<LongWritable, QuadWritable> selectRecordReader(Lang lang) throws IOException {
-- if (!RDFLanguages.isQuads(lang) && !RDFLanguages.isTriples(lang))
-- throw new IOException(lang.getLabel() + " is not a RDF triples/quads format");
--
-- if (HadoopRdfIORegistry.hasQuadReader(lang)) {
-- // Supports quads directly
-- return HadoopRdfIORegistry.createQuadReader(lang);
-- } else {
-- // Try to create a triples reader and wrap upwards into quads
-- // This will throw an error if a triple reader is not available
-- return new TriplesToQuadsReader(HadoopRdfIORegistry.createTripleReader(lang));
-- }
-- }
--
-- /**
-- * Gets the graph node which represents the graph into which triples will be
-- * indicated to belong to when they are converting into quads.
-- * <p>
-- * Defaults to {@link Quad#defaultGraphNodeGenerated} which represents the
-- * default graph
-- * </p>
-- *
-- * @return Graph node
-- */
-- protected Node getGraphNode() {
-- return Quad.defaultGraphNodeGenerated;
-- }
--}
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.hadoop.rdf.io.input.readers;
++
++import java.io.IOException;
++
++import org.apache.hadoop.io.LongWritable;
++import org.apache.hadoop.mapreduce.RecordReader;
++import org.apache.jena.graph.Node ;
++import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry;
++import org.apache.jena.hadoop.rdf.types.QuadWritable;
++import org.apache.jena.riot.Lang;
++import org.apache.jena.riot.RDFLanguages;
++import org.apache.jena.sparql.core.Quad ;
++
++/**
++ * A record reader that reads RDF from any triples/quads format. Triples are
++ * converted into quads in the default graph. This behaviour can be changed by
++ * deriving from this class and overriding the {@link #getGraphNode()} method
++ *
++ *
++ *
++ */
++@SuppressWarnings("javadoc")
++public class TriplesOrQuadsReader extends AbstractRdfReader<Quad, QuadWritable> {
++
++ @Override
++ protected RecordReader<LongWritable, QuadWritable> selectRecordReader(Lang lang) throws IOException {
++ if (!RDFLanguages.isQuads(lang) && !RDFLanguages.isTriples(lang))
++ throw new IOException(lang.getLabel() + " is not a RDF triples/quads format");
++
++ if (HadoopRdfIORegistry.hasQuadReader(lang)) {
++ // Supports quads directly
++ return HadoopRdfIORegistry.createQuadReader(lang);
++ } else {
++ // Try to create a triples reader and wrap upwards into quads
++ // This will throw an error if a triple reader is not available
++ return new TriplesToQuadsReader(HadoopRdfIORegistry.createTripleReader(lang));
++ }
++ }
++
++ /**
++ * Gets the graph node which represents the graph into which triples will be
++ * indicated to belong to when they are converting into quads.
++ * <p>
++ * Defaults to {@link Quad#defaultGraphNodeGenerated} which represents the
++ * default graph
++ * </p>
++ *
++ * @return Graph node
++ */
++ protected Node getGraphNode() {
++ return Quad.defaultGraphNodeGenerated;
++ }
++}
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesReader.java
index 9fe930c,9fe930c..0467b5c
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesReader.java
@@@ -1,48 -1,48 +1,48 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.hadoop.rdf.io.input.readers;
--
--import java.io.IOException;
--
--import org.apache.hadoop.io.LongWritable;
--import org.apache.hadoop.mapreduce.RecordReader;
--import org.apache.jena.graph.Triple ;
--import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry;
--import org.apache.jena.hadoop.rdf.types.TripleWritable;
--import org.apache.jena.riot.Lang;
--import org.apache.jena.riot.RDFLanguages;
--
--/**
-- * A record reader that reads triples from any RDF triples format
-- */
--public class TriplesReader extends AbstractRdfReader<Triple, TripleWritable> {
--
-- @Override
-- protected RecordReader<LongWritable, TripleWritable> selectRecordReader(Lang lang) throws IOException {
-- if (!RDFLanguages.isTriples(lang))
-- throw new IOException(
-- lang.getLabel()
-- + " is not a RDF triples format, perhaps you wanted QuadsInputFormat or TriplesOrQuadsInputFormat instead?");
--
-- // This will throw an appropriate error if the language does not support
-- // triples
-- return HadoopRdfIORegistry.createTripleReader(lang);
-- }
--
--}
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.hadoop.rdf.io.input.readers;
++
++import java.io.IOException;
++
++import org.apache.hadoop.io.LongWritable;
++import org.apache.hadoop.mapreduce.RecordReader;
++import org.apache.jena.graph.Triple ;
++import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry;
++import org.apache.jena.hadoop.rdf.types.TripleWritable;
++import org.apache.jena.riot.Lang;
++import org.apache.jena.riot.RDFLanguages;
++
++/**
++ * A record reader that reads triples from any RDF triples format
++ */
++public class TriplesReader extends AbstractRdfReader<Triple, TripleWritable> {
++
++ @Override
++ protected RecordReader<LongWritable, TripleWritable> selectRecordReader(Lang lang) throws IOException {
++ if (!RDFLanguages.isTriples(lang))
++ throw new IOException(
++ lang.getLabel()
++ + " is not a RDF triples format, perhaps you wanted QuadsInputFormat or TriplesOrQuadsInputFormat instead?");
++
++ // This will throw an appropriate error if the language does not support
++ // triples
++ return HadoopRdfIORegistry.createTripleReader(lang);
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesToQuadsReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesToQuadsReader.java
index 14d6c6e,14d6c6e..e4de126
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesToQuadsReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesToQuadsReader.java
@@@ -1,33 -1,33 +1,33 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.readers;
import java.io.IOException;
--
++
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
--import org.apache.jena.graph.Node ;
--import org.apache.jena.hadoop.rdf.types.QuadWritable;
--import org.apache.jena.hadoop.rdf.types.TripleWritable;
--import org.apache.jena.sparql.core.Quad ;
++import org.apache.jena.graph.Node ;
++import org.apache.jena.hadoop.rdf.types.QuadWritable;
++import org.apache.jena.hadoop.rdf.types.TripleWritable;
++import org.apache.jena.sparql.core.Quad ;
/**
* A record reader that converts triples into quads by wrapping a
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/BlockedNQuadsReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/BlockedNQuadsReader.java
index cef8ef1,cef8ef1..ad46cde
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/BlockedNQuadsReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/BlockedNQuadsReader.java
@@@ -1,24 -1,24 +1,24 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.readers.nquads;
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractBlockBasedQuadReader;
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractBlockBasedQuadReader;
import org.apache.jena.riot.Lang;
/**
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/NQuadsReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/NQuadsReader.java
index 437b43f,437b43f..356cd94
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/NQuadsReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/NQuadsReader.java
@@@ -1,31 -1,31 +1,31 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.readers.nquads;
import java.util.Iterator;
--
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractLineBasedQuadReader;
++
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractLineBasedQuadReader;
import org.apache.jena.riot.lang.LangNQuads;
import org.apache.jena.riot.system.ParserProfile;
import org.apache.jena.riot.tokens.Tokenizer;
import org.apache.jena.riot.tokens.TokenizerFactory;
--import org.apache.jena.sparql.core.Quad ;
++import org.apache.jena.sparql.core.Quad ;
/**
* A record reader for NQuads
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/WholeFileNQuadsReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/WholeFileNQuadsReader.java
index 96e6f80,96e6f80..b96d458
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/WholeFileNQuadsReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/WholeFileNQuadsReader.java
@@@ -1,24 -1,24 +1,24 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.readers.nquads;
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
import org.apache.jena.riot.Lang;
/**
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/BlockedNTriplesReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/BlockedNTriplesReader.java
index 7268d5a,7268d5a..ee6ee22
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/BlockedNTriplesReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/BlockedNTriplesReader.java
@@@ -1,24 -1,24 +1,24 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.readers.ntriples;
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractBlockBasedTripleReader;
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractBlockBasedTripleReader;
import org.apache.jena.riot.Lang;
/**
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/NTriplesReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/NTriplesReader.java
index 97a9bf4,97a9bf4..e38166f
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/NTriplesReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/NTriplesReader.java
@@@ -1,27 -1,27 +1,27 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.readers.ntriples;
import java.util.Iterator;
--
--import org.apache.jena.graph.Triple ;
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractLineBasedTripleReader;
++
++import org.apache.jena.graph.Triple ;
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractLineBasedTripleReader;
import org.apache.jena.riot.lang.LangNTriples;
import org.apache.jena.riot.system.ParserProfile;
import org.apache.jena.riot.tokens.Tokenizer;
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/WholeFileNTriplesReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/WholeFileNTriplesReader.java
index c200d93,c200d93..b01c3df
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/WholeFileNTriplesReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/WholeFileNTriplesReader.java
@@@ -1,24 -1,24 +1,24 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.readers.ntriples;
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
import org.apache.jena.riot.Lang;
/**
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfjson/RdfJsonReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfjson/RdfJsonReader.java
index 009024b,009024b..6cab094
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfjson/RdfJsonReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfjson/RdfJsonReader.java
@@@ -1,24 -1,24 +1,24 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.readers.rdfjson;
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
import org.apache.jena.riot.Lang;
/**
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfxml/RdfXmlReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfxml/RdfXmlReader.java
index 9c374c6,9c374c6..b5e943f
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfxml/RdfXmlReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfxml/RdfXmlReader.java
@@@ -1,24 -1,24 +1,24 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.readers.rdfxml;
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
import org.apache.jena.riot.Lang;
/**
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trig/TriGReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trig/TriGReader.java
index b1b0c3c,b1b0c3c..237c9c1
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trig/TriGReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trig/TriGReader.java
@@@ -1,24 -1,24 +1,24 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.readers.trig;
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
import org.apache.jena.riot.Lang;
/**
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trix/TriXReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trix/TriXReader.java
index 6873c64,6873c64..5087370
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trix/TriXReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trix/TriXReader.java
@@@ -1,24 -1,24 +1,24 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.readers.trix;
--import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
++import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
import org.apache.jena.riot.Lang;
/**