You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by rv...@apache.org on 2014/11/12 15:35:32 UTC

[5/6] jena git commit: Add support for dynamically selected triples/quads output

Add support for dynamically selected triples/quads output

This commit adds QuadsOutputFormat, TriplesOutputFormat and
TriplesOrQuadsOutputFormat which are counterparts to the existing
equivalent input formats.  These output formats dynamically select the
actual record writer to use based upon the target output filename.  This
is useful for jobs where the output filename may be user supplied so the
desired output format is not known in advance.


Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/394638c5
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/394638c5
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/394638c5

Branch: refs/heads/hadoop-rdf
Commit: 394638c506eac5f80466336c9f04ac4154c16fa6
Parents: e57cd52
Author: Rob Vesse <rv...@apache.org>
Authored: Wed Nov 12 11:48:17 2014 +0000
Committer: Rob Vesse <rv...@apache.org>
Committed: Wed Nov 12 11:48:17 2014 +0000

----------------------------------------------------------------------
 .../rdf/io/input/readers/AbstractRdfReader.java | 180 +++++++++----------
 .../AbstractBatchedNodeTupleOutputFormat.java   |   3 +-
 .../output/AbstractNodeTupleOutputFormat.java   | 167 ++++++++---------
 .../AbstractStreamRdfNodeTupleOutputFormat.java |   3 +-
 .../hadoop/rdf/io/output/QuadsOutputFormat.java |  64 +++++++
 .../io/output/TriplesOrQuadsOutputFormat.java   |  74 ++++++++
 .../rdf/io/output/TriplesOutputFormat.java      |  61 +++++++
 .../output/jsonld/JsonLDQuadOutputFormat.java   |   3 +-
 .../output/jsonld/JsonLDTripleOutputFormat.java |   3 +-
 .../io/output/nquads/NQuadsOutputFormat.java    |   3 +-
 .../output/ntriples/NTriplesOutputFormat.java   |   3 +-
 .../io/output/rdfjson/RdfJsonOutputFormat.java  |   3 +-
 .../io/output/rdfxml/RdfXmlOutputFormat.java    |   3 +-
 .../io/output/writers/QuadsToTriplesWriter.java |  59 ++++++
 14 files changed, 450 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/394638c5/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java
index a7f5e8e..d0ffed8 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/AbstractRdfReader.java
@@ -16,95 +16,93 @@
  * limitations under the License.
  */
 
