You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2016/05/14 14:03:18 UTC
[19/42] jena git commit: Merge commit 'refs/pull/143/head' of
github.com:apache/jena
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java
index e1110b6,e1110b6..cda77e5
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java
@@@ -1,31 -1,31 +1,31 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.nquads;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
--import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat;
--import org.apache.jena.hadoop.rdf.io.input.readers.nquads.NQuadsReader;
--import org.apache.jena.hadoop.rdf.types.QuadWritable;
--
++import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat;
++import org.apache.jena.hadoop.rdf.io.input.readers.nquads.NQuadsReader;
++import org.apache.jena.hadoop.rdf.types.QuadWritable;
++
/**
* NQuads input format
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java
index 9d5966f,9d5966f..086ddba
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java
@@@ -1,30 -1,30 +1,30 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.nquads;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
--import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
--import org.apache.jena.hadoop.rdf.io.input.readers.nquads.WholeFileNQuadsReader;
--import org.apache.jena.hadoop.rdf.types.QuadWritable;
++import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
++import org.apache.jena.hadoop.rdf.io.input.readers.nquads.WholeFileNQuadsReader;
++import org.apache.jena.hadoop.rdf.types.QuadWritable;
/**
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java
index 3536f5d,3536f5d..81cc6a2
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java
@@@ -1,30 -1,30 +1,30 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.ntriples;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
--import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat;
--import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.BlockedNTriplesReader;
--import org.apache.jena.hadoop.rdf.types.TripleWritable;
++import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat;
++import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.BlockedNTriplesReader;
++import org.apache.jena.hadoop.rdf.types.TripleWritable;
/**
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java
index bbe4c9a,bbe4c9a..5bfa04c
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java
@@@ -1,31 -1,31 +1,31 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.ntriples;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
--import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat;
--import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.NTriplesReader;
--import org.apache.jena.hadoop.rdf.types.TripleWritable;
--
++import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat;
++import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.NTriplesReader;
++import org.apache.jena.hadoop.rdf.types.TripleWritable;
++
/**
* NTriples input format
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java
index 5a06789,5a06789..0b7db0a
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java
@@@ -1,30 -1,30 +1,30 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.ntriples;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
--import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
--import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.WholeFileNTriplesReader;
--import org.apache.jena.hadoop.rdf.types.TripleWritable;
++import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
++import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.WholeFileNTriplesReader;
++import org.apache.jena.hadoop.rdf.types.TripleWritable;
/**
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java
index 29d0f6f,29d0f6f..66b1833
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java
@@@ -1,30 -1,30 +1,30 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.rdfjson;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
--import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
--import org.apache.jena.hadoop.rdf.io.input.readers.rdfjson.RdfJsonReader;
--import org.apache.jena.hadoop.rdf.types.TripleWritable;
++import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
++import org.apache.jena.hadoop.rdf.io.input.readers.rdfjson.RdfJsonReader;
++import org.apache.jena.hadoop.rdf.types.TripleWritable;
/**
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java
index f231073,f231073..0a2b25b
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java
@@@ -1,30 -1,30 +1,30 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.rdfxml;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
--import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
--import org.apache.jena.hadoop.rdf.io.input.readers.rdfxml.RdfXmlReader;
--import org.apache.jena.hadoop.rdf.types.TripleWritable;
++import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
++import org.apache.jena.hadoop.rdf.io.input.readers.rdfxml.RdfXmlReader;
++import org.apache.jena.hadoop.rdf.types.TripleWritable;
/**
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java
index cf5e621,cf5e621..33d2baf
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java
@@@ -1,344 -1,344 +1,344 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.hadoop.rdf.io.input.readers;
--
--import java.io.IOException;
--import java.io.InputStream;
--
--import org.apache.hadoop.conf.Configuration;
--import org.apache.hadoop.fs.FSDataInputStream;
--import org.apache.hadoop.fs.FileSystem;
--import org.apache.hadoop.fs.Path;
--import org.apache.hadoop.io.LongWritable;
--import org.apache.hadoop.io.compress.CompressionCodec;
--import org.apache.hadoop.io.compress.CompressionCodecFactory;
--import org.apache.hadoop.mapreduce.InputSplit;
--import org.apache.hadoop.mapreduce.RecordReader;
--import org.apache.hadoop.mapreduce.TaskAttemptContext;
--import org.apache.hadoop.mapreduce.lib.input.FileSplit;
--import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
--import org.apache.jena.hadoop.rdf.io.input.util.BlockInputStream;
--import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackedInputStream;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
--import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
--import org.apache.jena.riot.Lang;
--import org.apache.jena.riot.RDFDataMgr;
--import org.apache.jena.riot.ReaderRIOT;
--import org.apache.jena.riot.lang.PipedRDFIterator;
--import org.apache.jena.riot.lang.PipedRDFStream;
--import org.apache.jena.riot.system.ParserProfile;
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
--/**
-- * An abstract implementation for a record reader that reads records from blocks
-- * of files, this is a hybrid between {@link AbstractLineBasedNodeTupleReader}
-- * and {@link AbstractWholeFileNodeTupleReader} in that it can only be used by
-- * formats which can be split by lines but reduces the overhead by parsing the
-- * split as a whole rather than as individual lines.
-- * <p>
-- * The keys produced are the approximate position in the file at which a tuple
-- * was found and the values will be node tuples. Positions are approximate
-- * because they are recorded after the point at which the most recent tuple was
-- * parsed from the input thus they reflect the approximate position in the
-- * stream immediately after which the triple was found.
-- * </p>
-- *
-- *
-- *
-- * @param <TValue>
-- * Value type
-- * @param <T>
-- * Tuple type
-- */
--public abstract class AbstractBlockBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> {
--
-- private static final Logger LOG = LoggerFactory.getLogger(AbstractBlockBasedNodeTupleReader.class);
-- private CompressionCodec compressionCodecs;
-- private TrackableInputStream input;
-- private LongWritable key;
-- private long start, length;
-- private T tuple;
-- private TrackedPipedRDFStream<TValue> stream;
-- private PipedRDFIterator<TValue> iter;
-- private Thread parserThread;
-- private boolean finished = false;
-- private boolean ignoreBadTuples = true;
-- private boolean parserFinished = false;
-- private Throwable parserError = null;
--
-- @Override
-- public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
-- LOG.debug("initialize({}, {})", genericSplit, context);
--
-- // Assuming file split
-- if (!(genericSplit instanceof FileSplit))
-- throw new IOException("This record reader only supports FileSplit inputs");
-- FileSplit split = (FileSplit) genericSplit;
--
-- // Configuration
-- Configuration config = context.getConfiguration();
-- this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true);
-- if (this.ignoreBadTuples)
-- LOG.warn(
-- "Configured to ignore bad tuples, parsing errors will be logged and further parsing aborted but no user visible errors will be thrown. Consider setting {} to false to disable this behaviour",
-- RdfIOConstants.INPUT_IGNORE_BAD_TUPLES);
--
-- // Figure out what portion of the file to read
-- start = split.getStart();
-- long end = start + split.getLength();
-- final Path file = split.getPath();
-- long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen();
-- boolean readToEnd = end == totalLength;
-- CompressionCodecFactory factory = new CompressionCodecFactory(config);
-- this.compressionCodecs = factory.getCodec(file);
--
-- LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start, split.getLength(), totalLength }));
--
-- // Open the file and prepare the input stream
-- FileSystem fs = file.getFileSystem(config);
-- FSDataInputStream fileIn = fs.open(file);
-- this.length = split.getLength();
-- if (start > 0)
-- fileIn.seek(start);
--
-- if (this.compressionCodecs != null) {
-- // Compressed input
-- // For compressed input NLineInputFormat will have failed to find
-- // any line breaks and will give us a split from 0 -> (length - 1)
-- // Add 1 and re-verify readToEnd so we can abort correctly if ever
-- // given a partial split of a compressed file
-- end++;
-- readToEnd = end == totalLength;
-- if (start > 0 || !readToEnd)
-- throw new IOException("This record reader can only be used with compressed input where the split is a whole file");
-- input = new TrackedInputStream(this.compressionCodecs.createInputStream(fileIn));
-- } else {
-- // Uncompressed input
--
-- if (readToEnd) {
-- input = new TrackedInputStream(fileIn);
-- } else {
-- // Need to limit the portion of the file we are reading
-- input = new BlockInputStream(fileIn, split.getLength());
-- }
-- }
--
-- // Set up background thread for parser
-- iter = this.getPipedIterator();
-- this.stream = this.getPipedStream(iter, this.input);
-- ParserProfile profile = RdfIOUtils.createParserProfile(context, file);
-- Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage(), profile);
-- this.parserThread = new Thread(parserRunnable);
-- this.parserThread.setDaemon(true);
-- this.parserThread.start();
-- }
--
-- /**
-- * Gets the RDF iterator to use
-- *
-- * @return Iterator
-- */
-- protected abstract PipedRDFIterator<TValue> getPipedIterator();
--
-- /**
-- * Gets the RDF stream to parse to
-- *
-- * @param iterator
-- * Iterator
-- * @return RDF stream
-- */
-- protected abstract TrackedPipedRDFStream<TValue> getPipedStream(PipedRDFIterator<TValue> iterator, TrackableInputStream input);
--
-- /**
-- * Gets the RDF language to use for parsing
-- *
-- * @return
-- */
-- protected abstract Lang getRdfLanguage();
--
-- /**
-- * Creates the runnable upon which the parsing will run
-- *
-- * @param input
-- * Input
-- * @param stream
-- * Stream
-- * @param lang
-- * Language to use for parsing
-- * @return Parser runnable
-- */
-- private Runnable createRunnable(@SuppressWarnings("rawtypes") final AbstractBlockBasedNodeTupleReader reader, final InputStream input,
-- final PipedRDFStream<TValue> stream, final Lang lang, final ParserProfile profile) {
-- return new Runnable() {
-- @Override
-- public void run() {
-- try {
-- ReaderRIOT riotReader = RDFDataMgr.createReader(lang);
-- riotReader.setParserProfile(profile);
-- riotReader.read(input, null, lang.getContentType(), stream, null);
-- //RDFDataMgr.parse(stream, input, null, lang);
-- reader.setParserFinished(null);
-- } catch (Throwable e) {
-- reader.setParserFinished(e);
-- }
-- }
-- };
-- }
--
-- /**
-- * Sets the parser thread finished state
-- *
-- * @param e
-- * Error (if any)
-- */
-- private void setParserFinished(Throwable e) {
-- synchronized (this.parserThread) {
-- this.parserError = e;
-- this.parserFinished = true;
-- }
-- }
--
-- /**
-- * Waits for the parser thread to have reported as finished
-- *
-- * @throws InterruptedException
-- */
-- private void waitForParserFinished() throws InterruptedException {
-- do {
-- synchronized (this.parserThread) {
-- if (this.parserFinished)
-- return;
-- }
-- Thread.sleep(50);
-- } while (true);
-- }
--
-- /**
-- * Creates an instance of a writable tuple from the given tuple value
-- *
-- * @param tuple
-- * Tuple value
-- * @return Writable tuple
-- */
-- protected abstract T createInstance(TValue tuple);
--
-- @Override
-- public boolean nextKeyValue() throws IOException {
-- // Reuse key for efficiency
-- if (key == null) {
-- key = new LongWritable();
-- }
--
-- if (this.finished)
-- return false;
--
-- try {
-- if (this.iter.hasNext()) {
-- // Position will be relative to the start for the split we're
-- // processing
-- Long l = this.start + this.stream.getPosition();
-- if (l != null) {
-- this.key.set(l);
-- // For compressed input the actual length from which we
-- // calculate progress is likely less than the actual
-- // uncompressed length so we need to increment the
-- // length as we go along
-- // We always add 1 more than the current length because we
-- // don't want to report 100% progress until we really have
-- // finished
-- if (this.compressionCodecs != null && l > this.length)
-- this.length = l + 1;
-- }
-- this.tuple = this.createInstance(this.iter.next());
-- return true;
-- } else {
-- // Need to ensure that the parser thread has finished in order
-- // to determine whether we finished without error
-- this.waitForParserFinished();
-- if (this.parserError != null) {
-- LOG.error("Error parsing block, aborting further parsing", this.parserError);
-- if (!this.ignoreBadTuples)
-- throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead()) + ", aborting further parsing",
-- this.parserError);
-- }
--
-- this.key = null;
-- this.tuple = null;
-- this.finished = true;
-- // This is necessary so that when compressed input is used we
-- // report 100% progress once we've reached the genuine end of
-- // the stream
-- if (this.compressionCodecs != null)
-- this.length--;
-- return false;
-- }
-- } catch (IOException e) {
-- throw e;
-- } catch (Throwable e) {
-- // Failed to read the tuple on this line
-- LOG.error("Error parsing block, aborting further parsing", e);
-- if (!this.ignoreBadTuples) {
-- this.iter.close();
-- throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead()) + ", aborting further parsing", e);
-- }
-- this.key = null;
-- this.tuple = null;
-- this.finished = true;
-- return false;
-- }
-- }
--
-- @Override
-- public LongWritable getCurrentKey() {
-- return this.key;
-- }
--
-- @Override
-- public T getCurrentValue() {
-- return this.tuple;
-- }
--
-- @Override
-- public float getProgress() {
-- float progress = 0.0f;
-- if (this.key == null) {
-- // We've either not started or we've finished
-- progress = (this.finished ? 1.0f : 0.0f);
-- } else if (this.key.get() == Long.MIN_VALUE) {
-- // We don't have a position so we've either in-progress or finished
-- progress = (this.finished ? 1.0f : 0.5f);
-- } else {
-- // We're some way through the file
-- progress = (this.key.get() - this.start) / (float) this.length;
-- }
-- LOG.debug("getProgress() --> {}", progress);
-- return progress;
-- }
--
-- @Override
-- public void close() throws IOException {
-- this.iter.close();
-- this.input.close();
-- this.finished = true;
-- }
--
--}
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.hadoop.rdf.io.input.readers;
++
++import java.io.IOException;
++import java.io.InputStream;
++
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FSDataInputStream;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.io.LongWritable;
++import org.apache.hadoop.io.compress.CompressionCodec;
++import org.apache.hadoop.io.compress.CompressionCodecFactory;
++import org.apache.hadoop.mapreduce.InputSplit;
++import org.apache.hadoop.mapreduce.RecordReader;
++import org.apache.hadoop.mapreduce.TaskAttemptContext;
++import org.apache.hadoop.mapreduce.lib.input.FileSplit;
++import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
++import org.apache.jena.hadoop.rdf.io.input.util.BlockInputStream;
++import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackedInputStream;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
++import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
++import org.apache.jena.riot.Lang;
++import org.apache.jena.riot.RDFDataMgr;
++import org.apache.jena.riot.ReaderRIOT;
++import org.apache.jena.riot.lang.PipedRDFIterator;
++import org.apache.jena.riot.lang.PipedRDFStream;
++import org.apache.jena.riot.system.ParserProfile;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++/**
++ * An abstract implementation for a record reader that reads records from blocks
++ * of files, this is a hybrid between {@link AbstractLineBasedNodeTupleReader}
++ * and {@link AbstractWholeFileNodeTupleReader} in that it can only be used by
++ * formats which can be split by lines but reduces the overhead by parsing the
++ * split as a whole rather than as individual lines.
++ * <p>
++ * The keys produced are the approximate position in the file at which a tuple
++ * was found and the values will be node tuples. Positions are approximate
++ * because they are recorded after the point at which the most recent tuple was
++ * parsed from the input thus they reflect the approximate position in the
++ * stream immediately after which the triple was found.
++ * </p>
++ *
++ *
++ *
++ * @param <TValue>
++ * Value type
++ * @param <T>
++ * Tuple type
++ */
++public abstract class AbstractBlockBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> {
++
++ private static final Logger LOG = LoggerFactory.getLogger(AbstractBlockBasedNodeTupleReader.class);
++ private CompressionCodec compressionCodecs;
++ private TrackableInputStream input;
++ private LongWritable key;
++ private long start, length;
++ private T tuple;
++ private TrackedPipedRDFStream<TValue> stream;
++ private PipedRDFIterator<TValue> iter;
++ private Thread parserThread;
++ private boolean finished = false;
++ private boolean ignoreBadTuples = true;
++ private boolean parserFinished = false;
++ private Throwable parserError = null;
++
++ @Override
++ public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
++ LOG.debug("initialize({}, {})", genericSplit, context);
++
++ // Assuming file split
++ if (!(genericSplit instanceof FileSplit))
++ throw new IOException("This record reader only supports FileSplit inputs");
++ FileSplit split = (FileSplit) genericSplit;
++
++ // Configuration
++ Configuration config = context.getConfiguration();
++ this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true);
++ if (this.ignoreBadTuples)
++ LOG.warn(
++ "Configured to ignore bad tuples, parsing errors will be logged and further parsing aborted but no user visible errors will be thrown. Consider setting {} to false to disable this behaviour",
++ RdfIOConstants.INPUT_IGNORE_BAD_TUPLES);
++
++ // Figure out what portion of the file to read
++ start = split.getStart();
++ long end = start + split.getLength();
++ final Path file = split.getPath();
++ long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen();
++ boolean readToEnd = end == totalLength;
++ CompressionCodecFactory factory = new CompressionCodecFactory(config);
++ this.compressionCodecs = factory.getCodec(file);
++
++ LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start, split.getLength(), totalLength }));
++
++ // Open the file and prepare the input stream
++ FileSystem fs = file.getFileSystem(config);
++ FSDataInputStream fileIn = fs.open(file);
++ this.length = split.getLength();
++ if (start > 0)
++ fileIn.seek(start);
++
++ if (this.compressionCodecs != null) {
++ // Compressed input
++ // For compressed input NLineInputFormat will have failed to find
++ // any line breaks and will give us a split from 0 -> (length - 1)
++ // Add 1 and re-verify readToEnd so we can abort correctly if ever
++ // given a partial split of a compressed file
++ end++;
++ readToEnd = end == totalLength;
++ if (start > 0 || !readToEnd)
++ throw new IOException("This record reader can only be used with compressed input where the split is a whole file");
++ input = new TrackedInputStream(this.compressionCodecs.createInputStream(fileIn));
++ } else {
++ // Uncompressed input
++
++ if (readToEnd) {
++ input = new TrackedInputStream(fileIn);
++ } else {
++ // Need to limit the portion of the file we are reading
++ input = new BlockInputStream(fileIn, split.getLength());
++ }
++ }
++
++ // Set up background thread for parser
++ iter = this.getPipedIterator();
++ this.stream = this.getPipedStream(iter, this.input);
++ ParserProfile profile = RdfIOUtils.createParserProfile(context, file);
++ Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage(), profile);
++ this.parserThread = new Thread(parserRunnable);
++ this.parserThread.setDaemon(true);
++ this.parserThread.start();
++ }
++
++ /**
++ * Gets the RDF iterator to use
++ *
++ * @return Iterator
++ */
++ protected abstract PipedRDFIterator<TValue> getPipedIterator();
++
++ /**
++ * Gets the RDF stream to parse to
++ *
++ * @param iterator
++ * Iterator
++ * @return RDF stream
++ */
++ protected abstract TrackedPipedRDFStream<TValue> getPipedStream(PipedRDFIterator<TValue> iterator, TrackableInputStream input);
++
++ /**
++ * Gets the RDF language to use for parsing
++ *
++ * @return
++ */
++ protected abstract Lang getRdfLanguage();
++
++ /**
++ * Creates the runnable upon which the parsing will run
++ *
++ * @param input
++ * Input
++ * @param stream
++ * Stream
++ * @param lang
++ * Language to use for parsing
++ * @return Parser runnable
++ */
++ private Runnable createRunnable(@SuppressWarnings("rawtypes") final AbstractBlockBasedNodeTupleReader reader, final InputStream input,
++ final PipedRDFStream<TValue> stream, final Lang lang, final ParserProfile profile) {
++ return new Runnable() {
++ @Override
++ public void run() {
++ try {
++ ReaderRIOT riotReader = RDFDataMgr.createReader(lang);
++ riotReader.setParserProfile(profile);
++ riotReader.read(input, null, lang.getContentType(), stream, null);
++ //RDFDataMgr.parse(stream, input, null, lang);
++ reader.setParserFinished(null);
++ } catch (Throwable e) {
++ reader.setParserFinished(e);
++ }
++ }
++ };
++ }
++
++ /**
++ * Sets the parser thread finished state
++ *
++ * @param e
++ * Error (if any)
++ */
++ private void setParserFinished(Throwable e) {
++ synchronized (this.parserThread) {
++ this.parserError = e;
++ this.parserFinished = true;
++ }
++ }
++
++ /**
++ * Waits for the parser thread to have reported as finished
++ *
++ * @throws InterruptedException
++ */
++ private void waitForParserFinished() throws InterruptedException {
++ do {
++ synchronized (this.parserThread) {
++ if (this.parserFinished)
++ return;
++ }
++ Thread.sleep(50);
++ } while (true);
++ }
++
++ /**
++ * Creates an instance of a writable tuple from the given tuple value
++ *
++ * @param tuple
++ * Tuple value
++ * @return Writable tuple
++ */
++ protected abstract T createInstance(TValue tuple);
++
++ @Override
++ public boolean nextKeyValue() throws IOException {
++ // Reuse key for efficiency
++ if (key == null) {
++ key = new LongWritable();
++ }
++
++ if (this.finished)
++ return false;
++
++ try {
++ if (this.iter.hasNext()) {
++ // Position will be relative to the start for the split we're
++ // processing
++ Long l = this.start + this.stream.getPosition();
++ if (l != null) {
++ this.key.set(l);
++ // For compressed input the actual length from which we
++ // calculate progress is likely less than the actual
++ // uncompressed length so we need to increment the
++ // length as we go along
++ // We always add 1 more than the current length because we
++ // don't want to report 100% progress until we really have
++ // finished
++ if (this.compressionCodecs != null && l > this.length)
++ this.length = l + 1;
++ }
++ this.tuple = this.createInstance(this.iter.next());
++ return true;
++ } else {
++ // Need to ensure that the parser thread has finished in order
++ // to determine whether we finished without error
++ this.waitForParserFinished();
++ if (this.parserError != null) {
++ LOG.error("Error parsing block, aborting further parsing", this.parserError);
++ if (!this.ignoreBadTuples)
++ throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead()) + ", aborting further parsing",
++ this.parserError);
++ }
++
++ this.key = null;
++ this.tuple = null;
++ this.finished = true;
++ // This is necessary so that when compressed input is used we
++ // report 100% progress once we've reached the genuine end of
++ // the stream
++ if (this.compressionCodecs != null)
++ this.length--;
++ return false;
++ }
++ } catch (IOException e) {
++ throw e;
++ } catch (Throwable e) {
++ // Failed to read the tuple on this line
++ LOG.error("Error parsing block, aborting further parsing", e);
++ if (!this.ignoreBadTuples) {
++ this.iter.close();
++ throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead()) + ", aborting further parsing", e);
++ }
++ this.key = null;
++ this.tuple = null;
++ this.finished = true;
++ return false;
++ }
++ }
++
++ @Override
++ public LongWritable getCurrentKey() {
++ return this.key;
++ }
++
++ @Override
++ public T getCurrentValue() {
++ return this.tuple;
++ }
++
++ @Override
++ public float getProgress() {
++ float progress = 0.0f;
++ if (this.key == null) {
++ // We've either not started or we've finished
++ progress = (this.finished ? 1.0f : 0.0f);
++ } else if (this.key.get() == Long.MIN_VALUE) {
++ // We don't have a position so we've either in-progress or finished
++ progress = (this.finished ? 1.0f : 0.5f);
++ } else {
++ // We're some way through the file
++ progress = (this.key.get() - this.start) / (float) this.length;
++ }
++ LOG.debug("getProgress() --> {}", progress);
++ return progress;
++ }
++
++ @Override
++ public void close() throws IOException {
++ this.iter.close();
++ this.input.close();
++ this.finished = true;
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java
index c6b2a6f,c6b2a6f..adc431f
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java
@@@ -1,29 -1,29 +1,29 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.readers;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedQuadsStream;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
--import org.apache.jena.hadoop.rdf.types.QuadWritable;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedQuadsStream;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
++import org.apache.jena.hadoop.rdf.types.QuadWritable;
import org.apache.jena.riot.lang.PipedRDFIterator;
--import org.apache.jena.sparql.core.Quad ;
++import org.apache.jena.sparql.core.Quad ;
/**
* An abstract record reader for whole file triple formats
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java
index 384b223,384b223..43a171c
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java
@@@ -1,28 -1,28 +1,28 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.readers;
--import org.apache.jena.graph.Triple ;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
--import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedTriplesStream;
--import org.apache.jena.hadoop.rdf.types.TripleWritable;
++import org.apache.jena.graph.Triple ;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
++import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedTriplesStream;
++import org.apache.jena.hadoop.rdf.types.TripleWritable;
import org.apache.jena.riot.lang.PipedRDFIterator;
/**
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java
index 04adb3b,04adb3b..87d2e06
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java
@@@ -1,265 -1,265 +1,265 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.hadoop.rdf.io.input.readers;
--
--import java.io.IOException;
--import java.util.Iterator;
--import org.apache.hadoop.conf.Configuration;
--import org.apache.hadoop.fs.FSDataInputStream;
--import org.apache.hadoop.fs.FileSystem;
--import org.apache.hadoop.fs.Path;
--import org.apache.hadoop.io.LongWritable;
--import org.apache.hadoop.io.Text;
--import org.apache.hadoop.io.compress.CompressionCodec;
--import org.apache.hadoop.io.compress.CompressionCodecFactory;
--import org.apache.hadoop.mapreduce.InputSplit;
--import org.apache.hadoop.mapreduce.RecordReader;
--import org.apache.hadoop.mapreduce.TaskAttemptContext;
--import org.apache.hadoop.mapreduce.lib.input.FileSplit;
--import org.apache.hadoop.util.LineReader;
--import org.apache.jena.hadoop.rdf.io.HadoopIOConstants;
--import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
--import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils;
--import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
--import org.apache.jena.riot.system.ParserProfile;
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
--/**
-- * An abstract implementation of a record reader that reads records from line
-- * based tuple formats. This only supports reading from file splits currently.
-- * <p>
-- * The keys produced are the position of the line in the file and the values
-- * will be node tuples
-- * </p>
-- *
-- *
-- *
-- * @param <TValue>
-- * @param <T>
-- * Writable tuple type
-- */
--public abstract class AbstractLineBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> {
--
-- private static final Logger LOG = LoggerFactory.getLogger(AbstractLineBasedNodeTupleReader.class);
-- private CompressionCodecFactory compressionCodecs = null;
-- private long start, pos, end, estLength;
-- private int maxLineLength;
-- private LineReader in;
-- private LongWritable key = null;
-- private Text value = null;
-- private T tuple = null;
-- private ParserProfile profile = null;
-- private boolean ignoreBadTuples = true;
--
-- @Override
-- public final void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
-- LOG.debug("initialize({}, {})", genericSplit, context);
--
-- // Assuming file split
-- if (!(genericSplit instanceof FileSplit))
-- throw new IOException("This record reader only supports FileSplit inputs");
-- FileSplit split = (FileSplit) genericSplit;
--
-- // Configuration
-- profile = RdfIOUtils.createParserProfile(context, split.getPath());
-- Configuration config = context.getConfiguration();
-- this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true);
-- if (this.ignoreBadTuples)
-- LOG.warn(
-- "Configured to ignore bad tuples, parsing errors will be logged and the bad line skipped but no errors will be thrownConsider setting {} to false to disable this behaviour",
-- RdfIOConstants.INPUT_IGNORE_BAD_TUPLES);
--
-- // Figure out what portion of the file to read
-- this.maxLineLength = config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE);
-- start = split.getStart();
-- end = start + split.getLength();
-- final Path file = split.getPath();
-- long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen();
-- compressionCodecs = new CompressionCodecFactory(config);
-- final CompressionCodec codec = compressionCodecs.getCodec(file);
--
-- LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start, split.getLength(), totalLength }));
--
-- // Open the file and seek to the start of the split
-- FileSystem fs = file.getFileSystem(config);
-- FSDataInputStream fileIn = fs.open(file);
-- boolean skipFirstLine = false;
-- if (codec != null) {
-- // Compressed input
-- // For compressed input NLineInputFormat will have failed to find
-- // any line breaks and will give us a split from 0 -> (length - 1)
-- // Add 1 and verify we got complete split
-- if (totalLength > split.getLength() + 1)
-- throw new IOException("This record reader can only be used with compressed input where the split covers the whole file");
-- in = new LineReader(codec.createInputStream(fileIn), config);
-- estLength = end;
-- end = Long.MAX_VALUE;
-- } else {
-- // Uncompressed input
-- if (start != 0) {
-- skipFirstLine = true;
-- --start;
-- fileIn.seek(start);
-- }
-- in = new LineReader(fileIn, config);
-- }
-- // Skip first line and re-establish "start".
-- // This is to do with how line reader reads lines and how
-- // NLineInputFormat will provide the split information to use
-- if (skipFirstLine) {
-- start += in.readLine(new Text(), 0, (int) Math.min(Integer.MAX_VALUE, end - start));
-- }
-- this.pos = start;
-- }
--
-- /**
-- * Gets an iterator over the data on the current line
-- *
-- * @param line
-- * Line
-- * @param profile
-- * Parser profile
-- * @return Iterator
-- */
-- protected abstract Iterator<TValue> getIterator(String line, ParserProfile profile);
--
-- /**
-- * Creates an instance of a writable tuple from the given tuple value
-- *
-- * @param tuple
-- * Tuple value
-- * @return Writable tuple
-- */
-- protected abstract T createInstance(TValue tuple);
--
-- @Override
-- public final boolean nextKeyValue() throws IOException {
-- // Reuse key for efficiency
-- if (key == null) {
-- key = new LongWritable();
-- }
--
-- // Reset value which we use for reading lines
-- if (value == null) {
-- value = new Text();
-- }
-- tuple = null;
--
-- // Try to read the next valid line
-- int newSize = 0;
-- while (pos < end) {
-- // Read next line
-- newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
--
-- // Once we get an empty line we've reached the end of our input
-- if (newSize == 0) {
-- break;
-- }
--
-- // Update position, remember that where inputs are compressed we may
-- // be at a larger position then we expected because the length of
-- // the split is likely less than the length of the data once
-- // decompressed
-- key.set(pos);
-- pos += newSize;
-- if (pos > estLength)
-- estLength = pos + 1;
--
-- // Skip lines that exceed the line length limit that has been set
-- if (newSize >= maxLineLength) {
-- LOG.warn("Skipped oversized line of size {} at position {}", newSize, (pos - newSize));
-- continue;
-- }
--
-- // Attempt to read the tuple from current line
-- try {
-- Iterator<TValue> iter = this.getIterator(value.toString(), profile);
-- if (iter.hasNext()) {
-- tuple = this.createInstance(iter.next());
--
-- // If we reach here we've found a valid tuple so we can
-- // break out of the loop
-- break;
-- } else {
-- // Empty line/Comment line
-- LOG.debug("Valid line with no triple at position {}", (pos - newSize));
-- continue;
-- }
-- } catch (Throwable e) {
-- // Failed to read the tuple on this line
-- LOG.error("Bad tuple at position " + (pos - newSize), e);
-- if (this.ignoreBadTuples)
-- continue;
-- throw new IOException(String.format("Bad tuple at position %d", (pos - newSize)), e);
-- }
-- }
-- boolean result = this.tuple != null;
--
-- // End of input
-- if (newSize == 0) {
-- key = null;
-- value = null;
-- tuple = null;
-- result = false;
-- estLength = pos;
-- }
-- LOG.debug("nextKeyValue() --> {}", result);
-- return result;
-- }
--
-- @Override
-- public LongWritable getCurrentKey() {
-- LOG.debug("getCurrentKey() --> {}", key);
-- return key;
-- }
--
-- @Override
-- public T getCurrentValue() {
-- LOG.debug("getCurrentValue() --> {}", tuple);
-- return tuple;
-- }
--
-- @Override
-- public float getProgress() {
-- float progress = 0.0f;
-- if (start != end) {
-- if (end == Long.MAX_VALUE) {
-- if (estLength == 0)
-- return 1.0f;
-- // Use estimated length
-- progress = Math.min(1.0f, (pos - start) / (float) (estLength - start));
-- } else {
-- // Use actual length
-- progress = Math.min(1.0f, (pos - start) / (float) (end - start));
-- }
-- }
-- LOG.debug("getProgress() --> {}", progress);
-- return progress;
-- }
--
-- @Override
-- public void close() throws IOException {
-- LOG.debug("close()");
-- if (in != null) {
-- in.close();
-- }
-- }
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.hadoop.rdf.io.input.readers;
++
++import java.io.IOException;
++import java.util.Iterator;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FSDataInputStream;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.io.LongWritable;
++import org.apache.hadoop.io.Text;
++import org.apache.hadoop.io.compress.CompressionCodec;
++import org.apache.hadoop.io.compress.CompressionCodecFactory;
++import org.apache.hadoop.mapreduce.InputSplit;
++import org.apache.hadoop.mapreduce.RecordReader;
++import org.apache.hadoop.mapreduce.TaskAttemptContext;
++import org.apache.hadoop.mapreduce.lib.input.FileSplit;
++import org.apache.hadoop.util.LineReader;
++import org.apache.jena.hadoop.rdf.io.HadoopIOConstants;
++import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
++import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils;
++import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
++import org.apache.jena.riot.system.ParserProfile;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++/**
++ * An abstract implementation of a record reader that reads records from line
++ * based tuple formats. This only supports reading from file splits currently.
++ * <p>
++ * The keys produced are the position of the line in the file and the values
++ * will be node tuples
++ * </p>
++ *
++ *
++ *
++ * @param <TValue>
++ * @param <T>
++ * Writable tuple type
++ */
++public abstract class AbstractLineBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends RecordReader<LongWritable, T> {
++
++ private static final Logger LOG = LoggerFactory.getLogger(AbstractLineBasedNodeTupleReader.class);
++ private CompressionCodecFactory compressionCodecs = null;
++ private long start, pos, end, estLength;
++ private int maxLineLength;
++ private LineReader in;
++ private LongWritable key = null;
++ private Text value = null;
++ private T tuple = null;
++ private ParserProfile profile = null;
++ private boolean ignoreBadTuples = true;
++
++ @Override
++ public final void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
++ LOG.debug("initialize({}, {})", genericSplit, context);
++
++ // Assuming file split
++ if (!(genericSplit instanceof FileSplit))
++ throw new IOException("This record reader only supports FileSplit inputs");
++ FileSplit split = (FileSplit) genericSplit;
++
++ // Configuration
++ profile = RdfIOUtils.createParserProfile(context, split.getPath());
++ Configuration config = context.getConfiguration();
++ this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true);
++ if (this.ignoreBadTuples)
++ LOG.warn(
++ "Configured to ignore bad tuples, parsing errors will be logged and the bad line skipped but no errors will be thrownConsider setting {} to false to disable this behaviour",
++ RdfIOConstants.INPUT_IGNORE_BAD_TUPLES);
++
++ // Figure out what portion of the file to read
++ this.maxLineLength = config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE);
++ start = split.getStart();
++ end = start + split.getLength();
++ final Path file = split.getPath();
++ long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen();
++ compressionCodecs = new CompressionCodecFactory(config);
++ final CompressionCodec codec = compressionCodecs.getCodec(file);
++
++ LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start, split.getLength(), totalLength }));
++
++ // Open the file and seek to the start of the split
++ FileSystem fs = file.getFileSystem(config);
++ FSDataInputStream fileIn = fs.open(file);
++ boolean skipFirstLine = false;
++ if (codec != null) {
++ // Compressed input
++ // For compressed input NLineInputFormat will have failed to find
++ // any line breaks and will give us a split from 0 -> (length - 1)
++ // Add 1 and verify we got complete split
++ if (totalLength > split.getLength() + 1)
++ throw new IOException("This record reader can only be used with compressed input where the split covers the whole file");
++ in = new LineReader(codec.createInputStream(fileIn), config);
++ estLength = end;
++ end = Long.MAX_VALUE;
++ } else {
++ // Uncompressed input
++ if (start != 0) {
++ skipFirstLine = true;
++ --start;
++ fileIn.seek(start);
++ }
++ in = new LineReader(fileIn, config);
++ }
++ // Skip first line and re-establish "start".
++ // This is to do with how line reader reads lines and how
++ // NLineInputFormat will provide the split information to use
++ if (skipFirstLine) {
++ start += in.readLine(new Text(), 0, (int) Math.min(Integer.MAX_VALUE, end - start));
++ }
++ this.pos = start;
++ }
++
++ /**
++ * Gets an iterator over the data on the current line
++ *
++ * @param line
++ * Line
++ * @param profile
++ * Parser profile
++ * @return Iterator
++ */
++ protected abstract Iterator<TValue> getIterator(String line, ParserProfile profile);
++
++ /**
++ * Creates an instance of a writable tuple from the given tuple value
++ *
++ * @param tuple
++ * Tuple value
++ * @return Writable tuple
++ */
++ protected abstract T createInstance(TValue tuple);
++
++ @Override
++ public final boolean nextKeyValue() throws IOException {
++ // Reuse key for efficiency
++ if (key == null) {
++ key = new LongWritable();
++ }
++
++ // Reset value which we use for reading lines
++ if (value == null) {
++ value = new Text();
++ }
++ tuple = null;
++
++ // Try to read the next valid line
++ int newSize = 0;
++ while (pos < end) {
++ // Read next line
++ newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
++
++ // Once we get an empty line we've reached the end of our input
++ if (newSize == 0) {
++ break;
++ }
++
++ // Update position, remember that where inputs are compressed we may
++ // be at a larger position then we expected because the length of
++ // the split is likely less than the length of the data once
++ // decompressed
++ key.set(pos);
++ pos += newSize;
++ if (pos > estLength)
++ estLength = pos + 1;
++
++ // Skip lines that exceed the line length limit that has been set
++ if (newSize >= maxLineLength) {
++ LOG.warn("Skipped oversized line of size {} at position {}", newSize, (pos - newSize));
++ continue;
++ }
++
++ // Attempt to read the tuple from current line
++ try {
++ Iterator<TValue> iter = this.getIterator(value.toString(), profile);
++ if (iter.hasNext()) {
++ tuple = this.createInstance(iter.next());
++
++ // If we reach here we've found a valid tuple so we can
++ // break out of the loop
++ break;
++ } else {
++ // Empty line/Comment line
++ LOG.debug("Valid line with no triple at position {}", (pos - newSize));
++ continue;
++ }
++ } catch (Throwable e) {
++ // Failed to read the tuple on this line
++ LOG.error("Bad tuple at position " + (pos - newSize), e);
++ if (this.ignoreBadTuples)
++ continue;
++ throw new IOException(String.format("Bad tuple at position %d", (pos - newSize)), e);
++ }
++ }
++ boolean result = this.tuple != null;
++
++ // End of input
++ if (newSize == 0) {
++ key = null;
++ value = null;
++ tuple = null;
++ result = false;
++ estLength = pos;
++ }
++ LOG.debug("nextKeyValue() --> {}", result);
++ return result;
++ }
++
++ @Override
++ public LongWritable getCurrentKey() {
++ LOG.debug("getCurrentKey() --> {}", key);
++ return key;
++ }
++
++ @Override
++ public T getCurrentValue() {
++ LOG.debug("getCurrentValue() --> {}", tuple);
++ return tuple;
++ }
++
++ @Override
++ public float getProgress() {
++ float progress = 0.0f;
++ if (start != end) {
++ if (end == Long.MAX_VALUE) {
++ if (estLength == 0)
++ return 1.0f;
++ // Use estimated length
++ progress = Math.min(1.0f, (pos - start) / (float) (estLength - start));
++ } else {
++ // Use actual length
++ progress = Math.min(1.0f, (pos - start) / (float) (end - start));
++ }
++ }
++ LOG.debug("getProgress() --> {}", progress);
++ return progress;
++ }
++
++ @Override
++ public void close() throws IOException {
++ LOG.debug("close()");
++ if (in != null) {
++ in.close();
++ }
++ }
++
}
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java
index 84c168d,84c168d..8aec04c
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java
@@@ -1,29 -1,29 +1,29 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.readers;
import java.util.Iterator;
--
--import org.apache.jena.hadoop.rdf.types.QuadWritable;
++
++import org.apache.jena.hadoop.rdf.types.QuadWritable;
import org.apache.jena.riot.system.ParserProfile;
import org.apache.jena.riot.tokens.Tokenizer;
--import org.apache.jena.sparql.core.Quad ;
++import org.apache.jena.sparql.core.Quad ;
/**
* An abstract reader for line based quad formats
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java
----------------------------------------------------------------------
diff --cc jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java
index 780630d,780630d..a67e5f9
--- a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java
@@@ -1,27 -1,27 +1,27 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
package org.apache.jena.hadoop.rdf.io.input.readers;
import java.util.Iterator;
--
--import org.apache.jena.graph.Triple ;
--import org.apache.jena.hadoop.rdf.types.TripleWritable;
++
++import org.apache.jena.graph.Triple ;
++import org.apache.jena.hadoop.rdf.types.TripleWritable;
import org.apache.jena.riot.system.ParserProfile;
import org.apache.jena.riot.tokens.Tokenizer;