You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by rv...@apache.org on 2015/01/05 16:07:19 UTC

[11/52] [abbrv] jena git commit: Rebrand to Jena Elephas per community vote

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicSetWritable.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicSetWritable.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicSetWritable.java
new file mode 100644
index 0000000..f29b156
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicSetWritable.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.NodeFactory;
+
+/**
+ * Represents a characteristic set which is comprised of a count of nodes for
+ * which the characteristic is applicable and a set of characteristics which
+ * represents the number of usages of predicates with those nodes
+ * 
+ * 
+ * 
+ */
+public class CharacteristicSetWritable implements WritableComparable<CharacteristicSetWritable> {
+
+    private Map<NodeWritable, CharacteristicWritable> characteristics = new TreeMap<NodeWritable, CharacteristicWritable>();
+    private LongWritable count = new LongWritable();
+
+    /**
+     * Creates a new empty characteristic set with the default count of 1
+     */
+    public CharacteristicSetWritable() {
+        this(1);
+    }
+
+    /**
+     * Creates a new characteristic set with the default count of 1 and the
+     * given characteristics
+     * 
+     * @param characteristics
+     *            Characteristics
+     */
+    public CharacteristicSetWritable(CharacteristicWritable... characteristics) {
+        this(1, characteristics);
+    }
+
+    /**
+     * Creates an empty characteristic set with the given count
+     * 
+     * @param count
+     *            Count
+     */
+    public CharacteristicSetWritable(long count) {
+        this(count, new CharacteristicWritable[0]);
+    }
+
+    /**
+     * Creates a new characteristic set
+     * 
+     * @param count
+     *            Count
+     * @param characteristics
+     *            Characteristics
+     */
+    public CharacteristicSetWritable(long count, CharacteristicWritable... characteristics) {
+        this.count.set(count);
+        for (CharacteristicWritable characteristic : characteristics) {
+            this.characteristics.put(characteristic.getNode(), characteristic);
+        }
+    }
+
+    /**
+     * Creates a new instance and reads its data from the given input
+     * 
+     * @param input
+     *            Input
+     * @return New instance
+     * @throws IOException
+     */
+    public static CharacteristicSetWritable read(DataInput input) throws IOException {
+        CharacteristicSetWritable set = new CharacteristicSetWritable();
+        set.readFields(input);
+        return set;
+    }
+
+    /**
+     * Gets the count
+     * 
+     * @return Count
+     */
+    public LongWritable getCount() {
+        return this.count;
+    }
+
+    /**
+     * Gets the characteristics
+     * 
+     * @return Characteristics
+     */
+    public Iterator<CharacteristicWritable> getCharacteristics() {
+        return this.characteristics.values().iterator();
+    }
+
+    /**
+     * Gets the size of the characteristic set
+     * 
+     * @return Size
+     */
+    public int size() {
+        return this.characteristics.size();
+    }
+
+    /**
+     * Adds a characteristic to the set merging it into the appropriate existing
+     * characteristic if applicable
+     * 
+     * @param characteristic
+     *            Characteristics
+     */
+    public void add(CharacteristicWritable characteristic) {
+        if (this.characteristics.containsKey(characteristic.getNode())) {
+            this.characteristics.get(characteristic.getNode()).increment(characteristic.getCount().get());
+        } else {
+            this.characteristics.put(characteristic.getNode(), characteristic);
+        }
+    }
+
+    /**
+     * Adds some characteristics to the set merging them with the appropriate
+     * existing characteristics if applicable
+     * 
+     * @param characteristics
+     */
+    public void add(CharacteristicWritable... characteristics) {
+        for (CharacteristicWritable characteristic : characteristics) {
+            this.add(characteristic);
+        }
+    }
+
+    /**
+     * Adds the contents of the other characteristic set to this characteristic
+     * set
+     * 
+     * @param set
+     *            Characteristic set
+     */
+    public void add(CharacteristicSetWritable set) {
+        this.increment(set.getCount().get());
+        Iterator<CharacteristicWritable> iter = set.getCharacteristics();
+        while (iter.hasNext()) {
+            this.add(iter.next());
+        }
+    }
+
+    /**
+     * Gets whether the set contains a characteristic for the given predicate
+     * 
+     * @param uri
+     *            Predicate URI
+     * @return True if contained in the set, false otherwise
+     */
+    public boolean hasCharacteristic(String uri) {
+        return this.hasCharacteristic(NodeFactory.createURI(uri));
+    }
+
+    /**
+     * Gets whether the set contains a characteristic for the given predicate
+     * 
+     * @param n
+     *            Predicate
+     * @return True if contained in the set, false otherwise
+     */
+    public boolean hasCharacteristic(Node n) {
+        return this.hasCharacteristic(new NodeWritable(n));
+    }
+
+    /**
+     * Gets whether the set contains a characteristic for the given predicate
+     * 
+     * @param n
+     *            Predicate
+     * @return True if contained in the set, false otherwise
+     */
+    public boolean hasCharacteristic(NodeWritable n) {
+        return this.characteristics.containsKey(n);
+    }
+
+    /**
+     * Increments the count by the given increment
+     * 
+     * @param l
+     *            Increment
+     */
+    public void increment(long l) {
+        this.count.set(this.count.get() + l);
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        // Write size, then count, then characteristics
+        WritableUtils.writeVInt(output, this.characteristics.size());
+        this.count.write(output);
+        for (CharacteristicWritable characteristic : this.characteristics.values()) {
+            characteristic.write(output);
+        }
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        // Read size, then count, then characteristics
+        int size = WritableUtils.readVInt(input);
+        this.count.readFields(input);
+        this.characteristics.clear();
+        for (int i = 0; i < size; i++) {
+            CharacteristicWritable cw = CharacteristicWritable.read(input);
+            this.characteristics.put(cw.getNode(), cw);
+        }
+    }
+
+    @Override
+    public int compareTo(CharacteristicSetWritable cs) {
+        int size = this.characteristics.size();
+        int otherSize = cs.characteristics.size();
+        if (size < otherSize) {
+            return -1;
+        } else if (size > otherSize) {
+            return 1;
+        } else {
+            // Compare characteristics in turn
+            Iterator<CharacteristicWritable> iter = this.getCharacteristics();
+            Iterator<CharacteristicWritable> otherIter = cs.getCharacteristics();
+
+            int compare = 0;
+            while (iter.hasNext()) {
+                CharacteristicWritable c = iter.next();
+                CharacteristicWritable otherC = otherIter.next();
+                compare = c.compareTo(otherC);
+                if (compare != 0)
+                    return compare;
+            }
+            return compare;
+        }
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof CharacteristicSetWritable))
+            return false;
+        return this.compareTo((CharacteristicSetWritable) other) == 0;
+    }
+
+    @Override
+    public int hashCode() {
+        // Build a hash code from characteristics
+        if (this.characteristics.size() == 0)
+            return 0;
+        Iterator<CharacteristicWritable> iter = this.getCharacteristics();
+        int hash = 17;
+        while (iter.hasNext()) {
+            hash = hash * 31 + iter.next().hashCode();
+        }
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("{ ");
+        builder.append(this.count.get());
+        Iterator<CharacteristicWritable> iter = this.getCharacteristics();
+        while (iter.hasNext()) {
+            builder.append(" , ");
+            builder.append(iter.next().toString());
+        }
+        builder.append(" }");
+        return builder.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicWritable.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicWritable.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicWritable.java
new file mode 100644
index 0000000..90fc7db
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicWritable.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.hp.hpl.jena.graph.Node;
+
+/**
+ * Represents a characteristic for a single node and contains the node and a
+ * count associated with that node
+ * <p>
+ * Note that characteristics are compared based upon only the nodes and not
+ * their counts
+ * </p>
+ * 
+ * 
+ * 
+ */
+public class CharacteristicWritable implements WritableComparable<CharacteristicWritable> {
+
+    private NodeWritable node = new NodeWritable();
+    private LongWritable count = new LongWritable();
+
+    /**
+     * Creates an empty characteristic writable
+     */
+    public CharacteristicWritable() {
+        this(null);
+    }
+
+    /**
+     * Creates a characteristic writable with the given node and the default
+     * count of 1
+     * 
+     * @param n
+     *            Node
+     */
+    public CharacteristicWritable(Node n) {
+        this(n, 1);
+    }
+
+    /**
+     * Creates a characteristic writable with the given node and count
+     * 
+     * @param n
+     *            Node
+     * @param count
+     *            Count
+     */
+    public CharacteristicWritable(Node n, long count) {
+        this.node.set(n);
+        this.count.set(count);
+    }
+
+    /**
+     * Creates a new instance and reads in its data from the given input
+     * 
+     * @param input
+     *            Input
+     * @return New instance
+     * @throws IOException
+     */
+    public static CharacteristicWritable read(DataInput input) throws IOException {
+        CharacteristicWritable cw = new CharacteristicWritable();
+        cw.readFields(input);
+        return cw;
+    }
+
+    /**
+     * Gets the node
+     * 
+     * @return Node
+     */
+    public NodeWritable getNode() {
+        return this.node;
+    }
+
+    /**
+     * Gets the count
+     * 
+     * @return Count
+     */
+    public LongWritable getCount() {
+        return this.count;
+    }
+
+    /**
+     * Increments the count by 1
+     */
+    public void increment() {
+        this.increment(1);
+    }
+
+    /**
+     * Increments the count by the given value
+     * 
+     * @param l
+     *            Value to increment by
+     */
+    public void increment(long l) {
+        this.count.set(this.count.get() + l);
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        this.node.write(output);
+        this.count.write(output);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        this.node.readFields(input);
+        this.count.readFields(input);
+    }
+
+    @Override
+    public int compareTo(CharacteristicWritable o) {
+        return this.node.compareTo(o.node);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof CharacteristicWritable))
+            return false;
+        return this.compareTo((CharacteristicWritable) other) == 0;
+    }
+
+    @Override
+    public int hashCode() {
+        return this.node.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "(" + this.node.toString() + ", " + this.count.toString() + ")";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeTupleWritable.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeTupleWritable.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeTupleWritable.java
new file mode 100644
index 0000000..e06aac4
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeTupleWritable.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.jena.atlas.lib.Tuple;
+import com.hp.hpl.jena.graph.Node;
+
+/**
+ * A writable RDF tuple
+ * <p>
+ * Unlike the more specific {@link TripleWritable} and {@link QuadWritable} this
+ * class allows for arbitrary length tuples and does not restrict tuples to
+ * being of uniform size.
+ * </p>
+ * 
+ * 
+ * 
+ */
+public class NodeTupleWritable extends AbstractNodeTupleWritable<Tuple<Node>> {
+
+    /**
+     * Creates a new empty instance
+     */
+    public NodeTupleWritable() {
+        this(null);
+    }
+
+    /**
+     * Creates a new instance with the given value
+     * 
+     * @param tuple
+     *            Tuple
+     */
+    public NodeTupleWritable(Tuple<Node> tuple) {
+        super(tuple);
+    }
+
+    /**
+     * Creates a new instance from the given input
+     * 
+     * @param input
+     *            Input
+     * @return New instance
+     * @throws IOException
+     */
+    public static NodeTupleWritable read(DataInput input) throws IOException {
+        NodeTupleWritable t = new NodeTupleWritable();
+        t.readFields(input);
+        return t;
+    }
+
+    @Override
+    protected Tuple<Node> createTuple(Node[] ns) {
+        return Tuple.create(ns);
+    }
+
+    @Override
+    protected Node[] createNodes(Tuple<Node> tuple) {
+        return tuple.tuple();
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeWritable.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeWritable.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeWritable.java
new file mode 100644
index 0000000..cf00f8d
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeWritable.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.jena.hadoop.rdf.types.comparators.SimpleBinaryComparator;
+import org.apache.jena.hadoop.rdf.types.converters.ThriftConverter;
+import org.apache.jena.riot.thrift.TRDF;
+import org.apache.jena.riot.thrift.ThriftConvert;
+import org.apache.jena.riot.thrift.wire.RDF_Term;
+import org.apache.thrift.TException;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.sparql.util.NodeUtils;
+
+/**
+ * A writable for {@link Node} instances
+ * <p>
+ * This uses <a
+ * href="http://afs.github.io/rdf-thrift/rdf-binary-thrift.html">RDF Thrift</a>
+ * for the binary encoding of terms. The in-memory storage for this type is both
+ * a {@link Node} and a {@link RDF_Term} with lazy conversion between the two
+ * forms as necessary.
+ * </p>
+ */
+public class NodeWritable implements WritableComparable<NodeWritable> {
+
+    static {
+        WritableComparator.define(NodeWritable.class, new SimpleBinaryComparator());
+    }
+
+    private Node node;
+    private RDF_Term term = new RDF_Term();
+
+    /**
+     * Creates an empty writable
+     */
+    public NodeWritable() {
+        this(null);
+    }
+
+    /**
+     * Creates a new instance from the given input
+     * 
+     * @param input
+     *            Input
+     * @return New instance
+     * @throws IOException
+     */
+    public static NodeWritable read(DataInput input) throws IOException {
+        NodeWritable nw = new NodeWritable();
+        nw.readFields(input);
+        return nw;
+    }
+
+    /**
+     * Creates a new writable with the given value
+     * 
+     * @param n
+     *            Node
+     */
+    public NodeWritable(Node n) {
+        this.set(n);
+    }
+
+    /**
+     * Gets the node
+     * 
+     * @return Node
+     */
+    public Node get() {
+        // We may not have yet loaded the node
+        if (this.node == null) {
+            // If term is set to undefined then node is supposed to be null
+            if (this.term.isSet() && !this.term.isSetUndefined()) {
+                this.node = ThriftConvert.convert(this.term);
+            }
+        }
+        return this.node;
+    }
+
+    /**
+     * Sets the node
+     * 
+     * @param n
+     *            Node
+     */
+    public void set(Node n) {
+        this.node = n;
+        // Clear the term for now
+        // We only convert the Node to a term as and when we want to write it
+        // out in order to not waste effort if the value is never written out
+        this.term.clear();
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        // Clear previous value
+        this.node = null;
+        this.term.clear();
+
+        // Read in the new value
+        int termLength = input.readInt();
+        byte[] buffer = new byte[termLength];
+        input.readFully(buffer);
+        try {
+            ThriftConverter.fromBytes(buffer, this.term);
+        } catch (TException e) {
+            throw new IOException(e);
+        }
+
+        // Note that we don't convert it back into a Node at this time
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        // May not yet have prepared the Thrift term
+        if (!this.term.isSet()) {
+            if (this.node == null) {
+                this.term.setUndefined(TRDF.UNDEF);
+            } else {
+                ThriftConvert.toThrift(this.node, null, this.term, false);
+            }
+        }
+
+        // Write out the Thrift term
+        byte[] buffer;
+        try {
+            buffer = ThriftConverter.toBytes(this.term);
+        } catch (TException e) {
+            throw new IOException(e);
+        }
+        output.writeInt(buffer.length);
+        output.write(buffer);
+    }
+
+    @Override
+    public int compareTo(NodeWritable other) {
+        // Use get() rather than accessing the field directly because the node
+        // field is lazily instantiated from the Thrift term
+        return NodeUtils.compareRDFTerms(this.get(), other.get());
+    }
+
+    @Override
+    public String toString() {
+        // Use get() rather than accessing the field directly because the node
+        // field is lazily instantiated from the Thrift term
+        Node n = this.get();
+        if (n == null)
+            return "";
+        return n.toString();
+    }
+
+    @Override
+    public int hashCode() {
+        // Use get() rather than accessing the field directly because the node
+        // field is lazily instantiated from the Thrift term
+        Node n = this.get();
+        return n != null ? this.get().hashCode() : 0;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof NodeWritable))
+            return false;
+        return this.compareTo((NodeWritable) other) == 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java
new file mode 100644
index 0000000..3d9dd00
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.jena.hadoop.rdf.types.comparators.SimpleBinaryComparator;
+import org.apache.jena.hadoop.rdf.types.converters.ThriftConverter;
+import org.apache.jena.riot.thrift.ThriftConvert;
+import org.apache.jena.riot.thrift.wire.RDF_Quad;
+import org.apache.thrift.TException;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * A writable quad
+ */
+public class QuadWritable extends AbstractNodeTupleWritable<Quad> {
+
+    static {
+        WritableComparator.define(QuadWritable.class, new SimpleBinaryComparator());
+    }
+
+    private RDF_Quad quad = new RDF_Quad();
+
+    /**
+     * Creates a new empty instance
+     */
+    public QuadWritable() {
+        this(null);
+    }
+
+    /**
+     * Creates a new instance with the given value
+     * 
+     * @param q
+     *            Quad
+     */
+    public QuadWritable(Quad q) {
+        super(q);
+    }
+
+    /**
+     * Creates a new instance from the given input
+     * 
+     * @param input
+     *            Input
+     * @return New instance
+     * @throws IOException
+     */
+    public static QuadWritable read(DataInput input) throws IOException {
+        QuadWritable q = new QuadWritable();
+        q.readFields(input);
+        return q;
+    }
+
+    @Override
+    public void set(Quad tuple) {
+        super.set(tuple);
+        this.quad.clear();
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        this.quad.clear();
+        int tripleLength = input.readInt();
+        byte[] buffer = new byte[tripleLength];
+        input.readFully(buffer);
+        try {
+            ThriftConverter.fromBytes(buffer, this.quad);
+        } catch (TException e) {
+            throw new IOException(e);
+        }
+        this.setInternal(new Quad(ThriftConvert.convert(this.quad.getG()), ThriftConvert.convert(this.quad.getS()),
+                ThriftConvert.convert(this.quad.getP()), ThriftConvert.convert(this.quad.getO())));
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        if (this.get() == null)
+            throw new IOException(
+                    "Null quads cannot be written using this class, consider using NodeTupleWritable instead");
+
+        // May not have yet prepared the Thrift triple
+        if (!this.quad.isSetS()) {
+            Quad tuple = this.get();
+            this.quad.setG(ThriftConvert.convert(tuple.getGraph(), false));
+            this.quad.setS(ThriftConvert.convert(tuple.getSubject(), false));
+            this.quad.setP(ThriftConvert.convert(tuple.getPredicate(), false));
+            this.quad.setO(ThriftConvert.convert(tuple.getObject(), false));
+        }
+
+        byte[] buffer;
+        try {
+            buffer = ThriftConverter.toBytes(this.quad);
+        } catch (TException e) {
+            throw new IOException(e);
+        }
+        output.writeInt(buffer.length);
+        output.write(buffer);
+    }
+
+    @Override
+    protected Quad createTuple(Node[] ns) {
+        if (ns.length != 4)
+            throw new IllegalArgumentException(String.format(
+                    "Incorrect number of nodes to form a quad - got %d but expected 4", ns.length));
+        return new Quad(ns[0], ns[1], ns[2], ns[3]);
+    }
+
+    @Override
+    protected Node[] createNodes(Quad tuple) {
+        return new Node[] { tuple.getGraph(), tuple.getSubject(), tuple.getPredicate(), tuple.getObject() };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java
new file mode 100644
index 0000000..a17052b
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.jena.hadoop.rdf.types.comparators.SimpleBinaryComparator;
+import org.apache.jena.hadoop.rdf.types.converters.ThriftConverter;
+import org.apache.jena.riot.thrift.ThriftConvert;
+import org.apache.jena.riot.thrift.wire.RDF_Triple;
+import org.apache.thrift.TException;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.Triple;
+
+/**
+ * A writable triple
+ * 
+ * 
+ * 
+ */
+public class TripleWritable extends AbstractNodeTupleWritable<Triple> {
+    
+    static {
+        WritableComparator.define(TripleWritable.class, new SimpleBinaryComparator());
+    }
+
+    private RDF_Triple triple = new RDF_Triple();
+
+    /**
+     * Creates a new instance using the default NTriples node formatter
+     */
+    public TripleWritable() {
+        this(null);
+    }
+
+    /**
+     * Creates a new instance with a given value that uses a specific node
+     * formatter
+     * 
+     * @param t
+     *            Triple
+     */
+    public TripleWritable(Triple t) {
+        super(t);
+    }
+
+    /**
+     * Creates a new instance from the given input
+     * 
+     * @param input
+     *            Input
+     * @return New instance
+     * @throws IOException
+     */
+    public static TripleWritable read(DataInput input) throws IOException {
+        TripleWritable t = new TripleWritable();
+        t.readFields(input);
+        return t;
+    }
+
+    @Override
+    public void set(Triple tuple) {
+        super.set(tuple);
+        this.triple.clear();
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        this.triple.clear();
+        int tripleLength = input.readInt();
+        byte[] buffer = new byte[tripleLength];
+        input.readFully(buffer);
+        try {
+            ThriftConverter.fromBytes(buffer, this.triple);
+        } catch (TException e) {
+            throw new IOException(e);
+        }
+        this.setInternal(new Triple(ThriftConvert.convert(this.triple.getS()),
+                ThriftConvert.convert(this.triple.getP()), ThriftConvert.convert(this.triple.getO())));
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        if (this.get() == null)
+            throw new IOException(
+                    "Null triples cannot be written using this class, consider using NodeTupleWritable instead");
+        
+        // May not have yet prepared the Thrift triple
+        if (!this.triple.isSetS()) {
+            Triple tuple = this.get();
+            this.triple.setS(ThriftConvert.convert(tuple.getSubject(), false));
+            this.triple.setP(ThriftConvert.convert(tuple.getPredicate(), false));
+            this.triple.setO(ThriftConvert.convert(tuple.getObject(), false));
+        }
+
+        byte[] buffer;
+        try {
+            buffer = ThriftConverter.toBytes(this.triple);
+        } catch (TException e) {
+            throw new IOException(e);
+        }
+        output.writeInt(buffer.length);
+        output.write(buffer);
+    }
+
+    @Override
+    protected Triple createTuple(Node[] ns) {
+        if (ns.length != 3)
+            throw new IllegalArgumentException(String.format(
+                    "Incorrect number of nodes to form a triple - got %d but expected 3", ns.length));
+        return new Triple(ns[0], ns[1], ns[2]);
+    }
+
+    @Override
+    protected Node[] createNodes(Triple tuple) {
+        return new Node[] { tuple.getSubject(), tuple.getPredicate(), tuple.getObject() };
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java
new file mode 100644
index 0000000..6c46714
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types.comparators;
+
+import org.apache.hadoop.io.WritableComparator;
+
+/**
+ * A general purpose comparator that may be used with any types which can be
+ * compared directly on their binary encodings
+ */
+public class SimpleBinaryComparator extends WritableComparator {
+
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+        return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java
new file mode 100644
index 0000000..0675afc
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types.converters;
+
+import java.io.ByteArrayOutputStream;
+
+import org.apache.jena.riot.thrift.wire.RDF_Quad;
+import org.apache.jena.riot.thrift.wire.RDF_Term;
+import org.apache.jena.riot.thrift.wire.RDF_Triple;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TMemoryInputTransport;
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * Helper for converting between the binary representation of Nodes, Triples and
+ * Quads and their Jena API equivalents
+ * 
+ */
+public class ThriftConverter {
+
+    private static ThreadLocal<TMemoryInputTransport> inputTransports = new ThreadLocal<>();
+    private static ThreadLocal<TProtocol> inputProtocols = new ThreadLocal<>();
+
+    private static ThreadLocal<ByteArrayOutputStream> outputStreams = new ThreadLocal<>();
+    private static ThreadLocal<TTransport> outputTransports = new ThreadLocal<>();
+    private static ThreadLocal<TProtocol> outputProtocols = new ThreadLocal<>();
+
+    private static TMemoryInputTransport getInputTransport() {
+        TMemoryInputTransport transport = inputTransports.get();
+        if (transport != null)
+            return transport;
+
+        transport = new TMemoryInputTransport();
+        inputTransports.set(transport);
+        return transport;
+    }
+
+    private static TProtocol getInputProtocol() {
+        TProtocol protocol = inputProtocols.get();
+        if (protocol != null)
+            return protocol;
+
+        protocol = new TCompactProtocol(getInputTransport());
+        inputProtocols.set(protocol);
+        return protocol;
+    }
+
+    private static ByteArrayOutputStream getOutputStream() {
+        ByteArrayOutputStream output = outputStreams.get();
+        if (output != null)
+            return output;
+
+        output = new ByteArrayOutputStream();
+        outputStreams.set(output);
+        return output;
+    }
+
+    private static TTransport getOutputTransport() {
+        TTransport transport = outputTransports.get();
+        if (transport != null)
+            return transport;
+
+        transport = new TIOStreamTransport(getOutputStream());
+        outputTransports.set(transport);
+        return transport;
+    }
+
+    private static TProtocol getOutputProtocol() {
+        TProtocol protocol = outputProtocols.get();
+        if (protocol != null)
+            return protocol;
+
+        protocol = new TCompactProtocol(getOutputTransport());
+        outputProtocols.set(protocol);
+        return protocol;
+    }
+
+    public static byte[] toBytes(RDF_Term term) throws TException {
+        ByteArrayOutputStream output = getOutputStream();
+        output.reset();
+
+        TProtocol protocol = getOutputProtocol();
+        term.write(protocol);
+
+        return output.toByteArray();
+    }
+
+    public static void fromBytes(byte[] bs, RDF_Term term) throws TException {
+        TMemoryInputTransport transport = getInputTransport();
+        transport.reset(bs);
+        TProtocol protocol = getInputProtocol();
+        term.read(protocol);
+    }
+
+    public static void fromBytes(byte[] buffer, RDF_Triple triple) throws TException {
+        TMemoryInputTransport transport = getInputTransport();
+        transport.reset(buffer);
+        TProtocol protocol = getInputProtocol();
+        triple.read(protocol);
+    }
+
+    public static byte[] toBytes(RDF_Triple triple) throws TException {
+        ByteArrayOutputStream output = getOutputStream();
+        output.reset();
+
+        TProtocol protocol = getOutputProtocol();
+        triple.write(protocol);
+
+        return output.toByteArray();
+    }
+
+    public static void fromBytes(byte[] buffer, RDF_Quad quad) throws TException {
+        TMemoryInputTransport transport = getInputTransport();
+        transport.reset(buffer);
+        TProtocol protocol = getInputProtocol();
+        quad.read(protocol);
+    }
+
+    public static byte[] toBytes(RDF_Quad quad) throws TException {
+        ByteArrayOutputStream output = getOutputStream();
+        output.reset();
+
+        TProtocol protocol = getOutputProtocol();
+        quad.write(protocol);
+
+        return output.toByteArray();
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java b/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java
new file mode 100644
index 0000000..7214b14
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.types;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.jena.hadoop.rdf.types.CharacteristicSetWritable;
+import org.apache.jena.hadoop.rdf.types.CharacteristicWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.NodeFactory;
+
+/**
+ * Tests for {@link CharacteristicWritable} and
+ * {@link CharacteristicSetWritable}
+ * 
+ * 
+ * 
+ */
+public class CharacteristicTests {
+
+    /**
+     * Checks whether a writable round trips successfully
+     * 
+     * @param cw
+     *            Characteristic writable
+     * @throws IOException
+     */
+    private void checkRoundTrip(CharacteristicWritable cw) throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream output = new DataOutputStream(outputStream);
+        cw.write(output);
+
+        ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+        DataInputStream input = new DataInputStream(inputStream);
+        CharacteristicWritable actual = CharacteristicWritable.read(input);
+        Assert.assertEquals(cw, actual);
+    }
+
+    /**
+     * Tests characteristic round tripping
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void characteristic_writable_01() throws IOException {
+        Node n = NodeFactory.createURI("http://example.org");
+        CharacteristicWritable expected = new CharacteristicWritable(n);
+        Assert.assertEquals(1, expected.getCount().get());
+
+        this.checkRoundTrip(expected);
+    }
+
+    /**
+     * Tests characteristic properties
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void characteristic_writable_02() throws IOException {
+        Node n = NodeFactory.createURI("http://example.org");
+        CharacteristicWritable cw1 = new CharacteristicWritable(n);
+        CharacteristicWritable cw2 = new CharacteristicWritable(n, 100);
+        this.checkRoundTrip(cw1);
+        this.checkRoundTrip(cw2);
+
+        // Should still be equal since equality is only on the node not the
+        // count
+        Assert.assertEquals(cw1, cw2);
+    }
+
+    /**
+     * Tests characteristic properties
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void characteristic_writable_03() throws IOException {
+        CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"));
+        CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other"));
+        this.checkRoundTrip(cw1);
+        this.checkRoundTrip(cw2);
+
+        // Should not be equal as different nodes
+        Assert.assertNotEquals(cw1, cw2);
+    }
+
+    /**
+     * Checks that a writable round trips
+     * 
+     * @param set
+     *            Characteristic set
+     * @throws IOException
+     */
+    private void checkRoundTrip(CharacteristicSetWritable set) throws IOException {
+        // Test round trip
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream output = new DataOutputStream(outputStream);
+        set.write(output);
+
+        ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+        DataInputStream input = new DataInputStream(inputStream);
+        CharacteristicSetWritable actual = CharacteristicSetWritable.read(input);
+        Assert.assertEquals(set, actual);
+    }
+
+    /**
+     * Checks a characteristic set
+     * 
+     * @param set
+     *            Set
+     * @param expectedItems
+     *            Expected number of characteristics
+     * @param expectedCounts
+     *            Expected counts for characteristics
+     */
+    protected final void checkCharacteristicSet(CharacteristicSetWritable set, int expectedItems, long[] expectedCounts) {
+        Assert.assertEquals(expectedItems, set.size());
+        Assert.assertEquals(expectedItems, expectedCounts.length);
+        Iterator<CharacteristicWritable> iter = set.getCharacteristics();
+        int i = 0;
+        while (iter.hasNext()) {
+            CharacteristicWritable cw = iter.next();
+            Assert.assertEquals(expectedCounts[i], cw.getCount().get());
+            i++;
+        }
+    }
+
+    /**
+     * Tests characteristic sets
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void characteristic_set_writable_01() throws IOException {
+        CharacteristicSetWritable set = new CharacteristicSetWritable();
+
+        // Add some characteristics
+        CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"));
+        CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other"));
+        set.add(cw1);
+        set.add(cw2);
+        this.checkCharacteristicSet(set, 2, new long[] { 1, 1 });
+        this.checkRoundTrip(set);
+    }
+
+    /**
+     * Tests characteristic sets
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void characteristic_set_writable_02() throws IOException {
+        CharacteristicSetWritable set = new CharacteristicSetWritable();
+
+        // Add some characteristics
+        CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"));
+        CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"), 2);
+        set.add(cw1);
+        set.add(cw2);
+        this.checkCharacteristicSet(set, 1, new long[] { 3 });
+        this.checkRoundTrip(set);
+    }
+    
+    /**
+     * Tests characteristic sets
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void characteristic_set_writable_03() throws IOException {
+        CharacteristicSetWritable set1 = new CharacteristicSetWritable();
+        CharacteristicSetWritable set2 = new CharacteristicSetWritable();
+
+        // Add some characteristics
+        CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"));
+        CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other"));
+        set1.add(cw1);
+        set2.add(cw2);
+        this.checkCharacteristicSet(set1, 1, new long[] { 1 });
+        this.checkCharacteristicSet(set2, 1, new long[] { 1 });
+        this.checkRoundTrip(set1);
+        this.checkRoundTrip(set2);
+        
+        Assert.assertNotEquals(set1, set2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java b/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java
new file mode 100644
index 0000000..a70dfb0
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.types;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.jena.atlas.lib.Tuple;
+import org.apache.jena.hadoop.rdf.types.NodeTupleWritable;
+import org.apache.jena.hadoop.rdf.types.NodeWritable;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hp.hpl.jena.datatypes.xsd.XSDDatatype;
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.NodeFactory;
+import com.hp.hpl.jena.graph.Triple;
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * Tests for the various RDF types defined by the
+ * {@link org.apache.jena.hadoop.rdf.types} package
+ * 
+ * 
+ * 
+ */
+public class RdfTypesTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RdfTypesTest.class);
+
+    private ByteArrayOutputStream outputStream;
+    private ByteArrayInputStream inputStream;
+
+    /**
+     * Prepare for output
+     * 
+     * @return Data output
+     */
+    private DataOutput prepareOutput() {
+        this.outputStream = new ByteArrayOutputStream();
+        return new DataOutputStream(this.outputStream);
+    }
+
+    /**
+     * Prepare for input from the previously written output
+     * 
+     * @return Data Input
+     */
+    private DataInput prepareInput() {
+        this.inputStream = new ByteArrayInputStream(this.outputStream.toByteArray());
+        return new DataInputStream(this.inputStream);
+    }
+
+    /**
+     * Prepare for input from the given data
+     * 
+     * @param data
+     *            Data
+     * @return Data Input
+     */
+    @SuppressWarnings("unused")
+    private DataInput prepareInput(byte[] data) {
+        this.inputStream = new ByteArrayInputStream(data);
+        return new DataInputStream(this.inputStream);
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private <T extends WritableComparable> void testWriteRead(T writable, T expected) throws IOException, InstantiationException, IllegalAccessException,
+            ClassNotFoundException {
+        // Write out data
+        DataOutput output = this.prepareOutput();
+        writable.write(output);
+
+        // Read back in data
+        DataInput input = this.prepareInput();
+        T actual = (T) Class.forName(writable.getClass().getName()).newInstance();
+        actual.readFields(input);
+
+        LOG.info("Original = " + writable.toString());
+        LOG.info("Round Tripped = " + actual.toString());
+
+        // Check equivalent
+        Assert.assertEquals(0, expected.compareTo(actual));
+    }
+    
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_null() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = null;
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+    
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    @Ignore
+    public void node_writable_variable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createVariable("x");
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+    
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    @Ignore
+    public void node_writable_variable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createVariable("really-log-variable-name-asddsfr4545egfdgdfgfdgdtgvdg-dfgfdgdfgdfgdfg4-dfvdfgdfgdfgfdgfdgdfgdfgfdg");
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_uri_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createURI("http://example.org");
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_uri_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createURI("http://user:password@example.org/some/path?key=value#id");
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_literal_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createLiteral("simple");
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_literal_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createLiteral("language", "en", null);
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_literal_03() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createLiteral("string", XSDDatatype.XSDstring);
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_literal_04() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createLiteral("1234", XSDDatatype.XSDinteger);
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_literal_05() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createLiteral("123.4", XSDDatatype.XSDdecimal);
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_literal_06() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createLiteral("12.3e4", XSDDatatype.XSDdouble);
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_literal_07() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createLiteral("true", XSDDatatype.XSDboolean);
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_bnode_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createAnon();
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_bnode_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createAnon();
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+        NodeWritable nw2 = new NodeWritable(n);
+        testWriteRead(nw2, nw2);
+
+        Assert.assertEquals(0, nw.compareTo(nw2));
+    }
+
+    /**
+     * Basic triple writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void triple_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Triple t = new Triple(NodeFactory.createURI("http://example"), NodeFactory.createURI("http://predicate"), NodeFactory.createLiteral("value"));
+        TripleWritable tw = new TripleWritable(t);
+        testWriteRead(tw, tw);
+    }
+    
+    /**
+     * Basic triple writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void triple_writable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Triple t = new Triple(NodeFactory.createAnon(), NodeFactory.createURI("http://predicate"), NodeFactory.createLiteral("value"));
+        TripleWritable tw = new TripleWritable(t);
+        testWriteRead(tw, tw);
+    }
+
+    /**
+     * Basic quad writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void quad_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Quad q = new Quad(Quad.defaultGraphNodeGenerated, NodeFactory.createURI("http://example"), NodeFactory.createURI("http://predicate"),
+                NodeFactory.createLiteral("value"));
+        QuadWritable qw = new QuadWritable(q);
+        testWriteRead(qw, qw);
+    }
+    
+    /**
+     * Basic quad writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void quad_writable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Quad q = new Quad(Quad.defaultGraphNodeGenerated, NodeFactory.createAnon(), NodeFactory.createURI("http://predicate"),
+                NodeFactory.createLiteral("value"));
+        QuadWritable qw = new QuadWritable(q);
+        testWriteRead(qw, qw);
+    }
+
+    /**
+     * Basic tuple writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void tuple_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Tuple<Node> t = Tuple.createTuple(NodeFactory.createURI("http://one"), NodeFactory.createURI("http://two"), NodeFactory.createLiteral("value"),
+                NodeFactory.createLiteral("foo"), NodeFactory.createURI("http://three"));
+        NodeTupleWritable tw = new NodeTupleWritable(t);
+        testWriteRead(tw, tw);
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/pom.xml
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/pom.xml b/jena-hadoop-rdf/jena-elephas-io/pom.xml
new file mode 100644
index 0000000..2be37f9
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/pom.xml
@@ -0,0 +1,67 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.jena</groupId>
+    <artifactId>jena-elephas</artifactId>
+    <version>0.9.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>jena-elephas-io</artifactId>
+  <name>Apache Jena - Elephas - I/O</name>
+  <description>RDF Input/Output formats library for Hadoop</description>
+
+	<!-- Note that versions are managed by parent POMs -->
+  <dependencies>
+		<!-- Internal Project Dependencies -->
+    <dependency>
+      <groupId>org.apache.jena</groupId>
+      <artifactId>jena-hadoop-rdf-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+	<!-- Hadoop Dependencies -->
+	<!-- Note these will be provided on the Hadoop cluster hence the provided 
+		scope -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+	<!-- Jena dependencies -->
+    <dependency>
+      <groupId>org.apache.jena</groupId>
+      <artifactId>jena-arq</artifactId>
+    </dependency>
+
+	<!-- Test Dependencies -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java
new file mode 100644
index 0000000..5c1b41c
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io;
+
+/**
+ * Hadoop IO related constants
+ * 
+ * 
+ * 
+ */
+public class HadoopIOConstants {
+
+    /**
+     * Private constructor prevents instantiation
+     */
+    private HadoopIOConstants() {
+    }
+
+    /**
+     * Map Reduce configuration setting for max line length
+     */
+    public static final String MAX_LINE_LENGTH = "mapreduce.input.linerecordreader.line.maxlength";
+
+    /**
+     * Run ID
+     */
+    public static final String RUN_ID = "runId";
+    
+    /**
+     * Compression codecs to use
+     */
+    public static final String IO_COMPRESSION_CODECS = "io.compression.codecs";
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java
new file mode 100644
index 0000000..27c2bb2
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io;
+
+import java.io.IOException;
+
+/**
+ * RDF IO related constants
+ * 
+ * 
+ * 
+ */
+public class RdfIOConstants {
+
+    /**
+     * Private constructor prevents instantiation
+     */
+    private RdfIOConstants() {
+    }
+
+    /**
+     * Configuration key used to set whether bad tuples are ignored. This is the
+     * default behaviour, when explicitly set to {@code false} bad tuples will
+     * result in {@link IOException} being thrown by the relevant record
+     * readers.
+     */
+    public static final String INPUT_IGNORE_BAD_TUPLES = "rdf.io.input.ignore-bad-tuples";
+
+    /**
+     * Configuration key used to set the batch size used for RDF output formats
+     * that take a batched writing approach. Default value is given by the
+     * constant {@link #DEFAULT_OUTPUT_BATCH_SIZE}.
+     */
+    public static final String OUTPUT_BATCH_SIZE = "rdf.io.output.batch-size";
+
+    /**
+     * Default batch size for batched output formats
+     */
+    public static final long DEFAULT_OUTPUT_BATCH_SIZE = 10000;
+
+    /**
+     * Configuration key used to control behaviour with regards to how blank
+     * nodes are handled.
+     * <p>
+     * The default behaviour is that blank nodes are file scoped which is what
+     * the RDF specifications require.
+     * </p>
+     * <p>
+     * However in the case of a multi-stage pipeline this behaviour can cause
+     * blank nodes to diverge over several jobs and introduce spurious blank
+     * nodes over time. This is described in <a
+     * href="https://issues.apache.org/jira/browse/JENA-820">JENA-820</a> and
+     * enabling this flag for jobs in your pipeline allow you to work around
+     * this problem.
+     * </p>
+     * <h3>Warning</h3> You should only enable this flag for jobs that take in
+     * RDF output originating from previous jobs since our normal blank node
+     * allocation policy ensures that blank nodes will be file scoped and unique
+     * over all files (barring unfortunate hasing collisions). If you enable
+     * this for jobs that take in RDF originating from other sources you may
+     * incorrectly conflate blank nodes that are supposed to distinct and
+     * separate nodes.
+     */
+    public static final String GLOBAL_BNODE_IDENTITY = "rdf.io.input.bnodes.global-identity";
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java
new file mode 100644
index 0000000..1fcb030
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract line based input format that reuses the machinery from
+ * {@link NLineInputFormat} to calculate the splits
+ * 
+ * 
+ * 
+ * @param <TKey>
+ *            Key type
+ * @param <TValue>
+ *            Value type
+ */
+public abstract class AbstractNLineFileInputFormat<TKey, TValue> extends FileInputFormat<TKey, TValue> {
+    
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNLineFileInputFormat.class);
+
+    /**
+     * Logically splits the set of input files for the job, splits N lines of
+     * the input as one split.
+     * 
+     * @see FileInputFormat#getSplits(JobContext)
+     */
+    public final List<InputSplit> getSplits(JobContext job) throws IOException {
+        boolean debug = LOGGER.isDebugEnabled();
+        if (debug && FileInputFormat.getInputDirRecursive(job)) {
+            LOGGER.debug("Recursive searching for input data is enabled");
+        }
+        
+        List<InputSplit> splits = new ArrayList<InputSplit>();
+        int numLinesPerSplit = NLineInputFormat.getNumLinesPerSplit(job);
+        for (FileStatus status : listStatus(job)) {
+            if (debug) {
+                LOGGER.debug("Determining how to split input file/directory {}", status.getPath());
+            }
+            splits.addAll(NLineInputFormat.getSplitsForFile(status, job.getConfiguration(), numLinesPerSplit));
+        }
+        return splits;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java
new file mode 100644
index 0000000..e561cdb
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * Abstract implementation of a while file input format where each file is a
+ * single split
+ * 
+ * 
+ * 
+ * @param <TKey>
+ *            Key type
+ * @param <TValue>
+ *            Value type
+ */
+public abstract class AbstractWholeFileInputFormat<TKey, TValue> extends FileInputFormat<TKey, TValue> {
+
+    @Override
+    protected final boolean isSplitable(JobContext context, Path filename) {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java
new file mode 100644
index 0000000..b8fdbd5
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.readers.QuadsReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+
+/**
+ * RDF input format that can handle any RDF quads format that ARQ supports
+ * selecting the format to use for each file based upon the file extension
+ * 
+ * 
+ * 
+ */
+public class QuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+    @Override
+    public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new QuadsReader();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java
new file mode 100644
index 0000000..03f394a
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.readers.TriplesReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+/**
+ * RDF input format that can handle any RDF triples format that ARQ supports
+ * selecting the format to use for each file based upon the file extension
+ */
+public class TriplesInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+    @Override
+    public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new TriplesReader();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java
new file mode 100644
index 0000000..bfd643e
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.readers.TriplesOrQuadsReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+
+/**
+ * RDF input format that can handle any RDF triple/quads format that ARQ
+ * supports selecting the format to use for each file based upon the file
+ * extension. Triples are converted into quads in the default graph.
+ * 
+ * 
+ * 
+ */
+public class TriplesOrQuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+    @Override
+    public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new TriplesOrQuadsReader();
+    }
+
+}