-package org.apache.jena.hadoop.rdf.io.input.readers;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+package org.apache.jena.hadoop.rdf.io.input.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
-import org.apache.jena.riot.Lang;
-import org.apache.jena.riot.RDFLanguages;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * An abstract record reader for arbitrary RDF which provides support for
- * selecting the actual record reader to use based on detecting the RDF language
- * from the file name
- * 
- * 
- * 
- * @param <TValue>
- *            Tuple type
- * @param <T>
- *            Writable tuple type
- */
-public abstract class AbstractRdfReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends
-        RecordReader<LongWritable, T> {
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractRdfReader.class);
-
-    private RecordReader<LongWritable, T> reader;
-
-    @Override
-    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
-        LOG.debug("initialize({}, {})", genericSplit, context);
-
-        // Assuming file split
-        if (!(genericSplit instanceof FileSplit))
-            throw new IOException("This record reader only supports FileSplit inputs");
-
-        // Find RDF language
-        FileSplit split = (FileSplit) genericSplit;
-        Path path = split.getPath();
-        Lang lang = RDFLanguages.filenameToLang(path.getName());
-        if (lang == null)
-            throw new IOException("There is no registered RDF language for the input file " + path.toString());
-
-        // Select the record reader and initialize
-        this.reader = this.selectRecordReader(lang);
-        this.reader.initialize(split, context);
-    }
-
-    /**
-     * Selects the appropriate record reader to use for the given RDF language
-     * 
-     * @param lang
-     *            RDF language
-     * @return Record reader
-     * @throws IOException
-     *             Should be thrown if no record reader can be selected
-     */
-    protected abstract RecordReader<LongWritable, T> selectRecordReader(Lang lang) throws IOException;
-
-    @Override
-    public final boolean nextKeyValue() throws IOException, InterruptedException {
-        return this.reader.nextKeyValue();
-    }
-
-    @Override
-    public final LongWritable getCurrentKey() throws IOException, InterruptedException {
-        return this.reader.getCurrentKey();
-    }
-
-    @Override
-    public final T getCurrentValue() throws IOException, InterruptedException {
-        return this.reader.getCurrentValue();
-    }
-
-    @Override
-    public final float getProgress() throws IOException, InterruptedException {
-        return this.reader.getProgress();
-    }
-
-    @Override
-    public final void close() throws IOException {
-        this.reader.close();
-    }
-
-}
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract record reader for arbitrary RDF which provides support for
+ * selecting the actual record reader to use based on detecting the RDF language
+ * from the file name
+ * 
+ * @param <TValue>
+ *            Tuple type
+ * @param <T>
+ *            Writable tuple type
+ */
+public abstract class AbstractRdfReader<TValue, T extends AbstractNodeTupleWritable<TValue>> extends
+        RecordReader<LongWritable, T> {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractRdfReader.class);
+
+    private RecordReader<LongWritable, T> reader;
+
+    @Override
+    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException,
+            InterruptedException {
+        LOG.debug("initialize({}, {})", genericSplit, context);
+
+        // Assuming file split
+        if (!(genericSplit instanceof FileSplit))
+            throw new IOException("This record reader only supports FileSplit inputs");
+
+        // Find RDF language
+        FileSplit split = (FileSplit) genericSplit;
+        Path path = split.getPath();
+        Lang lang = RDFLanguages.filenameToLang(path.getName());
+        if (lang == null)
+            throw new IOException("There is no registered RDF language for the input file " + path.toString());
+
+        // Select the record reader and initialize
+        this.reader = this.selectRecordReader(lang);
+        this.reader.initialize(split, context);
+    }
+
+    /**
+     * Selects the appropriate record reader to use for the given RDF language
+     * 
+     * @param lang
+     *            RDF language
+     * @return Record reader
+     * @throws IOException
+     *             Should be thrown if no record reader can be selected
+     */
+    protected abstract RecordReader<LongWritable, T> selectRecordReader(Lang lang) throws IOException;
+
+    @Override
+    public final boolean nextKeyValue() throws IOException, InterruptedException {
+        return this.reader.nextKeyValue();
+    }
+
+    @Override
+    public final LongWritable getCurrentKey() throws IOException, InterruptedException {
+        return this.reader.getCurrentKey();
+    }
+
+    @Override
+    public final T getCurrentValue() throws IOException, InterruptedException {
+        return this.reader.getCurrentValue();
+    }
+
+    @Override
+    public final float getProgress() throws IOException, InterruptedException {
+        return this.reader.getProgress();
+    }
+
+    @Override
+    public final void close() throws IOException {
+        this.reader.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/394638c5/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractBatchedNodeTupleOutputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractBatchedNodeTupleOutputFormat.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractBatchedNodeTupleOutputFormat.java
index ac4ea2b..02fbf9c 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractBatchedNodeTupleOutputFormat.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractBatchedNodeTupleOutputFormat.java
@@ -21,6 +21,7 @@ package org.apache.jena.hadoop.rdf.io.output;
 import java.io.Writer;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
 import org.apache.jena.hadoop.rdf.io.output.writers.AbstractBatchedNodeTupleWriter;
@@ -44,7 +45,7 @@ public abstract class AbstractBatchedNodeTupleOutputFormat<TKey, TTuple, TValue
         AbstractNodeTupleOutputFormat<TKey, TTuple, TValue> {
 
     @Override
-    protected RecordWriter<TKey, TValue> getRecordWriter(Writer writer, Configuration config) {
+    protected RecordWriter<TKey, TValue> getRecordWriter(Writer writer, Configuration config, Path outputPath) {
         long batchSize = config.getLong(RdfIOConstants.OUTPUT_BATCH_SIZE, RdfIOConstants.DEFAULT_OUTPUT_BATCH_SIZE);
         return this.getRecordWriter(writer, batchSize);
     }

http://git-wip-us.apache.org/repos/asf/jena/blob/394638c5/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormat.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormat.java
index 1944bda..3a5438e 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormat.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractNodeTupleOutputFormat.java
@@ -16,84 +16,91 @@
  * limitations under the License.
  */
 
-package org.apache.jena.hadoop.rdf.io.output;
-
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * An abstract implementation of an output format for line based tuple formats
- * where the key is ignored and only the tuple values will be output
- * 
- * 
- * @param <TKey>
- *            Key type
- * @param <TValue>
- *            Tuple value type
- * @param <T>
- *            Writable node tuple type
- * 
- */
-public abstract class AbstractNodeTupleOutputFormat<TKey, TValue, T extends AbstractNodeTupleWritable<TValue>> extends
-        FileOutputFormat<TKey, T> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeTupleOutputFormat.class);
-
-    @Override
-    public RecordWriter<TKey, T> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
-        Configuration config = context.getConfiguration();
-        boolean isCompressed = getCompressOutput(context);
-        CompressionCodec codec = null;
-        String extension = this.getFileExtension();
-        if (isCompressed) {
-            Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(context, GzipCodec.class);
-            codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, config);
-            extension += codec.getDefaultExtension();
-        }
-        Path file = getDefaultWorkFile(context, extension);
-        LOG.info("Writing output to file " + file);
-        FileSystem fs = file.getFileSystem(config);
-        if (!isCompressed) {
-            FSDataOutputStream fileOut = fs.create(file, false);
-            return this.getRecordWriter(new OutputStreamWriter(fileOut), config);
-        } else {
-            FSDataOutputStream fileOut = fs.create(file, false);
-            return this.getRecordWriter(new OutputStreamWriter(codec.createOutputStream(fileOut)), config);
-        }
-    }
-
-    /**
-     * Gets the file extension to use for output
-     * 
-     * @return File extension including the '.'
-     */
-    protected abstract String getFileExtension();
-
-    /**
-     * Gets the record writer to use
-     * 
-     * @param writer
-     *            Writer to write output to
-     * @param config
-     *            Configuration
-     * @return Record writer
-     */
-    protected abstract RecordWriter<TKey, T> getRecordWriter(Writer writer, Configuration config);
-
-}
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract implementation of an output format for line based tuple formats
+ * where the key is ignored and only the tuple values will be output
+ * 
+ * 
+ * @param <TKey>
+ *            Key type
+ * @param <TValue>
+ *            Tuple value type
+ * @param <T>
+ *            Writable node tuple type
+ * 
+ */
+public abstract class AbstractNodeTupleOutputFormat<TKey, TValue, T extends AbstractNodeTupleWritable<TValue>> extends
+        FileOutputFormat<TKey, T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeTupleOutputFormat.class);
+
+    @Override
+    public RecordWriter<TKey, T> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+        Configuration config = context.getConfiguration();
+        boolean isCompressed = getCompressOutput(context);
+        CompressionCodec codec = null;
+        String extension = this.getFileExtension();
+        if (isCompressed) {
+            Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(context, GzipCodec.class);
+            codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, config);
+            extension += codec.getDefaultExtension();
+        }
+        Path file = getDefaultWorkFile(context, extension);
+        LOG.info("Writing output to file " + file);
+        FileSystem fs = file.getFileSystem(config);
+        if (!isCompressed) {
+            FSDataOutputStream fileOut = fs.create(file, false);
+            return this.getRecordWriter(new OutputStreamWriter(fileOut), config, file);
+        } else {
+            // TODO Do we need to append the relevant extension to the path
+            // here?
+            FSDataOutputStream fileOut = fs.create(file, false);
+            return this.getRecordWriter(new OutputStreamWriter(codec.createOutputStream(fileOut)), config, file);
+        }
+    }
+
+    /**
+     * Gets the file extension to use for output
+     * 
+     * @return File extension including the '.'
+     */
+    protected abstract String getFileExtension();
+
+    /**
+     * Gets the record writer to use
+     * 
+     * @param writer
+     *            Writer to write output to
+     * @param config
+     *            Configuration
+     * @param outputPath
+     *            Output path being written to
+     * @return Record writer
+     * @throws IOException
+     *             May be thrown if a record writer cannot be obtained for any
+     *             reason
+     */
+    protected abstract RecordWriter<TKey, T> getRecordWriter(Writer writer, Configuration config, Path outputPath)
+            throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/394638c5/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractStreamRdfNodeTupleOutputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractStreamRdfNodeTupleOutputFormat.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractStreamRdfNodeTupleOutputFormat.java
index 97027f5..30999ae 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractStreamRdfNodeTupleOutputFormat.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/AbstractStreamRdfNodeTupleOutputFormat.java
@@ -21,6 +21,7 @@ package org.apache.jena.hadoop.rdf.io.output;
 import java.io.Writer;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
 import org.apache.jena.riot.system.StreamRDF;
