You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by rv...@apache.org on 2015/01/27 18:28:39 UTC
[46/59] [abbrv] jena git commit: Further rebranding to Elephas
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TriGWriterFactory.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TriGWriterFactory.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TriGWriterFactory.java
new file mode 100644
index 0000000..6d8b08a
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TriGWriterFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.hadoop.rdf.io.registry.writers;
+
+import java.io.IOException;
+import java.io.Writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.jena.hadoop.rdf.io.output.writers.StreamRdfQuadWriter;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.writer.WriterStreamRDFBlocks;
+
+/**
+ *
+ */
+public class TriGWriterFactory extends AbstractQuadsOnlyWriterFactory {
+
+ public TriGWriterFactory() {
+ super(Lang.TRIG);
+ }
+
+ @Override
+ public <TKey> RecordWriter<TKey, QuadWritable> createQuadWriter(Writer writer, Configuration config)
+ throws IOException {
+ return new StreamRdfQuadWriter<TKey>(new WriterStreamRDFBlocks(writer), writer);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TriXWriterFactory.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TriXWriterFactory.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TriXWriterFactory.java
new file mode 100644
index 0000000..0e1b7b2
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TriXWriterFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.hadoop.rdf.io.registry.writers;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.nio.charset.Charset;
+
+import org.apache.commons.io.output.WriterOutputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.jena.hadoop.rdf.io.output.writers.StreamRdfQuadWriter;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.writer.StreamWriterTriX;
+
+/**
+ *
+ */
+public class TriXWriterFactory extends AbstractQuadsOnlyWriterFactory {
+
+ public TriXWriterFactory() {
+ super(Lang.TRIX);
+ }
+
+ @Override
+ public <TKey> RecordWriter<TKey, QuadWritable> createQuadWriter(Writer writer, Configuration config)
+ throws IOException {
+ return new StreamRdfQuadWriter<>(new StreamWriterTriX(new WriterOutputStream(writer, Charset.forName("utf-8"))), writer);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TurtleWriterFactory.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TurtleWriterFactory.java b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TurtleWriterFactory.java
new file mode 100644
index 0000000..c837f12
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/java/org/apache/jena/hadoop/rdf/io/registry/writers/TurtleWriterFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.hadoop.rdf.io.registry.writers;
+
+import java.io.IOException;
+import java.io.Writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.jena.hadoop.rdf.io.output.writers.StreamRdfTripleWriter;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.writer.WriterStreamRDFBlocks;
+
+/**
+ *
+ */
+public class TurtleWriterFactory extends AbstractTriplesOnlyWriterFactory {
+
+ public TurtleWriterFactory() {
+ super(Lang.TURTLE, Lang.TTL, Lang.N3);
+ }
+
+ @Override
+ public <TKey> RecordWriter<TKey, TripleWritable> createTripleWriter(Writer writer, Configuration config)
+ throws IOException {
+ return new StreamRdfTripleWriter<>(new WriterStreamRDFBlocks(writer), writer);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/resources/META-INF/services/org.apache.jena.hadoop.rdf.io.registry.ReaderFactory
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/resources/META-INF/services/org.apache.jena.hadoop.rdf.io.registry.ReaderFactory b/jena-elephas/jena-elephas-io/src/main/resources/META-INF/services/org.apache.jena.hadoop.rdf.io.registry.ReaderFactory
new file mode 100644
index 0000000..ec0e48a
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/resources/META-INF/services/org.apache.jena.hadoop.rdf.io.registry.ReaderFactory
@@ -0,0 +1,10 @@
+# Default Reader Factory implementations
+org.apache.jena.hadoop.rdf.io.registry.readers.JsonLDReaderFactory
+org.apache.jena.hadoop.rdf.io.registry.readers.NQuadsReaderFactory
+org.apache.jena.hadoop.rdf.io.registry.readers.NTriplesReaderFactory
+org.apache.jena.hadoop.rdf.io.registry.readers.RdfJsonReaderFactory
+org.apache.jena.hadoop.rdf.io.registry.readers.RdfXmlReaderFactory
+org.apache.jena.hadoop.rdf.io.registry.readers.ThriftReaderFactory
+org.apache.jena.hadoop.rdf.io.registry.readers.TriGReaderFactory
+org.apache.jena.hadoop.rdf.io.registry.readers.TriXReaderFactory
+org.apache.jena.hadoop.rdf.io.registry.readers.TurtleReaderFactory
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/main/resources/META-INF/services/org.apache.jena.hadoop.rdf.io.registry.WriterFactory
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/main/resources/META-INF/services/org.apache.jena.hadoop.rdf.io.registry.WriterFactory b/jena-elephas/jena-elephas-io/src/main/resources/META-INF/services/org.apache.jena.hadoop.rdf.io.registry.WriterFactory
new file mode 100644
index 0000000..164880d
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/main/resources/META-INF/services/org.apache.jena.hadoop.rdf.io.registry.WriterFactory
@@ -0,0 +1,10 @@
+# Default Writer Factory implementations
+org.apache.jena.hadoop.rdf.io.registry.writers.JsonLDWriterFactory
+org.apache.jena.hadoop.rdf.io.registry.writers.NQuadsWriterFactory
+org.apache.jena.hadoop.rdf.io.registry.writers.NTriplesWriterFactory
+org.apache.jena.hadoop.rdf.io.registry.writers.RdfJsonWriterFactory
+org.apache.jena.hadoop.rdf.io.registry.writers.RdfXmlWriterFactory
+org.apache.jena.hadoop.rdf.io.registry.writers.ThriftWriterFactory
+org.apache.jena.hadoop.rdf.io.registry.writers.TriGWriterFactory
+org.apache.jena.hadoop.rdf.io.registry.writers.TriXWriterFactory
+org.apache.jena.hadoop.rdf.io.registry.writers.TurtleWriterFactory
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/RdfTriplesInputTestMapper.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/RdfTriplesInputTestMapper.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/RdfTriplesInputTestMapper.java
new file mode 100644
index 0000000..5762fb7
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/RdfTriplesInputTestMapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.log4j.Logger;
+
+
+/**
+ * A test mapper which takes in line based RDF triple input and just produces triples
+ *
+ *
+ */
+public class RdfTriplesInputTestMapper extends Mapper<LongWritable, TripleWritable, NullWritable, TripleWritable> {
+
+ private static final Logger LOG = Logger.getLogger(RdfTriplesInputTestMapper.class);
+
+ @Override
+ protected void map(LongWritable key, TripleWritable value, Context context)
+ throws IOException, InterruptedException {
+ LOG.info("Line " + key.toString() + " => " + value.toString());
+ context.write(NullWritable.get(), value);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedQuadInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedQuadInputFormatTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedQuadInputFormatTests.java
new file mode 100644
index 0000000..1cda0bd
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedQuadInputFormatTests.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+/**
+ * Abstract tests for blocked triple input formats
+ *
+ *
+ *
+ */
+public abstract class AbstractBlockedQuadInputFormatTests extends AbstractWholeFileQuadInputFormatTests {
+
+ @Override
+ protected boolean canSplitInputs() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedTripleInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedTripleInputFormatTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedTripleInputFormatTests.java
new file mode 100644
index 0000000..2e1e865
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractBlockedTripleInputFormatTests.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+/**
+ * Abstract tests for blocked triple input formats
+ *
+ *
+ *
+ */
+public abstract class AbstractBlockedTripleInputFormatTests extends AbstractWholeFileTripleInputFormatTests {
+
+ @Override
+ protected boolean canSplitInputs() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
new file mode 100644
index 0000000..e22650f
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.jena.hadoop.rdf.io.HadoopIOConstants;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract node tuple input format tests
+ *
+ *
+ *
+ * @param <TValue>
+ * @param <T>
+ */
+public abstract class AbstractNodeTupleInputFormatTests<TValue, T extends AbstractNodeTupleWritable<TValue>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeTupleInputFormatTests.class);
+
+ protected static final int EMPTY_SIZE = 0, SMALL_SIZE = 100, LARGE_SIZE = 10000, BAD_SIZE = 100, MIXED_SIZE = 100;
+ protected static final String EMPTY = "empty";
+ protected static final String SMALL = "small";
+ protected static final String LARGE = "large";
+ protected static final String BAD = "bad";
+ protected static final String MIXED = "mixed";
+
+ /**
+ * Temporary folder for the tests
+ */
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ protected File empty, small, large, bad, mixed;
+
+ /**
+ * Prepares the inputs for the tests
+ *
+ * @throws IOException
+ */
+ @Before
+ public void beforeTest() throws IOException {
+ this.prepareInputs();
+ }
+
+ /**
+ * Cleans up the inputs after each test
+ */
+ @After
+ public void afterTest() {
+ // Should be unnecessary since JUnit will clean up the temporary folder
+ // anyway but best to do this regardless
+ if (empty != null)
+ empty.delete();
+ if (small != null)
+ small.delete();
+ if (large != null)
+ large.delete();
+ if (bad != null)
+ bad.delete();
+ if (mixed != null)
+ mixed.delete();
+ }
+
+ /**
+ * Prepares a fresh configuration
+ *
+ * @return Configuration
+ */
+ protected Configuration prepareConfiguration() {
+ Configuration config = new Configuration(true);
+ // Nothing else to do
+ return config;
+ }
+
+ /**
+ * Prepares the inputs
+ *
+ * @throws IOException
+ */
+ protected void prepareInputs() throws IOException {
+ String ext = this.getFileExtension();
+ empty = folder.newFile(EMPTY + ext);
+ this.generateTuples(empty, EMPTY_SIZE);
+ small = folder.newFile(SMALL + ext);
+ this.generateTuples(small, SMALL_SIZE);
+ large = folder.newFile(LARGE + ext);
+ this.generateTuples(large, LARGE_SIZE);
+ bad = folder.newFile(BAD + ext);
+ this.generateBadTuples(bad, BAD_SIZE);
+ mixed = folder.newFile(MIXED + ext);
+ this.generateMixedTuples(mixed, MIXED_SIZE);
+ }
+
+ /**
+ * Gets the extra file extension to add to the filenames
+ *
+ * @return File extension
+ */
+ protected abstract String getFileExtension();
+
+ /**
+ * Generates tuples used for tests
+ *
+ * @param f
+ * File
+ * @param num
+ * Number of tuples to generate
+ * @throws IOException
+ */
+ protected final void generateTuples(File f, int num) throws IOException {
+ this.generateTuples(this.getOutputStream(f), num);
+ }
+
+ /**
+ * Gets the output stream to use for generating tuples
+ *
+ * @param f
+ * File
+ * @return Output Stream
+ * @throws IOException
+ */
+ protected OutputStream getOutputStream(File f) throws IOException {
+ return new FileOutputStream(f, false);
+ }
+
+ /**
+ * Generates tuples used for tests
+ *
+ * @param output
+ * Output Stream to write to
+ * @param num
+ * Number of tuples to generate
+ * @throws IOException
+ */
+ protected abstract void generateTuples(OutputStream output, int num) throws IOException;
+
+ /**
+ * Generates bad tuples used for tests
+ *
+ * @param f
+ * File
+ * @param num
+ * Number of bad tuples to generate
+ * @throws IOException
+ */
+ protected final void generateBadTuples(File f, int num) throws IOException {
+ this.generateBadTuples(this.getOutputStream(f), num);
+ }
+
+ /**
+ * Generates bad tuples used for tests
+ *
+ * @param output
+ * Output Stream to write to
+ * @param num
+ * Number of bad tuples to generate
+ * @throws IOException
+ */
+ protected abstract void generateBadTuples(OutputStream output, int num) throws IOException;
+
+ /**
+ * Generates a mixture of good and bad tuples used for tests
+ *
+ * @param f
+ * File
+ * @param num
+ * Number of tuples to generate, they should be a 50/50 mix of
+ * good and bad tuples
+ * @throws IOException
+ */
+ protected final void generateMixedTuples(File f, int num) throws IOException {
+ this.generateMixedTuples(this.getOutputStream(f), num);
+ }
+
+ /**
+ * Generates a mixture of good and bad tuples used for tests
+ *
+ * @param output
+ * Output Stream to write to
+ * @param num
+ * Number of tuples to generate, they should be a 50/50 mix of
+ * good and bad tuples
+ * @throws IOException
+ */
+ protected abstract void generateMixedTuples(OutputStream output, int num) throws IOException;
+
+ /**
+ * Adds an input path to the job configuration
+ *
+ * @param f
+ * File
+ * @param config
+ * Configuration
+ * @param job
+ * Job
+ * @throws IOException
+ */
+ protected void addInputPath(File f, Configuration config, Job job) throws IOException {
+ FileSystem fs = FileSystem.getLocal(config);
+ Path inputPath = fs.makeQualified(new Path(f.getAbsolutePath()));
+ FileInputFormat.addInputPath(job, inputPath);
+ }
+
+ protected final int countTuples(RecordReader<LongWritable, T> reader) throws IOException, InterruptedException {
+ int count = 0;
+
+ // Check initial progress
+ LOG.info(String.format("Initial Reported Progress %f", reader.getProgress()));
+ float progress = reader.getProgress();
+ if (Float.compare(0.0f, progress) == 0) {
+ Assert.assertEquals(0.0d, reader.getProgress(), 0.0d);
+ } else if (Float.compare(1.0f, progress) == 0) {
+ // If reader is reported 1.0 straight away then we expect there to
+ // be no key values
+ Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
+ Assert.assertFalse(reader.nextKeyValue());
+ } else {
+ Assert.fail(String.format(
+ "Expected progress of 0.0 or 1.0 before reader has been accessed for first time but got %f",
+ progress));
+ }
+
+ // Count tuples
+ boolean debug = LOG.isDebugEnabled();
+ while (reader.nextKeyValue()) {
+ count++;
+ progress = reader.getProgress();
+ if (debug)
+ LOG.debug(String.format("Current Reported Progress %f", progress));
+ Assert.assertTrue(String.format("Progress should be in the range 0.0 < p <= 1.0 but got %f", progress),
+ progress > 0.0f && progress <= 1.0f);
+ }
+ reader.close();
+ LOG.info(String.format("Got %d tuples from this record reader", count));
+
+ // Check final progress
+ LOG.info(String.format("Final Reported Progress %f", reader.getProgress()));
+ Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
+
+ return count;
+ }
+
+ protected final void checkTuples(RecordReader<LongWritable, T> reader, int expected) throws IOException,
+ InterruptedException {
+ Assert.assertEquals(expected, this.countTuples(reader));
+ }
+
+ /**
+ * Runs a test with a single input
+ *
+ * @param input
+ * Input
+ * @param expectedTuples
+ * Expected tuples
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected final void testSingleInput(File input, int expectedSplits, int expectedTuples) throws IOException,
+ InterruptedException {
+ // Prepare configuration
+ Configuration config = this.prepareConfiguration();
+ this.testSingleInput(config, input, expectedSplits, expectedTuples);
+ }
+
+ /**
+ * Runs a test with a single input
+ *
+ * @param config
+ * Configuration
+ * @param input
+ * Input
+ * @param expectedTuples
+ * Expected tuples
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected final void testSingleInput(Configuration config, File input, int expectedSplits, int expectedTuples)
+ throws IOException, InterruptedException {
+ // Set up fake job
+ InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
+ Job job = Job.getInstance(config);
+ job.setInputFormatClass(inputFormat.getClass());
+ this.addInputPath(input, job.getConfiguration(), job);
+ JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+ Assert.assertEquals(1, FileInputFormat.getInputPaths(context).length);
+ NLineInputFormat.setNumLinesPerSplit(job, LARGE_SIZE);
+
+ // Check splits
+ List<InputSplit> splits = inputFormat.getSplits(context);
+ Assert.assertEquals(expectedSplits, splits.size());
+
+ // Check tuples
+ for (InputSplit split : splits) {
+ TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+ RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
+ reader.initialize(split, taskContext);
+ this.checkTuples(reader, expectedTuples);
+ }
+ }
+
+ protected abstract InputFormat<LongWritable, T> getInputFormat();
+
+ /**
+ * Basic tuples input test
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public final void single_input_01() throws IOException, InterruptedException, ClassNotFoundException {
+ testSingleInput(empty, this.canSplitInputs() ? 0 : 1, EMPTY_SIZE);
+ }
+
+ /**
+ * Basic tuples input test
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public final void single_input_02() throws IOException, InterruptedException, ClassNotFoundException {
+ testSingleInput(small, 1, SMALL_SIZE);
+ }
+
+ /**
+ * Basic tuples input test
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public final void single_input_03() throws IOException, InterruptedException, ClassNotFoundException {
+ testSingleInput(large, 1, LARGE_SIZE);
+ }
+
+ /**
+ * Basic tuples input test
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public final void single_input_04() throws IOException, InterruptedException, ClassNotFoundException {
+ testSingleInput(bad, 1, 0);
+ }
+
+ /**
+ * Basic tuples input test
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public final void single_input_05() throws IOException, InterruptedException, ClassNotFoundException {
+ testSingleInput(mixed, 1, MIXED_SIZE / 2);
+ }
+
+ /**
+ * Tests behaviour when ignoring bad tuples is disabled
+ *
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ @Test(expected = IOException.class)
+ public final void fail_on_bad_input_01() throws IOException, InterruptedException {
+ Configuration config = this.prepareConfiguration();
+ config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+ Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true));
+ testSingleInput(config, bad, 1, 0);
+ }
+
+ /**
+ * Tests behaviour when ignoring bad tuples is disabled
+ *
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ @Test(expected = IOException.class)
+ public final void fail_on_bad_input_02() throws IOException, InterruptedException {
+ Configuration config = this.prepareConfiguration();
+ config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+ Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true));
+ testSingleInput(config, mixed, 1, MIXED_SIZE / 2);
+ }
+
+ /**
+ * Runs a multiple input test
+ *
+ * @param inputs
+ * Inputs
+ * @param expectedSplits
+ * Number of splits expected
+ * @param expectedTuples
+ * Number of tuples expected
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected final void testMultipleInputs(File[] inputs, int expectedSplits, int expectedTuples) throws IOException,
+ InterruptedException {
+ // Prepare configuration and inputs
+ Configuration config = this.prepareConfiguration();
+
+ // Set up fake job
+ InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
+ Job job = Job.getInstance(config);
+ job.setInputFormatClass(inputFormat.getClass());
+ for (File input : inputs) {
+ this.addInputPath(input, job.getConfiguration(), job);
+ }
+ JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+ Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length);
+ NLineInputFormat.setNumLinesPerSplit(job, expectedTuples);
+
+ // Check splits
+ List<InputSplit> splits = inputFormat.getSplits(context);
+ Assert.assertEquals(expectedSplits, splits.size());
+
+ // Check tuples
+ int count = 0;
+ for (InputSplit split : splits) {
+ TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+ RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
+ reader.initialize(split, taskContext);
+ count += this.countTuples(reader);
+ }
+ Assert.assertEquals(expectedTuples, count);
+ }
+
+ /**
+ * tuples test with multiple inputs
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public final void multiple_inputs_01() throws IOException, InterruptedException, ClassNotFoundException {
+ testMultipleInputs(new File[] { empty, small, large }, this.canSplitInputs() ? 2 : 3, EMPTY_SIZE + SMALL_SIZE
+ + LARGE_SIZE);
+ }
+
+ /**
+ * tuples test with multiple inputs
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public final void multiple_inputs_02() throws IOException, InterruptedException, ClassNotFoundException {
+ testMultipleInputs(new File[] { folder.getRoot() }, this.canSplitInputs() ? 4 : 5, EMPTY_SIZE + SMALL_SIZE
+ + LARGE_SIZE + (MIXED_SIZE / 2));
+ }
+
+ protected final void testSplitInputs(Configuration config, File[] inputs, int expectedSplits, int expectedTuples)
+ throws IOException, InterruptedException {
+ // Set up fake job
+ InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
+ Job job = Job.getInstance(config);
+ job.setInputFormatClass(inputFormat.getClass());
+ for (File input : inputs) {
+ this.addInputPath(input, job.getConfiguration(), job);
+ }
+ JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+ Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length);
+
+ // Check splits
+ List<InputSplit> splits = inputFormat.getSplits(context);
+ Assert.assertEquals(expectedSplits, splits.size());
+
+ // Check tuples
+ int count = 0;
+ for (InputSplit split : splits) {
+ // Validate split
+ Assert.assertTrue(this.isValidSplit(split, config));
+
+ // Read split
+ TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+ RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
+ reader.initialize(split, taskContext);
+ count += this.countTuples(reader);
+ }
+ Assert.assertEquals(expectedTuples, count);
+ }
+
+ /**
+ * Determines whether an input split is valid
+ *
+ * @param split
+ * Input split
+ * @return True if a valid split, false otherwise
+ * @throws IOException
+ */
+ protected boolean isValidSplit(InputSplit split, Configuration config) throws IOException {
+ return split instanceof FileSplit;
+ }
+
+ /**
+ * Indicates whether inputs can be split, defaults to true
+ *
+ * @return Whether inputs can be split
+ */
+ protected boolean canSplitInputs() {
+ return true;
+ }
+
+ /**
+ * Tests for input splitting
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public final void split_input_01() throws IOException, InterruptedException, ClassNotFoundException {
+ Assume.assumeTrue(this.canSplitInputs());
+
+ Configuration config = this.prepareConfiguration();
+ config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+ Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
+ this.testSplitInputs(config, new File[] { small }, 100, SMALL_SIZE);
+ }
+
+ /**
+ * Tests for input splitting
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public final void split_input_02() throws IOException, InterruptedException, ClassNotFoundException {
+ Assume.assumeTrue(this.canSplitInputs());
+
+ Configuration config = this.prepareConfiguration();
+ config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+ config.setLong(NLineInputFormat.LINES_PER_MAP, 10);
+ Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
+ this.testSplitInputs(config, new File[] { small }, 10, SMALL_SIZE);
+ }
+
+ /**
+ * Tests for input splitting
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public final void split_input_03() throws IOException, InterruptedException, ClassNotFoundException {
+ Assume.assumeTrue(this.canSplitInputs());
+
+ Configuration config = this.prepareConfiguration();
+ config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+ config.setLong(NLineInputFormat.LINES_PER_MAP, 100);
+ Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
+ this.testSplitInputs(config, new File[] { large }, 100, LARGE_SIZE);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
new file mode 100644
index 0000000..78d7f33
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * Abstract tests for Quad input formats
+ *
+ *
+ */
+public abstract class AbstractQuadsInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+
+ private static final Charset utf8 = Charset.forName("utf-8");
+
+ @Override
+ protected void generateTuples(OutputStream output, int num) throws IOException {
+ for (int i = 0; i < num; i++) {
+ output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n").getBytes(utf8));
+ }
+ output.flush();
+ output.close();
+ }
+
+ @Override
+ protected void generateBadTuples(OutputStream output, int num) throws IOException {
+ for (int i = 0; i < num; i++) {
+ output.write("<http://broken\n".getBytes(utf8));
+ }
+ output.flush();
+ output.close();
+ }
+
+ @Override
+ protected void generateMixedTuples(OutputStream output, int num) throws IOException {
+ boolean bad = false;
+ for (int i = 0; i < num; i++, bad = !bad) {
+ if (bad) {
+ output.write("<http://broken\n".getBytes(utf8));
+ } else {
+ output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n").getBytes(utf8));
+ }
+ }
+ output.flush();
+ output.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
new file mode 100644
index 0000000..65a9889
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+import com.hp.hpl.jena.graph.Triple;
+
+/**
+ * Abstract tests for Triple input formats
+ *
+ *
+ *
+ */
+public abstract class AbstractTriplesInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
+
+ private static final Charset utf8 = Charset.forName("utf-8");
+
+ @Override
+ protected void generateTuples(OutputStream output, int num) throws IOException {
+ for (int i = 0; i < num; i++) {
+ output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n").getBytes(utf8));
+ }
+ output.flush();
+ output.close();
+ }
+
+ @Override
+ protected void generateBadTuples(OutputStream output, int num) throws IOException {
+ byte[] junk = "<http://broken\n".getBytes(utf8);
+ for (int i = 0; i < num; i++) {
+ output.write(junk);
+ }
+ output.flush();
+ output.close();
+ }
+
+ @Override
+ protected void generateMixedTuples(OutputStream output, int num) throws IOException {
+ boolean bad = false;
+ for (int i = 0; i < num; i++, bad = !bad) {
+ if (bad) {
+ output.write("<http://broken\n".getBytes(utf8));
+ } else {
+ output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n").getBytes(utf8));
+ }
+ }
+ output.flush();
+ output.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
new file mode 100644
index 0000000..0b6cfde
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFDataMgr;
+import org.apache.jena.riot.RDFWriterRegistry;
+
+import com.hp.hpl.jena.query.Dataset;
+import com.hp.hpl.jena.query.DatasetFactory;
+import com.hp.hpl.jena.rdf.model.Model;
+import com.hp.hpl.jena.rdf.model.ModelFactory;
+import com.hp.hpl.jena.rdf.model.Property;
+import com.hp.hpl.jena.rdf.model.Resource;
+import com.hp.hpl.jena.sparql.core.Quad;
+
+/**
+ * Abstract tests for Quad input formats
+ *
+ *
+ *
+ */
+public abstract class AbstractWholeFileQuadInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+
+ private static final Charset utf8 = Charset.forName("utf-8");
+
+ @Override
+ protected boolean canSplitInputs() {
+ return false;
+ }
+
+ private void writeTuples(Dataset ds, OutputStream output) {
+ RDFDataMgr.write(output, ds, RDFWriterRegistry.defaultSerialization(this.getRdfLanguage()));
+ }
+
+ /**
+ * Gets the RDF language to write out generate tuples in
+ *
+ * @return RDF language
+ */
+ protected abstract Lang getRdfLanguage();
+
+ private void writeGoodTuples(OutputStream output, int num) throws IOException {
+ Dataset ds = DatasetFactory.createMem();
+ Model m = ModelFactory.createDefaultModel();
+ Resource currSubj = m.createResource("http://example.org/subjects/0");
+ Property predicate = m.createProperty("http://example.org/predicate");
+ for (int i = 0; i < num; i++) {
+ if (i % 100 == 0) {
+ ds.addNamedModel("http://example.org/graphs/" + (i / 100), m);
+ m = ModelFactory.createDefaultModel();
+ }
+ if (i % 10 == 0) {
+ currSubj = m.createResource("http://example.org/subjects/" + (i / 10));
+ }
+ m.add(currSubj, predicate, m.createTypedLiteral(i));
+ }
+ if (!m.isEmpty()) {
+ ds.addNamedModel("http://example.org/graphs/extra", m);
+ }
+ this.writeTuples(ds, output);
+ }
+
+ @Override
+ protected final void generateTuples(OutputStream output, int num) throws IOException {
+ this.writeGoodTuples(output, num);
+ output.close();
+ }
+
+ @Override
+ protected final void generateMixedTuples(OutputStream output, int num) throws IOException {
+ // Write good data
+ this.writeGoodTuples(output, num / 2);
+
+ // Write junk data
+ byte[] junk = "junk data\n".getBytes(utf8);
+ for (int i = 0; i < num / 2; i++) {
+ output.write(junk);
+ }
+
+ output.flush();
+ output.close();
+ }
+
+ @Override
+ protected final void generateBadTuples(OutputStream output, int num) throws IOException {
+ byte[] junk = "junk data\n".getBytes(utf8);
+ for (int i = 0; i < num; i++) {
+ output.write(junk);
+ }
+ output.flush();
+ output.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
new file mode 100644
index 0000000..b68d662
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFDataMgr;
+
+import com.hp.hpl.jena.graph.Triple;
+import com.hp.hpl.jena.rdf.model.Model;
+import com.hp.hpl.jena.rdf.model.ModelFactory;
+import com.hp.hpl.jena.rdf.model.Property;
+import com.hp.hpl.jena.rdf.model.Resource;
+
+/**
+ * Abstract tests for Triple input formats
+ *
+ *
+ *
+ */
+public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
+
+ private static final Charset utf8 = Charset.forName("utf-8");
+
+ @Override
+ protected boolean canSplitInputs() {
+ return false;
+ }
+
+ private void writeTuples(Model m, OutputStream output) {
+ RDFDataMgr.write(output, m, this.getRdfLanguage());
+ }
+
+ /**
+ * Gets the RDF language to write out generate tuples in
+ * @return RDF language
+ */
+ protected abstract Lang getRdfLanguage();
+
+ @Override
+ protected final void generateTuples(OutputStream output, int num) throws IOException {
+ Model m = ModelFactory.createDefaultModel();
+ Resource currSubj = m.createResource("http://example.org/subjects/0");
+ Property predicate = m.createProperty("http://example.org/predicate");
+ for (int i = 0; i < num; i++) {
+ if (i % 10 == 0) {
+ currSubj = m.createResource("http://example.org/subjects/" + (i / 10));
+ }
+ m.add(currSubj, predicate, m.createTypedLiteral(i));
+ }
+ this.writeTuples(m, output);
+ output.close();
+ }
+
+ @Override
+ protected final void generateMixedTuples(OutputStream output, int num) throws IOException {
+ // Write good data
+ Model m = ModelFactory.createDefaultModel();
+ Resource currSubj = m.createResource("http://example.org/subjects/0");
+ Property predicate = m.createProperty("http://example.org/predicate");
+ for (int i = 0; i < num / 2; i++) {
+ if (i % 10 == 0) {
+ currSubj = m.createResource("http://example.org/subjects/" + (i / 10));
+ }
+ m.add(currSubj, predicate, m.createTypedLiteral(i));
+ }
+ this.writeTuples(m, output);
+
+ // Write junk data
+ byte[] junk = "junk data\n".getBytes(utf8);
+ for (int i = 0; i < num / 2; i++) {
+ output.write(junk);
+ }
+
+ output.flush();
+ output.close();
+ }
+
+ @Override
+ protected final void generateBadTuples(OutputStream output, int num) throws IOException {
+ byte[] junk = "junk data\n".getBytes(utf8);
+ for (int i = 0; i < num; i++) {
+ output.write(junk);
+ }
+ output.flush();
+ output.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractBlankNodeTests.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractBlankNodeTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractBlankNodeTests.java
new file mode 100644
index 0000000..4bb0939
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractBlankNodeTests.java
@@ -0,0 +1,636 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.hadoop.rdf.io.input.bnodes;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.FileAttribute;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
+import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
+import org.apache.jena.riot.system.ParserProfile;
+import org.apache.log4j.BasicConfigurator;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.NodeFactory;
+
+/**
+ * Test case that embodies the scenario described in JENA-820
+ */
+@SuppressWarnings("unused")
+public abstract class AbstractBlankNodeTests<T, TValue extends AbstractNodeTupleWritable<T>> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBlankNodeTests.class);
+
+ @BeforeClass
+ public static void setup() {
+ // Enable if you need to diagnose test failures
+ // Useful since it includes printing the file names of the temporary
+ // files being used
+ // BasicConfigurator.resetConfiguration();
+ // BasicConfigurator.configure();
+ }
+
+ /**
+ * Gets the extension for the initial input files
+ *
+ * @return Extension including the {@code .}
+ */
+ protected abstract String getInitialInputExtension();
+
+ /**
+ * Creates a tuple
+ *
+ * @param s
+ * Subject
+ * @param p
+ * Predicate
+ * @param o
+ * Object
+ * @return Tuple
+ */
+ protected abstract T createTuple(Node s, Node p, Node o);
+
+ /**
+ * Writes out the given tuples to the given file
+ *
+ * @param f
+ * File
+ * @param tuples
+ * Tuples
+ * @throws FileNotFoundException
+ */
+ protected abstract void writeTuples(File f, List<T> tuples) throws FileNotFoundException;
+
+ /**
+ * Creates the input format for reading the initial inputs
+ *
+ * @return Input format
+ */
+ protected abstract InputFormat<LongWritable, TValue> createInitialInputFormat();
+
+ /**
+ * Creates the output format for writing the intermediate output
+ *
+ * @return Output format
+ */
+ protected abstract OutputFormat<LongWritable, TValue> createIntermediateOutputFormat();
+
+ /**
+ * Creates the input format for reading the intermediate outputs back in
+ *
+ * @return Input format
+ */
+ protected abstract InputFormat<LongWritable, TValue> createIntermediateInputFormat();
+
+ /**
+ * Gets the subject of the tuple
+ *
+ * @param value
+ * Tuple
+ * @return Subject
+ */
+ protected abstract Node getSubject(T value);
+
+ /**
+ * Gets whether the format being tested respects the RIOT
+ * {@link ParserProfile}
+ *
+ * @return True if parser profile is respected, false otherwise
+ */
+ protected boolean respectsParserProfile() {
+ return true;
+ }
+
+ /**
+ * Gets whether the format being tested preserves blank node identity
+ *
+ * @return True if identity is presereved, false otherwise
+ */
+ protected boolean preservesBlankNodeIdentity() {
+ return false;
+ }
+
+ /**
+ * Test that starts with two blank nodes with the same identity in a single
+ * file, splits them over two files and checks that we can workaround
+ * JENA-820 successfully by setting the
+ * {@link RdfIOConstants#GLOBAL_BNODE_IDENTITY} flag for our subsequent job
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test
+ public final void blank_node_divergence_01() throws IOException, InterruptedException {
+ Assume.assumeTrue("Requires ParserProfile be respected", this.respectsParserProfile());
+ Assume.assumeFalse("Requires that Blank Node identity not be preserved", this.preservesBlankNodeIdentity());
+
+ // Temporary files
+ File a = File.createTempFile("bnode_divergence", getInitialInputExtension());
+ File intermediateOutputDir = Files.createTempDirectory("bnode_divergence", new FileAttribute[0]).toFile();
+
+ try {
+ // Prepare the input data
+ // Two mentions of the same blank node in the same file
+ List<T> tuples = new ArrayList<>();
+ Node bnode = NodeFactory.createAnon();
+ Node pred = NodeFactory.createURI("http://example.org/predicate");
+ tuples.add(createTuple(bnode, pred, NodeFactory.createLiteral("first")));
+ tuples.add(createTuple(bnode, pred, NodeFactory.createLiteral("second")));
+ writeTuples(a, tuples);
+
+ // Set up fake job which will process the file as a single split
+ Configuration config = new Configuration(true);
+ InputFormat<LongWritable, TValue> inputFormat = createInitialInputFormat();
+ Job job = Job.getInstance(config);
+ job.setInputFormatClass(inputFormat.getClass());
+ NLineInputFormat.setNumLinesPerSplit(job, 100);
+ FileInputFormat.setInputPaths(job, new Path(a.getAbsolutePath()));
+ FileOutputFormat.setOutputPath(job, new Path(intermediateOutputDir.getAbsolutePath()));
+ JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+
+ // Get the splits
+ List<InputSplit> splits = inputFormat.getSplits(context);
+ Assert.assertEquals(1, splits.size());
+
+ for (InputSplit split : splits) {
+ // Initialize the input reading
+ TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+ createAttemptID(1, 1, 1));
+ RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext);
+ reader.initialize(split, inputTaskContext);
+
+ // Copy the input to the output - each triple goes to a separate
+ // output file
+ // This is how we force multiple files to be produced
+ int taskID = 1;
+ while (reader.nextKeyValue()) {
+ // Prepare the output writing
+ OutputFormat<LongWritable, TValue> outputFormat = createIntermediateOutputFormat();
+ TaskAttemptContext outputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+ createAttemptID(1, ++taskID, 1));
+ RecordWriter<LongWritable, TValue> writer = outputFormat.getRecordWriter(outputTaskContext);
+
+ writer.write(reader.getCurrentKey(), reader.getCurrentValue());
+ writer.close(outputTaskContext);
+ }
+ }
+
+ // Promote outputs from temporary status
+ promoteInputs(intermediateOutputDir);
+
+ // Now we need to create a subsequent job that reads the
+ // intermediate outputs
+ // As described in JENA-820 at this point the blank nodes are
+ // consistent, however when we read them from different files they
+ // by default get treated as different nodes and so the blank nodes
+ // diverge which is incorrect and undesirable behaviour in
+ // multi-stage pipelines
+ System.out.println(intermediateOutputDir.getAbsolutePath());
+ job = Job.getInstance(config);
+ inputFormat = createIntermediateInputFormat();
+ job.setInputFormatClass(inputFormat.getClass());
+ FileInputFormat.setInputPaths(job, new Path(intermediateOutputDir.getAbsolutePath()));
+
+ // Enabling this flag works around the JENA-820 issue
+ job.getConfiguration().setBoolean(RdfIOConstants.GLOBAL_BNODE_IDENTITY, true);
+ context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+
+ // Get the splits
+ splits = inputFormat.getSplits(context);
+ Assert.assertEquals(2, splits.size());
+
+ // Expect to end up with a single blank node
+ Set<Node> nodes = new HashSet<Node>();
+ for (InputSplit split : splits) {
+ TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+ new TaskAttemptID());
+ RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext);
+ reader.initialize(split, inputTaskContext);
+
+ while (reader.nextKeyValue()) {
+ nodes.add(getSubject(reader.getCurrentValue().get()));
+ }
+ }
+ // Nodes should not have diverged
+ Assert.assertEquals(1, nodes.size());
+
+ } finally {
+ a.delete();
+ deleteDirectory(intermediateOutputDir);
+ }
+ }
+
+ /**
+ * Test that starts with two blank nodes with the same identity in a single
+ * file, splits them over two files and shows that they diverge in the
+ * subsequent job when the JENA-820 workaround is not enabled
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test
+ public void blank_node_divergence_02() throws IOException, InterruptedException {
+ Assume.assumeTrue("Requires ParserProfile be respected", this.respectsParserProfile());
+ Assume.assumeFalse("Requires that Blank Node identity not be preserved", this.preservesBlankNodeIdentity());
+
+ // Temporary files
+ File a = File.createTempFile("bnode_divergence", getInitialInputExtension());
+ File intermediateOutputDir = Files.createTempDirectory("bnode_divergence", new FileAttribute[0]).toFile();
+
+ try {
+ // Prepare the input data
+ // Two mentions of the same blank node in the same file
+ List<T> tuples = new ArrayList<>();
+ Node bnode = NodeFactory.createAnon();
+ Node pred = NodeFactory.createURI("http://example.org/predicate");
+ tuples.add(createTuple(bnode, pred, NodeFactory.createLiteral("first")));
+ tuples.add(createTuple(bnode, pred, NodeFactory.createLiteral("second")));
+ writeTuples(a, tuples);
+
+ // Set up fake job which will process the file as a single split
+ Configuration config = new Configuration(true);
+ InputFormat<LongWritable, TValue> inputFormat = createInitialInputFormat();
+ Job job = Job.getInstance(config);
+ job.setInputFormatClass(inputFormat.getClass());
+ NLineInputFormat.setNumLinesPerSplit(job, 100);
+ FileInputFormat.setInputPaths(job, new Path(a.getAbsolutePath()));
+ FileOutputFormat.setOutputPath(job, new Path(intermediateOutputDir.getAbsolutePath()));
+ JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+
+ // Get the splits
+ List<InputSplit> splits = inputFormat.getSplits(context);
+ Assert.assertEquals(1, splits.size());
+
+ for (InputSplit split : splits) {
+ // Initialize the input reading
+ TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+ createAttemptID(1, 1, 1));
+ RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext);
+ reader.initialize(split, inputTaskContext);
+
+ // Copy the input to the output - each triple goes to a separate
+ // output file
+ // This is how we force multiple files to be produced
+ int taskID = 1;
+ while (reader.nextKeyValue()) {
+ // Prepare the output writing
+ OutputFormat<LongWritable, TValue> outputFormat = createIntermediateOutputFormat();
+ TaskAttemptContext outputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+ createAttemptID(1, ++taskID, 1));
+ RecordWriter<LongWritable, TValue> writer = outputFormat.getRecordWriter(outputTaskContext);
+
+ writer.write(reader.getCurrentKey(), reader.getCurrentValue());
+ writer.close(outputTaskContext);
+ }
+ }
+
+ // Promote outputs from temporary status
+ promoteInputs(intermediateOutputDir);
+
+ // Now we need to create a subsequent job that reads the
+ // intermediate outputs
+ // As described in JENA-820 at this point the blank nodes are
+ // consistent, however when we read them from different files they
+ // by default get treated as different nodes and so the blank nodes
+ // diverge which is incorrect and undesirable behaviour in
+ // multi-stage pipelines. However it is the default behaviour
+ // because when we start from external inputs we want them to be
+ // file scoped.
+ System.out.println(intermediateOutputDir.getAbsolutePath());
+ job = Job.getInstance(config);
+ inputFormat = createIntermediateInputFormat();
+ job.setInputFormatClass(inputFormat.getClass());
+ FileInputFormat.setInputPaths(job, new Path(intermediateOutputDir.getAbsolutePath()));
+
+ // Make sure JENA-820 flag is disabled
+ job.getConfiguration().setBoolean(RdfIOConstants.GLOBAL_BNODE_IDENTITY, false);
+ context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+
+ // Get the splits
+ splits = inputFormat.getSplits(context);
+ Assert.assertEquals(2, splits.size());
+
+ // Expect to end up with a single blank node
+ Set<Node> nodes = new HashSet<Node>();
+ for (InputSplit split : splits) {
+ TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+ new TaskAttemptID());
+ RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext);
+ reader.initialize(split, inputTaskContext);
+
+ while (reader.nextKeyValue()) {
+ nodes.add(getSubject(reader.getCurrentValue().get()));
+ }
+ }
+ // Nodes should have diverged
+ Assert.assertEquals(2, nodes.size());
+
+ } finally {
+ a.delete();
+ deleteDirectory(intermediateOutputDir);
+ }
+ }
+
+ /**
+ * Test that starts with two blank nodes in two different files and checks
+ * that writing them to a single file does not conflate them
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test
+ public void blank_node_identity_01() throws IOException, InterruptedException {
+ Assume.assumeTrue("Requires ParserProfile be respected", this.respectsParserProfile());
+ Assume.assumeFalse("Requires that Blank Node identity not be preserved", this.preservesBlankNodeIdentity());
+
+ // Temporary files
+ File a = File.createTempFile("bnode_identity", getInitialInputExtension());
+ File b = File.createTempFile("bnode_identity", getInitialInputExtension());
+ File intermediateOutputDir = Files.createTempDirectory("bnode_identity", new FileAttribute[0]).toFile();
+
+ try {
+ // Prepare the input data
+ // Different blank nodes in different files
+ List<T> tuples = new ArrayList<>();
+ Node bnode1 = NodeFactory.createAnon();
+ Node bnode2 = NodeFactory.createAnon();
+ Node pred = NodeFactory.createURI("http://example.org/predicate");
+
+ tuples.add(createTuple(bnode1, pred, NodeFactory.createLiteral("first")));
+ writeTuples(a, tuples);
+
+ tuples.clear();
+ tuples.add(createTuple(bnode2, pred, NodeFactory.createLiteral("second")));
+ writeTuples(b, tuples);
+
+ // Set up fake job which will process the two files
+ Configuration config = new Configuration(true);
+ InputFormat<LongWritable, TValue> inputFormat = createInitialInputFormat();
+ Job job = Job.getInstance(config);
+ job.setInputFormatClass(inputFormat.getClass());
+ NLineInputFormat.setNumLinesPerSplit(job, 100);
+ FileInputFormat.setInputPaths(job, new Path(a.getAbsolutePath()), new Path(b.getAbsolutePath()));
+ FileOutputFormat.setOutputPath(job, new Path(intermediateOutputDir.getAbsolutePath()));
+ JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+
+ // Get the splits
+ List<InputSplit> splits = inputFormat.getSplits(context);
+ Assert.assertEquals(2, splits.size());
+
+ // Prepare the output writing - putting all output to a single file
+ OutputFormat<LongWritable, TValue> outputFormat = createIntermediateOutputFormat();
+ TaskAttemptContext outputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(), createAttemptID(
+ 1, 2, 1));
+ RecordWriter<LongWritable, TValue> writer = outputFormat.getRecordWriter(outputTaskContext);
+
+ for (InputSplit split : splits) {
+ // Initialize the input reading
+ TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+ createAttemptID(1, 1, 1));
+ RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext);
+ reader.initialize(split, inputTaskContext);
+
+ // Copy the input to the output - all triples go to a single
+ // output
+ while (reader.nextKeyValue()) {
+ writer.write(reader.getCurrentKey(), reader.getCurrentValue());
+ }
+ }
+ writer.close(outputTaskContext);
+
+ // Promote outputs from temporary status
+ promoteInputs(intermediateOutputDir);
+
+ // Now we need to create a subsequent job that reads the
+ // intermediate outputs
+ // The Blank nodes should have been given separate identities so we
+ // should not be conflating them, this is the opposite problem to
+ // that described in JENA-820
+ System.out.println(intermediateOutputDir.getAbsolutePath());
+ job = Job.getInstance(config);
+ inputFormat = createIntermediateInputFormat();
+ job.setInputFormatClass(inputFormat.getClass());
+ NLineInputFormat.setNumLinesPerSplit(job, 100);
+ FileInputFormat.setInputPaths(job, new Path(intermediateOutputDir.getAbsolutePath()));
+ context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+
+ // Get the splits
+ splits = inputFormat.getSplits(context);
+ Assert.assertEquals(1, splits.size());
+
+ // Expect to end up with a single blank node
+ Set<Node> nodes = new HashSet<Node>();
+ for (InputSplit split : splits) {
+ TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+ new TaskAttemptID());
+ RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext);
+ reader.initialize(split, inputTaskContext);
+
+ while (reader.nextKeyValue()) {
+ nodes.add(getSubject(reader.getCurrentValue().get()));
+ }
+ }
+ // Nodes must not have converged
+ Assert.assertEquals(2, nodes.size());
+
+ } finally {
+ a.delete();
+ b.delete();
+ deleteDirectory(intermediateOutputDir);
+ }
+ }
+
+ /**
+ * Test that starts with two blank nodes in two different files and checks
+ * that writing them to a single file does not conflate them
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test
+ public void blank_node_identity_02() throws IOException, InterruptedException {
+ Assume.assumeTrue("Requires ParserProfile be respected", this.respectsParserProfile());
+ Assume.assumeFalse("Requires that Blank Node identity not be preserved", this.preservesBlankNodeIdentity());
+
+ // Temporary files
+ File a = File.createTempFile("bnode_identity", getInitialInputExtension());
+ File b = File.createTempFile("bnode_identity", getInitialInputExtension());
+ File intermediateOutputDir = Files.createTempDirectory("bnode_identity", new FileAttribute[0]).toFile();
+
+ try {
+ // Prepare the input data
+ // Same blank node but in different files so must be treated as
+ // different blank nodes and not converge
+ List<T> tuples = new ArrayList<>();
+ Node bnode = NodeFactory.createAnon();
+ Node pred = NodeFactory.createURI("http://example.org/predicate");
+
+ tuples.add(createTuple(bnode, pred, NodeFactory.createLiteral("first")));
+ writeTuples(a, tuples);
+
+ tuples.clear();
+ tuples.add(createTuple(bnode, pred, NodeFactory.createLiteral("second")));
+ writeTuples(b, tuples);
+
+ // Set up fake job which will process the two files
+ Configuration config = new Configuration(true);
+ InputFormat<LongWritable, TValue> inputFormat = createInitialInputFormat();
+ Job job = Job.getInstance(config);
+ job.setInputFormatClass(inputFormat.getClass());
+ NLineInputFormat.setNumLinesPerSplit(job, 100);
+ FileInputFormat.setInputPaths(job, new Path(a.getAbsolutePath()), new Path(b.getAbsolutePath()));
+ FileOutputFormat.setOutputPath(job, new Path(intermediateOutputDir.getAbsolutePath()));
+ JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+
+ // Get the splits
+ List<InputSplit> splits = inputFormat.getSplits(context);
+ Assert.assertEquals(2, splits.size());
+
+ // Prepare the output writing - putting all output to a single file
+ OutputFormat<LongWritable, TValue> outputFormat = createIntermediateOutputFormat();
+ TaskAttemptContext outputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(), createAttemptID(
+ 1, 2, 1));
+ RecordWriter<LongWritable, TValue> writer = outputFormat.getRecordWriter(outputTaskContext);
+
+ for (InputSplit split : splits) {
+ // Initialize the input reading
+ TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+ createAttemptID(1, 1, 1));
+ RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext);
+ reader.initialize(split, inputTaskContext);
+
+ // Copy the input to the output - all triples go to a single
+ // output
+ while (reader.nextKeyValue()) {
+ writer.write(reader.getCurrentKey(), reader.getCurrentValue());
+ }
+ }
+ writer.close(outputTaskContext);
+
+ // Promote outputs from temporary status
+ promoteInputs(intermediateOutputDir);
+
+ // Now we need to create a subsequent job that reads the
+ // intermediate outputs
+ // The Blank nodes should have been given separate identities so we
+ // should not be conflating them, this is the opposite problem to
+ // that described in JENA-820
+ System.out.println(intermediateOutputDir.getAbsolutePath());
+ job = Job.getInstance(config);
+ inputFormat = createIntermediateInputFormat();
+ job.setInputFormatClass(inputFormat.getClass());
+ NLineInputFormat.setNumLinesPerSplit(job, 100);
+ FileInputFormat.setInputPaths(job, new Path(intermediateOutputDir.getAbsolutePath()));
+ context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+
+ // Get the splits
+ splits = inputFormat.getSplits(context);
+ Assert.assertEquals(1, splits.size());
+
+ // Expect to end up with a single blank node
+ Set<Node> nodes = new HashSet<Node>();
+ for (InputSplit split : splits) {
+ TaskAttemptContext inputTaskContext = new TaskAttemptContextImpl(job.getConfiguration(),
+ new TaskAttemptID());
+ RecordReader<LongWritable, TValue> reader = inputFormat.createRecordReader(split, inputTaskContext);
+ reader.initialize(split, inputTaskContext);
+
+ while (reader.nextKeyValue()) {
+ nodes.add(getSubject(reader.getCurrentValue().get()));
+ }
+ }
+ // Nodes must not diverge
+ Assert.assertEquals(2, nodes.size());
+
+ } finally {
+ a.delete();
+ b.delete();
+ deleteDirectory(intermediateOutputDir);
+ }
+ }
+
+ private TaskAttemptID createAttemptID(int jobID, int taskID, int id) {
+ return new TaskAttemptID("outputTest", jobID, TaskType.MAP, taskID, 1);
+ }
+
+ private void promoteInputs(File baseDir) throws IOException {
+ for (File f : baseDir.listFiles()) {
+ if (f.isDirectory()) {
+ promoteInputs(baseDir, f);
+ }
+ }
+ }
+
+ private void promoteInputs(File targetDir, File dir) throws IOException {
+ java.nio.file.Path target = Paths.get(targetDir.toURI());
+ for (File f : dir.listFiles()) {
+ if (f.isDirectory()) {
+ promoteInputs(targetDir, f);
+ } else {
+ LOGGER.debug("Moving {} to {}", f.getAbsolutePath(), target.resolve(f.getName()));
+ Files.move(Paths.get(f.toURI()), target.resolve(f.getName()), StandardCopyOption.REPLACE_EXISTING);
+ }
+ }
+
+ // Remove defunct sub-directory
+ dir.delete();
+ }
+
+ private void deleteDirectory(File dir) throws IOException {
+ for (File f : dir.listFiles()) {
+ if (f.isFile())
+ f.delete();
+ if (f.isDirectory())
+ deleteDirectory(f);
+ }
+ dir.delete();
+ }
+}
http://git-wip-us.apache.org/repos/asf/jena/blob/49c4cffe/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractTripleBlankNodeTests.java
----------------------------------------------------------------------
diff --git a/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractTripleBlankNodeTests.java b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractTripleBlankNodeTests.java
new file mode 100644
index 0000000..bbd6742
--- /dev/null
+++ b/jena-elephas/jena-elephas-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/bnodes/AbstractTripleBlankNodeTests.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.hadoop.rdf.io.input.bnodes;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.util.List;
+
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFDataMgr;
+
+import com.hp.hpl.jena.graph.Graph;
+import com.hp.hpl.jena.graph.Node;
+import com.hp.hpl.jena.graph.Triple;
+import com.hp.hpl.jena.sparql.graph.GraphFactory;
+
+/**
+ *
+ */
+public abstract class AbstractTripleBlankNodeTests extends AbstractBlankNodeTests<Triple, TripleWritable> {
+
+ /**
+ * Gets the language to use
+ *
+ * @return Language
+ */
+ protected abstract Lang getLanguage();
+
+ @Override
+ protected Triple createTuple(Node s, Node p, Node o) {
+ return new Triple(s, p, o);
+ }
+
+ @Override
+ protected void writeTuples(File f, List<Triple> tuples) throws FileNotFoundException {
+ Graph g = GraphFactory.createGraphMem();
+ for (Triple t : tuples) {
+ g.add(t);
+ }
+ RDFDataMgr.write(new FileOutputStream(f), g, getLanguage());
+ }
+
+ @Override
+ protected Node getSubject(Triple value) {
+ return value.getSubject();
+ }
+
+}