You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by rv...@apache.org on 2015/01/05 16:07:17 UTC
[09/52] [abbrv] jena git commit: Rebrand to Jena Elephas per
community vote
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesOrQuadsReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesOrQuadsReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesOrQuadsReader.java
new file mode 100644
index 0000000..b071f67
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesOrQuadsReader.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * A record reader that reads RDF from any triples/quads format. Triples are
+ * converted into quads in the default graph. This behaviour can be changed by
+ * deriving from this class and overriding the {@link #getGraphNode()} method
+ *
+ *
+ *
+ */
+@SuppressWarnings("javadoc")
+public class TriplesOrQuadsReader extends AbstractRdfReader<Quad, QuadWritable> {
+
+ @Override
+ protected RecordReader<LongWritable, QuadWritable> selectRecordReader(Lang lang) throws IOException {
+ if (!RDFLanguages.isQuads(lang) && !RDFLanguages.isTriples(lang))
+ throw new IOException(lang.getLabel() + " is not a RDF triples/quads format");
+
+ if (HadoopRdfIORegistry.hasQuadReader(lang)) {
+ // Supports quads directly
+ return HadoopRdfIORegistry.createQuadReader(lang);
+ } else {
+ // Try to create a triples reader and wrap upwards into quads
+ // This will throw an error if a triple reader is not available
+ return new TriplesToQuadsReader(HadoopRdfIORegistry.createTripleReader(lang));
+ }
+ }
+
+ /**
+ * Gets the graph node which represents the graph into which triples will be
+ * indicated to belong to when they are converting into quads.
+ * <p>
+ * Defaults to {@link Quad#defaultGraphNodeGenerated} which represents the
+ * default graph
+ * </p>
+ *
+ * @return Graph node
+ */
+ protected Node getGraphNode() {
+ return Quad.defaultGraphNodeGenerated;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesReader.java
new file mode 100644
index 0000000..96c4a65
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesReader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+import com.hp.hpl.jena.graph.Triple;
+
+/**
+ * A record reader that reads triples from any RDF triples format
+ */
+public class TriplesReader extends AbstractRdfReader<Triple, TripleWritable> {
+
+ @Override
+ protected RecordReader<LongWritable, TripleWritable> selectRecordReader(Lang lang) throws IOException {
+ if (!RDFLanguages.isTriples(lang))
+ throw new IOException(
+ lang.getLabel()
+ + " is not a RDF triples format, perhaps you wanted QuadsInputFormat or TriplesOrQuadsInputFormat instead?");
+
+ // This will throw an appropriate error if the language does not support
+ // triples
+ return HadoopRdfIORegistry.createTripleReader(lang);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesToQuadsReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesToQuadsReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesToQuadsReader.java
new file mode 100644
index 0000000..a388f0e
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/TriplesToQuadsReader.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * A record reader that converts triples into quads by wrapping a
+ * {@code RecordReader<LongWritable, TripleWritable>} implementation
+ *
+ *
+ *
+ */
+public class TriplesToQuadsReader extends RecordReader<LongWritable, QuadWritable> {
+
+ private final RecordReader<LongWritable, TripleWritable> reader;
+ private Node graph;
+
+ /**
+ * Creates a new reader
+ *
+ * @param reader
+ * Triple reader
+ */
+ public TriplesToQuadsReader(RecordReader<LongWritable, TripleWritable> reader) {
+ this(reader, Quad.defaultGraphNodeGenerated);
+ }
+
+ /**
+ * Creates a new reader
+ *
+ * @param reader
+ * Triple reader
+ * @param graphNode
+ * Graph node
+ */
+ public TriplesToQuadsReader(RecordReader<LongWritable, TripleWritable> reader, Node graphNode) {
+ if (reader == null)
+ throw new NullPointerException("reader cannot be null");
+ if (graphNode == null)
+ throw new NullPointerException("Graph node cannot be null");
+ this.reader = reader;
+ this.graph = graphNode;
+ }
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ this.reader.initialize(split, context);
+ }
+
+ @Override
+ public final boolean nextKeyValue() throws IOException, InterruptedException {
+ return this.reader.nextKeyValue();
+ }
+
+ @Override
+ public final LongWritable getCurrentKey() throws IOException, InterruptedException {
+ return this.reader.getCurrentKey();
+ }
+
+ @Override
+ public final QuadWritable getCurrentValue() throws IOException, InterruptedException {
+ TripleWritable t = this.reader.getCurrentValue();
+ return new QuadWritable(new Quad(this.graph, t.get()));
+ }
+
+ @Override
+ public final float getProgress() throws IOException, InterruptedException {
+ return this.reader.getProgress();
+ }
+
+ @Override
+ public final void close() throws IOException {
+ this.reader.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/jsonld/JsonLDQuadReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/jsonld/JsonLDQuadReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/jsonld/JsonLDQuadReader.java
new file mode 100644
index 0000000..1b3f467
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/jsonld/JsonLDQuadReader.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.jsonld;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+public class JsonLDQuadReader extends AbstractWholeFileQuadReader {
+
+ @Override
+ protected Lang getRdfLanguage() {
+ return RDFLanguages.JSONLD;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/jsonld/JsonLDTripleReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/jsonld/JsonLDTripleReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/jsonld/JsonLDTripleReader.java
new file mode 100644
index 0000000..7cdea9e
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/jsonld/JsonLDTripleReader.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.jsonld;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+public class JsonLDTripleReader extends AbstractWholeFileTripleReader {
+ @Override
+ protected Lang getRdfLanguage() {
+ return RDFLanguages.JSONLD;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/BlockedNQuadsReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/BlockedNQuadsReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/BlockedNQuadsReader.java
new file mode 100644
index 0000000..cef8ef1
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/BlockedNQuadsReader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.nquads;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractBlockBasedQuadReader;
+import org.apache.jena.riot.Lang;
+
+/**
+ * A record record for NQuads
+ * <p>
+ * This is a hybrid of the {@link NQuadsReader} and the
+ * {@link WholeFileNQuadsReader} in that it does not process individual lines
+ * rather it processes the inputs in blocks of lines parsing the whole block
+ * rather than individual lines. This provides a compromise between the higher
+ * parser setup of creating more parsers and the benefit of being able to split
+ * input files over multiple mappers.
+ * </p>
+ *
+ *
+ *
+ */
+public class BlockedNQuadsReader extends AbstractBlockBasedQuadReader {
+
+ @Override
+ protected Lang getRdfLanguage() {
+ return Lang.NQUADS;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/NQuadsReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/NQuadsReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/NQuadsReader.java
new file mode 100644
index 0000000..e00e318
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/NQuadsReader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.nquads;
+
+import java.util.Iterator;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractLineBasedQuadReader;
+import org.apache.jena.riot.lang.LangNQuads;
+import org.apache.jena.riot.system.ParserProfile;
+import org.apache.jena.riot.tokens.Tokenizer;
+import org.apache.jena.riot.tokens.TokenizerFactory;
+
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * A record reader for NQuads
+ *
+ *
+ *
+ */
+public class NQuadsReader extends AbstractLineBasedQuadReader {
+
+ @Override
+ protected Tokenizer getTokenizer(String line) {
+ return TokenizerFactory.makeTokenizerString(line);
+ }
+
+ @Override
+ protected Iterator<Quad> getQuadsIterator(Tokenizer tokenizer, ParserProfile profile) {
+ return new LangNQuads(tokenizer, profile, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/WholeFileNQuadsReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/WholeFileNQuadsReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/WholeFileNQuadsReader.java
new file mode 100644
index 0000000..96e6f80
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/nquads/WholeFileNQuadsReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.nquads;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
+import org.apache.jena.riot.Lang;
+
+/**
+ * A record record for NQuads
+ * <p>
+ * Unlike the {@link NQuadsReader} this processes files as a whole rather than
+ * individual lines. This has the advantage of less parser setup overhead but
+ * the disadvantage that the input cannot be split between multiple mappers.
+ * </p>
+ *
+ *
+ *
+ */
+public class WholeFileNQuadsReader extends AbstractWholeFileQuadReader {
+
+ @Override
+ protected Lang getRdfLanguage() {
+ return Lang.NQUADS;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/BlockedNTriplesReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/BlockedNTriplesReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/BlockedNTriplesReader.java
new file mode 100644
index 0000000..7268d5a
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/BlockedNTriplesReader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.ntriples;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractBlockBasedTripleReader;
+import org.apache.jena.riot.Lang;
+
+/**
+ * A record record for NTriples
+ * <p>
+ * This is a hybrid of the {@link NTriplesReader} and the
+ * {@link WholeFileNTriplesReader} in that it does not process individual lines
+ * rather it processes the inputs in blocks of lines parsing the whole block
+ * rather than individual lines. This provides a compromise between the higher
+ * parser setup of creating more parsers and the benefit of being able to split
+ * input files over multiple mappers.
+ * </p>
+ *
+ *
+ *
+ */
+public class BlockedNTriplesReader extends AbstractBlockBasedTripleReader {
+
+ @Override
+ protected Lang getRdfLanguage() {
+ return Lang.NTRIPLES;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/NTriplesReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/NTriplesReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/NTriplesReader.java
new file mode 100644
index 0000000..bfc8503
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/NTriplesReader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.ntriples;
+
+import java.util.Iterator;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractLineBasedTripleReader;
+import org.apache.jena.riot.lang.LangNTriples;
+import org.apache.jena.riot.system.ParserProfile;
+import org.apache.jena.riot.tokens.Tokenizer;
+import org.apache.jena.riot.tokens.TokenizerFactory;
+
+import com.hp.hpl.jena.graph.Triple;
+
+/**
+ * A record reader for NTriples
+ *
+ *
+ *
+ */
+public class NTriplesReader extends AbstractLineBasedTripleReader {
+
+ @Override
+ protected Iterator<Triple> getTriplesIterator(Tokenizer tokenizer, ParserProfile profile) {
+ return new LangNTriples(tokenizer, profile, null);
+ }
+
+ @Override
+ protected Tokenizer getTokenizer(String line) {
+ return TokenizerFactory.makeTokenizerString(line);
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/WholeFileNTriplesReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/WholeFileNTriplesReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/WholeFileNTriplesReader.java
new file mode 100644
index 0000000..c200d93
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/ntriples/WholeFileNTriplesReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.ntriples;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
+import org.apache.jena.riot.Lang;
+
+/**
+ * A record record for NTriples
+ * <p>
+ * Unlike the {@link NTriplesReader} this processes files as a whole rather than
+ * individual lines. This has the advantage of less parser setup overhead but
+ * the disadvantage that the input cannot be split between multiple mappers.
+ * </p>
+ *
+ *
+ *
+ */
+public class WholeFileNTriplesReader extends AbstractWholeFileTripleReader {
+
+ @Override
+ protected Lang getRdfLanguage() {
+ return Lang.NTRIPLES;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfjson/RdfJsonReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfjson/RdfJsonReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfjson/RdfJsonReader.java
new file mode 100644
index 0000000..009024b
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfjson/RdfJsonReader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.rdfjson;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
+import org.apache.jena.riot.Lang;
+
+/**
+ * A record reader for RDF/JSON files
+ *
+ *
+ *
+ */
+public class RdfJsonReader extends AbstractWholeFileTripleReader {
+
+ @Override
+ protected Lang getRdfLanguage() {
+ return Lang.RDFJSON;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfxml/RdfXmlReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfxml/RdfXmlReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfxml/RdfXmlReader.java
new file mode 100644
index 0000000..9c374c6
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/rdfxml/RdfXmlReader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.rdfxml;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
+import org.apache.jena.riot.Lang;
+
+/**
+ * A record reader for RDF/XML files
+ *
+ *
+ *
+ */
+public class RdfXmlReader extends AbstractWholeFileTripleReader {
+
+ @Override
+ protected Lang getRdfLanguage() {
+ return Lang.RDFXML;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java
new file mode 100644
index 0000000..084b1ec
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.thrift;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+public class ThriftQuadReader extends AbstractWholeFileQuadReader {
+
+ @Override
+ protected Lang getRdfLanguage() {
+ return RDFLanguages.THRIFT;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java
new file mode 100644
index 0000000..713bfa7
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.thrift;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+public class ThriftTripleReader extends AbstractWholeFileTripleReader {
+ @Override
+ protected Lang getRdfLanguage() {
+ return RDFLanguages.THRIFT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trig/TriGReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trig/TriGReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trig/TriGReader.java
new file mode 100644
index 0000000..b1b0c3c
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trig/TriGReader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.trig;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
+import org.apache.jena.riot.Lang;
+
+/**
+ * A record reader for TriG files
+ *
+ *
+ *
+ */
+public class TriGReader extends AbstractWholeFileQuadReader {
+
+ @Override
+ protected Lang getRdfLanguage() {
+ return Lang.TRIG;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trix/TriXReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trix/TriXReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trix/TriXReader.java
new file mode 100644
index 0000000..6873c64
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/trix/TriXReader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.trix;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
+import org.apache.jena.riot.Lang;
+
+/**
+ * A record reader for TriX files
+ *
+ *
+ *
+ */
+public class TriXReader extends AbstractWholeFileQuadReader {
+
+ @Override
+ protected Lang getRdfLanguage() {
+ return Lang.TRIX;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/turtle/TurtleReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/turtle/TurtleReader.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/turtle/TurtleReader.java
new file mode 100644
index 0000000..b3fb377
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/turtle/TurtleReader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.turtle;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
+import org.apache.jena.riot.Lang;
+
+/**
+ * A record reader for Turtle files
+ *
+ *
+ *
+ */
+public class TurtleReader extends AbstractWholeFileTripleReader {
+
+ @Override
+ protected Lang getRdfLanguage() {
+ return Lang.TURTLE;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java
new file mode 100644
index 0000000..f75542a
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.thrift;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.thrift.ThriftQuadReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+public class ThriftQuadInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+ @Override
+ public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new ThriftQuadReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java
new file mode 100644
index 0000000..b60380d
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.thrift;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.thrift.ThriftTripleReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+public class ThriftTripleInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+ @Override
+ public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new ThriftTripleReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/trig/TriGInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/trig/TriGInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/trig/TriGInputFormat.java
new file mode 100644
index 0000000..0b36e93
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/trig/TriGInputFormat.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.trig;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.trig.TriGReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+
+/**
+ * Input format for TriG
+ *
+ *
+ *
+ */
+public class TriGInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+ @Override
+ public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new TriGReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/trix/TriXInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/trix/TriXInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/trix/TriXInputFormat.java
new file mode 100644
index 0000000..723c5c3
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/trix/TriXInputFormat.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.trix;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.trix.TriXReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+/**
+ * Input format for TriX
+ */
+public class TriXInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+ @Override
+ public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new TriXReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/turtle/TurtleInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/turtle/TurtleInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/turtle/TurtleInputFormat.java
new file mode 100644
index 0000000..c7771b6
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/turtle/TurtleInputFormat.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.turtle;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.turtle.TurtleReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+
+/**
+ * Turtle input format
+ *
+ *
+ *
+ */
+public class TurtleInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+ @Override
+ public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new TurtleReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/BlockInputStream.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/BlockInputStream.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/BlockInputStream.java
new file mode 100644
index 0000000..a9e692e
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/BlockInputStream.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A block input stream which can is a wrapper around another input stream which
+ * restricts reading to a specific number of bytes and can report the number of
+ * bytes read
+ * <p>
+ * The class assumes that the underlying input stream has already been seeked to
+ * the appropriate start point
+ * </p>
+ *
+ *
+ *
+ */
+public final class BlockInputStream extends TrackedInputStream {
+
+ private long limit = Long.MAX_VALUE;
+
+ /**
+ * Creates a new tracked input stream
+ *
+ * @param input
+ * Input stream to track
+ * @param limit
+ * Maximum number of bytes to read from the stream
+ */
+ public BlockInputStream(InputStream input, long limit) {
+ super(input);
+ if (limit < 0)
+ throw new IllegalArgumentException("limit must be >= 0");
+ this.limit = limit;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (this.bytesRead >= this.limit) {
+ return -1;
+ }
+ return super.read();
+ }
+
+ @Override
+ public int available() throws IOException {
+ if (this.bytesRead >= this.limit) {
+ return 0;
+ }
+ return super.available();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (len == 0) {
+ return 0;
+ } else if (this.bytesRead >= this.limit) {
+ return -1;
+ } else if (len > this.limit - this.bytesRead) {
+ len = (int) (this.limit - this.bytesRead);
+ }
+ return super.read(b, off, len);
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (n == 0) {
+ return 0;
+ } else if (this.bytesRead >= this.limit) {
+ return -1;
+ } else if (n > this.limit - this.bytesRead) {
+ n = this.limit - this.bytesRead;
+ }
+ return super.skip(n);
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/RdfIOUtils.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/RdfIOUtils.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/RdfIOUtils.java
new file mode 100644
index 0000000..372b22c
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/RdfIOUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.util;
+
+import java.util.UUID;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.riot.lang.LabelToNode;
+import org.apache.jena.riot.system.ErrorHandlerFactory;
+import org.apache.jena.riot.system.IRIResolver;
+import org.apache.jena.riot.system.ParserProfile;
+import org.apache.jena.riot.system.ParserProfileBase;
+import org.apache.jena.riot.system.Prologue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RDF IO utility functions
+ *
+ *
+ *
+ */
+public class RdfIOUtils {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RdfIOUtils.class);
+
+ /**
+ * Private constructor prevents instantiation
+ */
+ private RdfIOUtils() {
+ }
+
+ /**
+ * Creates a parser profile for the given job context
+ *
+ * @param context
+ * Context
+ * @param path
+ * File path
+ * @return Parser profile
+ */
+ public static ParserProfile createParserProfile(JobContext context, Path path) {
+ Prologue prologue = new Prologue(null, IRIResolver.createNoResolve());
+ UUID seed = RdfIOUtils.getSeed(context, path);
+ LabelToNode labelMapping = LabelToNode.createScopeByDocumentHash(seed);
+ return new ParserProfileBase(prologue, ErrorHandlerFactory.errorHandlerStd, labelMapping);
+ }
+
+ /**
+ * Selects a seed for use in generating blank node identifiers
+ *
+ * @param context
+ * Job Context
+ * @param path
+ * File path
+ * @return Seed
+ */
+ public static UUID getSeed(JobContext context, Path path) {
+ // This is to ensure that blank node allocation policy is constant when
+ // subsequent MapReduce jobs need that
+ String jobId = context.getJobID().toString();
+ if (jobId == null) {
+ jobId = String.valueOf(System.currentTimeMillis());
+ LOGGER.warn(
+ "Job ID was not set, using current milliseconds of {}. Sequence of MapReduce jobs must carefully handle blank nodes.",
+ jobId);
+ }
+
+ if (!context.getConfiguration().getBoolean(RdfIOConstants.GLOBAL_BNODE_IDENTITY, false)) {
+ // Using normal file scoped blank node allocation
+ LOGGER.debug("Generating Blank Node Seed from Job Details (ID={}, Input Path={})", jobId, path);
+
+ // Form a reproducible seed for the run
+ return new UUID(jobId.hashCode(), path.hashCode());
+ } else {
+ // Using globally scoped blank node allocation
+ LOGGER.warn(
+ "Using globally scoped blank node allocation policy from Job Details (ID={}) - this is unsafe if your RDF inputs did not originate from a previous job",
+ jobId);
+
+ return new UUID(jobId.hashCode(), 0);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackableInputStream.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackableInputStream.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackableInputStream.java
new file mode 100644
index 0000000..92e2df5
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackableInputStream.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.util;
+
+import java.io.InputStream;
+
+/**
+ * An input stream that tracks the number of bytes read
+ *
+ *
+ *
+ */
+public abstract class TrackableInputStream extends InputStream {
+
+ /**
+ * Gets the number of bytes read
+ *
+ * @return Number of bytes read
+ */
+ public abstract long getBytesRead();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedInputStream.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedInputStream.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedInputStream.java
new file mode 100644
index 0000000..e51a866
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedInputStream.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A tracked input stream which can is a wrapper around another input stream and
+ * can report the number of bytes read
+ *
+ *
+ *
+ */
+public class TrackedInputStream extends TrackableInputStream {
+
+ protected InputStream input;
+ protected long bytesRead = 0, lastMark;
+
+ /**
+ * Creates a new tracked input stream
+ *
+ * @param input
+ * Input stream to track
+ */
+ public TrackedInputStream(InputStream input) {
+ if (input == null)
+ throw new NullPointerException("Input cannot be null");
+ this.input = input;
+ }
+
+ @Override
+ public int read() throws IOException {
+ int read = this.input.read();
+ if (read >= 0)
+ this.bytesRead++;
+ return read;
+ }
+
+ @Override
+ public long getBytesRead() {
+ return this.bytesRead;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.input.close();
+ }
+
+ @Override
+ public int available() throws IOException {
+ return this.input.available();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ this.input.mark(readlimit);
+ this.lastMark = this.bytesRead;
+ }
+
+ @Override
+ public boolean markSupported() {
+ return this.input.markSupported();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (len == 0) return 0;
+ int read = this.input.read(b, off, len);
+ if (read > 0)
+ this.bytesRead += read;
+ return read;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return this.read(b, 0, b.length);
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ this.input.reset();
+ this.bytesRead = this.lastMark;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (n == 0)
+ return 0;
+ long skipped = 0;
+ byte[] buffer = new byte[16];
+ int readSize = Math.min(buffer.length, n > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) n);
+ int read;
+ do {
+ if (n - skipped > readSize) {
+ read = this.input.read(buffer, 0, readSize);
+ } else {
+ read = this.input.read(buffer, 0, (int) (n - skipped));
+ }
+ if (read > 0) {
+ this.bytesRead += read;
+ skipped += read;
+ }
+ } while (skipped < n && read >= 0);
+
+ return skipped;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedQuadsStream.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedQuadsStream.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedQuadsStream.java
new file mode 100644
index 0000000..845c709
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedQuadsStream.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.util;
+
+import org.apache.jena.riot.lang.PipedRDFIterator;
+
+import com.hp.hpl.jena.graph.Triple;
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * A tracked piped quads stream
+ *
+ *
+ *
+ */
+public class TrackedPipedQuadsStream extends TrackedPipedRDFStream<Quad> {
+
+ /**
+ * Creates a new stream
+ *
+ * @param sink
+ * Sink
+ * @param input
+ * Input stream
+ */
+ public TrackedPipedQuadsStream(PipedRDFIterator<Quad> sink, TrackableInputStream input) {
+ super(sink, input);
+ }
+
+ @Override
+ public void triple(Triple triple) {
+ // Triples are discarded
+ }
+
+ @Override
+ public void quad(Quad quad) {
+ this.receive(quad);
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedRDFStream.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedRDFStream.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedRDFStream.java
new file mode 100644
index 0000000..6e910be
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedRDFStream.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.util;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.jena.riot.lang.PipedRDFIterator;
+import org.apache.jena.riot.lang.PipedRDFStream;
+
+/**
+ * A tracked piped RDF stream
+ *
+ *
+ *
+ * @param <T>
+ * Type corresponding to a supported RDF primitive
+ */
+public abstract class TrackedPipedRDFStream<T> extends PipedRDFStream<T> {
+
+ private TrackableInputStream input;
+ private Queue<Long> positions = new LinkedList<Long>();
+
+ protected TrackedPipedRDFStream(PipedRDFIterator<T> sink, TrackableInputStream input) {
+ super(sink);
+ this.input = input;
+ }
+
+ @Override
+ protected void receive(T t) {
+ // Track positions the input stream is at as we receive inputs
+ synchronized (this.positions) {
+ this.positions.add(this.input.getBytesRead());
+ }
+ super.receive(t);
+ }
+
+ /**
+ * Gets the next position
+ *
+ * @return Position
+ */
+ public Long getPosition() {
+ synchronized (this.positions) {
+ return this.positions.poll();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedTriplesStream.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedTriplesStream.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedTriplesStream.java
new file mode 100644
index 0000000..2040c4f
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/util/TrackedPipedTriplesStream.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.util;
+
+import org.apache.jena.riot.lang.PipedRDFIterator;
+
+import com.hp.hpl.jena.graph.Triple;
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * A tracked piped triples stream
+ *
+ *
+ *
+ */
+public class TrackedPipedTriplesStream extends TrackedPipedRDFStream<Triple> {
+
+ /**
+ * Creates a tracked triples stream
+ *
+ * @param sink
+ * Sink
+ * @param input
+ * Input stream
+ */
+ public TrackedPipedTriplesStream(PipedRDFIterator<Triple> sink, TrackableInputStream input) {
+ super(sink, input);
+ }
+
+ @Override
+ public void triple(Triple triple) {
+ receive(triple);
+ }
+
+ @Override
+ public void quad(Quad quad) {
+ // Quads are discarded
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractBatchedNodeTupleOutputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractBatchedNodeTupleOutputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractBatchedNodeTupleOutputFormat.java
new file mode 100644
index 0000000..02fbf9c
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractBatchedNodeTupleOutputFormat.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.io.Writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.io.output.writers.AbstractBatchedNodeTupleWriter;
+import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
+
+
+/**
+ * Abstract output format for formats that use a
+ * {@link AbstractBatchedNodeTupleWriter} as their writer
+ *
+ *
+ *
+ * @param <TKey>
+ * Key type
+ * @param <TTuple>
+ * Tuple type
+ * @param <TValue>
+ * Writable tuple type i.e. the value type
+ */
+public abstract class AbstractBatchedNodeTupleOutputFormat<TKey, TTuple, TValue extends AbstractNodeTupleWritable<TTuple>> extends
+ AbstractNodeTupleOutputFormat<TKey, TTuple, TValue> {
+
+ @Override
+ protected RecordWriter<TKey, TValue> getRecordWriter(Writer writer, Configuration config, Path outputPath) {
+ long batchSize = config.getLong(RdfIOConstants.OUTPUT_BATCH_SIZE, RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE);
+ return this.getRecordWriter(writer, batchSize);
+ }
+
+ protected abstract RecordWriter<TKey, TValue> getRecordWriter(Writer writer, long batchSize);
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeOutputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeOutputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeOutputFormat.java
new file mode 100644
index 0000000..cfc98bd
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeOutputFormat.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.jena.hadoop.rdf.types.NodeWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Abstract output format which takes pairs with Node keys and arbitrary values
+ * and writes them as a simple line based text file
+ *
+ *
+ *
+ * @param <TValue> Value type
+ */
+public abstract class AbstractNodeOutputFormat<TValue> extends FileOutputFormat<NodeWritable, TValue> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeOutputFormat.class);
+
+ @Override
+ public RecordWriter<NodeWritable, TValue> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+ Configuration config = context.getConfiguration();
+ boolean isCompressed = getCompressOutput(context);
+ CompressionCodec codec = null;
+ String extension = this.getFileExtension();
+ if (isCompressed) {
+ Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(context, GzipCodec.class);
+ codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, config);
+ extension += codec.getDefaultExtension();
+ }
+ Path file = getDefaultWorkFile(context, extension);
+ LOG.info("Writing output to file " + file);
+ FileSystem fs = file.getFileSystem(config);
+ if (!isCompressed) {
+ FSDataOutputStream fileOut = fs.create(file, false);
+ return this.getRecordWriter(new OutputStreamWriter(fileOut), config);
+ } else {
+ FSDataOutputStream fileOut = fs.create(file, false);
+ return this.getRecordWriter(new OutputStreamWriter(codec.createOutputStream(fileOut)), config);
+ }
+ }
+
+ /**
+ * Gets the file extension to use for output
+ *
+ * @return File extension including the '.'
+ */
+ protected String getFileExtension() {
+ return ".nodes";
+ }
+
+ /**
+ * Gets the record writer to use
+ *
+ * @param writer
+ * Writer to write output to
+ * @param config
+ * Configuration
+ * @return Record writer
+ */
+ protected abstract RecordWriter<NodeWritable, TValue> getRecordWriter(Writer writer, Configuration config);
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormat.java
new file mode 100644
index 0000000..c4a34f5
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormat.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract implementation of an output format for line based tuple formats
+ * where the key is ignored and only the tuple values will be output
+ *
+ *
+ * @param <TKey>
+ * Key type
+ * @param <TValue>
+ * Tuple value type
+ * @param <T>
+ * Writable node tuple type
+ *
+ */
+public abstract class AbstractNodeTupleOutputFormat<TKey, TValue, T extends AbstractNodeTupleWritable<TValue>> extends
+ FileOutputFormat<TKey, T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeTupleOutputFormat.class);
+
+ @Override
+ public RecordWriter<TKey, T> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+ Configuration config = context.getConfiguration();
+ boolean isCompressed = getCompressOutput(context);
+ CompressionCodec codec = null;
+
+ // Build the output file path
+ String extension = this.getFileExtension();
+ if (isCompressed) {
+ // Add compression extension if applicable
+ Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(context, GzipCodec.class);
+ codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, config);
+ extension += codec.getDefaultExtension();
+ }
+ Path file = getDefaultWorkFile(context, extension);
+ LOG.info("Writing output to file " + file);
+
+ // Open the file appropriately and create a record writer for it
+ FileSystem fs = file.getFileSystem(config);
+ if (!isCompressed) {
+ FSDataOutputStream fileOut = fs.create(file, false);
+ return this.getRecordWriter(new OutputStreamWriter(fileOut), config, file);
+ } else {
+ FSDataOutputStream fileOut = fs.create(file, false);
+ return this.getRecordWriter(new OutputStreamWriter(codec.createOutputStream(fileOut)), config, file);
+ }
+ }
+
+ /**
+ * Gets the file extension to use for output
+ *
+ * @return File extension including the '.'
+ */
+ protected abstract String getFileExtension();
+
+ /**
+ * Gets the record writer to use
+ *
+ * @param writer
+ * Writer to write output to
+ * @param config
+ * Configuration
+ * @param outputPath
+ * Output path being written to
+ * @return Record writer
+ * @throws IOException
+ * May be thrown if a record writer cannot be obtained for any
+ * reason
+ */
+ protected abstract RecordWriter<TKey, T> getRecordWriter(Writer writer, Configuration config, Path outputPath)
+ throws IOException;
+
+}