@@ -40,7 +41,7 @@ public abstract class AbstractStreamRdfNodeTupleOutputFormat<TKey, TTuple, TValu
         extends AbstractNodeTupleOutputFormat<TKey, TTuple, TValue> {
 
     @Override
-    protected RecordWriter<TKey, TValue> getRecordWriter(Writer writer, Configuration config) {
+    protected RecordWriter<TKey, TValue> getRecordWriter(Writer writer, Configuration config, Path outputPath) {
         return getRecordWriter(getStream(writer, config), writer, config);
     }
 

http://git-wip-us.apache.org/repos/asf/jena/blob/394638c5/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/QuadsOutputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/QuadsOutputFormat.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/QuadsOutputFormat.java
new file mode 100644
index 0000000..cc9fe2f
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/QuadsOutputFormat.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.io.IOException;
+import java.io.Writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * An output format for RDF quads that dynamically selects the appropriate quad
+ * writer to use based on the file extension of the output file.
+ * <p>
+ * For example this is useful when the output format may be controlled by a user
+ * supplied filename i.e. the desired RDF output format is not precisely known
+ * in advance
+ * </p>
+ * 
+ * @param <TKey>
+ *            Key type
+ */
+public abstract class QuadsOutputFormat<TKey> extends AbstractNodeTupleOutputFormat<TKey, Quad, QuadWritable> {
+
+    @Override
+    protected RecordWriter<TKey, QuadWritable> getRecordWriter(Writer writer, Configuration config, Path outputPath)
+            throws IOException {
+        Lang lang = RDFLanguages.filenameToLang(outputPath.getName());
+        if (lang == null)
+            throw new IOException("There is no registered RDF language for the output file " + outputPath.toString());
+
+        if (!RDFLanguages.isQuads(lang))
+            throw new IOException(
+                    lang.getName()
+                            + " is not a RDF quads format, perhaps you wanted TriplesOutputFormat or TriplesOrQuadsOutputFormat instead?");
+
+        // This will throw an appropriate error if the language does not support
+        // writing quads
+        return HadoopRdfIORegistry.<TKey> createQuadWriter(lang, writer, config);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/394638c5/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriplesOrQuadsOutputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriplesOrQuadsOutputFormat.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriplesOrQuadsOutputFormat.java
new file mode 100644
index 0000000..3eaf0d7
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriplesOrQuadsOutputFormat.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.io.IOException;
+import java.io.Writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.jena.hadoop.rdf.io.output.writers.QuadsToTriplesWriter;
+import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * An output format for RDF triples/quads that dynamically selects the
+ * appropriate triple/quad writer to use based on the file extension of the
+ * output file.
+ * <p>
+ * For example this is useful when the output format may be controlled by a user
+ * supplied filename i.e. the desired RDF output format is not precisely known
+ * in advance.
+ * </p>
+ * <h3>Warning</h3>
+ * <p>
+ * Where the format is determined to be triples the quads are converted into
+ * triples are thus will lose any graph information that might be carried.
+ * </p>
+ * 
+ * @param <TKey>
+ *            Key type
+ */
+public abstract class TriplesOrQuadsOutputFormat<TKey> extends AbstractNodeTupleOutputFormat<TKey, Quad, QuadWritable> {
+
+    @Override
+    protected RecordWriter<TKey, QuadWritable> getRecordWriter(Writer writer, Configuration config, Path outputPath)
+            throws IOException {
+        Lang lang = RDFLanguages.filenameToLang(outputPath.getName());
+        if (lang == null)
+            throw new IOException("There is no registered RDF language for the output file " + outputPath.toString());
+
+        if (!RDFLanguages.isQuads(lang) && !RDFLanguages.isTriples(lang))
+            throw new IOException(lang.getName() + " is not a RDF triples/quads format");
+
+        if (HadoopRdfIORegistry.hasQuadWriter(lang)) {
+            // Supports quads directly
+            return HadoopRdfIORegistry.<TKey> createQuadWriter(lang, writer, config);
+        } else {
+            // Try to create a triples writer and wrap downwards from quads
+            // This will throw an error if a triple writer is not available
+            return new QuadsToTriplesWriter<TKey>(HadoopRdfIORegistry.<TKey> createTripleWriter(lang, writer, config));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/394638c5/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriplesOutputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriplesOutputFormat.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriplesOutputFormat.java
new file mode 100644
index 0000000..d9d4189
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/TriplesOutputFormat.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.hadoop.rdf.io.output;
+
+import java.io.IOException;
+import java.io.Writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+import com.hp.hpl.jena.graph.Triple;
+
+/**
+ * An output format for RDF triples that dynamically selects the appropriate triple
+ * writer to use based on the file extension of the output file.
+ * <p>
+ * For example this is useful when the output format may be controlled by a user
+ * supplied filename i.e. the desired RDF output format is not precisely known
+ * in advance
+ * </p>
+ * 
+ * @param <TKey>
+ *            Key type
+ */
+public abstract class TriplesOutputFormat<TKey> extends AbstractNodeTupleOutputFormat<TKey, Triple, TripleWritable> {
+
+    @Override
+    protected RecordWriter<TKey, TripleWritable> getRecordWriter(Writer writer, Configuration config, Path outputPath) throws IOException {
+        Lang lang = RDFLanguages.filenameToLang(outputPath.getName());
+        if (lang == null)
+            throw new IOException("There is no registered RDF language for the output file " + outputPath.toString());
+        
+        if (!RDFLanguages.isTriples(lang)) throw new IOException(
+                lang.getName()
+                + " is not a RDF triples format, perhaps you wanted QuadsOutputFormat or TriplesOrQuadsOutputFormat instead?");
+        
+        // This will throw an appropriate error if the language does not support writing triples
+        return HadoopRdfIORegistry.<TKey>createTripleWriter(lang, writer, config);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/394638c5/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/jsonld/JsonLDQuadOutputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/jsonld/JsonLDQuadOutputFormat.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/jsonld/JsonLDQuadOutputFormat.java
index d6a87b4..8f4797a 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/jsonld/JsonLDQuadOutputFormat.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/jsonld/JsonLDQuadOutputFormat.java
@@ -21,6 +21,7 @@ package org.apache.jena.hadoop.rdf.io.output.jsonld;
 import java.io.Writer;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.jena.hadoop.rdf.io.output.AbstractNodeTupleOutputFormat;
 import org.apache.jena.hadoop.rdf.io.output.writers.jsonld.JsonLDQuadWriter;
@@ -36,7 +37,7 @@ public class JsonLDQuadOutputFormat<TKey> extends AbstractNodeTupleOutputFormat<
     }
 
     @Override
-    protected RecordWriter<TKey, QuadWritable> getRecordWriter(Writer writer, Configuration config) {
+    protected RecordWriter<TKey, QuadWritable> getRecordWriter(Writer writer, Configuration config, Path outputPath) {
         return new JsonLDQuadWriter<TKey>(writer);
     }
 

http://git-wip-us.apache.org/repos/asf/jena/blob/394638c5/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/jsonld/JsonLDTripleOutputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/jsonld/JsonLDTripleOutputFormat.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/jsonld/JsonLDTripleOutputFormat.java
index a96d7bf..a8cbeac 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/jsonld/JsonLDTripleOutputFormat.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/jsonld/JsonLDTripleOutputFormat.java
@@ -21,6 +21,7 @@ package org.apache.jena.hadoop.rdf.io.output.jsonld;
 import java.io.Writer;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.jena.hadoop.rdf.io.output.AbstractNodeTupleOutputFormat;
 import org.apache.jena.hadoop.rdf.io.output.writers.jsonld.JsonLDTripleWriter;
@@ -36,7 +37,7 @@ public class JsonLDTripleOutputFormat<TKey> extends AbstractNodeTupleOutputForma
     }
 
     @Override
-    protected RecordWriter<TKey, TripleWritable> getRecordWriter(Writer writer, Configuration config) {
+    protected RecordWriter<TKey, TripleWritable> getRecordWriter(Writer writer, Configuration config, Path outputPath) {
         return new JsonLDTripleWriter<TKey>(writer);
     }
 

http://git-wip-us.apache.org/repos/asf/jena/blob/394638c5/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/nquads/NQuadsOutputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/nquads/NQuadsOutputFormat.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/nquads/NQuadsOutputFormat.java
index 0f9e3cf..a8ab017 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/nquads/NQuadsOutputFormat.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/nquads/NQuadsOutputFormat.java
@@ -21,6 +21,7 @@ package org.apache.jena.hadoop.rdf.io.output.nquads;
 import java.io.Writer;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.jena.hadoop.rdf.io.output.AbstractNodeTupleOutputFormat;
 import org.apache.jena.hadoop.rdf.io.output.writers.nquads.NQuadsWriter;
@@ -39,7 +40,7 @@ import com.hp.hpl.jena.sparql.core.Quad;
 public class NQuadsOutputFormat<TKey> extends AbstractNodeTupleOutputFormat<TKey, Quad, QuadWritable> {
 
     @Override
-    protected RecordWriter<TKey, QuadWritable> getRecordWriter(Writer writer, Configuration config) {
+    protected RecordWriter<TKey, QuadWritable> getRecordWriter(Writer writer, Configuration config, Path outputPath) {
         return new NQuadsWriter<TKey>(writer);
     }
 

http://git-wip-us.apache.org/repos/asf/jena/blob/394638c5/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/ntriples/NTriplesOutputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/ntriples/NTriplesOutputFormat.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/ntriples/NTriplesOutputFormat.java
index 1fbf256..51b9b75 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/ntriples/NTriplesOutputFormat.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/ntriples/NTriplesOutputFormat.java
@@ -21,6 +21,7 @@ package org.apache.jena.hadoop.rdf.io.output.ntriples;
 import java.io.Writer;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.jena.hadoop.rdf.io.output.AbstractNodeTupleOutputFormat;
 import org.apache.jena.hadoop.rdf.io.output.writers.ntriples.NTriplesWriter;
@@ -38,7 +39,7 @@ import com.hp.hpl.jena.graph.Triple;
 public class NTriplesOutputFormat<TKey> extends AbstractNodeTupleOutputFormat<TKey, Triple, TripleWritable> {
 
     @Override
-    protected RecordWriter<TKey, TripleWritable> getRecordWriter(Writer writer, Configuration config) {
+    protected RecordWriter<TKey, TripleWritable> getRecordWriter(Writer writer, Configuration config, Path outputPath) {
         return new NTriplesWriter<TKey>(writer);
     }
 

http://git-wip-us.apache.org/repos/asf/jena/blob/394638c5/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/rdfjson/RdfJsonOutputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/rdfjson/RdfJsonOutputFormat.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/rdfjson/RdfJsonOutputFormat.java
index 49d8e54..e5fa114 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/rdfjson/RdfJsonOutputFormat.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/rdfjson/RdfJsonOutputFormat.java
@@ -21,6 +21,7 @@ package org.apache.jena.hadoop.rdf.io.output.rdfjson;
 import java.io.Writer;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.jena.hadoop.rdf.io.output.AbstractNodeTupleOutputFormat;
 import org.apache.jena.hadoop.rdf.io.output.writers.rdfjson.RdfJsonWriter;
@@ -44,7 +45,7 @@ public class RdfJsonOutputFormat<TKey> extends AbstractNodeTupleOutputFormat<TKe
     }
 
     @Override
-    protected RecordWriter<TKey, TripleWritable> getRecordWriter(Writer writer, Configuration config) {
+    protected RecordWriter<TKey, TripleWritable> getRecordWriter(Writer writer, Configuration config, Path outputPath) {
         return new RdfJsonWriter<TKey>(writer);
     }
 

http://git-wip-us.apache.org/repos/asf/jena/blob/394638c5/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/rdfxml/RdfXmlOutputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/rdfxml/RdfXmlOutputFormat.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/rdfxml/RdfXmlOutputFormat.java
index 7ef8c30..6c9a9ea 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/rdfxml/RdfXmlOutputFormat.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/rdfxml/RdfXmlOutputFormat.java
@@ -21,6 +21,7 @@ package org.apache.jena.hadoop.rdf.io.output.rdfxml;
 import java.io.Writer;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.jena.hadoop.rdf.io.output.AbstractNodeTupleOutputFormat;
 import org.apache.jena.hadoop.rdf.io.output.writers.rdfxml.RdfXmlWriter;
@@ -44,7 +45,7 @@ public class RdfXmlOutputFormat<TKey> extends AbstractNodeTupleOutputFormat<TKey
     }
 
     @Override
-    protected RecordWriter<TKey, TripleWritable> getRecordWriter(Writer writer, Configuration config) {
+    protected RecordWriter<TKey, TripleWritable> getRecordWriter(Writer writer, Configuration config, Path outputPath) {
         return new RdfXmlWriter<TKey>(writer);
     }
 

http://git-wip-us.apache.org/repos/asf/jena/blob/394638c5/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/QuadsToTriplesWriter.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/QuadsToTriplesWriter.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/QuadsToTriplesWriter.java
new file mode 100644
index 0000000..e932e1f
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/output/writers/QuadsToTriplesWriter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.hadoop.rdf.io.output.writers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+/**
+ * A record writer that converts quads into triples by stripping off the graph
+ * field
+ * 
+ * @param <TKey>
+ *            Key type
+ */
+public class QuadsToTriplesWriter<TKey> extends RecordWriter<TKey, QuadWritable> {
+
+    private RecordWriter<TKey, TripleWritable> writer;
+
+    /**
+     * Creates a new writer
+     * 
+     * @param tripleWriter
+     *            Triple writer to use
+     */
+    public QuadsToTriplesWriter(RecordWriter<TKey, TripleWritable> tripleWriter) {
+        if (tripleWriter == null)
+            throw new NullPointerException("tripleWriter cannot be null");
+        this.writer = tripleWriter;
+    }
+
+    @Override
+    public void write(TKey key, QuadWritable value) throws IOException, InterruptedException {
+        this.writer.write(key, new TripleWritable(value.get().asTriple()));
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+        this.writer.close(context);
+    }
+}