You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by rv...@apache.org on 2014/05/29 13:07:19 UTC

svn commit: r1598254 - in /jena/Experimental/hadoop-rdf/hadoop-rdf-io/src: main/java/org/apache/jena/hadoop/rdf/io/output/ main/java/org/apache/jena/hadoop/rdf/io/output/writers/ test/java/org/apache/jena/hadoop/rdf/io/output/

Author: rvesse
Date: Thu May 29 11:07:18 2014
New Revision: 1598254

URL: http://svn.apache.org/r1598254
Log:
Rework Turtle and TriG output to use StreamRDF via WriterStreamRDFBlocks to fix invalid data produced when batching blank node output (JENA-675)

Added:
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractStreamRdfNodeTupleOutputFormat.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/BatchedTriGOutputFormat.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/BatchedTurtleOutputFormat.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractStreamRdfNodeTupleWriter.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/BatchedTriGWriter.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/BatchedTurtleWriter.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/StreamRdfQuadWriter.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/StreamRdfTripleWriter.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/BatchedTriGOutputTest.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/BatchedTurtleOutputTest.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/StreamedTriGOutputTest.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/StreamedTurtleOutputTest.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TriGBlankNodeOutputTests.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TurtleBlankNodeOutputTests.java
Removed:
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/TriGWriter.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/TurtleWriter.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TriGOutputTest.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TurtleOutputTest.java
Modified:
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractBatchedNodeTupleOutputFormat.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriGOutputFormat.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TurtleOutputFormat.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractLineBasedTripleWriter.java
    jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormatTests.java

Modified: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractBatchedNodeTupleOutputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractBatchedNodeTupleOutputFormat.java?rev=1598254&r1=1598253&r2=1598254&view=diff
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractBatchedNodeTupleOutputFormat.java (original)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractBatchedNodeTupleOutputFormat.java Thu May 29 11:07:18 2014
@@ -35,20 +35,20 @@ import org.apache.jena.hadoop.rdf.types.
  * 
  * @param <TKey>
  *            Key type
- * @param <TValue>
+ * @param <TTuple>
  *            Tuple type
- * @param <T>
- *            Writable tuple type
+ * @param <TValue>
+ *            Writable tuple type i.e. the value type
  */
