You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by rv...@apache.org on 2015/01/27 18:28:44 UTC
[51/59] [abbrv] jena git commit: Further rebranding to Elephas
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java b/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java
new file mode 100644
index 0000000..3d9dd00
--- /dev/null
+++ b/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.jena.hadoop.rdf.types.comparators.SimpleBinaryComparator;
+import org.apache.jena.hadoop.rdf.types.converters.ThriftConverter;
+import org.apache.jena.riot.thrift.ThriftConvert;
+import org.apache.jena.riot.thrift.wire.RDF_Quad;
+import org.apache.thrift.TException;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * A writable quad
+ */
+public class QuadWritable extends AbstractNodeTupleWritable<Quad> {
+
+ static {
+ WritableComparator.define(QuadWritable.class, new SimpleBinaryComparator());
+ }
+
+ private RDF_Quad quad = new RDF_Quad();
+
+ /**
+ * Creates a new empty instance
+ */
+ public QuadWritable() {
+ this(null);
+ }
+
+ /**
+ * Creates a new instance with the given value
+ *
+ * @param q
+ * Quad
+ */
+ public QuadWritable(Quad q) {
+ super(q);
+ }
+
+ /**
+ * Creates a new instance from the given input
+ *
+ * @param input
+ * Input
+ * @return New instance
+ * @throws IOException
+ */
+ public static QuadWritable read(DataInput input) throws IOException {
+ QuadWritable q = new QuadWritable();
+ q.readFields(input);
+ return q;
+ }
+
+ @Override
+ public void set(Quad tuple) {
+ super.set(tuple);
+ this.quad.clear();
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ this.quad.clear();
+ int tripleLength = input.readInt();
+ byte[] buffer = new byte[tripleLength];
+ input.readFully(buffer);
+ try {
+ ThriftConverter.fromBytes(buffer, this.quad);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ this.setInternal(new Quad(ThriftConvert.convert(this.quad.getG()), ThriftConvert.convert(this.quad.getS()),
+ ThriftConvert.convert(this.quad.getP()), ThriftConvert.convert(this.quad.getO())));
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ if (this.get() == null)
+ throw new IOException(
+ "Null quads cannot be written using this class, consider using NodeTupleWritable instead");
+
+ // May not have yet prepared the Thrift triple
+ if (!this.quad.isSetS()) {
+ Quad tuple = this.get();
+ this.quad.setG(ThriftConvert.convert(tuple.getGraph(), false));
+ this.quad.setS(ThriftConvert.convert(tuple.getSubject(), false));
+ this.quad.setP(ThriftConvert.convert(tuple.getPredicate(), false));
+ this.quad.setO(ThriftConvert.convert(tuple.getObject(), false));
+ }
+
+ byte[] buffer;
+ try {
+ buffer = ThriftConverter.toBytes(this.quad);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ output.writeInt(buffer.length);
+ output.write(buffer);
+ }
+
+ @Override
+ protected Quad createTuple(Node[] ns) {
+ if (ns.length != 4)
+ throw new IllegalArgumentException(String.format(
+ "Incorrect number of nodes to form a quad - got %d but expected 4", ns.length));
+ return new Quad(ns[0], ns[1], ns[2], ns[3]);
+ }
+
+ @Override
+ protected Node[] createNodes(Quad tuple) {
+ return new Node[] { tuple.getGraph(), tuple.getSubject(), tuple.getPredicate(), tuple.getObject() };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java b/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java
new file mode 100644
index 0000000..a17052b
--- /dev/null
+++ b/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.jena.hadoop.rdf.types.comparators.SimpleBinaryComparator;
+import org.apache.jena.hadoop.rdf.types.converters.ThriftConverter;
+import org.apache.jena.riot.thrift.ThriftConvert;
+import org.apache.jena.riot.thrift.wire.RDF_Triple;
+import org.apache.thrift.TException;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.Triple;
+
+/**
+ * A writable triple
+ *
+ *
+ *
+ */
+public class TripleWritable extends AbstractNodeTupleWritable<Triple> {
+
+ static {
+ WritableComparator.define(TripleWritable.class, new SimpleBinaryComparator());
+ }
+
+ private RDF_Triple triple = new RDF_Triple();
+
+ /**
+ * Creates a new instance using the default NTriples node formatter
+ */
+ public TripleWritable() {
+ this(null);
+ }
+
+ /**
+ * Creates a new instance with a given value that uses a specific node
+ * formatter
+ *
+ * @param t
+ * Triple
+ */
+ public TripleWritable(Triple t) {
+ super(t);
+ }
+
+ /**
+ * Creates a new instance from the given input
+ *
+ * @param input
+ * Input
+ * @return New instance
+ * @throws IOException
+ */
+ public static TripleWritable read(DataInput input) throws IOException {
+ TripleWritable t = new TripleWritable();
+ t.readFields(input);
+ return t;
+ }
+
+ @Override
+ public void set(Triple tuple) {
+ super.set(tuple);
+ this.triple.clear();
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ this.triple.clear();
+ int tripleLength = input.readInt();
+ byte[] buffer = new byte[tripleLength];
+ input.readFully(buffer);
+ try {
+ ThriftConverter.fromBytes(buffer, this.triple);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ this.setInternal(new Triple(ThriftConvert.convert(this.triple.getS()),
+ ThriftConvert.convert(this.triple.getP()), ThriftConvert.convert(this.triple.getO())));
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ if (this.get() == null)
+ throw new IOException(
+ "Null triples cannot be written using this class, consider using NodeTupleWritable instead");
+
+ // May not have yet prepared the Thrift triple
+ if (!this.triple.isSetS()) {
+ Triple tuple = this.get();
+ this.triple.setS(ThriftConvert.convert(tuple.getSubject(), false));
+ this.triple.setP(ThriftConvert.convert(tuple.getPredicate(), false));
+ this.triple.setO(ThriftConvert.convert(tuple.getObject(), false));
+ }
+
+ byte[] buffer;
+ try {
+ buffer = ThriftConverter.toBytes(this.triple);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ output.writeInt(buffer.length);
+ output.write(buffer);
+ }
+
+ @Override
+ protected Triple createTuple(Node[] ns) {
+ if (ns.length != 3)
+ throw new IllegalArgumentException(String.format(
+ "Incorrect number of nodes to form a triple - got %d but expected 3", ns.length));
+ return new Triple(ns[0], ns[1], ns[2]);
+ }
+
+ @Override
+ protected Node[] createNodes(Triple tuple) {
+ return new Node[] { tuple.getSubject(), tuple.getPredicate(), tuple.getObject() };
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java b/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java
new file mode 100644
index 0000000..6c46714
--- /dev/null
+++ b/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types.comparators;
+
+import org.apache.hadoop.io.WritableComparator;
+
+/**
+ * A general purpose comparator that may be used with any types which can be
+ * compared directly on their binary encodings
+ */
+public class SimpleBinaryComparator extends WritableComparator {
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java b/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java
new file mode 100644
index 0000000..0675afc
--- /dev/null
+++ b/jena-elephas/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types.converters;
+
+import java.io.ByteArrayOutputStream;
+
+import org.apache.jena.riot.thrift.wire.RDF_Quad;
+import org.apache.jena.riot.thrift.wire.RDF_Term;
+import org.apache.jena.riot.thrift.wire.RDF_Triple;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TMemoryInputTransport;
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * Helper for converting between the binary representation of Nodes, Triples and
+ * Quads and their Jena API equivalents
+ *
+ */
+public class ThriftConverter {
+
+ private static ThreadLocal<TMemoryInputTransport> inputTransports = new ThreadLocal<>();
+ private static ThreadLocal<TProtocol> inputProtocols = new ThreadLocal<>();
+
+ private static ThreadLocal<ByteArrayOutputStream> outputStreams = new ThreadLocal<>();
+ private static ThreadLocal<TTransport> outputTransports = new ThreadLocal<>();
+ private static ThreadLocal<TProtocol> outputProtocols = new ThreadLocal<>();
+
+ private static TMemoryInputTransport getInputTransport() {
+ TMemoryInputTransport transport = inputTransports.get();
+ if (transport != null)
+ return transport;
+
+ transport = new TMemoryInputTransport();
+ inputTransports.set(transport);
+ return transport;
+ }
+
+ private static TProtocol getInputProtocol() {
+ TProtocol protocol = inputProtocols.get();
+ if (protocol != null)
+ return protocol;
+
+ protocol = new TCompactProtocol(getInputTransport());
+ inputProtocols.set(protocol);
+ return protocol;
+ }
+
+ private static ByteArrayOutputStream getOutputStream() {
+ ByteArrayOutputStream output = outputStreams.get();
+ if (output != null)
+ return output;
+
+ output = new ByteArrayOutputStream();
+ outputStreams.set(output);
+ return output;
+ }
+
+ private static TTransport getOutputTransport() {
+ TTransport transport = outputTransports.get();
+ if (transport != null)
+ return transport;
+
+ transport = new TIOStreamTransport(getOutputStream());
+ outputTransports.set(transport);
+ return transport;
+ }
+
+ private static TProtocol getOutputProtocol() {
+ TProtocol protocol = outputProtocols.get();
+ if (protocol != null)
+ return protocol;
+
+ protocol = new TCompactProtocol(getOutputTransport());
+ outputProtocols.set(protocol);
+ return protocol;
+ }
+
+ public static byte[] toBytes(RDF_Term term) throws TException {
+ ByteArrayOutputStream output = getOutputStream();
+ output.reset();
+
+ TProtocol protocol = getOutputProtocol();
+ term.write(protocol);
+
+ return output.toByteArray();
+ }
+
+ public static void fromBytes(byte[] bs, RDF_Term term) throws TException {
+ TMemoryInputTransport transport = getInputTransport();
+ transport.reset(bs);
+ TProtocol protocol = getInputProtocol();
+ term.read(protocol);
+ }
+
+ public static void fromBytes(byte[] buffer, RDF_Triple triple) throws TException {
+ TMemoryInputTransport transport = getInputTransport();
+ transport.reset(buffer);
+ TProtocol protocol = getInputProtocol();
+ triple.read(protocol);
+ }
+
+ public static byte[] toBytes(RDF_Triple triple) throws TException {
+ ByteArrayOutputStream output = getOutputStream();
+ output.reset();
+
+ TProtocol protocol = getOutputProtocol();
+ triple.write(protocol);
+
+ return output.toByteArray();
+ }
+
+ public static void fromBytes(byte[] buffer, RDF_Quad quad) throws TException {
+ TMemoryInputTransport transport = getInputTransport();
+ transport.reset(buffer);
+ TProtocol protocol = getInputProtocol();
+ quad.read(protocol);
+ }
+
+ public static byte[] toBytes(RDF_Quad quad) throws TException {
+ ByteArrayOutputStream output = getOutputStream();
+ output.reset();
+
+ TProtocol protocol = getOutputProtocol();
+ quad.write(protocol);
+
+ return output.toByteArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java b/jena-elephas/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java
new file mode 100644
index 0000000..7214b14
--- /dev/null
+++ b/jena-elephas/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.types;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.jena.hadoop.rdf.types.CharacteristicSetWritable;
+import org.apache.jena.hadoop.rdf.types.CharacteristicWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.NodeFactory;
+
+/**
+ * Tests for {@link CharacteristicWritable} and
+ * {@link CharacteristicSetWritable}
+ *
+ *
+ *
+ */
+public class CharacteristicTests {
+
+ /**
+ * Checks whether a writable round trips successfully
+ *
+ * @param cw
+ * Characteristic writable
+ * @throws IOException
+ */
+ private void checkRoundTrip(CharacteristicWritable cw) throws IOException {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutputStream output = new DataOutputStream(outputStream);
+ cw.write(output);
+
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+ DataInputStream input = new DataInputStream(inputStream);
+ CharacteristicWritable actual = CharacteristicWritable.read(input);
+ Assert.assertEquals(cw, actual);
+ }
+
+ /**
+ * Tests characteristic round tripping
+ *
+ * @throws IOException
+ */
+ @Test
+ public void characteristic_writable_01() throws IOException {
+ Node n = NodeFactory.createURI("http://example.org");
+ CharacteristicWritable expected = new CharacteristicWritable(n);
+ Assert.assertEquals(1, expected.getCount().get());
+
+ this.checkRoundTrip(expected);
+ }
+
+ /**
+ * Tests characteristic properties
+ *
+ * @throws IOException
+ */
+ @Test
+ public void characteristic_writable_02() throws IOException {
+ Node n = NodeFactory.createURI("http://example.org");
+ CharacteristicWritable cw1 = new CharacteristicWritable(n);
+ CharacteristicWritable cw2 = new CharacteristicWritable(n, 100);
+ this.checkRoundTrip(cw1);
+ this.checkRoundTrip(cw2);
+
+ // Should still be equal since equality is only on the node not the
+ // count
+ Assert.assertEquals(cw1, cw2);
+ }
+
+ /**
+ * Tests characteristic properties
+ *
+ * @throws IOException
+ */
+ @Test
+ public void characteristic_writable_03() throws IOException {
+ CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"));
+ CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other"));
+ this.checkRoundTrip(cw1);
+ this.checkRoundTrip(cw2);
+
+ // Should not be equal as different nodes
+ Assert.assertNotEquals(cw1, cw2);
+ }
+
+ /**
+ * Checks that a writable round trips
+ *
+ * @param set
+ * Characteristic set
+ * @throws IOException
+ */
+ private void checkRoundTrip(CharacteristicSetWritable set) throws IOException {
+ // Test round trip
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutputStream output = new DataOutputStream(outputStream);
+ set.write(output);
+
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+ DataInputStream input = new DataInputStream(inputStream);
+ CharacteristicSetWritable actual = CharacteristicSetWritable.read(input);
+ Assert.assertEquals(set, actual);
+ }
+
+ /**
+ * Checks a characteristic set
+ *
+ * @param set
+ * Set
+ * @param expectedItems
+ * Expected number of characteristics
+ * @param expectedCounts
+ * Expected counts for characteristics
+ */
+ protected final void checkCharacteristicSet(CharacteristicSetWritable set, int expectedItems, long[] expectedCounts) {
+ Assert.assertEquals(expectedItems, set.size());
+ Assert.assertEquals(expectedItems, expectedCounts.length);
+ Iterator<CharacteristicWritable> iter = set.getCharacteristics();
+ int i = 0;
+ while (iter.hasNext()) {
+ CharacteristicWritable cw = iter.next();
+ Assert.assertEquals(expectedCounts[i], cw.getCount().get());
+ i++;
+ }
+ }
+
+ /**
+ * Tests characteristic sets
+ *
+ * @throws IOException
+ */
+ @Test
+ public void characteristic_set_writable_01() throws IOException {
+ CharacteristicSetWritable set = new CharacteristicSetWritable();
+
+ // Add some characteristics
+ CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"));
+ CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other"));
+ set.add(cw1);
+ set.add(cw2);
+ this.checkCharacteristicSet(set, 2, new long[] { 1, 1 });
+ this.checkRoundTrip(set);
+ }
+
+ /**
+ * Tests characteristic sets
+ *
+ * @throws IOException
+ */
+ @Test
+ public void characteristic_set_writable_02() throws IOException {
+ CharacteristicSetWritable set = new CharacteristicSetWritable();
+
+ // Add some characteristics
+ CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"));
+ CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"), 2);
+ set.add(cw1);
+ set.add(cw2);
+ this.checkCharacteristicSet(set, 1, new long[] { 3 });
+ this.checkRoundTrip(set);
+ }
+
+ /**
+ * Tests characteristic sets
+ *
+ * @throws IOException
+ */
+ @Test
+ public void characteristic_set_writable_03() throws IOException {
+ CharacteristicSetWritable set1 = new CharacteristicSetWritable();
+ CharacteristicSetWritable set2 = new CharacteristicSetWritable();
+
+ // Add some characteristics
+ CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"));
+ CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other"));
+ set1.add(cw1);
+ set2.add(cw2);
+ this.checkCharacteristicSet(set1, 1, new long[] { 1 });
+ this.checkCharacteristicSet(set2, 1, new long[] { 1 });
+ this.checkRoundTrip(set1);
+ this.checkRoundTrip(set2);
+
+ Assert.assertNotEquals(set1, set2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java b/jena-elephas/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java
new file mode 100644
index 0000000..a70dfb0
--- /dev/null
+++ b/jena-elephas/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.types;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.jena.atlas.lib.Tuple;
+import org.apache.jena.hadoop.rdf.types.NodeTupleWritable;
+import org.apache.jena.hadoop.rdf.types.NodeWritable;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hp.hpl.jena.datatypes.xsd.XSDDatatype;
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.NodeFactory;
+import com.hp.hpl.jena.graph.Triple;
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * Tests for the various RDF types defined by the
+ * {@link org.apache.jena.hadoop.rdf.types} package
+ *
+ *
+ *
+ */
+public class RdfTypesTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RdfTypesTest.class);
+
+ private ByteArrayOutputStream outputStream;
+ private ByteArrayInputStream inputStream;
+
+ /**
+ * Prepare for output
+ *
+ * @return Data output
+ */
+ private DataOutput prepareOutput() {
+ this.outputStream = new ByteArrayOutputStream();
+ return new DataOutputStream(this.outputStream);
+ }
+
+ /**
+ * Prepare for input from the previously written output
+ *
+ * @return Data Input
+ */
+ private DataInput prepareInput() {
+ this.inputStream = new ByteArrayInputStream(this.outputStream.toByteArray());
+ return new DataInputStream(this.inputStream);
+ }
+
+ /**
+ * Prepare for input from the given data
+ *
+ * @param data
+ * Data
+ * @return Data Input
+ */
+ @SuppressWarnings("unused")
+ private DataInput prepareInput(byte[] data) {
+ this.inputStream = new ByteArrayInputStream(data);
+ return new DataInputStream(this.inputStream);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private <T extends WritableComparable> void testWriteRead(T writable, T expected) throws IOException, InstantiationException, IllegalAccessException,
+ ClassNotFoundException {
+ // Write out data
+ DataOutput output = this.prepareOutput();
+ writable.write(output);
+
+ // Read back in data
+ DataInput input = this.prepareInput();
+ T actual = (T) Class.forName(writable.getClass().getName()).newInstance();
+ actual.readFields(input);
+
+ LOG.info("Original = " + writable.toString());
+ LOG.info("Round Tripped = " + actual.toString());
+
+ // Check equivalent
+ Assert.assertEquals(0, expected.compareTo(actual));
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_null() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = null;
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ @Ignore
+ public void node_writable_variable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createVariable("x");
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ @Ignore
+ public void node_writable_variable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createVariable("really-log-variable-name-asddsfr4545egfdgdfgfdgdtgvdg-dfgfdgdfgdfgdfg4-dfvdfgdfgdfgfdgfdgdfgdfgfdg");
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_uri_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createURI("http://example.org");
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_uri_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createURI("http://user:password@example.org/some/path?key=value#id");
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_literal_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createLiteral("simple");
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_literal_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createLiteral("language", "en", null);
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_literal_03() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createLiteral("string", XSDDatatype.XSDstring);
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_literal_04() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createLiteral("1234", XSDDatatype.XSDinteger);
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_literal_05() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createLiteral("123.4", XSDDatatype.XSDdecimal);
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_literal_06() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createLiteral("12.3e4", XSDDatatype.XSDdouble);
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_literal_07() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createLiteral("true", XSDDatatype.XSDboolean);
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_bnode_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createAnon();
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_bnode_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createAnon();
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ NodeWritable nw2 = new NodeWritable(n);
+ testWriteRead(nw2, nw2);
+
+ Assert.assertEquals(0, nw.compareTo(nw2));
+ }
+
+ /**
+ * Basic triple writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void triple_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Triple t = new Triple(NodeFactory.createURI("http://example"), NodeFactory.createURI("http://predicate"), NodeFactory.createLiteral("value"));
+ TripleWritable tw = new TripleWritable(t);
+ testWriteRead(tw, tw);
+ }
+
+ /**
+ * Basic triple writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void triple_writable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Triple t = new Triple(NodeFactory.createAnon(), NodeFactory.createURI("http://predicate"), NodeFactory.createLiteral("value"));
+ TripleWritable tw = new TripleWritable(t);
+ testWriteRead(tw, tw);
+ }
+
+ /**
+ * Basic quad writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void quad_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Quad q = new Quad(Quad.defaultGraphNodeGenerated, NodeFactory.createURI("http://example"), NodeFactory.createURI("http://predicate"),
+ NodeFactory.createLiteral("value"));
+ QuadWritable qw = new QuadWritable(q);
+ testWriteRead(qw, qw);
+ }
+
+ /**
+ * Basic quad writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void quad_writable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Quad q = new Quad(Quad.defaultGraphNodeGenerated, NodeFactory.createAnon(), NodeFactory.createURI("http://predicate"),
+ NodeFactory.createLiteral("value"));
+ QuadWritable qw = new QuadWritable(q);
+ testWriteRead(qw, qw);
+ }
+
+ /**
+ * Basic tuple writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void tuple_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Tuple<Node> t = Tuple.createTuple(NodeFactory.createURI("http://one"), NodeFactory.createURI("http://two"), NodeFactory.createLiteral("value"),
+ NodeFactory.createLiteral("foo"), NodeFactory.createURI("http://three"));
+ NodeTupleWritable tw = new NodeTupleWritable(t);
+ testWriteRead(tw, tw);
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/pom.xml
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/pom.xml b/jena-elephas/jena-elephas-io/pom.xml
new file mode 100644
index 0000000..e5134d0
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/pom.xml
@@ -0,0 +1,67 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.jena</groupId>
+ <artifactId>jena-elephas</artifactId>
+ <version>0.9.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>jena-elephas-io</artifactId>
+ <name>Apache Jena - Elephas - I/O</name>
+ <description>RDF Input/Output formats library for Hadoop</description>
+
+ <!-- Note that versions are managed by parent POMs -->
+ <dependencies>
+ <!-- Internal Project Dependencies -->
+ <dependency>
+ <groupId>org.apache.jena</groupId>
+ <artifactId>jena-elephas-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- Hadoop Dependencies -->
+ <!-- Note these will be provided on the Hadoop cluster hence the provided
+ scope -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Jena dependencies -->
+ <dependency>
+ <groupId>org.apache.jena</groupId>
+ <artifactId>jena-arq</artifactId>
+ </dependency>
+
+ <!-- Test Dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java
new file mode 100644
index 0000000..5c1b41c
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io;
+
+/**
+ * Hadoop IO related constants
+ *
+ *
+ *
+ */
+public class HadoopIOConstants {
+
+ /**
+ * Private constructor prevents instantiation
+ */
+ private HadoopIOConstants() {
+ }
+
+ /**
+ * Map Reduce configuration setting for max line length
+ */
+ public static final String MAX_LINE_LENGTH = "mapreduce.input.linerecordreader.line.maxlength";
+
+ /**
+ * Run ID
+ */
+ public static final String RUN_ID = "runId";
+
+ /**
+ * Compression codecs to use
+ */
+ public static final String IO_COMPRESSION_CODECS = "io.compression.codecs";
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java
new file mode 100644
index 0000000..27c2bb2
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io;
+
+import java.io.IOException;
+
+/**
+ * RDF IO related constants
+ *
+ *
+ *
+ */
+public class RdfIOConstants {
+
+ /**
+ * Private constructor prevents instantiation
+ */
+ private RdfIOConstants() {
+ }
+
+ /**
+ * Configuration key used to set whether bad tuples are ignored. This is the
+ * default behaviour, when explicitly set to {@code false} bad tuples will
+ * result in {@link IOException} being thrown by the relevant record
+ * readers.
+ */
+ public static final String INPUT_IGNORE_BAD_TUPLES = "rdf.io.input.ignore-bad-tuples";
+
+ /**
+ * Configuration key used to set the batch size used for RDF output formats
+ * that take a batched writing approach. Default value is given by the
+ * constant {@link #DEFAULT_OUTPUT_BATCH_SIZE}.
+ */
+ public static final String OUTPUT_BATCH_SIZE = "rdf.io.output.batch-size";
+
+ /**
+ * Default batch size for batched output formats
+ */
+ public static final long DEFAULT_OUTPUT_BATCH_SIZE = 10000;
+
+ /**
+ * Configuration key used to control behaviour with regards to how blank
+ * nodes are handled.
+ * <p>
+ * The default behaviour is that blank nodes are file scoped which is what
+ * the RDF specifications require.
+ * </p>
+ * <p>
+ * However in the case of a multi-stage pipeline this behaviour can cause
+ * blank nodes to diverge over several jobs and introduce spurious blank
+ * nodes over time. This is described in <a
+ * href="https://issues.apache.org/jira/browse/JENA-820">JENA-820</a> and
+ * enabling this flag for jobs in your pipeline allow you to work around
+ * this problem.
+ * </p>
+ * <h3>Warning</h3> You should only enable this flag for jobs that take in
+ * RDF output originating from previous jobs since our normal blank node
+ * allocation policy ensures that blank nodes will be file scoped and unique
+ * over all files (barring unfortunate hasing collisions). If you enable
+ * this for jobs that take in RDF originating from other sources you may
+ * incorrectly conflate blank nodes that are supposed to distinct and
+ * separate nodes.
+ */
+ public static final String GLOBAL_BNODE_IDENTITY = "rdf.io.input.bnodes.global-identity";
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java
new file mode 100644
index 0000000..1fcb030
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract line based input format that reuses the machinery from
+ * {@link NLineInputFormat} to calculate the splits
+ *
+ *
+ *
+ * @param <TKey>
+ * Key type
+ * @param <TValue>
+ * Value type
+ */
+public abstract class AbstractNLineFileInputFormat<TKey, TValue> extends FileInputFormat<TKey, TValue> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNLineFileInputFormat.class);
+
+ /**
+ * Logically splits the set of input files for the job, splits N lines of
+ * the input as one split.
+ *
+ * @see FileInputFormat#getSplits(JobContext)
+ */
+ public final List<InputSplit> getSplits(JobContext job) throws IOException {
+ boolean debug = LOGGER.isDebugEnabled();
+ if (debug && FileInputFormat.getInputDirRecursive(job)) {
+ LOGGER.debug("Recursive searching for input data is enabled");
+ }
+
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ int numLinesPerSplit = NLineInputFormat.getNumLinesPerSplit(job);
+ for (FileStatus status : listStatus(job)) {
+ if (debug) {
+ LOGGER.debug("Determining how to split input file/directory {}", status.getPath());
+ }
+ splits.addAll(NLineInputFormat.getSplitsForFile(status, job.getConfiguration(), numLinesPerSplit));
+ }
+ return splits;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java
new file mode 100644
index 0000000..e561cdb
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * Abstract implementation of a while file input format where each file is a
+ * single split
+ *
+ *
+ *
+ * @param <TKey>
+ * Key type
+ * @param <TValue>
+ * Value type
+ */
+public abstract class AbstractWholeFileInputFormat<TKey, TValue> extends FileInputFormat<TKey, TValue> {
+
+ @Override
+ protected final boolean isSplitable(JobContext context, Path filename) {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java
new file mode 100644
index 0000000..b8fdbd5
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.readers.QuadsReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+
+/**
+ * RDF input format that can handle any RDF quads format that ARQ supports
+ * selecting the format to use for each file based upon the file extension
+ *
+ *
+ *
+ */
+public class QuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+ @Override
+ public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new QuadsReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java
new file mode 100644
index 0000000..03f394a
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.readers.TriplesReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+/**
+ * RDF input format that can handle any RDF triples format that ARQ supports
+ * selecting the format to use for each file based upon the file extension
+ */
+public class TriplesInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+ @Override
+ public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new TriplesReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java
new file mode 100644
index 0000000..bfd643e
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.readers.TriplesOrQuadsReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+
+/**
+ * RDF input format that can handle any RDF triple/quads format that ARQ
+ * supports selecting the format to use for each file based upon the file
+ * extension. Triples are converted into quads in the default graph.
+ *
+ *
+ *
+ */
+public class TriplesOrQuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+ @Override
+ public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new TriplesOrQuadsReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDQuadInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDQuadInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDQuadInputFormat.java
new file mode 100644
index 0000000..2464946
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDQuadInputFormat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.jsonld;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.jsonld.JsonLDQuadReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+public class JsonLDQuadInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+ @Override
+ public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new JsonLDQuadReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDTripleInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDTripleInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDTripleInputFormat.java
new file mode 100644
index 0000000..0e08a4b
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/jsonld/JsonLDTripleInputFormat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.jsonld;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.jsonld.JsonLDTripleReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+public class JsonLDTripleInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+ @Override
+ public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new JsonLDTripleReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/BlockedNQuadsInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/BlockedNQuadsInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/BlockedNQuadsInputFormat.java
new file mode 100644
index 0000000..6829c4d
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/BlockedNQuadsInputFormat.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.nquads;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.nquads.BlockedNQuadsReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+
+/**
+ * NTriples input format where files are processed as blocks of lines rather
+ * than in a line based manner as with the {@link NQuadsInputFormat} or as
+ * whole files with the {@link WholeFileNQuadsInputFormat}
+ * <p>
+ * This provides a compromise between the higher parser setup of creating more
+ * parsers and the benefit of being able to split input files over multiple
+ * mappers.
+ * </p>
+ *
+ *
+ *
+ */
+public class BlockedNQuadsInputFormat extends AbstractNLineFileInputFormat<LongWritable, QuadWritable> {
+
+ @Override
+ public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new BlockedNQuadsReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java
new file mode 100644
index 0000000..802fbea
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/NQuadsInputFormat.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.nquads;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.nquads.NQuadsReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+
+/**
+ * NQuads input format
+ *
+ *
+ *
+ */
+public class NQuadsInputFormat extends AbstractNLineFileInputFormat<LongWritable, QuadWritable> {
+
+ @Override
+ public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
+ throws IOException, InterruptedException {
+ return new NQuadsReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java
new file mode 100644
index 0000000..128d079
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/nquads/WholeFileNQuadsInputFormat.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.nquads;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.nquads.WholeFileNQuadsReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+
+/**
+ * NQuads input format where files are processed as complete files rather than
+ * in a line based manner as with the {@link NQuadsInputFormat}
+ * <p>
+ * This has the advantage of less parser setup overhead but the disadvantage
+ * that the input cannot be split over multiple mappers.
+ * </p>
+ *
+ *
+ *
+ */
+public class WholeFileNQuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+ @Override
+ public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new WholeFileNQuadsReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java
new file mode 100644
index 0000000..292167b
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/BlockedNTriplesInputFormat.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.ntriples;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.BlockedNTriplesReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+
+/**
+ * NTriples input format where files are processed as blocks of lines rather
+ * than in a line based manner as with the {@link NTriplesInputFormat} or as
+ * whole files with the {@link WholeFileNTriplesInputFormat}
+ * <p>
+ * This provides a compromise between the higher parser setup of creating more
+ * parsers and the benefit of being able to split input files over multiple
+ * mappers.
+ * </p>
+ *
+ *
+ *
+ */
+public class BlockedNTriplesInputFormat extends AbstractNLineFileInputFormat<LongWritable, TripleWritable> {
+
+ @Override
+ public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new BlockedNTriplesReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java
new file mode 100644
index 0000000..1694c87
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/NTriplesInputFormat.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.ntriples;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractNLineFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.NTriplesReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+
+/**
+ * NTriples input format
+ *
+ *
+ *
+ */
+public class NTriplesInputFormat extends AbstractNLineFileInputFormat<LongWritable, TripleWritable> {
+
+ @Override
+ public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new NTriplesReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java
new file mode 100644
index 0000000..31c1252
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/ntriples/WholeFileNTriplesInputFormat.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.ntriples;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.ntriples.WholeFileNTriplesReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+
+/**
+ * NTriples input format where files are processed as complete files rather than
+ * in a line based manner as with the {@link NTriplesInputFormat}
+ * <p>
+ * This has the advantage of less parser setup overhead but the disadvantage
+ * that the input cannot be split over multiple mappers.
+ * </p>
+ *
+ *
+ *
+ */
+public class WholeFileNTriplesInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+ @Override
+ public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new WholeFileNTriplesReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java
new file mode 100644
index 0000000..e5a7940
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfjson/RdfJsonInputFormat.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.rdfjson;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.rdfjson.RdfJsonReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+
+/**
+ * RDF/JSON input format
+ *
+ *
+ *
+ */
+public class RdfJsonInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+ @Override
+ public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new RdfJsonReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java
new file mode 100644
index 0000000..4deb925
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/rdfxml/RdfXmlInputFormat.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.rdfxml;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.rdfxml.RdfXmlReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+
+/**
+ * RDF/XML input format
+ *
+ *
+ *
+ */
+public class RdfXmlInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+ @Override
+ public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new RdfXmlReader();
+ }
+
+}