You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by rv...@apache.org on 2014/11/11 13:59:49 UTC
[1/2] jena git commit: Support RDF Thrift as an input format
Repository: jena
Updated Branches:
refs/heads/hadoop-rdf 3ccab77c7 -> f08fff2a9
http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileTripleInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileTripleInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileTripleInputFormatTests.java
index ee9a92c..56dd8ca 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileTripleInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileTripleInputFormatTests.java
@@ -16,128 +16,129 @@
* limitations under the License.
*/
-package org.apache.jena.hadoop.rdf.io.input.compressed;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CompressionCodec;
+package org.apache.jena.hadoop.rdf.io.input.compressed;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.jena.hadoop.rdf.io.HadoopIOConstants;
import org.apache.jena.hadoop.rdf.io.input.AbstractNodeTupleInputFormatTests;
import org.apache.jena.hadoop.rdf.types.TripleWritable;
-import org.apache.jena.riot.Lang;
-import org.apache.jena.riot.RDFDataMgr;
-
-import com.hp.hpl.jena.graph.Triple;
-import com.hp.hpl.jena.rdf.model.Model;
-import com.hp.hpl.jena.rdf.model.ModelFactory;
-import com.hp.hpl.jena.rdf.model.Property;
-import com.hp.hpl.jena.rdf.model.Resource;
-
-/**
- * Abstract tests for compressed whole file triple formats
- *
- *
- */
-public abstract class AbstractCompressedWholeFileTripleInputFormatTests extends
- AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
-
- @Override
- protected Configuration prepareConfiguration() {
- Configuration config = super.prepareConfiguration();
- config.set(HadoopIOConstants.IO_COMPRESSION_CODECS, this.getCompressionCodec().getClass().getCanonicalName());
- return config;
- }
-
- @Override
- protected Writer getWriter(File f) throws IOException {
- CompressionCodec codec = this.getCompressionCodec();
- if (codec instanceof Configurable) {
- ((Configurable) codec).setConf(this.prepareConfiguration());
- }
- FileOutputStream fileOutput = new FileOutputStream(f, false);
- OutputStream output = codec.createOutputStream(fileOutput);
- return new OutputStreamWriter(output);
- }
-
- /**
- * Gets the compression codec to use
- *
- * @return Compression codec
- */
- protected abstract CompressionCodec getCompressionCodec();
-
- /**
- * Indicates whether inputs can be split, defaults to false for compressed
- * input tests
- */
- @Override
- protected boolean canSplitInputs() {
- return false;
- }
-
- @SuppressWarnings("deprecation")
- private void writeTuples(Model m, Writer writer) {
- RDFDataMgr.write(writer, m, this.getRdfLanguage());
- }
-
- /**
- * Gets the RDF language to write out generated tuples in
- *
- * @return RDF language
- */
- protected abstract Lang getRdfLanguage();
-
- @Override
- protected final void generateTuples(Writer writer, int num) throws IOException {
- Model m = ModelFactory.createDefaultModel();
- Resource currSubj = m.createResource("http://example.org/subjects/0");
- Property predicate = m.createProperty("http://example.org/predicate");
- for (int i = 0; i < num; i++) {
- if (i % 10 == 0) {
- currSubj = m.createResource("http://example.org/subjects/" + (i / 10));
- }
- m.add(currSubj, predicate, m.createTypedLiteral(i));
- }
- this.writeTuples(m, writer);
- writer.close();
- }
-
- @Override
- protected final void generateMixedTuples(Writer writer, int num) throws IOException {
- // Write good data
- Model m = ModelFactory.createDefaultModel();
- Resource currSubj = m.createResource("http://example.org/subjects/0");
- Property predicate = m.createProperty("http://example.org/predicate");
- for (int i = 0; i < num / 2; i++) {
- if (i % 10 == 0) {
- currSubj = m.createResource("http://example.org/subjects/" + (i / 10));
- }
- m.add(currSubj, predicate, m.createTypedLiteral(i));
- }
- this.writeTuples(m, writer);
-
- // Write junk data
- for (int i = 0; i < num / 2; i++) {
- writer.write("junk data\n");
- }
-
- writer.flush();
- writer.close();
- }
-
- @Override
- protected final void generateBadTuples(Writer writer, int num) throws IOException {
- for (int i = 0; i < num; i++) {
- writer.write("junk data\n");
- }
- writer.flush();
- writer.close();
- }
-}
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFDataMgr;
+
+import com.hp.hpl.jena.graph.Triple;
+import com.hp.hpl.jena.rdf.model.Model;
+import com.hp.hpl.jena.rdf.model.ModelFactory;
+import com.hp.hpl.jena.rdf.model.Property;
+import com.hp.hpl.jena.rdf.model.Resource;
+
+/**
+ * Abstract tests for compressed whole file triple formats
+ *
+ *
+ */
+public abstract class AbstractCompressedWholeFileTripleInputFormatTests extends
+ AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
+
+ private static final Charset utf8 = Charset.forName("utf-8");
+
+ @Override
+ protected Configuration prepareConfiguration() {
+ Configuration config = super.prepareConfiguration();
+ config.set(HadoopIOConstants.IO_COMPRESSION_CODECS, this.getCompressionCodec().getClass().getCanonicalName());
+ return config;
+ }
+
+ @Override
+ protected OutputStream getOutputStream(File f) throws IOException {
+ CompressionCodec codec = this.getCompressionCodec();
+ if (codec instanceof Configurable) {
+ ((Configurable) codec).setConf(this.prepareConfiguration());
+ }
+ FileOutputStream fileOutput = new FileOutputStream(f, false);
+ return codec.createOutputStream(fileOutput);
+ }
+
+ /**
+ * Gets the compression codec to use
+ *
+ * @return Compression codec
+ */
+ protected abstract CompressionCodec getCompressionCodec();
+
+ /**
+ * Indicates whether inputs can be split, defaults to false for compressed
+ * input tests
+ */
+ @Override
+ protected boolean canSplitInputs() {
+ return false;
+ }
+
+ private void writeTuples(Model m, OutputStream output) {
+ RDFDataMgr.write(output, m, this.getRdfLanguage());
+ }
+
+ /**
+ * Gets the RDF language to write out generated tuples in
+ *
+ * @return RDF language
+ */
+ protected abstract Lang getRdfLanguage();
+
+ @Override
+ protected final void generateTuples(OutputStream output, int num) throws IOException {
+ Model m = ModelFactory.createDefaultModel();
+ Resource currSubj = m.createResource("http://example.org/subjects/0");
+ Property predicate = m.createProperty("http://example.org/predicate");
+ for (int i = 0; i < num; i++) {
+ if (i % 10 == 0) {
+ currSubj = m.createResource("http://example.org/subjects/" + (i / 10));
+ }
+ m.add(currSubj, predicate, m.createTypedLiteral(i));
+ }
+ this.writeTuples(m, output);
+ output.close();
+ }
+
+ @Override
+ protected final void generateMixedTuples(OutputStream output, int num) throws IOException {
+ // Write good data
+ Model m = ModelFactory.createDefaultModel();
+ Resource currSubj = m.createResource("http://example.org/subjects/0");
+ Property predicate = m.createProperty("http://example.org/predicate");
+ for (int i = 0; i < num / 2; i++) {
+ if (i % 10 == 0) {
+ currSubj = m.createResource("http://example.org/subjects/" + (i / 10));
+ }
+ m.add(currSubj, predicate, m.createTypedLiteral(i));
+ }
+ this.writeTuples(m, output);
+
+ // Write junk data
+ byte[] junk = "junk data\n".getBytes(utf8);
+ for (int i = 0; i < num / 2; i++) {
+ output.write(junk);
+ }
+
+ output.flush();
+ output.close();
+ }
+
+ @Override
+ protected final void generateBadTuples(OutputStream output, int num) throws IOException {
+ byte[] junk = "junk data\n".getBytes(utf8);
+ for (int i = 0; i < num; i++) {
+ output.write(junk);
+ }
+ output.flush();
+ output.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputTest.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputTest.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputTest.java
new file mode 100644
index 0000000..8d79295
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.thrift;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileQuadInputFormatTests;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+
+/**
+ * Tests for JSON-LD input
+ *
+ *
+ */
+public class ThriftQuadInputTest extends AbstractWholeFileQuadInputFormatTests {
+
+ @Override
+ protected Lang getRdfLanguage() {
+ return RDFLanguages.THRIFT;
+ }
+
+ @Override
+ protected String getFileExtension() {
+ return ".trdf";
+ }
+
+ @Override
+ protected InputFormat<LongWritable, QuadWritable> getInputFormat() {
+ return new ThriftQuadInputFormat();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputTest.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputTest.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputTest.java
new file mode 100644
index 0000000..6b5e0b7
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.thrift;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileTripleInputFormatTests;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+
+/**
+ * Tests for JSON-LD input
+ *
+ *
+ */
+public class ThriftTripleInputTest extends AbstractWholeFileTripleInputFormatTests {
+
+ @Override
+ protected Lang getRdfLanguage() {
+ return RDFLanguages.THRIFT;
+ }
+
+ @Override
+ protected String getFileExtension() {
+ return ".trdf";
+ }
+
+ @Override
+ protected InputFormat<LongWritable, TripleWritable> getInputFormat() {
+ return new ThriftTripleInputFormat();
+ }
+
+}
[2/2] jena git commit: Support RDF Thrift as an input format
Posted by rv...@apache.org.
Support RDF Thrift as an input format
Supported for both triples and quads.
Refactors input format tests to prepare test data using OutputStream
instead of Writer.
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/f08fff2a
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/f08fff2a
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/f08fff2a
Branch: refs/heads/hadoop-rdf
Commit: f08fff2a98db7f97d05b0ad1181afe1851848dac
Parents: 3ccab77
Author: Rob Vesse <rv...@apache.org>
Authored: Tue Nov 11 12:59:00 2014 +0000
Committer: Rob Vesse <rv...@apache.org>
Committed: Tue Nov 11 12:59:00 2014 +0000
----------------------------------------------------------------------
.../input/readers/thrift/ThriftQuadReader.java | 32 +
.../readers/thrift/ThriftTripleReader.java | 30 +
.../io/input/thrift/ThriftQuadInputFormat.java | 39 +
.../input/thrift/ThriftTripleInputFormat.java | 39 +
.../AbstractNodeTupleInputFormatTests.java | 1174 +++++++++---------
.../io/input/AbstractQuadsInputFormatTests.java | 33 +-
.../input/AbstractTriplesInputFormatTests.java | 34 +-
.../AbstractWholeFileQuadInputFormatTests.java | 44 +-
...AbstractWholeFileTripleInputFormatTests.java | 46 +-
...ractCompressedNodeTupleInputFormatTests.java | 12 +-
...AbstractCompressedQuadsInputFormatTests.java | 33 +-
...stractCompressedTriplesInputFormatTests.java | 33 +-
...CompressedWholeFileQuadInputFormatTests.java | 53 +-
...mpressedWholeFileTripleInputFormatTests.java | 245 ++--
.../io/input/thrift/ThriftQuadInputTest.java | 51 +
.../io/input/thrift/ThriftTripleInputTest.java | 51 +
16 files changed, 1108 insertions(+), 841 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java
new file mode 100644
index 0000000..084b1ec
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.thrift;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+public class ThriftQuadReader extends AbstractWholeFileQuadReader {
+
+ @Override
+ protected Lang getRdfLanguage() {
+ return RDFLanguages.THRIFT;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java
new file mode 100644
index 0000000..713bfa7
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.thrift;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+public class ThriftTripleReader extends AbstractWholeFileTripleReader {
+ @Override
+ protected Lang getRdfLanguage() {
+ return RDFLanguages.THRIFT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java
new file mode 100644
index 0000000..f75542a
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.thrift;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.thrift.ThriftQuadReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+public class ThriftQuadInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+ @Override
+ public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new ThriftQuadReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java
new file mode 100644
index 0000000..b60380d
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.thrift;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.thrift.ThriftTripleReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+public class ThriftTripleInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+ @Override
+ public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new ThriftTripleReader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
index ef1b8d3..e22650f 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
@@ -16,591 +16,597 @@
* limitations under the License.
*/
-package org.apache.jena.hadoop.rdf.io.input;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.Writer;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.jena.hadoop.rdf.io.HadoopIOConstants;
import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Abstract node tuple input format tests
- *
- *
- *
- * @param <TValue>
- * @param <T>
- */
-public abstract class AbstractNodeTupleInputFormatTests<TValue, T extends AbstractNodeTupleWritable<TValue>> {
-
- private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeTupleInputFormatTests.class);
-
- protected static final int EMPTY_SIZE = 0, SMALL_SIZE = 100, LARGE_SIZE = 10000, BAD_SIZE = 100, MIXED_SIZE = 100;
- protected static final String EMPTY = "empty";
- protected static final String SMALL = "small";
- protected static final String LARGE = "large";
- protected static final String BAD = "bad";
- protected static final String MIXED = "mixed";
-
- /**
- * Temporary folder for the tests
- */
- @Rule
- public TemporaryFolder folder = new TemporaryFolder();
-
- protected File empty, small, large, bad, mixed;
-
- /**
- * Prepares the inputs for the tests
- *
- * @throws IOException
- */
- @Before
- public void beforeTest() throws IOException {
- this.prepareInputs();
- }
-
- /**
- * Cleans up the inputs after each test
- */
- @After
- public void afterTest() {
- // Should be unnecessary since JUnit will clean up the temporary folder
- // anyway but best to do this regardless
- empty.delete();
- small.delete();
- large.delete();
- bad.delete();
- mixed.delete();
- }
-
- /**
- * Prepares a fresh configuration
- *
- * @return Configuration
- */
- protected Configuration prepareConfiguration() {
- Configuration config = new Configuration(true);
- // Nothing else to do
- return config;
- }
-
- /**
- * Prepares the inputs
- *
- * @throws IOException
- */
- protected void prepareInputs() throws IOException {
- String ext = this.getFileExtension();
- empty = folder.newFile(EMPTY + ext);
- this.generateTuples(empty, EMPTY_SIZE);
- small = folder.newFile(SMALL + ext);
- this.generateTuples(small, SMALL_SIZE);
- large = folder.newFile(LARGE + ext);
- this.generateTuples(large, LARGE_SIZE);
- bad = folder.newFile(BAD + ext);
- this.generateBadTuples(bad, BAD_SIZE);
- mixed = folder.newFile(MIXED + ext);
- this.generateMixedTuples(mixed, MIXED_SIZE);
- }
-
- /**
- * Gets the extra file extension to add to the filenames
- *
- * @return File extension
- */
- protected abstract String getFileExtension();
-
- /**
- * Generates tuples used for tests
- *
- * @param f
- * File
- * @param num
- * Number of tuples to generate
- * @throws IOException
- */
- protected final void generateTuples(File f, int num) throws IOException {
- this.generateTuples(this.getWriter(f), num);
- }
-
- /**
- * Gets the writer to use for generating tuples
- *
- * @param f
- * File
- * @return Writer
- * @throws IOException
- */
- protected Writer getWriter(File f) throws IOException {
- return new FileWriter(f, false);
- }
-
- /**
- * Generates tuples used for tests
- *
- * @param writer
- * Writer to write to
- * @param num
- * Number of tuples to generate
- * @throws IOException
- */
- protected abstract void generateTuples(Writer writer, int num) throws IOException;
-
- /**
- * Generates bad tuples used for tests
- *
- * @param f
- * File
- * @param num
- * Number of bad tuples to generate
- * @throws IOException
- */
- protected final void generateBadTuples(File f, int num) throws IOException {
- this.generateBadTuples(this.getWriter(f), num);
- }
-
- /**
- * Generates bad tuples used for tests
- *
- * @param writer
- * Writer to write to
- * @param num
- * Number of bad tuples to generate
- * @throws IOException
- */
- protected abstract void generateBadTuples(Writer writer, int num) throws IOException;
-
- /**
- * Generates a mixture of good and bad tuples used for tests
- *
- * @param f
- * File
- * @param num
- * Number of tuples to generate, they should be a 50/50 mix of
- * good and bad tuples
- * @throws IOException
- */
- protected final void generateMixedTuples(File f, int num) throws IOException {
- this.generateMixedTuples(this.getWriter(f), num);
- }
-
- /**
- * Generates a mixture of good and bad tuples used for tests
- *
- * @param writer
- * Writer to write to
- * @param num
- * Number of tuples to generate, they should be a 50/50 mix of
- * good and bad tuples
- * @throws IOException
- */
- protected abstract void generateMixedTuples(Writer write, int num) throws IOException;
-
- /**
- * Adds an input path to the job configuration
- *
- * @param f
- * File
- * @param config
- * Configuration
- * @param job
- * Job
- * @throws IOException
- */
- protected void addInputPath(File f, Configuration config, Job job) throws IOException {
- FileSystem fs = FileSystem.getLocal(config);
- Path inputPath = fs.makeQualified(new Path(f.getAbsolutePath()));
- FileInputFormat.addInputPath(job, inputPath);
- }
-
- protected final int countTuples(RecordReader<LongWritable, T> reader) throws IOException, InterruptedException {
- int count = 0;
-
- // Check initial progress
- LOG.info(String.format("Initial Reported Progress %f", reader.getProgress()));
- float progress = reader.getProgress();
- if (Float.compare(0.0f, progress) == 0) {
- Assert.assertEquals(0.0d, reader.getProgress(), 0.0d);
- } else if (Float.compare(1.0f, progress) == 0) {
- // If reader is reported 1.0 straight away then we expect there to
- // be no key values
- Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
- Assert.assertFalse(reader.nextKeyValue());
- } else {
- Assert.fail(String.format(
- "Expected progress of 0.0 or 1.0 before reader has been accessed for first time but got %f", progress));
- }
-
- // Count tuples
- boolean debug = LOG.isDebugEnabled();
- while (reader.nextKeyValue()) {
- count++;
- progress = reader.getProgress();
- if (debug)
- LOG.debug(String.format("Current Reported Progress %f", progress));
- Assert.assertTrue(String.format("Progress should be in the range 0.0 < p <= 1.0 but got %f", progress),
- progress > 0.0f && progress <= 1.0f);
- }
- reader.close();
- LOG.info(String.format("Got %d tuples from this record reader", count));
-
- // Check final progress
- LOG.info(String.format("Final Reported Progress %f", reader.getProgress()));
- Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
-
- return count;
- }
-
- protected final void checkTuples(RecordReader<LongWritable, T> reader, int expected) throws IOException, InterruptedException {
- Assert.assertEquals(expected, this.countTuples(reader));
- }
-
- /**
- * Runs a test with a single input
- *
- * @param input
- * Input
- * @param expectedTuples
- * Expected tuples
- * @throws IOException
- * @throws InterruptedException
- */
- protected final void testSingleInput(File input, int expectedSplits, int expectedTuples) throws IOException,
- InterruptedException {
- // Prepare configuration
- Configuration config = this.prepareConfiguration();
- this.testSingleInput(config, input, expectedSplits, expectedTuples);
- }
-
- /**
- * Runs a test with a single input
- *
- * @param config
- * Configuration
- * @param input
- * Input
- * @param expectedTuples
- * Expected tuples
- * @throws IOException
- * @throws InterruptedException
- */
- protected final void testSingleInput(Configuration config, File input, int expectedSplits, int expectedTuples)
- throws IOException, InterruptedException {
- // Set up fake job
- InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
- Job job = Job.getInstance(config);
- job.setInputFormatClass(inputFormat.getClass());
- this.addInputPath(input, job.getConfiguration(), job);
- JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
- Assert.assertEquals(1, FileInputFormat.getInputPaths(context).length);
- NLineInputFormat.setNumLinesPerSplit(job, LARGE_SIZE);
-
- // Check splits
- List<InputSplit> splits = inputFormat.getSplits(context);
- Assert.assertEquals(expectedSplits, splits.size());
-
- // Check tuples
- for (InputSplit split : splits) {
- TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
- RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
- reader.initialize(split, taskContext);
- this.checkTuples(reader, expectedTuples);
- }
- }
-
- protected abstract InputFormat<LongWritable, T> getInputFormat();
-
- /**
- * Basic tuples input test
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public final void single_input_01() throws IOException, InterruptedException, ClassNotFoundException {
- testSingleInput(empty, this.canSplitInputs() ? 0 : 1, EMPTY_SIZE);
- }
-
- /**
- * Basic tuples input test
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public final void single_input_02() throws IOException, InterruptedException, ClassNotFoundException {
- testSingleInput(small, 1, SMALL_SIZE);
- }
-
- /**
- * Basic tuples input test
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public final void single_input_03() throws IOException, InterruptedException, ClassNotFoundException {
- testSingleInput(large, 1, LARGE_SIZE);
- }
-
- /**
- * Basic tuples input test
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public final void single_input_04() throws IOException, InterruptedException, ClassNotFoundException {
- testSingleInput(bad, 1, 0);
- }
-
- /**
- * Basic tuples input test
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public final void single_input_05() throws IOException, InterruptedException, ClassNotFoundException {
- testSingleInput(mixed, 1, MIXED_SIZE / 2);
- }
-
- /**
- * Tests behaviour when ignoring bad tuples is disabled
- *
- * @throws InterruptedException
- * @throws IOException
- */
- @Test(expected = IOException.class)
- public final void fail_on_bad_input_01() throws IOException, InterruptedException {
- Configuration config = this.prepareConfiguration();
- config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
- Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true));
- testSingleInput(config, bad, 1, 0);
- }
-
- /**
- * Tests behaviour when ignoring bad tuples is disabled
- *
- * @throws InterruptedException
- * @throws IOException
- */
- @Test(expected = IOException.class)
- public final void fail_on_bad_input_02() throws IOException, InterruptedException {
- Configuration config = this.prepareConfiguration();
- config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
- Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true));
- testSingleInput(config, mixed, 1, MIXED_SIZE / 2);
- }
-
- /**
- * Runs a multiple input test
- *
- * @param inputs
- * Inputs
- * @param expectedSplits
- * Number of splits expected
- * @param expectedTuples
- * Number of tuples expected
- * @throws IOException
- * @throws InterruptedException
- */
- protected final void testMultipleInputs(File[] inputs, int expectedSplits, int expectedTuples) throws IOException,
- InterruptedException {
- // Prepare configuration and inputs
- Configuration config = this.prepareConfiguration();
-
- // Set up fake job
- InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
- Job job = Job.getInstance(config);
- job.setInputFormatClass(inputFormat.getClass());
- for (File input : inputs) {
- this.addInputPath(input, job.getConfiguration(), job);
- }
- JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
- Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length);
- NLineInputFormat.setNumLinesPerSplit(job, expectedTuples);
-
- // Check splits
- List<InputSplit> splits = inputFormat.getSplits(context);
- Assert.assertEquals(expectedSplits, splits.size());
-
- // Check tuples
- int count = 0;
- for (InputSplit split : splits) {
- TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
- RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
- reader.initialize(split, taskContext);
- count += this.countTuples(reader);
- }
- Assert.assertEquals(expectedTuples, count);
- }
-
- /**
- * tuples test with multiple inputs
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public final void multiple_inputs_01() throws IOException, InterruptedException, ClassNotFoundException {
- testMultipleInputs(new File[] { empty, small, large }, this.canSplitInputs() ? 2 : 3, EMPTY_SIZE + SMALL_SIZE
- + LARGE_SIZE);
- }
-
- /**
- * tuples test with multiple inputs
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public final void multiple_inputs_02() throws IOException, InterruptedException, ClassNotFoundException {
- testMultipleInputs(new File[] { folder.getRoot() }, this.canSplitInputs() ? 4 : 5, EMPTY_SIZE + SMALL_SIZE + LARGE_SIZE
- + (MIXED_SIZE / 2));
- }
-
- protected final void testSplitInputs(Configuration config, File[] inputs, int expectedSplits, int expectedTuples)
- throws IOException, InterruptedException {
- // Set up fake job
- InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
- Job job = Job.getInstance(config);
- job.setInputFormatClass(inputFormat.getClass());
- for (File input : inputs) {
- this.addInputPath(input, job.getConfiguration(), job);
- }
- JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
- Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length);
-
- // Check splits
- List<InputSplit> splits = inputFormat.getSplits(context);
- Assert.assertEquals(expectedSplits, splits.size());
-
- // Check tuples
- int count = 0;
- for (InputSplit split : splits) {
- // Validate split
- Assert.assertTrue(this.isValidSplit(split, config));
-
- // Read split
- TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
- RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
- reader.initialize(split, taskContext);
- count += this.countTuples(reader);
- }
- Assert.assertEquals(expectedTuples, count);
- }
-
- /**
- * Determines whether an input split is valid
- *
- * @param split
- * Input split
- * @return True if a valid split, false otherwise
- * @throws IOException
- */
- protected boolean isValidSplit(InputSplit split, Configuration config) throws IOException {
- return split instanceof FileSplit;
- }
-
- /**
- * Indicates whether inputs can be split, defaults to true
- *
- * @return Whether inputs can be split
- */
- protected boolean canSplitInputs() {
- return true;
- }
-
- /**
- * Tests for input splitting
- *
- * @throws IOException
- * @throws InterruptedException
- * @throws ClassNotFoundException
- */
- @Test
- public final void split_input_01() throws IOException, InterruptedException, ClassNotFoundException {
- Assume.assumeTrue(this.canSplitInputs());
-
- Configuration config = this.prepareConfiguration();
- config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
- Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
- this.testSplitInputs(config, new File[] { small }, 100, SMALL_SIZE);
- }
-
- /**
- * Tests for input splitting
- *
- * @throws IOException
- * @throws InterruptedException
- * @throws ClassNotFoundException
- */
- @Test
- public final void split_input_02() throws IOException, InterruptedException, ClassNotFoundException {
- Assume.assumeTrue(this.canSplitInputs());
-
- Configuration config = this.prepareConfiguration();
- config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
- config.setLong(NLineInputFormat.LINES_PER_MAP, 10);
- Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
- this.testSplitInputs(config, new File[] { small }, 10, SMALL_SIZE);
- }
-
- /**
- * Tests for input splitting
- *
- * @throws IOException
- * @throws InterruptedException
- * @throws ClassNotFoundException
- */
- @Test
- public final void split_input_03() throws IOException, InterruptedException, ClassNotFoundException {
- Assume.assumeTrue(this.canSplitInputs());
-
- Configuration config = this.prepareConfiguration();
- config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
- config.setLong(NLineInputFormat.LINES_PER_MAP, 100);
- Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
- this.testSplitInputs(config, new File[] { large }, 100, LARGE_SIZE);
- }
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract node tuple input format tests
+ *
+ *
+ *
+ * @param <TValue>
+ * @param <T>
+ */
+public abstract class AbstractNodeTupleInputFormatTests<TValue, T extends AbstractNodeTupleWritable<TValue>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeTupleInputFormatTests.class);
+
+ protected static final int EMPTY_SIZE = 0, SMALL_SIZE = 100, LARGE_SIZE = 10000, BAD_SIZE = 100, MIXED_SIZE = 100;
+ protected static final String EMPTY = "empty";
+ protected static final String SMALL = "small";
+ protected static final String LARGE = "large";
+ protected static final String BAD = "bad";
+ protected static final String MIXED = "mixed";
+
+ /**
+ * Temporary folder for the tests
+ */
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ protected File empty, small, large, bad, mixed;
+
+ /**
+ * Prepares the inputs for the tests
+ *
+ * @throws IOException
+ */
+ @Before
+ public void beforeTest() throws IOException {
+ this.prepareInputs();
+ }
+
+ /**
+ * Cleans up the inputs after each test
+ */
+ @After
+ public void afterTest() {
+ // Should be unnecessary since JUnit will clean up the temporary folder
+ // anyway but best to do this regardless
+ if (empty != null)
+ empty.delete();
+ if (small != null)
+ small.delete();
+ if (large != null)
+ large.delete();
+ if (bad != null)
+ bad.delete();
+ if (mixed != null)
+ mixed.delete();
+ }
+
+ /**
+ * Prepares a fresh configuration
+ *
+ * @return Configuration
+ */
+ protected Configuration prepareConfiguration() {
+ Configuration config = new Configuration(true);
+ // Nothing else to do
+ return config;
+ }
+
+ /**
+ * Prepares the inputs
+ *
+ * @throws IOException
+ */
+ protected void prepareInputs() throws IOException {
+ String ext = this.getFileExtension();
+ empty = folder.newFile(EMPTY + ext);
+ this.generateTuples(empty, EMPTY_SIZE);
+ small = folder.newFile(SMALL + ext);
+ this.generateTuples(small, SMALL_SIZE);
+ large = folder.newFile(LARGE + ext);
+ this.generateTuples(large, LARGE_SIZE);
+ bad = folder.newFile(BAD + ext);
+ this.generateBadTuples(bad, BAD_SIZE);
+ mixed = folder.newFile(MIXED + ext);
+ this.generateMixedTuples(mixed, MIXED_SIZE);
+ }
+
+ /**
+ * Gets the extra file extension to add to the filenames
+ *
+ * @return File extension
+ */
+ protected abstract String getFileExtension();
+
+ /**
+ * Generates tuples used for tests
+ *
+ * @param f
+ * File
+ * @param num
+ * Number of tuples to generate
+ * @throws IOException
+ */
+ protected final void generateTuples(File f, int num) throws IOException {
+ this.generateTuples(this.getOutputStream(f), num);
+ }
+
+ /**
+ * Gets the output stream to use for generating tuples
+ *
+ * @param f
+ * File
+ * @return Output Stream
+ * @throws IOException
+ */
+ protected OutputStream getOutputStream(File f) throws IOException {
+ return new FileOutputStream(f, false);
+ }
+
+ /**
+ * Generates tuples used for tests
+ *
+ * @param output
+ * Output Stream to write to
+ * @param num
+ * Number of tuples to generate
+ * @throws IOException
+ */
+ protected abstract void generateTuples(OutputStream output, int num) throws IOException;
+
+ /**
+ * Generates bad tuples used for tests
+ *
+ * @param f
+ * File
+ * @param num
+ * Number of bad tuples to generate
+ * @throws IOException
+ */
+ protected final void generateBadTuples(File f, int num) throws IOException {
+ this.generateBadTuples(this.getOutputStream(f), num);
+ }
+
+ /**
+ * Generates bad tuples used for tests
+ *
+ * @param output
+ * Output Stream to write to
+ * @param num
+ * Number of bad tuples to generate
+ * @throws IOException
+ */
+ protected abstract void generateBadTuples(OutputStream output, int num) throws IOException;
+
+ /**
+ * Generates a mixture of good and bad tuples used for tests
+ *
+ * @param f
+ * File
+ * @param num
+ * Number of tuples to generate, they should be a 50/50 mix of
+ * good and bad tuples
+ * @throws IOException
+ */
+ protected final void generateMixedTuples(File f, int num) throws IOException {
+ this.generateMixedTuples(this.getOutputStream(f), num);
+ }
+
+ /**
+ * Generates a mixture of good and bad tuples used for tests
+ *
+ * @param output
+ * Output Stream to write to
+ * @param num
+ * Number of tuples to generate, they should be a 50/50 mix of
+ * good and bad tuples
+ * @throws IOException
+ */
+ protected abstract void generateMixedTuples(OutputStream output, int num) throws IOException;
+
+ /**
+ * Adds an input path to the job configuration
+ *
+ * @param f
+ * File
+ * @param config
+ * Configuration
+ * @param job
+ * Job
+ * @throws IOException
+ */
+ protected void addInputPath(File f, Configuration config, Job job) throws IOException {
+ FileSystem fs = FileSystem.getLocal(config);
+ Path inputPath = fs.makeQualified(new Path(f.getAbsolutePath()));
+ FileInputFormat.addInputPath(job, inputPath);
+ }
+
+ protected final int countTuples(RecordReader<LongWritable, T> reader) throws IOException, InterruptedException {
+ int count = 0;
+
+ // Check initial progress
+ LOG.info(String.format("Initial Reported Progress %f", reader.getProgress()));
+ float progress = reader.getProgress();
+ if (Float.compare(0.0f, progress) == 0) {
+ Assert.assertEquals(0.0d, reader.getProgress(), 0.0d);
+ } else if (Float.compare(1.0f, progress) == 0) {
+ // If reader is reported 1.0 straight away then we expect there to
+ // be no key values
+ Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
+ Assert.assertFalse(reader.nextKeyValue());
+ } else {
+ Assert.fail(String.format(
+ "Expected progress of 0.0 or 1.0 before reader has been accessed for first time but got %f",
+ progress));
+ }
+
+ // Count tuples
+ boolean debug = LOG.isDebugEnabled();
+ while (reader.nextKeyValue()) {
+ count++;
+ progress = reader.getProgress();
+ if (debug)
+ LOG.debug(String.format("Current Reported Progress %f", progress));
+ Assert.assertTrue(String.format("Progress should be in the range 0.0 < p <= 1.0 but got %f", progress),
+ progress > 0.0f && progress <= 1.0f);
+ }
+ reader.close();
+ LOG.info(String.format("Got %d tuples from this record reader", count));
+
+ // Check final progress
+ LOG.info(String.format("Final Reported Progress %f", reader.getProgress()));
+ Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
+
+ return count;
+ }
+
+ protected final void checkTuples(RecordReader<LongWritable, T> reader, int expected) throws IOException,
+ InterruptedException {
+ Assert.assertEquals(expected, this.countTuples(reader));
+ }
+
+ /**
+ * Runs a test with a single input
+ *
+ * @param input
+ * Input
+ * @param expectedTuples
+ * Expected tuples
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected final void testSingleInput(File input, int expectedSplits, int expectedTuples) throws IOException,
+ InterruptedException {
+ // Prepare configuration
+ Configuration config = this.prepareConfiguration();
+ this.testSingleInput(config, input, expectedSplits, expectedTuples);
+ }
+
+ /**
+ * Runs a test with a single input
+ *
+ * @param config
+ * Configuration
+ * @param input
+ * Input
+ * @param expectedTuples
+ * Expected tuples
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected final void testSingleInput(Configuration config, File input, int expectedSplits, int expectedTuples)
+ throws IOException, InterruptedException {
+ // Set up fake job
+ InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
+ Job job = Job.getInstance(config);
+ job.setInputFormatClass(inputFormat.getClass());
+ this.addInputPath(input, job.getConfiguration(), job);
+ JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+ Assert.assertEquals(1, FileInputFormat.getInputPaths(context).length);
+ NLineInputFormat.setNumLinesPerSplit(job, LARGE_SIZE);
+
+ // Check splits
+ List<InputSplit> splits = inputFormat.getSplits(context);
+ Assert.assertEquals(expectedSplits, splits.size());
+
+ // Check tuples
+ for (InputSplit split : splits) {
+ TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+ RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
+ reader.initialize(split, taskContext);
+ this.checkTuples(reader, expectedTuples);
+ }
+ }
+
+ protected abstract InputFormat<LongWritable, T> getInputFormat();
+
+ /**
+ * Basic tuples input test
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public final void single_input_01() throws IOException, InterruptedException, ClassNotFoundException {
+ testSingleInput(empty, this.canSplitInputs() ? 0 : 1, EMPTY_SIZE);
+ }
+
+ /**
+ * Basic tuples input test
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public final void single_input_02() throws IOException, InterruptedException, ClassNotFoundException {
+ testSingleInput(small, 1, SMALL_SIZE);
+ }
+
+ /**
+ * Basic tuples input test
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public final void single_input_03() throws IOException, InterruptedException, ClassNotFoundException {
+ testSingleInput(large, 1, LARGE_SIZE);
+ }
+
+ /**
+ * Basic tuples input test
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public final void single_input_04() throws IOException, InterruptedException, ClassNotFoundException {
+ testSingleInput(bad, 1, 0);
+ }
+
+ /**
+ * Basic tuples input test
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public final void single_input_05() throws IOException, InterruptedException, ClassNotFoundException {
+ testSingleInput(mixed, 1, MIXED_SIZE / 2);
+ }
+
+ /**
+ * Tests behaviour when ignoring bad tuples is disabled
+ *
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ @Test(expected = IOException.class)
+ public final void fail_on_bad_input_01() throws IOException, InterruptedException {
+ Configuration config = this.prepareConfiguration();
+ config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+ Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true));
+ testSingleInput(config, bad, 1, 0);
+ }
+
+ /**
+ * Tests behaviour when ignoring bad tuples is disabled
+ *
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ @Test(expected = IOException.class)
+ public final void fail_on_bad_input_02() throws IOException, InterruptedException {
+ Configuration config = this.prepareConfiguration();
+ config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+ Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true));
+ testSingleInput(config, mixed, 1, MIXED_SIZE / 2);
+ }
+
+ /**
+ * Runs a multiple input test
+ *
+ * @param inputs
+ * Inputs
+ * @param expectedSplits
+ * Number of splits expected
+ * @param expectedTuples
+ * Number of tuples expected
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected final void testMultipleInputs(File[] inputs, int expectedSplits, int expectedTuples) throws IOException,
+ InterruptedException {
+ // Prepare configuration and inputs
+ Configuration config = this.prepareConfiguration();
+
+ // Set up fake job
+ InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
+ Job job = Job.getInstance(config);
+ job.setInputFormatClass(inputFormat.getClass());
+ for (File input : inputs) {
+ this.addInputPath(input, job.getConfiguration(), job);
+ }
+ JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+ Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length);
+ NLineInputFormat.setNumLinesPerSplit(job, expectedTuples);
+
+ // Check splits
+ List<InputSplit> splits = inputFormat.getSplits(context);
+ Assert.assertEquals(expectedSplits, splits.size());
+
+ // Check tuples
+ int count = 0;
+ for (InputSplit split : splits) {
+ TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+ RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
+ reader.initialize(split, taskContext);
+ count += this.countTuples(reader);
+ }
+ Assert.assertEquals(expectedTuples, count);
+ }
+
+ /**
+ * tuples test with multiple inputs
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public final void multiple_inputs_01() throws IOException, InterruptedException, ClassNotFoundException {
+ testMultipleInputs(new File[] { empty, small, large }, this.canSplitInputs() ? 2 : 3, EMPTY_SIZE + SMALL_SIZE
+ + LARGE_SIZE);
+ }
+
+ /**
+ * tuples test with multiple inputs
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public final void multiple_inputs_02() throws IOException, InterruptedException, ClassNotFoundException {
+ testMultipleInputs(new File[] { folder.getRoot() }, this.canSplitInputs() ? 4 : 5, EMPTY_SIZE + SMALL_SIZE
+ + LARGE_SIZE + (MIXED_SIZE / 2));
+ }
+
+ protected final void testSplitInputs(Configuration config, File[] inputs, int expectedSplits, int expectedTuples)
+ throws IOException, InterruptedException {
+ // Set up fake job
+ InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
+ Job job = Job.getInstance(config);
+ job.setInputFormatClass(inputFormat.getClass());
+ for (File input : inputs) {
+ this.addInputPath(input, job.getConfiguration(), job);
+ }
+ JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+ Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length);
+
+ // Check splits
+ List<InputSplit> splits = inputFormat.getSplits(context);
+ Assert.assertEquals(expectedSplits, splits.size());
+
+ // Check tuples
+ int count = 0;
+ for (InputSplit split : splits) {
+ // Validate split
+ Assert.assertTrue(this.isValidSplit(split, config));
+
+ // Read split
+ TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+ RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
+ reader.initialize(split, taskContext);
+ count += this.countTuples(reader);
+ }
+ Assert.assertEquals(expectedTuples, count);
+ }
+
+ /**
+ * Determines whether an input split is valid
+ *
+ * @param split
+ * Input split
+ * @return True if a valid split, false otherwise
+ * @throws IOException
+ */
+ protected boolean isValidSplit(InputSplit split, Configuration config) throws IOException {
+ return split instanceof FileSplit;
+ }
+
+ /**
+ * Indicates whether inputs can be split, defaults to true
+ *
+ * @return Whether inputs can be split
+ */
+ protected boolean canSplitInputs() {
+ return true;
+ }
+
+ /**
+ * Tests for input splitting
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public final void split_input_01() throws IOException, InterruptedException, ClassNotFoundException {
+ Assume.assumeTrue(this.canSplitInputs());
+
+ Configuration config = this.prepareConfiguration();
+ config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+ Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
+ this.testSplitInputs(config, new File[] { small }, 100, SMALL_SIZE);
+ }
+
+ /**
+ * Tests for input splitting
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public final void split_input_02() throws IOException, InterruptedException, ClassNotFoundException {
+ Assume.assumeTrue(this.canSplitInputs());
+
+ Configuration config = this.prepareConfiguration();
+ config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+ config.setLong(NLineInputFormat.LINES_PER_MAP, 10);
+ Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
+ this.testSplitInputs(config, new File[] { small }, 10, SMALL_SIZE);
+ }
+
+ /**
+ * Tests for input splitting
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public final void split_input_03() throws IOException, InterruptedException, ClassNotFoundException {
+ Assume.assumeTrue(this.canSplitInputs());
+
+ Configuration config = this.prepareConfiguration();
+ config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+ config.setLong(NLineInputFormat.LINES_PER_MAP, 100);
+ Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
+ this.testSplitInputs(config, new File[] { large }, 100, LARGE_SIZE);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
index 4dd396b..78d7f33 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
@@ -19,7 +19,8 @@
package org.apache.jena.hadoop.rdf.io.input;
import java.io.IOException;
-import java.io.Writer;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
import org.apache.jena.hadoop.rdf.types.QuadWritable;
@@ -30,38 +31,40 @@ import com.hp.hpl.jena.sparql.core.Quad;
*
*
*/
-public abstract class AbstractQuadsInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+public abstract class AbstractQuadsInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+
+ private static final Charset utf8 = Charset.forName("utf-8");
@Override
- protected void generateTuples(Writer writer, int num) throws IOException {
+ protected void generateTuples(OutputStream output, int num) throws IOException {
for (int i = 0; i < num; i++) {
- writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n");
+ output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n").getBytes(utf8));
}
- writer.flush();
- writer.close();
+ output.flush();
+ output.close();
}
@Override
- protected void generateBadTuples(Writer writer, int num) throws IOException {
+ protected void generateBadTuples(OutputStream output, int num) throws IOException {
for (int i = 0; i < num; i++) {
- writer.write("<http://broken\n");
+ output.write("<http://broken\n".getBytes(utf8));
}
- writer.flush();
- writer.close();
+ output.flush();
+ output.close();
}
@Override
- protected void generateMixedTuples(Writer writer, int num) throws IOException {
+ protected void generateMixedTuples(OutputStream output, int num) throws IOException {
boolean bad = false;
for (int i = 0; i < num; i++, bad = !bad) {
if (bad) {
- writer.write("<http://broken\n");
+ output.write("<http://broken\n".getBytes(utf8));
} else {
- writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n");
+ output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n").getBytes(utf8));
}
}
- writer.flush();
- writer.close();
+ output.flush();
+ output.close();
}
}
http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
index 04572d3..65a9889 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
@@ -19,7 +19,8 @@
package org.apache.jena.hadoop.rdf.io.input;
import java.io.IOException;
-import java.io.Writer;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
import org.apache.jena.hadoop.rdf.types.TripleWritable;
@@ -31,38 +32,41 @@ import com.hp.hpl.jena.graph.Triple;
*
*
*/
-public abstract class AbstractTriplesInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
+public abstract class AbstractTriplesInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
+
+ private static final Charset utf8 = Charset.forName("utf-8");
@Override
- protected void generateTuples(Writer writer, int num) throws IOException {
+ protected void generateTuples(OutputStream output, int num) throws IOException {
for (int i = 0; i < num; i++) {
- writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n");
+ output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n").getBytes(utf8));
}
- writer.flush();
- writer.close();
+ output.flush();
+ output.close();
}
@Override
- protected void generateBadTuples(Writer writer, int num) throws IOException {
+ protected void generateBadTuples(OutputStream output, int num) throws IOException {
+ byte[] junk = "<http://broken\n".getBytes(utf8);
for (int i = 0; i < num; i++) {
- writer.write("<http://broken\n");
+ output.write(junk);
}
- writer.flush();
- writer.close();
+ output.flush();
+ output.close();
}
@Override
- protected void generateMixedTuples(Writer writer, int num) throws IOException {
+ protected void generateMixedTuples(OutputStream output, int num) throws IOException {
boolean bad = false;
for (int i = 0; i < num; i++, bad = !bad) {
if (bad) {
- writer.write("<http://broken\n");
+ output.write("<http://broken\n".getBytes(utf8));
} else {
- writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n");
+ output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n").getBytes(utf8));
}
}
- writer.flush();
- writer.close();
+ output.flush();
+ output.close();
}
}
http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
index 3f2c8d2..0b6cfde 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
@@ -19,7 +19,8 @@
package org.apache.jena.hadoop.rdf.io.input;
import java.io.IOException;
-import java.io.Writer;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
import org.apache.jena.hadoop.rdf.types.QuadWritable;
import org.apache.jena.riot.Lang;
@@ -40,16 +41,17 @@ import com.hp.hpl.jena.sparql.core.Quad;
*
*
*/
-public abstract class AbstractWholeFileQuadInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+public abstract class AbstractWholeFileQuadInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+
+ private static final Charset utf8 = Charset.forName("utf-8");
@Override
protected boolean canSplitInputs() {
return false;
}
- @SuppressWarnings("deprecation")
- private void writeTuples(Dataset ds, Writer writer) {
- RDFDataMgr.write(writer, ds, RDFWriterRegistry.defaultSerialization(this.getRdfLanguage()));
+ private void writeTuples(Dataset ds, OutputStream output) {
+ RDFDataMgr.write(output, ds, RDFWriterRegistry.defaultSerialization(this.getRdfLanguage()));
}
/**
@@ -59,7 +61,7 @@ public abstract class AbstractWholeFileQuadInputFormatTests extends AbstractNode
*/
protected abstract Lang getRdfLanguage();
- private void writeGoodTuples(Writer writer, int num) throws IOException {
+ private void writeGoodTuples(OutputStream output, int num) throws IOException {
Dataset ds = DatasetFactory.createMem();
Model m = ModelFactory.createDefaultModel();
Resource currSubj = m.createResource("http://example.org/subjects/0");
@@ -77,35 +79,37 @@ public abstract class AbstractWholeFileQuadInputFormatTests extends AbstractNode
if (!m.isEmpty()) {
ds.addNamedModel("http://example.org/graphs/extra", m);
}
- this.writeTuples(ds, writer);
+ this.writeTuples(ds, output);
}
@Override
- protected final void generateTuples(Writer writer, int num) throws IOException {
- this.writeGoodTuples(writer, num);
- writer.close();
+ protected final void generateTuples(OutputStream output, int num) throws IOException {
+ this.writeGoodTuples(output, num);
+ output.close();
}
@Override
- protected final void generateMixedTuples(Writer writer, int num) throws IOException {
+ protected final void generateMixedTuples(OutputStream output, int num) throws IOException {
// Write good data
- this.writeGoodTuples(writer, num / 2);
+ this.writeGoodTuples(output, num / 2);
- // Write junk data
+ // Write junk data
+ byte[] junk = "junk data\n".getBytes(utf8);
for (int i = 0; i < num / 2; i++) {
- writer.write("junk data\n");
+ output.write(junk);
}
- writer.flush();
- writer.close();
+ output.flush();
+ output.close();
}
@Override
- protected final void generateBadTuples(Writer writer, int num) throws IOException {
+ protected final void generateBadTuples(OutputStream output, int num) throws IOException {
+ byte[] junk = "junk data\n".getBytes(utf8);
for (int i = 0; i < num; i++) {
- writer.write("junk data\n");
+ output.write(junk);
}
- writer.flush();
- writer.close();
+ output.flush();
+ output.close();
}
}
http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
index bacd7ba..b68d662 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
@@ -19,7 +19,8 @@
package org.apache.jena.hadoop.rdf.io.input;
import java.io.IOException;
-import java.io.Writer;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
import org.apache.jena.hadoop.rdf.types.TripleWritable;
import org.apache.jena.riot.Lang;
@@ -37,18 +38,19 @@ import com.hp.hpl.jena.rdf.model.Resource;
*
*
*/
-public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
+public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
+
+ private static final Charset utf8 = Charset.forName("utf-8");
@Override
protected boolean canSplitInputs() {
return false;
}
- @SuppressWarnings("deprecation")
- private void writeTuples(Model m, Writer writer) {
- RDFDataMgr.write(writer, m, this.getRdfLanguage());
- }
-
+ private void writeTuples(Model m, OutputStream output) {
+ RDFDataMgr.write(output, m, this.getRdfLanguage());
+ }
+
/**
* Gets the RDF language to write out generate tuples in
* @return RDF language
@@ -56,7 +58,7 @@ public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNo
protected abstract Lang getRdfLanguage();
@Override
- protected final void generateTuples(Writer writer, int num) throws IOException {
+ protected final void generateTuples(OutputStream output, int num) throws IOException {
Model m = ModelFactory.createDefaultModel();
Resource currSubj = m.createResource("http://example.org/subjects/0");
Property predicate = m.createProperty("http://example.org/predicate");
@@ -66,12 +68,12 @@ public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNo
}
m.add(currSubj, predicate, m.createTypedLiteral(i));
}
- this.writeTuples(m, writer);
- writer.close();
- }
+ this.writeTuples(m, output);
+ output.close();
+ }
@Override
- protected final void generateMixedTuples(Writer writer, int num) throws IOException {
+ protected final void generateMixedTuples(OutputStream output, int num) throws IOException {
// Write good data
Model m = ModelFactory.createDefaultModel();
Resource currSubj = m.createResource("http://example.org/subjects/0");
@@ -82,23 +84,25 @@ public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNo
}
m.add(currSubj, predicate, m.createTypedLiteral(i));
}
- this.writeTuples(m, writer);
+ this.writeTuples(m, output);
- // Write junk data
+ // Write junk data
+ byte[] junk = "junk data\n".getBytes(utf8);
for (int i = 0; i < num / 2; i++) {
- writer.write("junk data\n");
+ output.write(junk);
}
- writer.flush();
- writer.close();
+ output.flush();
+ output.close();
}
@Override
- protected final void generateBadTuples(Writer writer, int num) throws IOException {
+ protected final void generateBadTuples(OutputStream output, int num) throws IOException {
+ byte[] junk = "junk data\n".getBytes(utf8);
for (int i = 0; i < num; i++) {
- writer.write("junk data\n");
+ output.write(junk);
}
- writer.flush();
- writer.close();
+ output.flush();
+ output.close();
}
}
http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java
index 5725cf7..1f18a95 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java
@@ -22,9 +22,6 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -40,8 +37,8 @@ import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
* @param <T>
*/
public abstract class AbstractCompressedNodeTupleInputFormatTests<TValue, T extends AbstractNodeTupleWritable<TValue>> extends
- AbstractNodeTupleInputFormatTests<TValue, T> {
-
+ AbstractNodeTupleInputFormatTests<TValue, T> {
+
@Override
protected Configuration prepareConfiguration() {
Configuration config = super.prepareConfiguration();
@@ -50,14 +47,13 @@ public abstract class AbstractCompressedNodeTupleInputFormatTests<TValue, T exte
}
@Override
- protected Writer getWriter(File f) throws IOException {
+ protected OutputStream getOutputStream(File f) throws IOException {
CompressionCodec codec = this.getCompressionCodec();
if (codec instanceof Configurable) {
((Configurable) codec).setConf(this.prepareConfiguration());
}
FileOutputStream fileOutput = new FileOutputStream(f, false);
- OutputStream output = codec.createOutputStream(fileOutput);
- return new OutputStreamWriter(output);
+ return codec.createOutputStream(fileOutput);
}
/**
http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java
index 386e772..312aae7 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java
@@ -19,7 +19,8 @@
package org.apache.jena.hadoop.rdf.io.input.compressed;
import java.io.IOException;
-import java.io.Writer;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
import org.apache.jena.hadoop.rdf.types.QuadWritable;
@@ -32,37 +33,39 @@ import com.hp.hpl.jena.sparql.core.Quad;
*
*/
public abstract class AbstractCompressedQuadsInputFormatTests extends
- AbstractCompressedNodeTupleInputFormatTests<Quad, QuadWritable> {
+ AbstractCompressedNodeTupleInputFormatTests<Quad, QuadWritable> {
+
+ private static final Charset utf8 = Charset.forName("utf-8");
@Override
- protected void generateTuples(Writer writer, int num) throws IOException {
+ protected void generateTuples(OutputStream output, int num) throws IOException {
for (int i = 0; i < num; i++) {
- writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n");
+ output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n").getBytes(utf8));
}
- writer.flush();
- writer.close();
+ output.flush();
+ output.close();
}
@Override
- protected void generateBadTuples(Writer writer, int num) throws IOException {
+ protected void generateBadTuples(OutputStream output, int num) throws IOException {
for (int i = 0; i < num; i++) {
- writer.write("<http://broken\n");
+ output.write("<http://broken\n".getBytes(utf8));
}
- writer.flush();
- writer.close();
+ output.flush();
+ output.close();
}
@Override
- protected void generateMixedTuples(Writer writer, int num) throws IOException {
+ protected void generateMixedTuples(OutputStream output, int num) throws IOException {
boolean bad = false;
for (int i = 0; i < num; i++, bad = !bad) {
if (bad) {
- writer.write("<http://broken\n");
+ output.write("<http://broken\n".getBytes(utf8));
} else {
- writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n");
+ output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n").getBytes(utf8));
}
}
- writer.flush();
- writer.close();
+ output.flush();
+ output.close();
}
}
http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java
index 5312b9e..f0f0caf 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java
@@ -19,7 +19,8 @@
package org.apache.jena.hadoop.rdf.io.input.compressed;
import java.io.IOException;
-import java.io.Writer;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
import org.apache.jena.hadoop.rdf.types.TripleWritable;
@@ -32,37 +33,39 @@ import com.hp.hpl.jena.graph.Triple;
*
*/
public abstract class AbstractCompressedTriplesInputFormatTests extends
- AbstractCompressedNodeTupleInputFormatTests<Triple, TripleWritable> {
+ AbstractCompressedNodeTupleInputFormatTests<Triple, TripleWritable> {
+
+ private static final Charset utf8 = Charset.forName("utf-8");
@Override
- protected void generateTuples(Writer writer, int num) throws IOException {
+ protected void generateTuples(OutputStream output, int num) throws IOException {
for (int i = 0; i < num; i++) {
- writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n");
+ output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n").getBytes(utf8));
}
- writer.flush();
- writer.close();
+ output.flush();
+ output.close();
}
@Override
- protected void generateBadTuples(Writer writer, int num) throws IOException {
+ protected void generateBadTuples(OutputStream output, int num) throws IOException {
for (int i = 0; i < num; i++) {
- writer.write("<http://broken\n");
+ output.write("<http://broken\n".getBytes(utf8));
}
- writer.flush();
- writer.close();
+ output.flush();
+ output.close();
}
@Override
- protected void generateMixedTuples(Writer writer, int num) throws IOException {
+ protected void generateMixedTuples(OutputStream output, int num) throws IOException {
boolean bad = false;
for (int i = 0; i < num; i++, bad = !bad) {
if (bad) {
- writer.write("<http://broken\n");
+ output.write("<http://broken\n".getBytes(utf8));
} else {
- writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n");
+ output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n").getBytes(utf8));
}
}
- writer.flush();
- writer.close();
+ output.flush();
+ output.close();
}
}
http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java
index 3dd0bd0..be2b1d7 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java
@@ -22,8 +22,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
+import java.nio.charset.Charset;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
@@ -33,8 +32,8 @@ import org.apache.jena.hadoop.rdf.io.input.AbstractNodeTupleInputFormatTests;
import org.apache.jena.hadoop.rdf.types.QuadWritable;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFDataMgr;
-import org.apache.jena.riot.RDFWriterRegistry;
-
+import org.apache.jena.riot.RDFWriterRegistry;
+
import com.hp.hpl.jena.query.Dataset;
import com.hp.hpl.jena.query.DatasetFactory;
import com.hp.hpl.jena.rdf.model.Model;
@@ -49,7 +48,9 @@ import com.hp.hpl.jena.sparql.core.Quad;
*
*/
public abstract class AbstractCompressedWholeFileQuadInputFormatTests extends
- AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+ AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+
+ private static final Charset utf8 = Charset.forName("utf-8");
@Override
protected Configuration prepareConfiguration() {
@@ -59,14 +60,13 @@ public abstract class AbstractCompressedWholeFileQuadInputFormatTests extends
}
@Override
- protected Writer getWriter(File f) throws IOException {
+ protected OutputStream getOutputStream(File f) throws IOException {
CompressionCodec codec = this.getCompressionCodec();
if (codec instanceof Configurable) {
((Configurable) codec).setConf(this.prepareConfiguration());
}
FileOutputStream fileOutput = new FileOutputStream(f, false);
- OutputStream output = codec.createOutputStream(fileOutput);
- return new OutputStreamWriter(output);
+ return codec.createOutputStream(fileOutput);
}
/**
@@ -85,9 +85,8 @@ public abstract class AbstractCompressedWholeFileQuadInputFormatTests extends
return false;
}
- @SuppressWarnings("deprecation")
- private void writeTuples(Dataset ds, Writer writer) {
- RDFDataMgr.write(writer, ds, RDFWriterRegistry.defaultSerialization(this.getRdfLanguage()));
+ private void writeTuples(Dataset ds, OutputStream output) {
+ RDFDataMgr.write(output, ds, RDFWriterRegistry.defaultSerialization(this.getRdfLanguage()));
}
/**
@@ -97,7 +96,7 @@ public abstract class AbstractCompressedWholeFileQuadInputFormatTests extends
*/
protected abstract Lang getRdfLanguage();
- private void writeGoodTuples(Writer writer, int num) throws IOException {
+ private void writeGoodTuples(OutputStream output, int num) throws IOException {
Dataset ds = DatasetFactory.createMem();
Model m = ModelFactory.createDefaultModel();
Resource currSubj = m.createResource("http://example.org/subjects/0");
@@ -115,35 +114,37 @@ public abstract class AbstractCompressedWholeFileQuadInputFormatTests extends
if (!m.isEmpty()) {
ds.addNamedModel("http://example.org/graphs/extra", m);
}
- this.writeTuples(ds, writer);
+ this.writeTuples(ds, output);
}
@Override
- protected final void generateTuples(Writer writer, int num) throws IOException {
- this.writeGoodTuples(writer, num);
- writer.close();
+ protected final void generateTuples(OutputStream output, int num) throws IOException {
+ this.writeGoodTuples(output, num);
+ output.close();
}
@Override
- protected final void generateMixedTuples(Writer writer, int num) throws IOException {
+ protected final void generateMixedTuples(OutputStream output, int num) throws IOException {
// Write good data
- this.writeGoodTuples(writer, num / 2);
+ this.writeGoodTuples(output, num / 2);
- // Write junk data
+ // Write junk data
+ byte[] junk = "junk data\n".getBytes(utf8);
for (int i = 0; i < num / 2; i++) {
- writer.write("junk data\n");
+ output.write(junk);
}
- writer.flush();
- writer.close();
+ output.flush();
+ output.close();
}
@Override
- protected final void generateBadTuples(Writer writer, int num) throws IOException {
+ protected final void generateBadTuples(OutputStream output, int num) throws IOException {
+ byte[] junk = "junk data\n".getBytes(utf8);
for (int i = 0; i < num; i++) {
- writer.write("junk data\n");
+ output.write(junk);
}
- writer.flush();
- writer.close();
+ output.flush();
+ output.close();
}
}