You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by rv...@apache.org on 2015/01/05 16:07:19 UTC
[11/52] [abbrv] jena git commit: Rebrand to Jena Elephas per
community vote
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicSetWritable.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicSetWritable.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicSetWritable.java
new file mode 100644
index 0000000..f29b156
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicSetWritable.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.NodeFactory;
+
+/**
+ * Represents a characteristic set which is comprised of a count of nodes for
+ * which the characteristic is applicable and a set of characteristics which
+ * represents the number of usages of predicates with those nodes
+ *
+ *
+ *
+ */
+public class CharacteristicSetWritable implements WritableComparable<CharacteristicSetWritable> {
+
+ private Map<NodeWritable, CharacteristicWritable> characteristics = new TreeMap<NodeWritable, CharacteristicWritable>();
+ private LongWritable count = new LongWritable();
+
+ /**
+ * Creates a new empty characteristic set with the default count of 1
+ */
+ public CharacteristicSetWritable() {
+ this(1);
+ }
+
+ /**
+ * Creates a new characteristic set with the default count of 1 and the
+ * given characteristics
+ *
+ * @param characteristics
+ * Characteristics
+ */
+ public CharacteristicSetWritable(CharacteristicWritable... characteristics) {
+ this(1, characteristics);
+ }
+
+ /**
+ * Creates an empty characteristic set with the given count
+ *
+ * @param count
+ * Count
+ */
+ public CharacteristicSetWritable(long count) {
+ this(count, new CharacteristicWritable[0]);
+ }
+
+ /**
+ * Creates a new characteristic set
+ *
+ * @param count
+ * Count
+ * @param characteristics
+ * Characteristics
+ */
+ public CharacteristicSetWritable(long count, CharacteristicWritable... characteristics) {
+ this.count.set(count);
+ for (CharacteristicWritable characteristic : characteristics) {
+ this.characteristics.put(characteristic.getNode(), characteristic);
+ }
+ }
+
+ /**
+ * Creates a new instance and reads its data from the given input
+ *
+ * @param input
+ * Input
+ * @return New instance
+ * @throws IOException
+ */
+ public static CharacteristicSetWritable read(DataInput input) throws IOException {
+ CharacteristicSetWritable set = new CharacteristicSetWritable();
+ set.readFields(input);
+ return set;
+ }
+
+ /**
+ * Gets the count
+ *
+ * @return Count
+ */
+ public LongWritable getCount() {
+ return this.count;
+ }
+
+ /**
+ * Gets the characteristics
+ *
+ * @return Characteristics
+ */
+ public Iterator<CharacteristicWritable> getCharacteristics() {
+ return this.characteristics.values().iterator();
+ }
+
+ /**
+ * Gets the size of the characteristic set
+ *
+ * @return Size
+ */
+ public int size() {
+ return this.characteristics.size();
+ }
+
+ /**
+ * Adds a characteristic to the set merging it into the appropriate existing
+ * characteristic if applicable
+ *
+ * @param characteristic
+ * Characteristics
+ */
+ public void add(CharacteristicWritable characteristic) {
+ if (this.characteristics.containsKey(characteristic.getNode())) {
+ this.characteristics.get(characteristic.getNode()).increment(characteristic.getCount().get());
+ } else {
+ this.characteristics.put(characteristic.getNode(), characteristic);
+ }
+ }
+
+ /**
+ * Adds some characteristics to the set merging them with the appropriate
+ * existing characteristics if applicable
+ *
+ * @param characteristics
+ */
+ public void add(CharacteristicWritable... characteristics) {
+ for (CharacteristicWritable characteristic : characteristics) {
+ this.add(characteristic);
+ }
+ }
+
+ /**
+ * Adds the contents of the other characteristic set to this characteristic
+ * set
+ *
+ * @param set
+ * Characteristic set
+ */
+ public void add(CharacteristicSetWritable set) {
+ this.increment(set.getCount().get());
+ Iterator<CharacteristicWritable> iter = set.getCharacteristics();
+ while (iter.hasNext()) {
+ this.add(iter.next());
+ }
+ }
+
+ /**
+ * Gets whether the set contains a characteristic for the given predicate
+ *
+ * @param uri
+ * Predicate URI
+ * @return True if contained in the set, false otherwise
+ */
+ public boolean hasCharacteristic(String uri) {
+ return this.hasCharacteristic(NodeFactory.createURI(uri));
+ }
+
+ /**
+ * Gets whether the set contains a characteristic for the given predicate
+ *
+ * @param n
+ * Predicate
+ * @return True if contained in the set, false otherwise
+ */
+ public boolean hasCharacteristic(Node n) {
+ return this.hasCharacteristic(new NodeWritable(n));
+ }
+
+ /**
+ * Gets whether the set contains a characteristic for the given predicate
+ *
+ * @param n
+ * Predicate
+ * @return True if contained in the set, false otherwise
+ */
+ public boolean hasCharacteristic(NodeWritable n) {
+ return this.characteristics.containsKey(n);
+ }
+
+ /**
+ * Increments the count by the given increment
+ *
+ * @param l
+ * Increment
+ */
+ public void increment(long l) {
+ this.count.set(this.count.get() + l);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ // Write size, then count, then characteristics
+ WritableUtils.writeVInt(output, this.characteristics.size());
+ this.count.write(output);
+ for (CharacteristicWritable characteristic : this.characteristics.values()) {
+ characteristic.write(output);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ // Read size, then count, then characteristics
+ int size = WritableUtils.readVInt(input);
+ this.count.readFields(input);
+ this.characteristics.clear();
+ for (int i = 0; i < size; i++) {
+ CharacteristicWritable cw = CharacteristicWritable.read(input);
+ this.characteristics.put(cw.getNode(), cw);
+ }
+ }
+
+ @Override
+ public int compareTo(CharacteristicSetWritable cs) {
+ int size = this.characteristics.size();
+ int otherSize = cs.characteristics.size();
+ if (size < otherSize) {
+ return -1;
+ } else if (size > otherSize) {
+ return 1;
+ } else {
+ // Compare characteristics in turn
+ Iterator<CharacteristicWritable> iter = this.getCharacteristics();
+ Iterator<CharacteristicWritable> otherIter = cs.getCharacteristics();
+
+ int compare = 0;
+ while (iter.hasNext()) {
+ CharacteristicWritable c = iter.next();
+ CharacteristicWritable otherC = otherIter.next();
+ compare = c.compareTo(otherC);
+ if (compare != 0)
+ return compare;
+ }
+ return compare;
+ }
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof CharacteristicSetWritable))
+ return false;
+ return this.compareTo((CharacteristicSetWritable) other) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ // Build a hash code from characteristics
+ if (this.characteristics.size() == 0)
+ return 0;
+ Iterator<CharacteristicWritable> iter = this.getCharacteristics();
+ int hash = 17;
+ while (iter.hasNext()) {
+ hash = hash * 31 + iter.next().hashCode();
+ }
+ return hash;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("{ ");
+ builder.append(this.count.get());
+ Iterator<CharacteristicWritable> iter = this.getCharacteristics();
+ while (iter.hasNext()) {
+ builder.append(" , ");
+ builder.append(iter.next().toString());
+ }
+ builder.append(" }");
+ return builder.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicWritable.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicWritable.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicWritable.java
new file mode 100644
index 0000000..90fc7db
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/CharacteristicWritable.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.hp.hpl.jena.graph.Node;
+
+/**
+ * Represents a characteristic for a single node and contains the node and a
+ * count associated with that node
+ * <p>
+ * Note that characteristics are compared based upon only the nodes and not
+ * their counts
+ * </p>
+ *
+ *
+ *
+ */
+public class CharacteristicWritable implements WritableComparable<CharacteristicWritable> {
+
+ private NodeWritable node = new NodeWritable();
+ private LongWritable count = new LongWritable();
+
+ /**
+ * Creates an empty characteristic writable
+ */
+ public CharacteristicWritable() {
+ this(null);
+ }
+
+ /**
+ * Creates a characteristic writable with the given node and the default
+ * count of 1
+ *
+ * @param n
+ * Node
+ */
+ public CharacteristicWritable(Node n) {
+ this(n, 1);
+ }
+
+ /**
+ * Creates a characteristic writable with the given node and count
+ *
+ * @param n
+ * Node
+ * @param count
+ * Count
+ */
+ public CharacteristicWritable(Node n, long count) {
+ this.node.set(n);
+ this.count.set(count);
+ }
+
+ /**
+ * Creates a new instance and reads in its data from the given input
+ *
+ * @param input
+ * Input
+ * @return New instance
+ * @throws IOException
+ */
+ public static CharacteristicWritable read(DataInput input) throws IOException {
+ CharacteristicWritable cw = new CharacteristicWritable();
+ cw.readFields(input);
+ return cw;
+ }
+
+ /**
+ * Gets the node
+ *
+ * @return Node
+ */
+ public NodeWritable getNode() {
+ return this.node;
+ }
+
+ /**
+ * Gets the count
+ *
+ * @return Count
+ */
+ public LongWritable getCount() {
+ return this.count;
+ }
+
+ /**
+ * Increments the count by 1
+ */
+ public void increment() {
+ this.increment(1);
+ }
+
+ /**
+ * Increments the count by the given value
+ *
+ * @param l
+ * Value to increment by
+ */
+ public void increment(long l) {
+ this.count.set(this.count.get() + l);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ this.node.write(output);
+ this.count.write(output);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ this.node.readFields(input);
+ this.count.readFields(input);
+ }
+
+ @Override
+ public int compareTo(CharacteristicWritable o) {
+ return this.node.compareTo(o.node);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof CharacteristicWritable))
+ return false;
+ return this.compareTo((CharacteristicWritable) other) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return this.node.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "(" + this.node.toString() + ", " + this.count.toString() + ")";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeTupleWritable.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeTupleWritable.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeTupleWritable.java
new file mode 100644
index 0000000..e06aac4
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeTupleWritable.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.jena.atlas.lib.Tuple;
+import com.hp.hpl.jena.graph.Node;
+
+/**
+ * A writable RDF tuple
+ * <p>
+ * Unlike the more specific {@link TripleWritable} and {@link QuadWritable} this
+ * class allows for arbitrary length tuples and does not restrict tuples to
+ * being of uniform size.
+ * </p>
+ *
+ *
+ *
+ */
+public class NodeTupleWritable extends AbstractNodeTupleWritable<Tuple<Node>> {
+
+ /**
+ * Creates a new empty instance
+ */
+ public NodeTupleWritable() {
+ this(null);
+ }
+
+ /**
+ * Creates a new instance with the given value
+ *
+ * @param tuple
+ * Tuple
+ */
+ public NodeTupleWritable(Tuple<Node> tuple) {
+ super(tuple);
+ }
+
+ /**
+ * Creates a new instance from the given input
+ *
+ * @param input
+ * Input
+ * @return New instance
+ * @throws IOException
+ */
+ public static NodeTupleWritable read(DataInput input) throws IOException {
+ NodeTupleWritable t = new NodeTupleWritable();
+ t.readFields(input);
+ return t;
+ }
+
+ @Override
+ protected Tuple<Node> createTuple(Node[] ns) {
+ return Tuple.create(ns);
+ }
+
+ @Override
+ protected Node[] createNodes(Tuple<Node> tuple) {
+ return tuple.tuple();
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeWritable.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeWritable.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeWritable.java
new file mode 100644
index 0000000..cf00f8d
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/NodeWritable.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.jena.hadoop.rdf.types.comparators.SimpleBinaryComparator;
+import org.apache.jena.hadoop.rdf.types.converters.ThriftConverter;
+import org.apache.jena.riot.thrift.TRDF;
+import org.apache.jena.riot.thrift.ThriftConvert;
+import org.apache.jena.riot.thrift.wire.RDF_Term;
+import org.apache.thrift.TException;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.sparql.util.NodeUtils;
+
+/**
+ * A writable for {@link Node} instances
+ * <p>
+ * This uses <a
+ * href="http://afs.github.io/rdf-thrift/rdf-binary-thrift.html">RDF Thrift</a>
+ * for the binary encoding of terms. The in-memory storage for this type is both
+ * a {@link Node} and a {@link RDF_Term} with lazy conversion between the two
+ * forms as necessary.
+ * </p>
+ */
+public class NodeWritable implements WritableComparable<NodeWritable> {
+
+ static {
+ WritableComparator.define(NodeWritable.class, new SimpleBinaryComparator());
+ }
+
+ private Node node;
+ private RDF_Term term = new RDF_Term();
+
+ /**
+ * Creates an empty writable
+ */
+ public NodeWritable() {
+ this(null);
+ }
+
+ /**
+ * Creates a new instance from the given input
+ *
+ * @param input
+ * Input
+ * @return New instance
+ * @throws IOException
+ */
+ public static NodeWritable read(DataInput input) throws IOException {
+ NodeWritable nw = new NodeWritable();
+ nw.readFields(input);
+ return nw;
+ }
+
+ /**
+ * Creates a new writable with the given value
+ *
+ * @param n
+ * Node
+ */
+ public NodeWritable(Node n) {
+ this.set(n);
+ }
+
+ /**
+ * Gets the node
+ *
+ * @return Node
+ */
+ public Node get() {
+ // We may not have yet loaded the node
+ if (this.node == null) {
+ // If term is set to undefined then node is supposed to be null
+ if (this.term.isSet() && !this.term.isSetUndefined()) {
+ this.node = ThriftConvert.convert(this.term);
+ }
+ }
+ return this.node;
+ }
+
+ /**
+ * Sets the node
+ *
+ * @param n
+ * Node
+ */
+ public void set(Node n) {
+ this.node = n;
+ // Clear the term for now
+ // We only convert the Node to a term as and when we want to write it
+ // out in order to not waste effort if the value is never written out
+ this.term.clear();
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ // Clear previous value
+ this.node = null;
+ this.term.clear();
+
+ // Read in the new value
+ int termLength = input.readInt();
+ byte[] buffer = new byte[termLength];
+ input.readFully(buffer);
+ try {
+ ThriftConverter.fromBytes(buffer, this.term);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+
+ // Note that we don't convert it back into a Node at this time
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ // May not yet have prepared the Thrift term
+ if (!this.term.isSet()) {
+ if (this.node == null) {
+ this.term.setUndefined(TRDF.UNDEF);
+ } else {
+ ThriftConvert.toThrift(this.node, null, this.term, false);
+ }
+ }
+
+ // Write out the Thrift term
+ byte[] buffer;
+ try {
+ buffer = ThriftConverter.toBytes(this.term);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ output.writeInt(buffer.length);
+ output.write(buffer);
+ }
+
+ @Override
+ public int compareTo(NodeWritable other) {
+ // Use get() rather than accessing the field directly because the node
+ // field is lazily instantiated from the Thrift term
+ return NodeUtils.compareRDFTerms(this.get(), other.get());
+ }
+
+ @Override
+ public String toString() {
+ // Use get() rather than accessing the field directly because the node
+ // field is lazily instantiated from the Thrift term
+ Node n = this.get();
+ if (n == null)
+ return "";
+ return n.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ // Use get() rather than accessing the field directly because the node
+ // field is lazily instantiated from the Thrift term
+ Node n = this.get();
+ return n != null ? this.get().hashCode() : 0;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof NodeWritable))
+ return false;
+ return this.compareTo((NodeWritable) other) == 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java
new file mode 100644
index 0000000..3d9dd00
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/QuadWritable.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.jena.hadoop.rdf.types.comparators.SimpleBinaryComparator;
+import org.apache.jena.hadoop.rdf.types.converters.ThriftConverter;
+import org.apache.jena.riot.thrift.ThriftConvert;
+import org.apache.jena.riot.thrift.wire.RDF_Quad;
+import org.apache.thrift.TException;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * A writable quad
+ */
+public class QuadWritable extends AbstractNodeTupleWritable<Quad> {
+
+ static {
+ WritableComparator.define(QuadWritable.class, new SimpleBinaryComparator());
+ }
+
+ private RDF_Quad quad = new RDF_Quad();
+
+ /**
+ * Creates a new empty instance
+ */
+ public QuadWritable() {
+ this(null);
+ }
+
+ /**
+ * Creates a new instance with the given value
+ *
+ * @param q
+ * Quad
+ */
+ public QuadWritable(Quad q) {
+ super(q);
+ }
+
+ /**
+ * Creates a new instance from the given input
+ *
+ * @param input
+ * Input
+ * @return New instance
+ * @throws IOException
+ */
+ public static QuadWritable read(DataInput input) throws IOException {
+ QuadWritable q = new QuadWritable();
+ q.readFields(input);
+ return q;
+ }
+
+ @Override
+ public void set(Quad tuple) {
+ super.set(tuple);
+ this.quad.clear();
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ this.quad.clear();
+ int tripleLength = input.readInt();
+ byte[] buffer = new byte[tripleLength];
+ input.readFully(buffer);
+ try {
+ ThriftConverter.fromBytes(buffer, this.quad);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ this.setInternal(new Quad(ThriftConvert.convert(this.quad.getG()), ThriftConvert.convert(this.quad.getS()),
+ ThriftConvert.convert(this.quad.getP()), ThriftConvert.convert(this.quad.getO())));
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ if (this.get() == null)
+ throw new IOException(
+ "Null quads cannot be written using this class, consider using NodeTupleWritable instead");
+
+ // May not have yet prepared the Thrift triple
+ if (!this.quad.isSetS()) {
+ Quad tuple = this.get();
+ this.quad.setG(ThriftConvert.convert(tuple.getGraph(), false));
+ this.quad.setS(ThriftConvert.convert(tuple.getSubject(), false));
+ this.quad.setP(ThriftConvert.convert(tuple.getPredicate(), false));
+ this.quad.setO(ThriftConvert.convert(tuple.getObject(), false));
+ }
+
+ byte[] buffer;
+ try {
+ buffer = ThriftConverter.toBytes(this.quad);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ output.writeInt(buffer.length);
+ output.write(buffer);
+ }
+
+ @Override
+ protected Quad createTuple(Node[] ns) {
+ if (ns.length != 4)
+ throw new IllegalArgumentException(String.format(
+ "Incorrect number of nodes to form a quad - got %d but expected 4", ns.length));
+ return new Quad(ns[0], ns[1], ns[2], ns[3]);
+ }
+
+ @Override
+ protected Node[] createNodes(Quad tuple) {
+ return new Node[] { tuple.getGraph(), tuple.getSubject(), tuple.getPredicate(), tuple.getObject() };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java
new file mode 100644
index 0000000..a17052b
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/TripleWritable.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.jena.hadoop.rdf.types.comparators.SimpleBinaryComparator;
+import org.apache.jena.hadoop.rdf.types.converters.ThriftConverter;
+import org.apache.jena.riot.thrift.ThriftConvert;
+import org.apache.jena.riot.thrift.wire.RDF_Triple;
+import org.apache.thrift.TException;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.Triple;
+
+/**
+ * A writable triple
+ *
+ *
+ *
+ */
+public class TripleWritable extends AbstractNodeTupleWritable<Triple> {
+
+ static {
+ WritableComparator.define(TripleWritable.class, new SimpleBinaryComparator());
+ }
+
+ private RDF_Triple triple = new RDF_Triple();
+
+ /**
+ * Creates a new instance using the default NTriples node formatter
+ */
+ public TripleWritable() {
+ this(null);
+ }
+
+ /**
+ * Creates a new instance with a given value that uses a specific node
+ * formatter
+ *
+ * @param t
+ * Triple
+ */
+ public TripleWritable(Triple t) {
+ super(t);
+ }
+
+ /**
+ * Creates a new instance from the given input
+ *
+ * @param input
+ * Input
+ * @return New instance
+ * @throws IOException
+ */
+ public static TripleWritable read(DataInput input) throws IOException {
+ TripleWritable t = new TripleWritable();
+ t.readFields(input);
+ return t;
+ }
+
+ @Override
+ public void set(Triple tuple) {
+ super.set(tuple);
+ this.triple.clear();
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ this.triple.clear();
+ int tripleLength = input.readInt();
+ byte[] buffer = new byte[tripleLength];
+ input.readFully(buffer);
+ try {
+ ThriftConverter.fromBytes(buffer, this.triple);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ this.setInternal(new Triple(ThriftConvert.convert(this.triple.getS()),
+ ThriftConvert.convert(this.triple.getP()), ThriftConvert.convert(this.triple.getO())));
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ if (this.get() == null)
+ throw new IOException(
+ "Null triples cannot be written using this class, consider using NodeTupleWritable instead");
+
+ // May not have yet prepared the Thrift triple
+ if (!this.triple.isSetS()) {
+ Triple tuple = this.get();
+ this.triple.setS(ThriftConvert.convert(tuple.getSubject(), false));
+ this.triple.setP(ThriftConvert.convert(tuple.getPredicate(), false));
+ this.triple.setO(ThriftConvert.convert(tuple.getObject(), false));
+ }
+
+ byte[] buffer;
+ try {
+ buffer = ThriftConverter.toBytes(this.triple);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ output.writeInt(buffer.length);
+ output.write(buffer);
+ }
+
+ @Override
+ protected Triple createTuple(Node[] ns) {
+ if (ns.length != 3)
+ throw new IllegalArgumentException(String.format(
+ "Incorrect number of nodes to form a triple - got %d but expected 3", ns.length));
+ return new Triple(ns[0], ns[1], ns[2]);
+ }
+
+ @Override
+ protected Node[] createNodes(Triple tuple) {
+ return new Node[] { tuple.getSubject(), tuple.getPredicate(), tuple.getObject() };
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java
new file mode 100644
index 0000000..6c46714
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/comparators/SimpleBinaryComparator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types.comparators;
+
+import org.apache.hadoop.io.WritableComparator;
+
+/**
+ * A general purpose comparator that may be used with any types which can be
+ * compared directly on their binary encodings
+ */
+public class SimpleBinaryComparator extends WritableComparator {
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java
new file mode 100644
index 0000000..0675afc
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/main/java/org/apache/jena/hadoop/rdf/types/converters/ThriftConverter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.types.converters;
+
+import java.io.ByteArrayOutputStream;
+
+import org.apache.jena.riot.thrift.wire.RDF_Quad;
+import org.apache.jena.riot.thrift.wire.RDF_Term;
+import org.apache.jena.riot.thrift.wire.RDF_Triple;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TMemoryInputTransport;
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * Helper for converting between the binary representation of Nodes, Triples and
+ * Quads and their Jena API equivalents
+ *
+ */
+public class ThriftConverter {
+
+ private static ThreadLocal<TMemoryInputTransport> inputTransports = new ThreadLocal<>();
+ private static ThreadLocal<TProtocol> inputProtocols = new ThreadLocal<>();
+
+ private static ThreadLocal<ByteArrayOutputStream> outputStreams = new ThreadLocal<>();
+ private static ThreadLocal<TTransport> outputTransports = new ThreadLocal<>();
+ private static ThreadLocal<TProtocol> outputProtocols = new ThreadLocal<>();
+
+ private static TMemoryInputTransport getInputTransport() {
+ TMemoryInputTransport transport = inputTransports.get();
+ if (transport != null)
+ return transport;
+
+ transport = new TMemoryInputTransport();
+ inputTransports.set(transport);
+ return transport;
+ }
+
+ private static TProtocol getInputProtocol() {
+ TProtocol protocol = inputProtocols.get();
+ if (protocol != null)
+ return protocol;
+
+ protocol = new TCompactProtocol(getInputTransport());
+ inputProtocols.set(protocol);
+ return protocol;
+ }
+
+ private static ByteArrayOutputStream getOutputStream() {
+ ByteArrayOutputStream output = outputStreams.get();
+ if (output != null)
+ return output;
+
+ output = new ByteArrayOutputStream();
+ outputStreams.set(output);
+ return output;
+ }
+
+ private static TTransport getOutputTransport() {
+ TTransport transport = outputTransports.get();
+ if (transport != null)
+ return transport;
+
+ transport = new TIOStreamTransport(getOutputStream());
+ outputTransports.set(transport);
+ return transport;
+ }
+
+ private static TProtocol getOutputProtocol() {
+ TProtocol protocol = outputProtocols.get();
+ if (protocol != null)
+ return protocol;
+
+ protocol = new TCompactProtocol(getOutputTransport());
+ outputProtocols.set(protocol);
+ return protocol;
+ }
+
+ public static byte[] toBytes(RDF_Term term) throws TException {
+ ByteArrayOutputStream output = getOutputStream();
+ output.reset();
+
+ TProtocol protocol = getOutputProtocol();
+ term.write(protocol);
+
+ return output.toByteArray();
+ }
+
+ public static void fromBytes(byte[] bs, RDF_Term term) throws TException {
+ TMemoryInputTransport transport = getInputTransport();
+ transport.reset(bs);
+ TProtocol protocol = getInputProtocol();
+ term.read(protocol);
+ }
+
+ public static void fromBytes(byte[] buffer, RDF_Triple triple) throws TException {
+ TMemoryInputTransport transport = getInputTransport();
+ transport.reset(buffer);
+ TProtocol protocol = getInputProtocol();
+ triple.read(protocol);
+ }
+
+ public static byte[] toBytes(RDF_Triple triple) throws TException {
+ ByteArrayOutputStream output = getOutputStream();
+ output.reset();
+
+ TProtocol protocol = getOutputProtocol();
+ triple.write(protocol);
+
+ return output.toByteArray();
+ }
+
+ public static void fromBytes(byte[] buffer, RDF_Quad quad) throws TException {
+ TMemoryInputTransport transport = getInputTransport();
+ transport.reset(buffer);
+ TProtocol protocol = getInputProtocol();
+ quad.read(protocol);
+ }
+
+ public static byte[] toBytes(RDF_Quad quad) throws TException {
+ ByteArrayOutputStream output = getOutputStream();
+ output.reset();
+
+ TProtocol protocol = getOutputProtocol();
+ quad.write(protocol);
+
+ return output.toByteArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java b/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java
new file mode 100644
index 0000000..7214b14
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/CharacteristicTests.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.types;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.jena.hadoop.rdf.types.CharacteristicSetWritable;
+import org.apache.jena.hadoop.rdf.types.CharacteristicWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.NodeFactory;
+
+/**
+ * Tests for {@link CharacteristicWritable} and
+ * {@link CharacteristicSetWritable}
+ *
+ *
+ *
+ */
+public class CharacteristicTests {
+
+ /**
+ * Checks whether a writable round trips successfully
+ *
+ * @param cw
+ * Characteristic writable
+ * @throws IOException
+ */
+ private void checkRoundTrip(CharacteristicWritable cw) throws IOException {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutputStream output = new DataOutputStream(outputStream);
+ cw.write(output);
+
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+ DataInputStream input = new DataInputStream(inputStream);
+ CharacteristicWritable actual = CharacteristicWritable.read(input);
+ Assert.assertEquals(cw, actual);
+ }
+
+ /**
+ * Tests characteristic round tripping
+ *
+ * @throws IOException
+ */
+ @Test
+ public void characteristic_writable_01() throws IOException {
+ Node n = NodeFactory.createURI("http://example.org");
+ CharacteristicWritable expected = new CharacteristicWritable(n);
+ Assert.assertEquals(1, expected.getCount().get());
+
+ this.checkRoundTrip(expected);
+ }
+
+ /**
+ * Tests characteristic properties
+ *
+ * @throws IOException
+ */
+ @Test
+ public void characteristic_writable_02() throws IOException {
+ Node n = NodeFactory.createURI("http://example.org");
+ CharacteristicWritable cw1 = new CharacteristicWritable(n);
+ CharacteristicWritable cw2 = new CharacteristicWritable(n, 100);
+ this.checkRoundTrip(cw1);
+ this.checkRoundTrip(cw2);
+
+ // Should still be equal since equality is only on the node not the
+ // count
+ Assert.assertEquals(cw1, cw2);
+ }
+
+ /**
+ * Tests characteristic properties
+ *
+ * @throws IOException
+ */
+ @Test
+ public void characteristic_writable_03() throws IOException {
+ CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"));
+ CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other"));
+ this.checkRoundTrip(cw1);
+ this.checkRoundTrip(cw2);
+
+ // Should not be equal as different nodes
+ Assert.assertNotEquals(cw1, cw2);
+ }
+
+ /**
+ * Checks that a writable round trips
+ *
+ * @param set
+ * Characteristic set
+ * @throws IOException
+ */
+ private void checkRoundTrip(CharacteristicSetWritable set) throws IOException {
+ // Test round trip
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutputStream output = new DataOutputStream(outputStream);
+ set.write(output);
+
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+ DataInputStream input = new DataInputStream(inputStream);
+ CharacteristicSetWritable actual = CharacteristicSetWritable.read(input);
+ Assert.assertEquals(set, actual);
+ }
+
+ /**
+ * Checks a characteristic set
+ *
+ * @param set
+ * Set
+ * @param expectedItems
+ * Expected number of characteristics
+ * @param expectedCounts
+ * Expected counts for characteristics
+ */
+ protected final void checkCharacteristicSet(CharacteristicSetWritable set, int expectedItems, long[] expectedCounts) {
+ Assert.assertEquals(expectedItems, set.size());
+ Assert.assertEquals(expectedItems, expectedCounts.length);
+ Iterator<CharacteristicWritable> iter = set.getCharacteristics();
+ int i = 0;
+ while (iter.hasNext()) {
+ CharacteristicWritable cw = iter.next();
+ Assert.assertEquals(expectedCounts[i], cw.getCount().get());
+ i++;
+ }
+ }
+
+ /**
+ * Tests characteristic sets
+ *
+ * @throws IOException
+ */
+ @Test
+ public void characteristic_set_writable_01() throws IOException {
+ CharacteristicSetWritable set = new CharacteristicSetWritable();
+
+ // Add some characteristics
+ CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"));
+ CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other"));
+ set.add(cw1);
+ set.add(cw2);
+ this.checkCharacteristicSet(set, 2, new long[] { 1, 1 });
+ this.checkRoundTrip(set);
+ }
+
+ /**
+ * Tests characteristic sets
+ *
+ * @throws IOException
+ */
+ @Test
+ public void characteristic_set_writable_02() throws IOException {
+ CharacteristicSetWritable set = new CharacteristicSetWritable();
+
+ // Add some characteristics
+ CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"));
+ CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"), 2);
+ set.add(cw1);
+ set.add(cw2);
+ this.checkCharacteristicSet(set, 1, new long[] { 3 });
+ this.checkRoundTrip(set);
+ }
+
+ /**
+ * Tests characteristic sets
+ *
+ * @throws IOException
+ */
+ @Test
+ public void characteristic_set_writable_03() throws IOException {
+ CharacteristicSetWritable set1 = new CharacteristicSetWritable();
+ CharacteristicSetWritable set2 = new CharacteristicSetWritable();
+
+ // Add some characteristics
+ CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"));
+ CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other"));
+ set1.add(cw1);
+ set2.add(cw2);
+ this.checkCharacteristicSet(set1, 1, new long[] { 1 });
+ this.checkCharacteristicSet(set2, 1, new long[] { 1 });
+ this.checkRoundTrip(set1);
+ this.checkRoundTrip(set2);
+
+ Assert.assertNotEquals(set1, set2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java b/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java
new file mode 100644
index 0000000..a70dfb0
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-common/src/test/java/org/apache/jena/hadoop/rdf/io/types/RdfTypesTest.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.types;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.jena.atlas.lib.Tuple;
+import org.apache.jena.hadoop.rdf.types.NodeTupleWritable;
+import org.apache.jena.hadoop.rdf.types.NodeWritable;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hp.hpl.jena.datatypes.xsd.XSDDatatype;
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.NodeFactory;
+import com.hp.hpl.jena.graph.Triple;
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * Tests for the various RDF types defined by the
+ * {@link org.apache.jena.hadoop.rdf.types} package
+ *
+ *
+ *
+ */
+public class RdfTypesTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RdfTypesTest.class);
+
+ private ByteArrayOutputStream outputStream;
+ private ByteArrayInputStream inputStream;
+
+ /**
+ * Prepare for output
+ *
+ * @return Data output
+ */
+ private DataOutput prepareOutput() {
+ this.outputStream = new ByteArrayOutputStream();
+ return new DataOutputStream(this.outputStream);
+ }
+
+ /**
+ * Prepare for input from the previously written output
+ *
+ * @return Data Input
+ */
+ private DataInput prepareInput() {
+ this.inputStream = new ByteArrayInputStream(this.outputStream.toByteArray());
+ return new DataInputStream(this.inputStream);
+ }
+
+ /**
+ * Prepare for input from the given data
+ *
+ * @param data
+ * Data
+ * @return Data Input
+ */
+ @SuppressWarnings("unused")
+ private DataInput prepareInput(byte[] data) {
+ this.inputStream = new ByteArrayInputStream(data);
+ return new DataInputStream(this.inputStream);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private <T extends WritableComparable> void testWriteRead(T writable, T expected) throws IOException, InstantiationException, IllegalAccessException,
+ ClassNotFoundException {
+ // Write out data
+ DataOutput output = this.prepareOutput();
+ writable.write(output);
+
+ // Read back in data
+ DataInput input = this.prepareInput();
+ T actual = (T) Class.forName(writable.getClass().getName()).newInstance();
+ actual.readFields(input);
+
+ LOG.info("Original = " + writable.toString());
+ LOG.info("Round Tripped = " + actual.toString());
+
+ // Check equivalent
+ Assert.assertEquals(0, expected.compareTo(actual));
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_null() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = null;
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ @Ignore
+ public void node_writable_variable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createVariable("x");
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ @Ignore
+ public void node_writable_variable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createVariable("really-log-variable-name-asddsfr4545egfdgdfgfdgdtgvdg-dfgfdgdfgdfgdfg4-dfvdfgdfgdfgfdgfdgdfgdfgfdg");
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_uri_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createURI("http://example.org");
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_uri_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createURI("http://user:password@example.org/some/path?key=value#id");
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_literal_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createLiteral("simple");
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_literal_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createLiteral("language", "en", null);
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_literal_03() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createLiteral("string", XSDDatatype.XSDstring);
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_literal_04() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createLiteral("1234", XSDDatatype.XSDinteger);
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_literal_05() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createLiteral("123.4", XSDDatatype.XSDdecimal);
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_literal_06() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createLiteral("12.3e4", XSDDatatype.XSDdouble);
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_literal_07() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createLiteral("true", XSDDatatype.XSDboolean);
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_bnode_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createAnon();
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ }
+
+ /**
+ * Basic node writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void node_writable_bnode_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Node n = NodeFactory.createAnon();
+ NodeWritable nw = new NodeWritable(n);
+ testWriteRead(nw, nw);
+ NodeWritable nw2 = new NodeWritable(n);
+ testWriteRead(nw2, nw2);
+
+ Assert.assertEquals(0, nw.compareTo(nw2));
+ }
+
+ /**
+ * Basic triple writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void triple_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Triple t = new Triple(NodeFactory.createURI("http://example"), NodeFactory.createURI("http://predicate"), NodeFactory.createLiteral("value"));
+ TripleWritable tw = new TripleWritable(t);
+ testWriteRead(tw, tw);
+ }
+
+ /**
+ * Basic triple writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void triple_writable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Triple t = new Triple(NodeFactory.createAnon(), NodeFactory.createURI("http://predicate"), NodeFactory.createLiteral("value"));
+ TripleWritable tw = new TripleWritable(t);
+ testWriteRead(tw, tw);
+ }
+
+ /**
+ * Basic quad writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void quad_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Quad q = new Quad(Quad.defaultGraphNodeGenerated, NodeFactory.createURI("http://example"), NodeFactory.createURI("http://predicate"),
+ NodeFactory.createLiteral("value"));
+ QuadWritable qw = new QuadWritable(q);
+ testWriteRead(qw, qw);
+ }
+
+ /**
+ * Basic quad writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void quad_writable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Quad q = new Quad(Quad.defaultGraphNodeGenerated, NodeFactory.createAnon(), NodeFactory.createURI("http://predicate"),
+ NodeFactory.createLiteral("value"));
+ QuadWritable qw = new QuadWritable(q);
+ testWriteRead(qw, qw);
+ }
+
+ /**
+ * Basic tuple writable round tripping test
+ *
+ * @throws IOException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void tuple_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+ Tuple<Node> t = Tuple.createTuple(NodeFactory.createURI("http://one"), NodeFactory.createURI("http://two"), NodeFactory.createLiteral("value"),
+ NodeFactory.createLiteral("foo"), NodeFactory.createURI("http://three"));
+ NodeTupleWritable tw = new NodeTupleWritable(t);
+ testWriteRead(tw, tw);
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/pom.xml
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/pom.xml b/jena-hadoop-rdf/jena-elephas-io/pom.xml
new file mode 100644
index 0000000..2be37f9
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/pom.xml
@@ -0,0 +1,67 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.jena</groupId>
+ <artifactId>jena-elephas</artifactId>
+ <version>0.9.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>jena-elephas-io</artifactId>
+ <name>Apache Jena - Elephas - I/O</name>
+ <description>RDF Input/Output formats library for Hadoop</description>
+
+ <!-- Note that versions are managed by parent POMs -->
+ <dependencies>
+ <!-- Internal Project Dependencies -->
+ <dependency>
+ <groupId>org.apache.jena</groupId>
+ <artifactId>jena-hadoop-rdf-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- Hadoop Dependencies -->
+ <!-- Note these will be provided on the Hadoop cluster hence the provided
+ scope -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Jena dependencies -->
+ <dependency>
+ <groupId>org.apache.jena</groupId>
+ <artifactId>jena-arq</artifactId>
+ </dependency>
+
+ <!-- Test Dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java
new file mode 100644
index 0000000..5c1b41c
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/HadoopIOConstants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io;
+
+/**
+ * Hadoop IO related constants
+ *
+ *
+ *
+ */
+public class HadoopIOConstants {
+
+ /**
+ * Private constructor prevents instantiation
+ */
+ private HadoopIOConstants() {
+ }
+
+ /**
+ * Map Reduce configuration setting for max line length
+ */
+ public static final String MAX_LINE_LENGTH = "mapreduce.input.linerecordreader.line.maxlength";
+
+ /**
+ * Run ID
+ */
+ public static final String RUN_ID = "runId";
+
+ /**
+ * Compression codecs to use
+ */
+ public static final String IO_COMPRESSION_CODECS = "io.compression.codecs";
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java
new file mode 100644
index 0000000..27c2bb2
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/RdfIOConstants.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io;
+
+import java.io.IOException;
+
+/**
+ * RDF IO related constants
+ *
+ *
+ *
+ */
+public class RdfIOConstants {
+
+ /**
+ * Private constructor prevents instantiation
+ */
+ private RdfIOConstants() {
+ }
+
+ /**
+ * Configuration key used to set whether bad tuples are ignored. This is the
+ * default behaviour, when explicitly set to {@code false} bad tuples will
+ * result in {@link IOException} being thrown by the relevant record
+ * readers.
+ */
+ public static final String INPUT_IGNORE_BAD_TUPLES = "rdf.io.input.ignore-bad-tuples";
+
+ /**
+ * Configuration key used to set the batch size used for RDF output formats
+ * that take a batched writing approach. Default value is given by the
+ * constant {@link #DEFAULT_OUTPUT_BATCH_SIZE}.
+ */
+ public static final String OUTPUT_BATCH_SIZE = "rdf.io.output.batch-size";
+
+ /**
+ * Default batch size for batched output formats
+ */
+ public static final long DEFAULT_OUTPUT_BATCH_SIZE = 10000;
+
+ /**
+ * Configuration key used to control behaviour with regards to how blank
+ * nodes are handled.
+ * <p>
+ * The default behaviour is that blank nodes are file scoped which is what
+ * the RDF specifications require.
+ * </p>
+ * <p>
+ * However in the case of a multi-stage pipeline this behaviour can cause
+ * blank nodes to diverge over several jobs and introduce spurious blank
+ * nodes over time. This is described in <a
+ * href="https://issues.apache.org/jira/browse/JENA-820">JENA-820</a> and
+ * enabling this flag for jobs in your pipeline allow you to work around
+ * this problem.
+ * </p>
+ * <h3>Warning</h3> You should only enable this flag for jobs that take in
+ * RDF output originating from previous jobs since our normal blank node
+ * allocation policy ensures that blank nodes will be file scoped and unique
+ * over all files (barring unfortunate hasing collisions). If you enable
+ * this for jobs that take in RDF originating from other sources you may
+ * incorrectly conflate blank nodes that are supposed to distinct and
+ * separate nodes.
+ */
+ public static final String GLOBAL_BNODE_IDENTITY = "rdf.io.input.bnodes.global-identity";
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java
new file mode 100644
index 0000000..1fcb030
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract line based input format that reuses the machinery from
+ * {@link NLineInputFormat} to calculate the splits
+ *
+ *
+ *
+ * @param <TKey>
+ * Key type
+ * @param <TValue>
+ * Value type
+ */
+public abstract class AbstractNLineFileInputFormat<TKey, TValue> extends FileInputFormat<TKey, TValue> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNLineFileInputFormat.class);
+
+ /**
+ * Logically splits the set of input files for the job, splits N lines of
+ * the input as one split.
+ *
+ * @see FileInputFormat#getSplits(JobContext)
+ */
+ public final List<InputSplit> getSplits(JobContext job) throws IOException {
+ boolean debug = LOGGER.isDebugEnabled();
+ if (debug && FileInputFormat.getInputDirRecursive(job)) {
+ LOGGER.debug("Recursive searching for input data is enabled");
+ }
+
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ int numLinesPerSplit = NLineInputFormat.getNumLinesPerSplit(job);
+ for (FileStatus status : listStatus(job)) {
+ if (debug) {
+ LOGGER.debug("Determining how to split input file/directory {}", status.getPath());
+ }
+ splits.addAll(NLineInputFormat.getSplitsForFile(status, job.getConfiguration(), numLinesPerSplit));
+ }
+ return splits;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java
new file mode 100644
index 0000000..e561cdb
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * Abstract implementation of a while file input format where each file is a
+ * single split
+ *
+ *
+ *
+ * @param <TKey>
+ * Key type
+ * @param <TValue>
+ * Value type
+ */
+public abstract class AbstractWholeFileInputFormat<TKey, TValue> extends FileInputFormat<TKey, TValue> {
+
+ @Override
+ protected final boolean isSplitable(JobContext context, Path filename) {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java
new file mode 100644
index 0000000..b8fdbd5
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/QuadsInputFormat.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.readers.QuadsReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+
+/**
+ * RDF input format that can handle any RDF quads format that ARQ supports
+ * selecting the format to use for each file based upon the file extension
+ *
+ *
+ *
+ */
+public class QuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+ @Override
+ public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new QuadsReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java
new file mode 100644
index 0000000..03f394a
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesInputFormat.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.readers.TriplesReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+/**
+ * RDF input format that can handle any RDF triples format that ARQ supports
+ * selecting the format to use for each file based upon the file extension
+ */
+public class TriplesInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+ @Override
+ public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new TriplesReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java
new file mode 100644
index 0000000..bfd643e
--- /dev/null
+++ b/jena-hadoop-rdf/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.readers.TriplesOrQuadsReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+
+/**
+ * RDF input format that can handle any RDF triple/quads format that ARQ
+ * supports selecting the format to use for each file based upon the file
+ * extension. Triples are converted into quads in the default graph.
+ *
+ *
+ *
+ */
+public class TriplesOrQuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+ @Override
+ public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new TriplesOrQuadsReader();
+ }
+
+}