You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by rv...@apache.org on 2014/04/02 11:20:58 UTC

svn commit: r1583942 [2/9] - in /jena/Experimental/hadoop-rdf: ./ hadoop-rdf-common/ hadoop-rdf-common/src/ hadoop-rdf-common/src/main/ hadoop-rdf-common/src/main/java/ hadoop-rdf-common/src/main/java/com/ hadoop-rdf-common/src/main/java/com/yarcdata/ ...

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-common/src/test/java/com/yarcdata/urika/hadoop/rdf/io/types/CharacteristicTests.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-common/src/test/java/com/yarcdata/urika/hadoop/rdf/io/types/CharacteristicTests.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-common/src/test/java/com/yarcdata/urika/hadoop/rdf/io/types/CharacteristicTests.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-common/src/test/java/com/yarcdata/urika/hadoop/rdf/io/types/CharacteristicTests.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.types;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.NodeFactory;
+import com.yarcdata.urika.hadoop.rdf.types.CharacteristicSetWritable;
+import com.yarcdata.urika.hadoop.rdf.types.CharacteristicWritable;
+
+/**
+ * Tests for {@link CharacteristicWritable} and
+ * {@link CharacteristicSetWritable}
+ * 
+ * @author rvesse
+ * 
+ */
+public class CharacteristicTests {
+
+    /**
+     * Checks whether a writable round trips successfully
+     * 
+     * @param cw
+     *            Characteristic writable
+     * @throws IOException
+     */
+    private void checkRoundTrip(CharacteristicWritable cw) throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream output = new DataOutputStream(outputStream);
+        cw.write(output);
+
+        ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+        DataInputStream input = new DataInputStream(inputStream);
+        CharacteristicWritable actual = CharacteristicWritable.read(input);
+        Assert.assertEquals(cw, actual);
+    }
+
+    /**
+     * Tests characteristic round tripping
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void characteristic_writable_01() throws IOException {
+        Node n = NodeFactory.createURI("http://example.org");
+        CharacteristicWritable expected = new CharacteristicWritable(n);
+        Assert.assertEquals(1, expected.getCount().get());
+
+        this.checkRoundTrip(expected);
+    }
+
+    /**
+     * Tests characteristic properties
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void characteristic_writable_02() throws IOException {
+        Node n = NodeFactory.createURI("http://example.org");
+        CharacteristicWritable cw1 = new CharacteristicWritable(n);
+        CharacteristicWritable cw2 = new CharacteristicWritable(n, 100);
+        this.checkRoundTrip(cw1);
+        this.checkRoundTrip(cw2);
+
+        // Should still be equal since equality is only on the node not the
+        // count
+        Assert.assertEquals(cw1, cw2);
+    }
+
+    /**
+     * Tests characteristic properties
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void characteristic_writable_03() throws IOException {
+        CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"));
+        CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other"));
+        this.checkRoundTrip(cw1);
+        this.checkRoundTrip(cw2);
+
+        // Should not be equal as different nodes
+        Assert.assertNotEquals(cw1, cw2);
+    }
+
+    /**
+     * Checks that a writable round trips
+     * 
+     * @param set
+     *            Characteristic set
+     * @throws IOException
+     */
+    private void checkRoundTrip(CharacteristicSetWritable set) throws IOException {
+        // Test round trip
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream output = new DataOutputStream(outputStream);
+        set.write(output);
+
+        ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+        DataInputStream input = new DataInputStream(inputStream);
+        CharacteristicSetWritable actual = CharacteristicSetWritable.read(input);
+        Assert.assertEquals(set, actual);
+    }
+
+    /**
+     * Checks a characteristic set
+     * 
+     * @param set
+     *            Set
+     * @param expectedItems
+     *            Expected number of characteristics
+     * @param expectedCounts
+     *            Expected counts for characteristics
+     */
+    protected final void checkCharacteristicSet(CharacteristicSetWritable set, int expectedItems, long[] expectedCounts) {
+        Assert.assertEquals(expectedItems, set.size());
+        Assert.assertEquals(expectedItems, expectedCounts.length);
+        Iterator<CharacteristicWritable> iter = set.getCharacteristics();
+        int i = 0;
+        while (iter.hasNext()) {
+            CharacteristicWritable cw = iter.next();
+            Assert.assertEquals(expectedCounts[i], cw.getCount().get());
+            i++;
+        }
+    }
+
+    /**
+     * Tests characteristic sets
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void characteristic_set_writable_01() throws IOException {
+        CharacteristicSetWritable set = new CharacteristicSetWritable();
+
+        // Add some characteristics
+        CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"));
+        CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other"));
+        set.add(cw1);
+        set.add(cw2);
+        this.checkCharacteristicSet(set, 2, new long[] { 1, 1 });
+        this.checkRoundTrip(set);
+    }
+
+    /**
+     * Tests characteristic sets
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void characteristic_set_writable_02() throws IOException {
+        CharacteristicSetWritable set = new CharacteristicSetWritable();
+
+        // Add some characteristics
+        CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"));
+        CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"), 2);
+        set.add(cw1);
+        set.add(cw2);
+        this.checkCharacteristicSet(set, 1, new long[] { 3 });
+        this.checkRoundTrip(set);
+    }
+    
+    /**
+     * Tests characteristic sets
+     * 
+     * @throws IOException
+     */
+    @Test
+    public void characteristic_set_writable_03() throws IOException {
+        CharacteristicSetWritable set1 = new CharacteristicSetWritable();
+        CharacteristicSetWritable set2 = new CharacteristicSetWritable();
+
+        // Add some characteristics
+        CharacteristicWritable cw1 = new CharacteristicWritable(NodeFactory.createURI("http://example.org"));
+        CharacteristicWritable cw2 = new CharacteristicWritable(NodeFactory.createURI("http://example.org/other"));
+        set1.add(cw1);
+        set2.add(cw2);
+        this.checkCharacteristicSet(set1, 1, new long[] { 1 });
+        this.checkCharacteristicSet(set2, 1, new long[] { 1 });
+        this.checkRoundTrip(set1);
+        this.checkRoundTrip(set2);
+        
+        Assert.assertNotEquals(set1, set2);
+    }
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-common/src/test/java/com/yarcdata/urika/hadoop/rdf/io/types/RdfTypesTest.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-common/src/test/java/com/yarcdata/urika/hadoop/rdf/io/types/RdfTypesTest.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-common/src/test/java/com/yarcdata/urika/hadoop/rdf/io/types/RdfTypesTest.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-common/src/test/java/com/yarcdata/urika/hadoop/rdf/io/types/RdfTypesTest.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,350 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.types;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.jena.atlas.lib.Tuple;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hp.hpl.jena.datatypes.xsd.XSDDatatype;
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.NodeFactory;
+import com.hp.hpl.jena.graph.Triple;
+import com.hp.hpl.jena.sparql.core.Quad;
+import com.yarcdata.urika.hadoop.rdf.types.NodeTupleWritable;
+import com.yarcdata.urika.hadoop.rdf.types.NodeWritable;
+import com.yarcdata.urika.hadoop.rdf.types.QuadWritable;
+import com.yarcdata.urika.hadoop.rdf.types.TripleWritable;
+
+/**
+ * Tests for the various RDF types defined by the
+ * {@link com.yarcdata.urika.hadoop.rdf.types} package
+ * 
+ * @author rvesse
+ * 
+ */
+public class RdfTypesTest {
+    
+    private static final Logger LOG = LoggerFactory.getLogger(RdfTypesTest.class);
+
+    private ByteArrayOutputStream outputStream;
+    private ByteArrayInputStream inputStream;
+
+    /**
+     * Prepare for output
+     * 
+     * @return Data output
+     */
+    private DataOutput prepareOutput() {
+        this.outputStream = new ByteArrayOutputStream();
+        return new DataOutputStream(this.outputStream);
+    }
+
+    /**
+     * Prepare for input from the previously written output
+     * 
+     * @return Data Input
+     */
+    private DataInput prepareInput() {
+        this.inputStream = new ByteArrayInputStream(this.outputStream.toByteArray());
+        return new DataInputStream(this.inputStream);
+    }
+
+    /**
+     * Prepare for input from the given data
+     * 
+     * @param data
+     *            Data
+     * @return Data Input
+     */
+    @SuppressWarnings("unused")
+    private DataInput prepareInput(byte[] data) {
+        this.inputStream = new ByteArrayInputStream(data);
+        return new DataInputStream(this.inputStream);
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private <T extends WritableComparable> void testWriteRead(T writable, T expected) throws IOException, InstantiationException,
+            IllegalAccessException, ClassNotFoundException {
+        // Write out data
+        DataOutput output = this.prepareOutput();
+        writable.write(output);
+
+        // Read back in data
+        DataInput input = this.prepareInput();
+        T actual = (T) Class.forName(writable.getClass().getName()).newInstance();
+        actual.readFields(input);
+        
+        LOG.info("Original = " + writable.toString());
+        LOG.info("Round Tripped = " + actual.toString());
+
+        // Check equivalent
+        Assert.assertEquals(0, expected.compareTo(actual));
+    }
+
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_uri_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createURI("http://example.org");
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_uri_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createURI("http://user:password@example.org/some/path?key=value#id");
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+    
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_literal_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createLiteral("simple");
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+    
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_literal_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createLiteral("language", "en", null);
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+    
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_literal_03() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createLiteral("string", XSDDatatype.XSDstring);
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+    
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_literal_04() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createLiteral("1234", XSDDatatype.XSDinteger);
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+    
+    
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_literal_05() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createLiteral("123.4", XSDDatatype.XSDdecimal);
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+    
+    
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_literal_06() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createLiteral("12.3e4", XSDDatatype.XSDdouble);
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+    
+    
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_literal_07() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createLiteral("true", XSDDatatype.XSDboolean);
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+    
+    
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_bnode_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createAnon();
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+    }
+    
+    /**
+     * Basic node writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void node_writable_bnode_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Node n = NodeFactory.createAnon();
+        NodeWritable nw = new NodeWritable(n);
+        testWriteRead(nw, nw);
+        NodeWritable nw2 = new NodeWritable(n);
+        testWriteRead(nw2, nw2);
+        
+        Assert.assertEquals(0, nw.compareTo(nw2));
+    }
+    
+    /**
+     * Basic triple writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void triple_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Triple t = new Triple(NodeFactory.createURI("http://example"), NodeFactory.createURI("http://predicate"),
+                NodeFactory.createLiteral("value"));
+        TripleWritable tw = new TripleWritable(t);
+        testWriteRead(tw, tw);
+    }
+    
+    /**
+     * Basic triple writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void triple_writable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Triple t = new Triple(NodeFactory.createAnon(), NodeFactory.createURI("http://predicate"),
+                NodeFactory.createLiteral("value"));
+        TripleWritable tw = new TripleWritable(t);
+        testWriteRead(tw, tw);
+    }
+    
+    /**
+     * Basic quad writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void quad_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Quad q = new Quad(Quad.defaultGraphNodeGenerated, NodeFactory.createURI("http://example"), NodeFactory.createURI("http://predicate"),
+                NodeFactory.createLiteral("value"));
+        QuadWritable qw = new QuadWritable(q);
+        testWriteRead(qw, qw);
+    }
+    
+    /**
+     * Basic quad writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void quad_writable_02() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Quad q = new Quad(Quad.defaultGraphNodeGenerated, NodeFactory.createAnon(), NodeFactory.createURI("http://predicate"),
+                NodeFactory.createLiteral("value"));
+        QuadWritable qw = new QuadWritable(q);
+        testWriteRead(qw, qw);
+    }
+    
+    /**
+     * Basic tuple writable round tripping test
+     * 
+     * @throws IOException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public void tuple_writable_01() throws IOException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+        Tuple<Node> t = Tuple.createTuple(NodeFactory.createURI("http://one"), NodeFactory.createURI("http://two"),
+                NodeFactory.createLiteral("value"), NodeFactory.createLiteral("foo"), NodeFactory.createURI("http://three"));
+        NodeTupleWritable tw = new NodeTupleWritable(t);
+        testWriteRead(tw, tw);
+    }
+}

Propchange: jena/Experimental/hadoop-rdf/hadoop-rdf-io/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Apr  2 09:20:51 2014
@@ -0,0 +1,5 @@
+.classpath
+.project
+.settings
+target
+test-output

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/NOTICE
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/NOTICE?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/NOTICE (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/NOTICE Wed Apr  2 09:20:51 2014
@@ -0,0 +1,22 @@
+Apache Hadoop
+Copyright 2007-2013 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+-----------------------------------------------------------------------------------------------------------
+
+Apache Jena
+Copyright 2011-2013 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This product includes software developed by
+PluggedIn Software and under a BSD license.
+
+Portions of this software were originally based on the following:
+  - Copyright 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009 Hewlett-Packard Development Company, LP
+  - Copyright 2010, 2011 Epimorphics Ltd.
+  - Copyright 2010, 2011 Talis Systems Ltd.
+These have been licensed to the Apache Software Foundation under a software grant.
\ No newline at end of file

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/pom.xml
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/pom.xml?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/pom.xml (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/pom.xml Wed Apr  2 09:20:51 2014
@@ -0,0 +1,48 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>com.yarcdata.urika</groupId>
+		<artifactId>hadoop-rdf</artifactId>
+		<version>0.0.1-SNAPSHOT</version>
+	</parent>
+	<artifactId>hadoop-rdf-io</artifactId>
+	<name>Hadoop RDF I/O</name>
+	<description>Input/Output formats library for Hadoop</description>
+
+	<!-- Note that versions are managed by parent POMs -->
+	<dependencies>
+		<!-- Internal Project Dependencies -->
+		<dependency>
+			<groupId>com.yarcdata.urika</groupId>
+			<artifactId>hadoop-rdf-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	
+		<!-- Hadoop Dependencies -->
+		<!-- Note these will be provided on the Hadoop cluster hence the provided scope -->
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-common</artifactId>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-mapreduce-client-common</artifactId>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- Jena dependencies -->
+		<dependency>
+			<groupId>org.apache.jena</groupId>
+			<artifactId>jena-arq</artifactId>
+		</dependency>
+
+		<!-- Test Dependencies -->
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+</project>
\ No newline at end of file

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/HadoopIOConstants.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/HadoopIOConstants.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/HadoopIOConstants.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/HadoopIOConstants.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io;
+
+/**
+ * Hadoop IO related constants
+ * 
+ * @author rvesse
+ * 
+ */
+public class HadoopIOConstants {
+
+    /**
+     * Private constructor prevents instantiation
+     */
+    private HadoopIOConstants() {
+    }
+
+    /**
+     * Map Reduce configuration setting for max line length
+     */
+    public static final String MAX_LINE_LENGTH = "mapreduce.input.linerecordreader.line.maxlength";
+
+    /**
+     * Run ID
+     */
+    public static final String RUN_ID = "runId";
+    
+    /**
+     * Compression codecs to use
+     */
+    public static final String IO_COMPRESSION_CODECS = "io.compression.codecs";
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/RdfIOConstants.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/RdfIOConstants.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/RdfIOConstants.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/RdfIOConstants.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io;
+
+import java.io.IOException;
+
+/**
+ * RDF IO related constants
+ * 
+ * @author rvesse
+ * 
+ */
+public class RdfIOConstants {
+
+    /**
+     * Private constructor prevents instantiation
+     */
+    private RdfIOConstants() {
+    }
+
+    /**
+     * Configuration key used to set whether bad tuples are ignored. This is the
+     * default behaviour, when explicitly set to {@code false} bad tuples will
+     * result in {@link IOException} being thrown by the relevant record
+     * readers.
+     */
+    public static final String INPUT_IGNORE_BAD_TUPLES = "rdf.io.input.ignore-bad-tuples";
+
+    /**
+     * Configuration key used to set the batch size used for RDF output formats
+     * that take a batched writing approach. Default value is given by the
+     * constant {@link #DEFAULT_OUTPUT_BATCH_SIZE}.
+     */
+    public static final String OUTPUT_BATCH_SIZE = "rdf.io.output.batch-size";
+
+    /**
+     * Default batch size for batched output formats
+     */
+    public static final long DEFAULT_OUTPUT_BATCH_SIZE = 10000;
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/AbstractNLineFileInputFormat.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
+
+/**
+ * Abstract line based input format that reuses the machinery from
+ * {@link NLineInputFormat} to calculate the splits
+ * 
+ * @author rvesse
+ * 
+ * @param <TKey>
+ *            Key type
+ * @param <TValue>
+ *            Value type
+ */
+public abstract class AbstractNLineFileInputFormat<TKey, TValue> extends FileInputFormat<TKey, TValue> {
+
+    /**
+     * Logically splits the set of input files for the job, splits N lines of
+     * the input as one split.
+     * 
+     * @see FileInputFormat#getSplits(JobContext)
+     */
+    public final List<InputSplit> getSplits(JobContext job) throws IOException {
+        List<InputSplit> splits = new ArrayList<InputSplit>();
+        int numLinesPerSplit = NLineInputFormat.getNumLinesPerSplit(job);
+        for (FileStatus status : listStatus(job)) {
+            splits.addAll(NLineInputFormat.getSplitsForFile(status, job.getConfiguration(), numLinesPerSplit));
+        }
+        return splits;
+    }
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/AbstractWholeFileInputFormat.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * Abstract implementation of a while file input format where each file is a
+ * single split
+ * 
+ * @author rvesse
+ * 
+ * @param <TKey>
+ *            Key type
+ * @param <TValue>
+ *            Value type
+ */
+public abstract class AbstractWholeFileInputFormat<TKey, TValue> extends FileInputFormat<TKey, TValue> {
+
+    @Override
+    protected final boolean isSplitable(JobContext context, Path filename) {
+        return false;
+    }
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/BlockedNQuadsInputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/BlockedNQuadsInputFormat.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/BlockedNQuadsInputFormat.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/BlockedNQuadsInputFormat.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.yarcdata.urika.hadoop.rdf.io.input.readers.BlockedNQuadsReader;
+import com.yarcdata.urika.hadoop.rdf.types.QuadWritable;
+
+/**
+ * NTriples input format where files are processed as blocks of lines rather
+ * than in a line based manner as with the {@link NQuadsInputFormat} or as
+ * whole files with the {@link WholeFileNQuadsInputFormat}
+ * <p>
+ * This provides a compromise between the higher parser setup of creating more
+ * parsers and the benefit of being able to split input files over multiple
+ * mappers.
+ * </p>
+ * 
+ * @author rvesse
+ * 
+ */
+public class BlockedNQuadsInputFormat extends AbstractNLineFileInputFormat<LongWritable, QuadWritable> {
+
+    @Override
+    public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new BlockedNQuadsReader();
+    }
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/BlockedNTriplesInputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/BlockedNTriplesInputFormat.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/BlockedNTriplesInputFormat.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/BlockedNTriplesInputFormat.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.yarcdata.urika.hadoop.rdf.io.input.readers.BlockedNTriplesReader;
+import com.yarcdata.urika.hadoop.rdf.types.TripleWritable;
+
+/**
+ * NTriples input format where files are processed as blocks of lines rather
+ * than in a line based manner as with the {@link NTriplesInputFormat} or as
+ * whole files with the {@link WholeFileNTriplesInputFormat}
+ * <p>
+ * This provides a compromise between the higher parser setup of creating more
+ * parsers and the benefit of being able to split input files over multiple
+ * mappers.
+ * </p>
+ * 
+ * @author rvesse
+ * 
+ */
+public class BlockedNTriplesInputFormat extends AbstractNLineFileInputFormat<LongWritable, TripleWritable> {
+
+    @Override
+    public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new BlockedNTriplesReader();
+    }
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/NQuadsInputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/NQuadsInputFormat.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/NQuadsInputFormat.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/NQuadsInputFormat.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.yarcdata.urika.hadoop.rdf.io.input.readers.NQuadsReader;
+import com.yarcdata.urika.hadoop.rdf.types.QuadWritable;
+
+/**
+ * NQuads input format
+ * 
+ * @author rvesse
+ * 
+ */
+public class NQuadsInputFormat extends AbstractNLineFileInputFormat<LongWritable, QuadWritable> {
+
+    @Override
+    public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
+            throws IOException, InterruptedException {
+        return new NQuadsReader();
+    }
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/NTriplesInputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/NTriplesInputFormat.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/NTriplesInputFormat.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/NTriplesInputFormat.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.yarcdata.urika.hadoop.rdf.io.input.readers.NTriplesReader;
+import com.yarcdata.urika.hadoop.rdf.types.TripleWritable;
+
+/**
+ * NTriples input format
+ * 
+ * @author rvesse
+ * 
+ */
+public class NTriplesInputFormat extends AbstractNLineFileInputFormat<LongWritable, TripleWritable> {
+
+    @Override
+    public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new NTriplesReader();
+    }
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/QuadsInputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/QuadsInputFormat.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/QuadsInputFormat.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/QuadsInputFormat.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.yarcdata.urika.hadoop.rdf.io.input.readers.QuadsReader;
+import com.yarcdata.urika.hadoop.rdf.types.QuadWritable;
+
+/**
+ * RDF input format that can handle any RDF quads format that ARQ supports
+ * selecting the format to use for each file based upon the file extension
+ * 
+ * @author rvesse
+ * 
+ */
+public class QuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+    @Override
+    public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new QuadsReader();
+    }
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/RdfJsonInputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/RdfJsonInputFormat.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/RdfJsonInputFormat.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/RdfJsonInputFormat.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.yarcdata.urika.hadoop.rdf.io.input.readers.RdfJsonReader;
+import com.yarcdata.urika.hadoop.rdf.types.TripleWritable;
+
+/**
+ * RDF/JSON input format
+ * 
+ * @author rvesse
+ * 
+ */
+public class RdfJsonInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+    @Override
+    public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new RdfJsonReader();
+    }
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/RdfXmlInputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/RdfXmlInputFormat.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/RdfXmlInputFormat.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/RdfXmlInputFormat.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.yarcdata.urika.hadoop.rdf.io.input.readers.RdfXmlReader;
+import com.yarcdata.urika.hadoop.rdf.types.TripleWritable;
+
+/**
+ * RDF/XML input format
+ * 
+ * @author rvesse
+ * 
+ */
+public class RdfXmlInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+    @Override
+    public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new RdfXmlReader();
+    }
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/TriGInputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/TriGInputFormat.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/TriGInputFormat.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/TriGInputFormat.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.yarcdata.urika.hadoop.rdf.io.input.readers.TriGReader;
+import com.yarcdata.urika.hadoop.rdf.types.QuadWritable;
+
+/**
+ * Input format for TriG
+ * 
+ * @author rvesse
+ * 
+ */
+public class TriGInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+    @Override
+    public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new TriGReader();
+    }
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/TriplesInputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/TriplesInputFormat.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/TriplesInputFormat.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/TriplesInputFormat.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.yarcdata.urika.hadoop.rdf.io.input.readers.TriplesReader;
+import com.yarcdata.urika.hadoop.rdf.types.TripleWritable;
+
+/**
+ * RDF input format that can handle any RDF triples format that ARQ supports
+ * selecting the format to use for each file based upon the file extension
+ * 
+ * @author rvesse
+ * 
+ */
+public class TriplesInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+    @Override
+    public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new TriplesReader();
+    }
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/TriplesOrQuadsInputFormat.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.yarcdata.urika.hadoop.rdf.io.input.readers.TriplesOrQuadsReader;
+import com.yarcdata.urika.hadoop.rdf.types.QuadWritable;
+
+/**
+ * RDF input format that can handle any RDF triple/quads format that ARQ
+ * supports selecting the format to use for each file based upon the file
+ * extension. Triples are converted into quads in the default graph.
+ * 
+ * @author rvesse
+ * 
+ */
+public class TriplesOrQuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+    @Override
+    public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new TriplesOrQuadsReader();
+    }
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/TurtleInputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/TurtleInputFormat.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/TurtleInputFormat.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/TurtleInputFormat.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.yarcdata.urika.hadoop.rdf.io.input.readers.TurtleReader;
+import com.yarcdata.urika.hadoop.rdf.types.TripleWritable;
+
+/**
+ * Turtle input format
+ * 
+ * @author rvesse
+ * 
+ */
+public class TurtleInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+    @Override
+    public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new TurtleReader();
+    }
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/WholeFileNQuadsInputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/WholeFileNQuadsInputFormat.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/WholeFileNQuadsInputFormat.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/WholeFileNQuadsInputFormat.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.yarcdata.urika.hadoop.rdf.io.input.readers.WholeFileNQuadsReader;
+import com.yarcdata.urika.hadoop.rdf.types.QuadWritable;
+
+/**
+ * NQuads input format where files are processed as complete files rather than
+ * in a line based manner as with the {@link NQuadsInputFormat}
+ * <p>
+ * This has the advantage of less parser setup overhead but the disadvantage
+ * that the input cannot be split over multiple mappers.
+ * </p>
+ * 
+ * @author rvesse
+ * 
+ */
+public class WholeFileNQuadsInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+    @Override
+    public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new WholeFileNQuadsReader();
+    }
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/WholeFileNTriplesInputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/WholeFileNTriplesInputFormat.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/WholeFileNTriplesInputFormat.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/WholeFileNTriplesInputFormat.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.yarcdata.urika.hadoop.rdf.io.input.readers.WholeFileNTriplesReader;
+import com.yarcdata.urika.hadoop.rdf.types.TripleWritable;
+
+/**
+ * NTriples input format where files are processed as complete files rather than
+ * in a line based manner as with the {@link NTriplesInputFormat}
+ * <p>
+ * This has the advantage of less parser setup overhead but the disadvantage
+ * that the input cannot be split over multiple mappers.
+ * </p>
+ * 
+ * @author rvesse
+ * 
+ */
+public class WholeFileNTriplesInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+    @Override
+    public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new WholeFileNTriplesReader();
+    }
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractBlockBasedNodeTupleReader.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,321 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input.readers;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFDataMgr;
+import org.apache.jena.riot.lang.PipedRDFIterator;
+import org.apache.jena.riot.lang.PipedRDFStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yarcdata.urika.hadoop.rdf.io.RdfIOConstants;
+import com.yarcdata.urika.hadoop.rdf.io.input.util.BlockInputStream;
+import com.yarcdata.urika.hadoop.rdf.io.input.util.TrackableInputStream;
+import com.yarcdata.urika.hadoop.rdf.io.input.util.TrackedInputStream;
+import com.yarcdata.urika.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
+import com.yarcdata.urika.hadoop.rdf.types.AbstractNodeTupleWritable;
+
+/**
+ * An abstract implementation for a record reader that reads records from blocks
+ * of files, this is a hybrid between {@link AbstractLineBasedNodeTupleReader}
+ * and {@link AbstractWholeFileNodeTupleReader} in that it can only be used by
+ * formats which can be split by lines but reduces the overhead by parsing the
+ * split as a whole rather than as individual lines.
+ * <p>
+ * The keys produced are the approximate position in the file at which a tuple
+ * was found and the values will be node tuples. Positions are approximate
+ * because they are recorded after the point at which the most recent tuple was
+ * parsed from the input thus they reflect the approximate position in the
+ * stream immediately after which the triple was found.
+ * </p>
+ * 
+ * @author rvesse
+ * 
+ * @param <TValue>
+ *            Value type
+ * @param <T>
+ *            Tuple type
+ */
+public abstract class AbstractBlockBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends
+        RecordReader<LongWritable, T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractLineBasedNodeTupleReader.class);
+    private CompressionCodec compressionCodecs;
+    private TrackableInputStream input;
+    private LongWritable key;
+    private long start, length;
+    private T tuple;
+    private TrackedPipedRDFStream<TValue> stream;
+    private PipedRDFIterator<TValue> iter;
+    private Thread parserThread;
+    private boolean finished = false;
+    private boolean ignoreBadTuples = true;
+    private boolean parserFinished = false;
+    private Throwable parserError = null;
+
+    @Override
+    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+        LOG.debug("initialize({}, {})", genericSplit, context);
+
+        // Assuming file split
+        if (!(genericSplit instanceof FileSplit))
+            throw new IOException("This record reader only supports FileSplit inputs");
+        FileSplit split = (FileSplit) genericSplit;
+
+        // Configuration
+        Configuration config = context.getConfiguration();
+        this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true);
+
+        // Figure out what portion of the file to read
+        start = split.getStart();
+        long end = start + split.getLength();
+        final Path file = split.getPath();
+        long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen();
+        boolean readToEnd = end == totalLength;
+        CompressionCodecFactory factory = new CompressionCodecFactory(config);
+        this.compressionCodecs = factory.getCodec(file);
+
+        LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start,
+                split.getLength(), totalLength }));
+
+        // Open the file and prepare the input stream
+        FileSystem fs = file.getFileSystem(config);
+        FSDataInputStream fileIn = fs.open(file);
+        this.length = split.getLength();
+        if (start > 0)
+            fileIn.seek(start);
+
+        if (this.compressionCodecs != null) {
+            // Compressed input
+            // For compressed input NLineInputFormat will have failed to find
+            // any line breaks and will give us a split from 0 -> (length - 1)
+            // Add 1 and re-verify readToEnd so we can abort correctly if ever
+            // given a partial split of a compressed file
+            end++;
+            readToEnd = end == totalLength;
+            if (start > 0 || !readToEnd)
+                throw new IOException("This record reader can only be used with compressed input where the split is a whole file");
+            input = new TrackedInputStream(this.compressionCodecs.createInputStream(fileIn));
+        } else {
+            // Uncompressed input
+
+            if (readToEnd) {
+                input = new TrackedInputStream(fileIn);
+            } else {
+                // Need to limit the portion of the file we are reading
+                input = new BlockInputStream(fileIn, split.getLength());
+            }
+        }
+
+        // Set up background thread for parser
+        iter = this.getPipedIterator();
+        this.stream = this.getPipedStream(iter, this.input);
+        Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage());
+        this.parserThread = new Thread(parserRunnable);
+        this.parserThread.setDaemon(true);
+        this.parserThread.start();
+    }
+
+    /**
+     * Gets the RDF iterator to use
+     * 
+     * @return Iterator
+     */
+    protected abstract PipedRDFIterator<TValue> getPipedIterator();
+
+    /**
+     * Gets the RDF stream to parse to
+     * 
+     * @param iterator
+     *            Iterator
+     * @return RDF stream
+     */
+    protected abstract TrackedPipedRDFStream<TValue> getPipedStream(PipedRDFIterator<TValue> iterator, TrackableInputStream input);
+
+    /**
+     * Gets the RDF language to use for parsing
+     * 
+     * @return
+     */
+    protected abstract Lang getRdfLanguage();
+
+    /**
+     * Creates the runnable upon which the parsing will run
+     * 
+     * @param input
+     *            Input
+     * @param stream
+     *            Stream
+     * @param lang
+     *            Language to use for parsing
+     * @return Parser runnable
+     */
+    private Runnable createRunnable(@SuppressWarnings("rawtypes") final AbstractBlockBasedNodeTupleReader reader,
+            final InputStream input, final PipedRDFStream<TValue> stream, final Lang lang) {
+        return new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    RDFDataMgr.parse(stream, input, null, lang);
+                    reader.setParserFinished(null);
+                } catch (Throwable e) {
+                    reader.setParserFinished(e);
+                }
+            }
+        };
+    }
+
+    /**
+     * Sets the parser thread finished state
+     * 
+     * @param e
+     *            Error (if any)
+     */
+    private void setParserFinished(Throwable e) {
+        synchronized (this.parserThread) {
+            this.parserError = e;
+            this.parserFinished = true;
+        }
+    }
+
+    /**
+     * Waits for the parser thread to have reported as finished
+     * 
+     * @throws InterruptedException
+     */
+    private void waitForParserFinished() throws InterruptedException {
+        do {
+            synchronized (this.parserThread) {
+                if (this.parserFinished)
+                    return;
+            }
+            Thread.sleep(50);
+        } while (true);
+    }
+
+    /**
+     * Creates an instance of a writable tuple from the given tuple value
+     * 
+     * @param tuple
+     *            Tuple value
+     * @return Writable tuple
+     */
+    protected abstract T createInstance(TValue tuple);
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+        // Reuse key for efficiency
+        if (key == null) {
+            key = new LongWritable();
+        }
+
+        if (this.finished)
+            return false;
+
+        try {
+            if (this.iter.hasNext()) {
+                // Position will be relative to the start for the split we're
+                // processing
+                Long l = this.start + this.stream.getPosition();
+                if (l != null) {
+                    this.key.set(l);
+                    // For compressed input the actual length from which we
+                    // calculate progress is likely less than the actual
+                    // uncompressed length so we need to increment the
+                    // length as we go along
+                    // We always add 1 more than the current length because we
+                    // don't want to report 100% progress until we really have
+                    // finished
+                    if (this.compressionCodecs != null && l > this.length)
+                        this.length = l + 1;
+                }
+                this.tuple = this.createInstance(this.iter.next());
+                return true;
+            } else {
+                // Need to ensure that the parser thread has finished in order
+                // to determine whether we finished without error
+                this.waitForParserFinished();
+                if (this.parserError != null) {
+                    LOG.error("Error parsing block, aborting further parsing", this.parserError);
+                    if (!this.ignoreBadTuples)
+                        throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead())
+                                + ", aborting further parsing", this.parserError);
+                }
+
+                this.key = null;
+                this.tuple = null;
+                this.finished = true;
+                // This is necessary so that when compressed input is used we
+                // report 100% progress once we've reached the genuine end of
+                // the stream
+                if (this.compressionCodecs != null)
+                    this.length--;
+                return false;
+            }
+        } catch (IOException e) {
+            throw e;
+        } catch (Throwable e) {
+            // Failed to read the tuple on this line
+            LOG.error("Error parsing block, aborting further parsing", e);
+            if (!this.ignoreBadTuples)
+                throw new IOException("Error parsing block at position " + (this.start + this.input.getBytesRead())
+                        + ", aborting further parsing", e);
+            this.key = null;
+            this.tuple = null;
+            this.finished = true;
+            return false;
+        }
+    }
+
+    @Override
+    public LongWritable getCurrentKey() throws IOException, InterruptedException {
+        return this.key;
+    }
+
+    @Override
+    public T getCurrentValue() throws IOException, InterruptedException {
+        return this.tuple;
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+        float progress = 0.0f;
+        if (this.key == null) {
+            // We've either not started or we've finished
+            progress = (this.finished ? 1.0f : 0.0f);
+        } else if (this.key.get() == Long.MIN_VALUE) {
+            // We don't have a position so we've either in-progress or finished
+            progress = (this.finished ? 1.0f : 0.5f);
+        } else {
+            // We're some way through the file
+            progress = (this.key.get() - this.start) / (float) this.length;
+        }
+        LOG.debug("getProgress() --> {}", progress);
+        return progress;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.iter.close();
+        this.input.close();
+        this.finished = true;
+    }
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractBlockBasedQuadReader.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input.readers;
+
+import org.apache.jena.riot.lang.PipedRDFIterator;
+
+import com.hp.hpl.jena.sparql.core.Quad;
+import com.yarcdata.urika.hadoop.rdf.io.input.util.TrackableInputStream;
+import com.yarcdata.urika.hadoop.rdf.io.input.util.TrackedPipedQuadsStream;
+import com.yarcdata.urika.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
+import com.yarcdata.urika.hadoop.rdf.types.QuadWritable;
+
+/**
+ * An abstract record reader for whole file triple formats
+ * 
+ * @author rvesse
+ * 
+ */
+public abstract class AbstractBlockBasedQuadReader extends AbstractBlockBasedNodeTupleReader<Quad, QuadWritable> {
+
+    @Override
+    protected PipedRDFIterator<Quad> getPipedIterator() {
+        return new PipedRDFIterator<Quad>();
+    }
+
+    @Override
+    protected TrackedPipedRDFStream<Quad> getPipedStream(PipedRDFIterator<Quad> iterator, TrackableInputStream input) {
+        return new TrackedPipedQuadsStream(iterator, input);
+    }
+
+    @Override
+    protected QuadWritable createInstance(Quad tuple) {
+        return new QuadWritable(tuple);
+    }
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractBlockBasedTripleReader.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input.readers;
+
+import org.apache.jena.riot.lang.PipedRDFIterator;
+
+import com.hp.hpl.jena.graph.Triple;
+import com.yarcdata.urika.hadoop.rdf.io.input.util.TrackableInputStream;
+import com.yarcdata.urika.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
+import com.yarcdata.urika.hadoop.rdf.io.input.util.TrackedPipedTriplesStream;
+import com.yarcdata.urika.hadoop.rdf.types.TripleWritable;
+
+/**
+ * An abstract record reader for whole file triple formats
+ * 
+ * @author rvesse
+ * 
+ */
+public abstract class AbstractBlockBasedTripleReader extends AbstractBlockBasedNodeTupleReader<Triple, TripleWritable> {
+
+    @Override
+    protected PipedRDFIterator<Triple> getPipedIterator() {
+        return new PipedRDFIterator<Triple>();
+    }
+
+    @Override
+    protected TrackedPipedRDFStream<Triple> getPipedStream(PipedRDFIterator<Triple> iterator, TrackableInputStream input) {
+        return new TrackedPipedTriplesStream(iterator, input);
+    }
+
+    @Override
+    protected TripleWritable createInstance(Triple tuple) {
+        return new TripleWritable(tuple);
+    }
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractLineBasedNodeTupleReader.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,251 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input.readers;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.util.LineReader;
+import org.apache.jena.riot.system.ParserProfile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yarcdata.urika.hadoop.rdf.io.HadoopIOConstants;
+import com.yarcdata.urika.hadoop.rdf.io.RdfIOConstants;
+import com.yarcdata.urika.hadoop.rdf.io.input.util.RdfIOUtils;
+import com.yarcdata.urika.hadoop.rdf.types.AbstractNodeTupleWritable;
+
+/**
+ * An abstract implementation of a record reader that reads records from line
+ * based tuple formats. This only supports reading from file splits currently.
+ * <p>
+ * The keys produced are the position of the line in the file and the values
+ * will be node tuples
+ * </p>
+ * 
+ * @author rvesse
+ * 
+ * @param <TValue>
+ * @param <T>
+ *            Writable tuple type
+ */
+public abstract class AbstractLineBasedNodeTupleReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends
+        RecordReader<LongWritable, T> {
+    
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractLineBasedNodeTupleReader.class);
+    private CompressionCodecFactory compressionCodecs = null;
+    private long start, pos, end, estLength;
+    private int maxLineLength;
+    private LineReader in;
+    private LongWritable key = null;
+    private Text value = null;
+    private T tuple = null;
+    private ParserProfile profile = null;
+    private boolean ignoreBadTuples = true;
+
+    @Override
+    public final void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+        LOG.debug("initialize({}, {})", genericSplit, context);
+
+        // Assuming file split
+        if (!(genericSplit instanceof FileSplit))
+            throw new IOException("This record reader only supports FileSplit inputs");
+        FileSplit split = (FileSplit) genericSplit;
+
+        // Configuration
+        profile = RdfIOUtils.createParserProfile(context, split.getPath());
+        Configuration config = context.getConfiguration();
+        this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true);
+
+        // Figure out what portion of the file to read
+        this.maxLineLength = config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE);
+        start = split.getStart();
+        end = start + split.getLength();
+        final Path file = split.getPath();
+        long totalLength = file.getFileSystem(context.getConfiguration()).getFileStatus(file).getLen();
+        compressionCodecs = new CompressionCodecFactory(config);
+        final CompressionCodec codec = compressionCodecs.getCodec(file);
+        
+        LOG.info(String.format("Got split with start %d and length %d for file with total length of %d", new Object[] { start,
+                split.getLength(), totalLength }));
+
+        // Open the file and seek to the start of the split
+        FileSystem fs = file.getFileSystem(config);
+        FSDataInputStream fileIn = fs.open(file);
+        boolean skipFirstLine = false;
+        if (codec != null) {
+            // Compressed input
+            // For compressed input NLineInputFormat will have failed to find
+            // any line breaks and will give us a split from 0 -> (length - 1)
+            // Add 1 and verify we got complete split
+            if (totalLength > split.getLength() + 1)
+                throw new IOException(
+                        "This record reader can only be used with compressed input where the split covers the whole file");
+            in = new LineReader(codec.createInputStream(fileIn), config);
+            estLength = end;
+            end = Long.MAX_VALUE;
+        } else {
+            // Uncompressed input
+            if (start != 0) {
+                skipFirstLine = true;
+                --start;
+                fileIn.seek(start);
+            }
+            in = new LineReader(fileIn, config);
+        }
+        // Skip first line and re-establish "start".
+        // This is to do with how line reader reads lines and how
+        // NLineInputFormat will provide the split information to use
+        if (skipFirstLine) {
+            start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start));
+        }
+        this.pos = start;
+    }
+
+    /**
+     * Gets an iterator over the data on the current line
+     * 
+     * @param line
+     *            Line
+     * @param profile
+     *            Parser profile
+     * @return Iterator
+     */
+    protected abstract Iterator<TValue> getIterator(String line, ParserProfile profile);
+
+    /**
+     * Creates an instance of a writable tuple from the given tuple value
+     * 
+     * @param tuple
+     *            Tuple value
+     * @return Writable tuple
+     */
+    protected abstract T createInstance(TValue tuple);
+
+    @Override
+    public final boolean nextKeyValue() throws IOException, InterruptedException {
+        // Reuse key for efficiency
+        if (key == null) {
+            key = new LongWritable();
+        }
+
+        // Reset value which we use for reading lines
+        if (value == null) {
+            value = new Text();
+        }
+        tuple = null;
+
+        // Try to read the next valid line
+        int newSize = 0;
+        while (pos < end) {
+            // Read next line
+            newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
+
+            // Once we get an empty line we've reached the end of our input
+            if (newSize == 0) {
+                break;
+            }
+
+            // Update position, remember that where inputs are compressed we may
+            // be at a larger position then we expected because the length of
+            // the split is likely less than the length of the data once
+            // decompressed
+            key.set(pos);
+            pos += newSize;
+            if (pos > estLength)
+                estLength = pos + 1;
+
+            // Skip lines that exceed the line length limit that has been set
+            if (newSize >= maxLineLength) {
+                LOG.warn("Skipped oversized line of size " + newSize + " at position " + (pos - newSize));
+                continue;
+            }
+
+            // Attempt to read the tuple from current line
+            try {
+                Iterator<TValue> iter = this.getIterator(value.toString(), profile);
+                if (iter.hasNext()) {
+                    tuple = this.createInstance(iter.next());
+
+                    // If we reach here we've found a valid tuple so we can
+                    // break out of the loop
+                    break;
+                } else {
+                    // Empty line/Comment line
+                    LOG.debug("Valid line with no triple at position " + (pos - newSize));
+                    continue;
+                }
+            } catch (Throwable e) {
+                // Failed to read the tuple on this line
+                LOG.error("Bad tuple at position " + (pos - newSize), e);
+                if (this.ignoreBadTuples)
+                    continue;
+                throw new IOException("Bad tuple at position " + (pos - newSize), e);
+            }
+        }
+        boolean result = this.tuple != null;
+
+        // End of input
+        if (newSize == 0) {
+            key = null;
+            value = null;
+            tuple = null;
+            result = false;
+            estLength = pos;
+        }
+        LOG.debug("nextKeyValue() --> {}", result);
+        return result;
+    }
+
+    @Override
+    public LongWritable getCurrentKey() throws IOException, InterruptedException {
+        LOG.debug("getCurrentKey() --> {}", key);
+        return key;
+    }
+
+    @Override
+    public T getCurrentValue() throws IOException, InterruptedException {
+        LOG.debug("getCurrentValue() --> {}", tuple);
+        return tuple;
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+        float progress = 0.0f;
+        if (start != end) {
+            if (end == Long.MAX_VALUE) {
+                if (estLength == 0)
+                    return 1.0f;
+                // Use estimated length
+                progress = Math.min(1.0f, (pos - start) / (float) (estLength - start));
+            } else {
+                // Use actual length
+                progress = Math.min(1.0f, (pos - start) / (float) (end - start));
+            }
+        }
+        LOG.debug("getProgress() --> {}", progress);
+        return progress;
+    }
+
+    @Override
+    public void close() throws IOException {
+        LOG.debug("close()");
+        if (in != null) {
+            in.close();
+        }
+    }
+
+}
\ No newline at end of file

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractLineBasedQuadReader.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input.readers;
+
+import java.util.Iterator;
+
+import org.apache.jena.riot.system.ParserProfile;
+import org.apache.jena.riot.tokens.Tokenizer;
+import com.hp.hpl.jena.sparql.core.Quad;
+import com.yarcdata.urika.hadoop.rdf.types.QuadWritable;
+
+/**
+ * An abstract reader for line based quad formats
+ * 
+ * @author rvesse
+ * 
+ */
+public abstract class AbstractLineBasedQuadReader extends AbstractLineBasedNodeTupleReader<Quad, QuadWritable> {
+
+    @Override
+    protected Iterator<Quad> getIterator(String line, ParserProfile profile) {
+        Tokenizer tokenizer = getTokenizer(line);
+        return getQuadsIterator(tokenizer, profile);
+    }
+
+    @Override
+    protected QuadWritable createInstance(Quad q) {
+        return new QuadWritable(q);
+    }
+
+    protected abstract Tokenizer getTokenizer(String line);
+
+    protected abstract Iterator<Quad> getQuadsIterator(Tokenizer tokenizer, ParserProfile profile);
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractLineBasedTripleReader.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input.readers;
+
+import java.util.Iterator;
+
+import org.apache.jena.riot.system.ParserProfile;
+import org.apache.jena.riot.tokens.Tokenizer;
+import com.hp.hpl.jena.graph.Triple;
+import com.yarcdata.urika.hadoop.rdf.types.TripleWritable;
+
+/**
+ * An abstract record reader for line based triple formats
+ * 
+ * @author rvesse
+ * 
+ */
+public abstract class AbstractLineBasedTripleReader extends AbstractLineBasedNodeTupleReader<Triple, TripleWritable> {
+
+    @Override
+    protected Iterator<Triple> getIterator(String line, ParserProfile profile) {
+        Tokenizer tokenizer = getTokenizer(line);
+        return getTriplesIterator(tokenizer, profile);
+    }
+
+    @Override
+    protected TripleWritable createInstance(Triple t) {
+        return new TripleWritable(t);
+    }
+    
+    protected abstract Tokenizer getTokenizer(String line);
+
+    protected abstract Iterator<Triple> getTriplesIterator(Tokenizer tokenizer, ParserProfile profile);
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractRdfReader.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractRdfReader.java?rev=1583942&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractRdfReader.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/com/yarcdata/urika/hadoop/rdf/io/input/readers/AbstractRdfReader.java Wed Apr  2 09:20:51 2014
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2013 YarcData LLC All Rights Reserved.
+ */
+
+package com.yarcdata.urika.hadoop.rdf.io.input.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yarcdata.urika.hadoop.rdf.types.AbstractNodeTupleWritable;
+
+/**
+ * An abstract record reader for arbitrary RDF which provides support for
+ * selecting the actual record reader to use based on detecting the RDF language
+ * from the file name
+ * 
+ * @author rvesse
+ * 
+ * @param <TValue>
+ *            Tuple type
+ * @param <T>
+ *            Writable tuple type
+ */
+public abstract class AbstractRdfReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends
+        RecordReader<LongWritable, T> {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractRdfReader.class);
+
+    private RecordReader<LongWritable, T> reader;
+
+    @Override
+    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+        LOG.debug("initialize({}, {})", genericSplit, context);
+
+        // Assuming file split
+        if (!(genericSplit instanceof FileSplit))
+            throw new IOException("This record reader only supports FileSplit inputs");
+
+        // Find RDF language
+        FileSplit split = (FileSplit) genericSplit;
+        Path path = split.getPath();
+        Lang lang = RDFLanguages.filenameToLang(path.getName());
+        if (lang == null)
+            throw new IOException("There is no registered RDF language for the input file " + path.toString());
+
+        // Select the record reader and initialize
+        this.reader = this.selectRecordReader(lang);
+        this.reader.initialize(split, context);
+    }
+
+    /**
+     * Selects the appropriate record reader to use for the given RDF language
+     * 
+     * @param lang
+     *            RDF language
+     * @return Record reader
+     * @throws IOException
+     *             Should be thrown if no record reader can be selected
+     */
+    protected abstract RecordReader<LongWritable, T> selectRecordReader(Lang lang) throws IOException;
+
+    @Override
+    public final boolean nextKeyValue() throws IOException, InterruptedException {
+        return this.reader.nextKeyValue();
+    }
+
+    @Override
+    public final LongWritable getCurrentKey() throws IOException, InterruptedException {
+        return this.reader.getCurrentKey();
+    }
+
+    @Override
+    public final T getCurrentValue() throws IOException, InterruptedException {
+        return this.reader.getCurrentValue();
+    }
+
+    @Override
+    public final float getProgress() throws IOException, InterruptedException {
+        return this.reader.getProgress();
+    }
+
+    @Override
+    public final void close() throws IOException {
+        this.reader.close();
+    }
+
+}