-public abstract class AbstractBatchedNodeTupleOutputFormat<TKey, TValue, T extends AbstractNodeTupleWritable<TValue>> extends
-        AbstractNodeTupleOutputFormat<TKey, TValue, T> {
+public abstract class AbstractBatchedNodeTupleOutputFormat<TKey, TTuple, TValue extends AbstractNodeTupleWritable<TTuple>> extends
+        AbstractNodeTupleOutputFormat<TKey, TTuple, TValue> {
 
     @Override
-    protected RecordWriter<TKey, T> getRecordWriter(Writer writer, Configuration config) {
+    protected RecordWriter<TKey, TValue> getRecordWriter(Writer writer, Configuration config) {
         long batchSize = config.getLong(RdfIOConstants.OUTPUT_BATCH_SIZE, RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE);
         return this.getRecordWriter(writer, batchSize);
     }
     
-    protected abstract RecordWriter<TKey, T> getRecordWriter(Writer writer, long batchSize);
+    protected abstract RecordWriter<TKey, TValue> getRecordWriter(Writer writer, long batchSize);
 
 }

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractStreamRdfNodeTupleOutputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractStreamRdfNodeTupleOutputFormat.java?rev=1598254&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractStreamRdfNodeTupleOutputFormat.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractStreamRdfNodeTupleOutputFormat.java Thu May 29 11:07:18 2014
@@ -0,0 +1,58 @@
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.io.Writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
+import org.apache.jena.riot.system.StreamRDF;
+
+/**
+ * Abstract output format for formats that use the RIOT {@link StreamRDF} API to
+ * stream the writes
+ * 
+ * @author rvesse
+ * 
+ * @param <TKey>
+ *            Key type
+ * @param <TTuple>
+ *            Tuple type
+ * @param <TValue>
+ *            Writable tuple type i.e. the value type
+ */
+public abstract class AbstractStreamRdfNodeTupleOutputFormat<TKey, TTuple, TValue extends AbstractNodeTupleWritable<TTuple>>
+		extends AbstractNodeTupleOutputFormat<TKey, TTuple, TValue> {
+
+	@Override
+	protected RecordWriter<TKey, TValue> getRecordWriter(Writer writer,
+			Configuration config) {
+		return getRecordWriter(getStream(writer, config), writer, config);
+	}
+
+	/**
+	 * Gets a writer which provides a bridge between the {@link RecordWriter}
+	 * and {@link StreamRDF} APIs
+	 * 
+	 * @param stream
+	 *            RDF Stream
+	 * @param writer
+	 *            Writer
+	 * @param config
+	 *            Configuration
+	 * @return Record Writer
+	 */
+	protected abstract RecordWriter<TKey, TValue> getRecordWriter(
+			StreamRDF stream, Writer writer, Configuration config);
+
+	/**
+	 * Gets a {@link StreamRDF} to which the tuples to be output should be
+	 * passed
+	 * 
+	 * @param writer
+	 *            Writer
+	 * @param config
+	 *            Configuration
+	 * @return RDF Stream
+	 */
+	protected abstract StreamRDF getStream(Writer writer, Configuration config);
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/BatchedTriGOutputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/BatchedTriGOutputFormat.java?rev=1598254&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/BatchedTriGOutputFormat.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/BatchedTriGOutputFormat.java Thu May 29 11:07:18 2014
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.io.Writer;
+
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.jena.hadoop.rdf.io.output.writers.BatchedTriGWriter;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * Output format for TriG that uses a batched approach, note that this will
+ * produce invalid data where blank nodes span batches so it is typically better
+ * to use the {@link TriGOutputFormat} instead
+ * 
+ * @author rvesse
+ * 
+ * @param <TKey>
+ *            Key type
+ */
+public class BatchedTriGOutputFormat<TKey> extends
+		AbstractBatchedNodeTupleOutputFormat<TKey, Quad, QuadWritable> {
+
+	@Override
+	protected RecordWriter<TKey, QuadWritable> getRecordWriter(Writer writer,
+			long batchSize) {
+		return new BatchedTriGWriter<TKey>(writer, batchSize);
+	}
+
+	@Override
+	protected String getFileExtension() {
+		return ".trig";
+	}
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/BatchedTurtleOutputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/BatchedTurtleOutputFormat.java?rev=1598254&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/BatchedTurtleOutputFormat.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/BatchedTurtleOutputFormat.java Thu May 29 11:07:18 2014
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.io.Writer;
+
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.jena.hadoop.rdf.io.output.writers.BatchedTurtleWriter;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+import com.hp.hpl.jena.graph.Triple;
+
+/**
+ * Output format for Turtle that uses a batched approach, note that this will
+ * produce invalid data where blank nodes span batches so it is typically better
+ * to use the {@link TurtleOutputFormat} instead
+ * 
+ * @author rvesse
+ * 
+ * @param <TKey>
+ *            Key type
+ */
+public class BatchedTurtleOutputFormat<TKey> extends
+		AbstractBatchedNodeTupleOutputFormat<TKey, Triple, TripleWritable> {
+
+	@Override
+	protected RecordWriter<TKey, TripleWritable> getRecordWriter(Writer writer,
+			long batchSize) {
+		return new BatchedTurtleWriter<TKey>(writer, batchSize);
+	}
+
+	@Override
+	protected String getFileExtension() {
+		return ".ttl";
+	}
+
+}

Modified: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriGOutputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriGOutputFormat.java?rev=1598254&r1=1598253&r2=1598254&view=diff
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriGOutputFormat.java (original)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriGOutputFormat.java Thu May 29 11:07:18 2014
@@ -1,28 +1,13 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- *     
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.jena.hadoop.rdf.io.output;
 
 import java.io.Writer;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.jena.hadoop.rdf.io.output.writers.TriGWriter;
-import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.hadoop.rdf.io.output.writers.StreamRdfQuadWriter;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.system.StreamRDF;
+import org.apache.jena.riot.writer.WriterStreamRDFBlocks;
 
 import com.hp.hpl.jena.sparql.core.Quad;
 
@@ -34,16 +19,23 @@ import com.hp.hpl.jena.sparql.core.Quad;
  * @param <TKey>
  *            Key type
  */
-public class TriGOutputFormat<TKey> extends AbstractBatchedNodeTupleOutputFormat<TKey, Quad, QuadWritable> {
+public class TriGOutputFormat<TKey> extends
+		AbstractStreamRdfNodeTupleOutputFormat<TKey, Quad, QuadWritable> {
 
-    @Override
-    protected RecordWriter<TKey, QuadWritable> getRecordWriter(Writer writer, long batchSize) {
-        return new TriGWriter<TKey>(writer, batchSize);
-    }
-
-    @Override
-    protected String getFileExtension() {
-        return ".trig";
-    }
+	@Override
+	protected RecordWriter<TKey, QuadWritable> getRecordWriter(
+			StreamRDF stream, Writer writer, Configuration config) {
+		return new StreamRdfQuadWriter<TKey>(stream, writer);
+	}
+
+	@Override
+	protected StreamRDF getStream(Writer writer, Configuration config) {
+		return new WriterStreamRDFBlocks(writer);
+	}
+
+	@Override
+	protected String getFileExtension() {
+		return ".trig";
+	}
 
 }

Modified: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TurtleOutputFormat.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TurtleOutputFormat.java?rev=1598254&r1=1598253&r2=1598254&view=diff
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TurtleOutputFormat.java (original)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TurtleOutputFormat.java Thu May 29 11:07:18 2014
@@ -16,34 +16,44 @@
  * limitations under the License.
  */
 
-package org.apache.jena.hadoop.rdf.io.output;
-
-import java.io.Writer;
-
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.jena.hadoop.rdf.io.output.writers.TurtleWriter;
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.io.Writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.jena.hadoop.rdf.io.output.writers.StreamRdfTripleWriter;
 import org.apache.jena.hadoop.rdf.types.TripleWritable;
-
-import com.hp.hpl.jena.graph.Triple;
-
-/**
- * Turtle output format
- * 
- * @author rvesse
- * 
- * @param <TKey>
- *            Key type
- */
-public class TurtleOutputFormat<TKey> extends AbstractBatchedNodeTupleOutputFormat<TKey, Triple, TripleWritable> {
-
-    @Override
-    protected RecordWriter<TKey, TripleWritable> getRecordWriter(Writer writer, long batchSize) {
-        return new TurtleWriter<TKey>(writer, batchSize);
-    }
-
-    @Override
-    protected String getFileExtension() {
-        return ".ttl";
-    }
-
-}
+import org.apache.jena.riot.system.StreamRDF;
+import org.apache.jena.riot.writer.WriterStreamRDFBlocks;
+
+import com.hp.hpl.jena.graph.Triple;
+
+/**
+ * Turtle output format
+ * 
+ * @author rvesse
+ * 
+ * @param <TKey>
+ *            Key type
+ */
+public class TurtleOutputFormat<TKey> extends
+		AbstractStreamRdfNodeTupleOutputFormat<TKey, Triple, TripleWritable> {
+
+	@Override
+	protected String getFileExtension() {
+		return ".ttl";
+	}
+
+	@Override
+	protected RecordWriter<TKey, TripleWritable> getRecordWriter(
+			StreamRDF stream, Writer writer, Configuration config) {
+		return new StreamRdfTripleWriter<TKey>(stream, writer);
+	}
+
+	@Override
+	protected StreamRDF getStream(Writer writer, Configuration config) {
+		return new WriterStreamRDFBlocks(writer);
+	}
+
+}

Modified: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractLineBasedTripleWriter.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractLineBasedTripleWriter.java?rev=1598254&r1=1598253&r2=1598254&view=diff
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractLineBasedTripleWriter.java (original)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractLineBasedTripleWriter.java Thu May 29 11:07:18 2014
@@ -16,52 +16,53 @@
  * limitations under the License.
  */
 
-package org.apache.jena.hadoop.rdf.io.output.writers;
-
-import java.io.Writer;
-
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.Writer;
+
 import org.apache.jena.hadoop.rdf.types.TripleWritable;
-import org.apache.jena.riot.out.NodeFormatter;
-import org.apache.jena.riot.out.NodeFormatterNT;
-
-import com.hp.hpl.jena.graph.Node;
-import com.hp.hpl.jena.graph.Triple;
-
-/**
- * An abstract writer for line based triple formats
- * 
- * @author rvesse
- * @param <TKey> 
- * 
- */
-public abstract class AbstractLineBasedTripleWriter<TKey> extends AbstractLineBasedNodeTupleWriter<TKey, Triple, TripleWritable> {
-
-    /**
-     * Creates a new writer using the default NTriples node formatter
-     * 
-     * @param writer
-     *            Writer
-     */
-    public AbstractLineBasedTripleWriter(Writer writer) {
-        this(writer, new NodeFormatterNT());
-    }
-
-    /**
-     * Creates a new writer using the specified node formatter
-     * 
-     * @param writer
-     *            Writer
-     * @param formatter
-     *            Node formatter
-     */
-    public AbstractLineBasedTripleWriter(Writer writer, NodeFormatter formatter) {
-        super(writer, formatter);
-    }
-
-    @Override
-    protected Node[] getNodes(TripleWritable tuple) {
-        Triple t = tuple.get();
-        return new Node[] { t.getSubject(), t.getPredicate(), t.getObject() };
-    }
-
-}
+import org.apache.jena.riot.out.NodeFormatter;
+import org.apache.jena.riot.out.NodeFormatterNT;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.Triple;
+
+/**
+ * An abstract writer for line based triple formats
+ * 
+ * @param <TKey>
+ *            Key type
+ * 
+ */
+public abstract class AbstractLineBasedTripleWriter<TKey> extends
+		AbstractLineBasedNodeTupleWriter<TKey, Triple, TripleWritable> {
+
+	/**
+	 * Creates a new writer using the default NTriples node formatter
+	 * 
+	 * @param writer
+	 *            Writer
+	 */
+	public AbstractLineBasedTripleWriter(Writer writer) {
+		this(writer, new NodeFormatterNT());
+	}
+
+	/**
+	 * Creates a new writer using the specified node formatter
+	 * 
+	 * @param writer
+	 *            Writer
+	 * @param formatter
+	 *            Node formatter
+	 */
+	public AbstractLineBasedTripleWriter(Writer writer, NodeFormatter formatter) {
+		super(writer, formatter);
+	}
+
+	@Override
+	protected Node[] getNodes(TripleWritable tuple) {
+		Triple t = tuple.get();
+		return new Node[] { t.getSubject(), t.getPredicate(), t.getObject() };
+	}
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractStreamRdfNodeTupleWriter.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractStreamRdfNodeTupleWriter.java?rev=1598254&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractStreamRdfNodeTupleWriter.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/AbstractStreamRdfNodeTupleWriter.java Thu May 29 11:07:18 2014
@@ -0,0 +1,53 @@
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.IOException;
+import java.io.Writer;
+
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
+import org.apache.jena.riot.system.StreamRDF;
+
+public abstract class AbstractStreamRdfNodeTupleWriter<TKey, TTuple, TValue extends AbstractNodeTupleWritable<TTuple>>
+		extends RecordWriter<TKey, TValue> {
+
+	private StreamRDF stream;
+	private Writer writer;
+
+	public AbstractStreamRdfNodeTupleWriter(StreamRDF stream, Writer writer) {
+		if (stream == null)
+			throw new NullPointerException("stream cannot be null");
+		if (writer == null)
+			throw new NullPointerException("writer cannot be null");
+		this.stream = stream;
+		this.stream.start();
+		this.writer = writer;
+	}
+
+	@Override
+	public void close(TaskAttemptContext context) throws IOException,
+			InterruptedException {
+		this.stream.finish();
+		this.writer.close();
+	}
+
+	@Override
+	public void write(TKey key, TValue value) throws IOException,
+			InterruptedException {
+		this.sendOutput(key, value, this.stream);
+	}
+
+	/**
+	 * Method that handles an actual key value pair passing it to the
+	 * {@link StreamRDF} instance as appropriate
+	 * 
+	 * @param key
+	 *            Key
+	 * @param value
+	 *            Value
+	 * @param stream
+	 *            RDF Stream
+	 */
+	protected abstract void sendOutput(TKey key, TValue value, StreamRDF stream);
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/BatchedTriGWriter.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/BatchedTriGWriter.java?rev=1598254&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/BatchedTriGWriter.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/BatchedTriGWriter.java Thu May 29 11:07:18 2014
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.Writer;
+
+import org.apache.jena.riot.Lang;
+
+/**
+ * A record writer for TriG that uses the batched approach, note that this
+ * approach will produce invalid data when blank nodes span batches
+ *  
+ * @param <TKey>
+ *            Key type
+ */
+public class BatchedTriGWriter<TKey> extends AbstractBatchedQuadWriter<TKey> {
+
+	/**
+	 * Creates a new record writer
+	 * 
+	 * @param writer
+	 *            Writer
+	 * @param batchSize
+	 *            Batch size
+	 */
+	public BatchedTriGWriter(Writer writer, long batchSize) {
+		super(writer, batchSize);
+	}
+
+	@Override
+	protected Lang getRdfLanguage() {
+		return Lang.TRIG;
+	}
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/BatchedTurtleWriter.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/BatchedTurtleWriter.java?rev=1598254&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/BatchedTurtleWriter.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/BatchedTurtleWriter.java Thu May 29 11:07:18 2014
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.Writer;
+
+import org.apache.jena.riot.Lang;
+
+/**
+ * A record writer for Turtle that uses the batched approach, note that this
+ * approach will produce invalid data when blank nodes span batches
+ * 
+ * @author rvesse
+ * 
+ * @param <TKey>
+ */
+public class BatchedTurtleWriter<TKey> extends
+		AbstractBatchedTripleWriter<TKey> {
+
+	/**
+	 * Creates a new record writer
+	 * 
+	 * @param writer
+	 *            Writer
+	 * @param batchSize
+	 *            Batch size
+	 */
+	public BatchedTurtleWriter(Writer writer, long batchSize) {
+		super(writer, batchSize);
+	}
+
+	@Override
+	protected Lang getRdfLanguage() {
+		return Lang.TURTLE;
+	}
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/StreamRdfQuadWriter.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/StreamRdfQuadWriter.java?rev=1598254&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/StreamRdfQuadWriter.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/StreamRdfQuadWriter.java Thu May 29 11:07:18 2014
@@ -0,0 +1,27 @@
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.Writer;
+
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.system.StreamRDF;
+
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * A writer for {@link StreamRDF} based quad writers
+ * 
+ * @param <TKey>
+ *            Key type
+ */
+public class StreamRdfQuadWriter<TKey> extends
+		AbstractStreamRdfNodeTupleWriter<TKey, Quad, QuadWritable> {
+
+	public StreamRdfQuadWriter(StreamRDF stream, Writer writer) {
+		super(stream, writer);
+	}
+
+	@Override
+	protected void sendOutput(TKey key, QuadWritable value, StreamRDF stream) {
+		stream.quad(value.get());
+	}
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/StreamRdfTripleWriter.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/StreamRdfTripleWriter.java?rev=1598254&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/StreamRdfTripleWriter.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/StreamRdfTripleWriter.java Thu May 29 11:07:18 2014
@@ -0,0 +1,26 @@
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.Writer;
+
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.system.StreamRDF;
+
+import com.hp.hpl.jena.graph.Triple;
+
+/**
+ * A writer for {@link StreamRDF} based triple writers
+ * 
+ * @param <TKey>
+ *            Key type
+ */
+public class StreamRdfTripleWriter<TKey> extends AbstractStreamRdfNodeTupleWriter<TKey, Triple, TripleWritable> {
+
+	public StreamRdfTripleWriter(StreamRDF stream, Writer writer) {
+		super(stream, writer);
+	}
+
+	@Override
+	protected void sendOutput(TKey key, TripleWritable value, StreamRDF stream) {
+		stream.triple(value.get());
+	}
+}

Modified: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormatTests.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormatTests.java?rev=1598254&r1=1598253&r2=1598254&view=diff
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormatTests.java (original)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormatTests.java Thu May 29 11:07:18 2014
@@ -114,8 +114,13 @@ public abstract class AbstractNodeTupleO
         RDFDataMgr.parse(counter, f.getAbsolutePath(), this.getRdfLanguage(), null);
         return counter.count();
     }
-
-    protected final void checkTuples(File f, long expected) {
+
+    /**
+     * Checks that tuples are as expected
+     * @param f File
+     * @param expected Expected number of tuples
+     */
+    protected void checkTuples(File f, long expected) {
         Assert.assertEquals(expected, this.countTuples(f));
     }
 

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/BatchedTriGOutputTest.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/BatchedTriGOutputTest.java?rev=1598254&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/BatchedTriGOutputTest.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/BatchedTriGOutputTest.java Thu May 29 11:07:18 2014
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.io.output.BatchedTriGOutputFormat;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.Lang;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+
+/**
+ * Tests for TriG output
+ * 
+ * @author rvesse
+ * 
+ */
+@RunWith(Parameterized.class)
+public class BatchedTriGOutputTest extends AbstractQuadOutputFormatTests {
+
+    static long $bs1 = RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE;
+    static long $bs2 = 1000;
+    static long $bs3 = 100;
+    static long $bs4 = 1;
+
+    /**
+     * @return Test parameters
+     */
+    @Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] { { $bs1 }, { $bs2 }, { $bs3 }, { $bs4 } });
+    }
+
+    private final long batchSize;
+
+    /**
+     * Creates new tests
+     * 
+     * @param batchSize
+     *            Batch size
+     */
+    public BatchedTriGOutputTest(long batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    protected String getFileExtension() {
+        return ".trig";
+    }
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return Lang.TRIG;
+    }
+
+    @Override
+    protected Configuration prepareConfiguration() {
+        Configuration config = super.prepareConfiguration();
+        config.setLong(RdfIOConstants.OUTPUT_BATCH_SIZE, this.batchSize);
+        return config;
+    }
+
+    @Override
+    protected OutputFormat<NullWritable, QuadWritable> getOutputFormat() {
+        return new BatchedTriGOutputFormat<NullWritable>();
+    }
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/BatchedTurtleOutputTest.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/BatchedTurtleOutputTest.java?rev=1598254&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/BatchedTurtleOutputTest.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/BatchedTurtleOutputTest.java Thu May 29 11:07:18 2014
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+
+/**
+ * Tests for Turtle output
+ * 
+ * @author rvesse
+ * 
+ */
+@RunWith(Parameterized.class)
+public class BatchedTurtleOutputTest extends AbstractTripleOutputFormatTests {
+
+    static long $bs1 = RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE;
+    static long $bs2 = 1000;
+    static long $bs3 = 100;
+    static long $bs4 = 1;
+
+    /**
+     * @return Test parameters
+     */
+    @Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] { { $bs1 }, { $bs2 }, { $bs3 }, { $bs4 } });
+    }
+
+    private final long batchSize;
+
+    /**
+     * Creates new tests
+     * 
+     * @param batchSize
+     *            Batch size
+     */
+    public BatchedTurtleOutputTest(long batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    protected String getFileExtension() {
+        return ".ttl";
+    }
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return Lang.TURTLE;
+    }
+    
+    @Override
+    protected Configuration prepareConfiguration() {
+        Configuration config = super.prepareConfiguration();
+        config.setLong(RdfIOConstants.OUTPUT_BATCH_SIZE, this.batchSize);
+        return config;
+    }
+
+    @Override
+    protected OutputFormat<NullWritable, TripleWritable> getOutputFormat() {
+        return new BatchedTurtleOutputFormat<NullWritable>();
+    }
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/StreamedTriGOutputTest.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/StreamedTriGOutputTest.java?rev=1598254&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/StreamedTriGOutputTest.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/StreamedTriGOutputTest.java Thu May 29 11:07:18 2014
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.Lang;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+
+/**
+ * Tests for Turtle output
+ * 
+ * @author rvesse
+ * 
+ */
+@RunWith(Parameterized.class)
+public class StreamedTriGOutputTest extends AbstractQuadOutputFormatTests {
+
+    static long $bs1 = RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE;
+    static long $bs2 = 1000;
+    static long $bs3 = 100;
+    static long $bs4 = 1;
+
+    /**
+     * @return Test parameters
+     */
+    @Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] { { $bs1 }, { $bs2 }, { $bs3 }, { $bs4 } });
+    }
+
+    private final long batchSize;
+
+    /**
+     * Creates new tests
+     * 
+     * @param batchSize
+     *            Batch size
+     */
+    public StreamedTriGOutputTest(long batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    protected String getFileExtension() {
+        return ".trig";
+    }
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return Lang.TRIG;
+    }
+    
+    @Override
+    protected Configuration prepareConfiguration() {
+        Configuration config = super.prepareConfiguration();
+        config.setLong(RdfIOConstants.OUTPUT_BATCH_SIZE, this.batchSize);
+        return config;
+    }
+
+    @Override
+    protected OutputFormat<NullWritable, QuadWritable> getOutputFormat() {
+        return new TriGOutputFormat<NullWritable>();
+    }
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/StreamedTurtleOutputTest.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/StreamedTurtleOutputTest.java?rev=1598254&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/StreamedTurtleOutputTest.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/StreamedTurtleOutputTest.java Thu May 29 11:07:18 2014
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.io.output.TurtleOutputFormat;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+
+/**
+ * Tests for Turtle output
+ * 
+ * @author rvesse
+ * 
+ */
+@RunWith(Parameterized.class)
+public class StreamedTurtleOutputTest extends AbstractTripleOutputFormatTests {
+
+    static long $bs1 = RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE;
+    static long $bs2 = 1000;
+    static long $bs3 = 100;
+    static long $bs4 = 1;
+
+    /**
+     * @return Test parameters
+     */
+    @Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] { { $bs1 }, { $bs2 }, { $bs3 }, { $bs4 } });
+    }
+
+    private final long batchSize;
+
+    /**
+     * Creates new tests
+     * 
+     * @param batchSize
+     *            Batch size
+     */
+    public StreamedTurtleOutputTest(long batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    protected String getFileExtension() {
+        return ".ttl";
+    }
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return Lang.TURTLE;
+    }
+    
+    @Override
+    protected Configuration prepareConfiguration() {
+        Configuration config = super.prepareConfiguration();
+        config.setLong(RdfIOConstants.OUTPUT_BATCH_SIZE, this.batchSize);
+        return config;
+    }
+
+    @Override
+    protected OutputFormat<NullWritable, TripleWritable> getOutputFormat() {
+        return new TurtleOutputFormat<NullWritable>();
+    }
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TriGBlankNodeOutputTests.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TriGBlankNodeOutputTests.java?rev=1598254&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TriGBlankNodeOutputTests.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TriGBlankNodeOutputTests.java Thu May 29 11:07:18 2014
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.RDFDataMgr;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.hp.hpl.jena.datatypes.xsd.XSDDatatype;
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.NodeFactory;
+import com.hp.hpl.jena.rdf.model.Model;
+import com.hp.hpl.jena.rdf.model.ResIterator;
+import com.hp.hpl.jena.rdf.model.Resource;
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * Tests for TriG output with blank nodes
+ * 
+ * @author rvesse
+ * 
+ */
+@RunWith(Parameterized.class)
+public class TriGBlankNodeOutputTests extends StreamedTriGOutputTest {
+
+	static long $bs1 = RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE;
+	static long $bs2 = 1000;
+	static long $bs3 = 100;
+	static long $bs4 = 1;
+
+	/**
+	 * @return Test parameters
+	 */
+	@Parameters
+	public static Collection<Object[]> data() {
+		return Arrays.asList(new Object[][] { { $bs1 }, { $bs2 }, { $bs3 },
+				{ $bs4 } });
+	}
+
+	/**
+	 * Creates new tests
+	 * 
+	 * @param batchSize
+	 *            Batch size
+	 */
+	public TriGBlankNodeOutputTests(long batchSize) {
+		super(batchSize);
+	}
+
+	@Override
+	protected Iterator<QuadWritable> generateTuples(int num) {
+		List<QuadWritable> qs = new ArrayList<QuadWritable>();
+		Node subject = NodeFactory.createAnon();
+		for (int i = 0; i < num; i++) {
+			Quad t = new Quad(
+					NodeFactory.createURI("http://example.org/graphs/" + i),
+					subject,
+					NodeFactory.createURI("http://example.org/predicate"),
+					NodeFactory.createLiteral(Integer.toString(i),
+							XSDDatatype.XSDinteger));
+			qs.add(new QuadWritable(t));
+		}
+		return qs.iterator();
+	}
+
+	@Override
+	protected void checkTuples(File f, long expected) {
+		super.checkTuples(f, expected);
+
+		Model m = RDFDataMgr.loadModel("file://" + f.getAbsolutePath(),
+				this.getRdfLanguage());
+		ResIterator iter = m.listSubjects();
+		Set<Node> subjects = new HashSet<Node>();
+		while (iter.hasNext()) {
+			Resource res = iter.next();
+			Assert.assertTrue(res.isAnon());
+			subjects.add(res.asNode());
+		}
+		// Should only be one subject unless the data was empty in which case
+		// there will be zero subjects
+		Assert.assertEquals(expected == 0 ? 0 : 1, subjects.size());
+	}
+
+	@Override
+	protected OutputFormat<NullWritable, QuadWritable> getOutputFormat() {
+		return new TriGOutputFormat<NullWritable>();
+	}
+
+}

Added: jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TurtleBlankNodeOutputTests.java
URL: http://svn.apache.org/viewvc/jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TurtleBlankNodeOutputTests.java?rev=1598254&view=auto
==============================================================================
--- jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TurtleBlankNodeOutputTests.java (added)
+++ jena/Experimental/hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/output/TurtleBlankNodeOutputTests.java Thu May 29 11:07:18 2014
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.io.output.TurtleOutputFormat;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.RDFDataMgr;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.hp.hpl.jena.datatypes.xsd.XSDDatatype;
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.NodeFactory;
+import com.hp.hpl.jena.graph.Triple;
+import com.hp.hpl.jena.rdf.model.Model;
+import com.hp.hpl.jena.rdf.model.ResIterator;
+import com.hp.hpl.jena.rdf.model.Resource;
+
+/**
+ * Tests for Turtle output with blank nodes
+ * 
+ * @author rvesse
+ * 
+ */
+@RunWith(Parameterized.class)
+public class TurtleBlankNodeOutputTests extends StreamedTurtleOutputTest {
+
+	static long $bs1 = RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE;
+	static long $bs2 = 1000;
+	static long $bs3 = 100;
+	static long $bs4 = 1;
+
+	/**
+	 * @return Test parameters
+	 */
+	@Parameters
+	public static Collection<Object[]> data() {
+		return Arrays.asList(new Object[][] { { $bs1 }, { $bs2 }, { $bs3 },
+				{ $bs4 } });
+	}
+
+	/**
+	 * Creates new tests
+	 * 
+	 * @param batchSize
+	 *            Batch size
+	 */
+	public TurtleBlankNodeOutputTests(long batchSize) {
+		super(batchSize);
+	}
+
+	@Override
+	protected Iterator<TripleWritable> generateTuples(int num) {
+		List<TripleWritable> ts = new ArrayList<TripleWritable>();
+		Node subject = NodeFactory.createAnon();
+		for (int i = 0; i < num; i++) {
+			Triple t = new Triple(subject,
+					NodeFactory.createURI("http://example.org/predicate"),
+					NodeFactory.createLiteral(Integer.toString(i),
+							XSDDatatype.XSDinteger));
+			ts.add(new TripleWritable(t));
+		}
+		return ts.iterator();
+	}
+
+	@Override
+	protected void checkTuples(File f, long expected) {
+		super.checkTuples(f, expected);
+
+		Model m = RDFDataMgr.loadModel("file://" + f.getAbsolutePath(),
+				this.getRdfLanguage());
+		ResIterator iter = m.listSubjects();
+		Set<Node> subjects = new HashSet<Node>();
+		while (iter.hasNext()) {
+			Resource res = iter.next();
+			Assert.assertTrue(res.isAnon());
+			subjects.add(res.asNode());
+		}
+		// Should only be one subject unless the data was empty in which case
+		// there will be zero subjects
+		Assert.assertEquals(expected == 0 ? 0 : 1, subjects.size());
+	}
+
+	@Override
+	protected OutputFormat<NullWritable, TripleWritable> getOutputFormat() {
+		return new TurtleOutputFormat<NullWritable>();
+	}
+
+}