You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/28 19:04:24 UTC
incubator-tinkerpop git commit: generalized the testing of file-based
Hadoop-Gremlin Input/OutputFormats. Validation of both read and write splits
at various split intervals.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master c8dcead9e -> 311db29fd
generalized the testing of file-based Hadoop-Gremlin Input/OutputFormats. Validation of both read and write splits at various split intervals.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/311db29f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/311db29f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/311db29f
Branch: refs/heads/master
Commit: 311db29fdc9bc195304b5af5ce87745d75050bed
Parents: c8dcead
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Apr 28 11:04:09 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Apr 28 11:04:21 2015 -0600
----------------------------------------------------------------------
.../structure/io/gryo/GryoOutputFormat.java | 4 -
.../io/TestFileReaderWriterHelper.java | 123 +++++++++++++++++++
.../GraphSONRecordReaderWriterTest.java | 87 ++-----------
.../io/gryo/GryoRecordReaderWriterTest.java | 96 ++-------------
4 files changed, 142 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/311db29f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoOutputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoOutputFormat.java
index 3b8a26f..77389e4 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoOutputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoOutputFormat.java
@@ -36,8 +36,4 @@ public class GryoOutputFormat extends CommonFileOutputFormat {
public RecordWriter<NullWritable, VertexWritable> getRecordWriter(final TaskAttemptContext job) throws IOException, InterruptedException {
return new GryoRecordWriter(getDataOuputStream(job), job.getConfiguration());
}
-
- public RecordWriter<NullWritable, VertexWritable> getRecordWriter(final TaskAttemptContext job, final DataOutputStream outputStream) throws IOException, InterruptedException {
- return new GryoRecordWriter(outputStream, job.getConfiguration());
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/311db29f/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/TestFileReaderWriterHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/TestFileReaderWriterHelper.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/TestFileReaderWriterHelper.java
new file mode 100644
index 0000000..e68355a
--- /dev/null
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/TestFileReaderWriterHelper.java
@@ -0,0 +1,123 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tinkerpop.gremlin.TestHelper;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class TestFileReaderWriterHelper {
+
+ public static List<FileSplit> generateFileSplits(final File file, final int numberOfSplits) {
+ final long fileSize = file.length();
+ final long splitLength = (long) ((double) fileSize / (double) numberOfSplits);
+ final List<FileSplit> splits = new ArrayList<>();
+ for (int i = 0; i < fileSize; i = i + (int) splitLength + 1) {
+ splits.add(new FileSplit(new Path(file.getAbsoluteFile().toURI().toString()), i, splitLength, null));
+ }
+ return splits;
+ }
+
+ public static void validateFileSplits(final List<FileSplit> fileSplits, final Class<? extends InputFormat> inputFormatClass, final Optional<Class<? extends OutputFormat>> outFormatClass) throws Exception {
+ File outputDirectory = TestHelper.makeTestDataPath(inputFormatClass, "hadoop-record-reader-writer-test");
+ final Configuration configuration = new Configuration(false);
+ configuration.set("fs.file.impl", LocalFileSystem.class.getName());
+ configuration.set("fs.default.name", "file:///");
+ configuration.set("mapred.output.dir", "file:///" + outputDirectory.getAbsolutePath());
+ final InputFormat inputFormat = ReflectionUtils.newInstance(inputFormatClass, configuration);
+ final TaskAttemptContext job = new TaskAttemptContext(configuration, new TaskAttemptID(UUID.randomUUID().toString(), 0, true, 0, 0));
+
+ int vertexCount = 0;
+ int outEdgeCount = 0;
+ int inEdgeCount = 0;
+
+ final OutputFormat outputFormat = outFormatClass.isPresent() ? ReflectionUtils.newInstance(outFormatClass.get(), configuration) : null;
+ final RecordWriter writer = null == outputFormat ? null : outputFormat.getRecordWriter(job);
+
+ boolean foundKeyValue = false;
+ for (final FileSplit split : fileSplits) {
+ System.out.println("\treading file split " + split.getPath().getName() + " (" + split.getStart() + "..." + (split.getStart() + split.getLength()) + " bytes)");
+ final RecordReader reader = inputFormat.createRecordReader(split, job);
+
+ float lastProgress = -1f;
+ while (reader.nextKeyValue()) {
+ //System.out.println("" + reader.getProgress() + "> " + reader.getCurrentKey() + ": " + reader.getCurrentValue());
+ final float progress = reader.getProgress();
+ assertTrue(progress >= lastProgress);
+ assertEquals(NullWritable.class, reader.getCurrentKey().getClass());
+ final VertexWritable v = (VertexWritable) reader.getCurrentValue();
+ if (null != writer) writer.write(NullWritable.get(), v);
+ vertexCount++;
+ outEdgeCount = outEdgeCount + (int) IteratorUtils.count(v.get().edges(Direction.OUT));
+ inEdgeCount = inEdgeCount + (int) IteratorUtils.count(v.get().edges(Direction.IN));
+
+ final Vertex vertex = v.get();
+ assertEquals(Integer.class, vertex.id().getClass());
+ final Object value = vertex.property("name");
+ if (((Property) value).value().equals("SUGAR MAGNOLIA")) {
+ foundKeyValue = true;
+ assertEquals(92, IteratorUtils.count(vertex.edges(Direction.OUT)));
+ assertEquals(77, IteratorUtils.count(vertex.edges(Direction.IN)));
+ }
+ lastProgress = progress;
+ }
+ }
+ assertEquals(8049, outEdgeCount);
+ assertEquals(8049, inEdgeCount);
+ assertEquals(outEdgeCount, inEdgeCount);
+ assertEquals(808, vertexCount);
+ assertTrue(foundKeyValue);
+
+ if (null != writer) {
+ writer.close(new TaskAttemptContext(configuration, job.getTaskAttemptID()));
+ for (int i = 1; i < 10; i++) {
+ validateFileSplits(generateFileSplits(new File(outputDirectory.getAbsoluteFile() + "/_temporary/" + job.getTaskAttemptID().getTaskID().toString().replace("task", "_attempt") + "_0" + "/part-m-00000"), i), inputFormatClass, Optional.empty());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/311db29f/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReaderWriterTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReaderWriterTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReaderWriterTest.java
index ab20cc2..b86e8ef 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReaderWriterTest.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReaderWriterTest.java
@@ -18,94 +18,27 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Property;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.TestFileReaderWriterHelper;
import org.junit.Test;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.File;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import java.util.List;
+import java.util.Optional;
/**
- * @author Joshua Shinavier (http://fortytwo.net)
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public class GraphSONRecordReaderWriterTest {
-
@Test
- public void shouldSplitFile() throws Exception {
- final Configuration configuration = new Configuration(false);
- configuration.set("fs.file.impl", LocalFileSystem.class.getName());
- configuration.set("fs.default.name", "file:///");
-
- final File testFile = new File(HadoopGraphProvider.PATHS.get("grateful-dead.json"));
- final FileSplit split = new FileSplit(
- new Path(testFile.getAbsoluteFile().toURI().toString()), 0,
- testFile.length(), null);
- System.out.println("reading GraphSON adjacency file " + testFile.getAbsolutePath() + " (" + testFile.length() + " bytes)");
-
- final GraphSONInputFormat inputFormat = ReflectionUtils.newInstance(GraphSONInputFormat.class, configuration);
- final TaskAttemptContext job = new TaskAttemptContext(configuration, new TaskAttemptID());
- final RecordReader reader = inputFormat.createRecordReader(split, job);
-
- final ByteArrayOutputStream bos = new ByteArrayOutputStream();
- try (final DataOutputStream dos = new DataOutputStream(bos)) {
- final GraphSONOutputFormat outputFormat = new GraphSONOutputFormat();
- final RecordWriter writer = outputFormat.getRecordWriter(job, dos);
-
- float lastProgress = -1f;
- int count = 0;
- boolean foundKeyValue = false;
- while (reader.nextKeyValue()) {
- //System.out.println("" + reader.getProgress() + "> " + reader.getCurrentKey() + ": " + reader.getCurrentValue());
- count++;
- final float progress = reader.getProgress();
- assertTrue(progress >= lastProgress);
- assertEquals(NullWritable.class, reader.getCurrentKey().getClass());
- final VertexWritable v = (VertexWritable) reader.getCurrentValue();
- writer.write(NullWritable.get(), v);
-
- final Vertex vertex = v.get();
- assertEquals(Integer.class, vertex.id().getClass());
-
- final Object value = vertex.property("name");
- if (null != value && ((Property) value).value().equals("SUGAR MAGNOLIA")) {
- foundKeyValue = true;
- assertEquals(92, IteratorUtils.count(vertex.edges(Direction.OUT)));
- assertEquals(77, IteratorUtils.count(vertex.edges(Direction.IN)));
- }
-
- lastProgress = progress;
- }
- assertEquals(808, count);
- assertTrue(foundKeyValue);
+ public void shouldSplitFileAndWriteProperSplits() throws Exception {
+ for (int numberOfSplits = 1; numberOfSplits < 10; numberOfSplits++) {
+ final File testFile = new File(HadoopGraphProvider.PATHS.get("grateful-dead.json"));
+ System.out.println("Testing: " + testFile + " (splits " + numberOfSplits + ")");
+ final List<FileSplit> splits = TestFileReaderWriterHelper.generateFileSplits(testFile, numberOfSplits);
+ TestFileReaderWriterHelper.validateFileSplits(splits, GraphSONInputFormat.class, Optional.of(GraphSONOutputFormat.class));
}
-
- //System.out.println("bos: " + new String(bos.toByteArray()));
- final String[] lines = new String(bos.toByteArray()).split("\n");
- assertEquals(808, lines.length);
- final String line42 = lines[41];
- assertTrue(line42.contains("outV"));
- assertTrue(line42.contains("ITS ALL OVER NOW"));
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/311db29f/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReaderWriterTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReaderWriterTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReaderWriterTest.java
index 7e9df46..7b936f3 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReaderWriterTest.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReaderWriterTest.java
@@ -18,104 +18,26 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Property;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.TestFileReaderWriterHelper;
import org.junit.Test;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import java.util.Optional;
/**
- * @author Joshua Shinavier (http://fortytwo.net)
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public class GryoRecordReaderWriterTest {
@Test
- public void shouldSplitFile() throws Exception {
- final Configuration configuration = new Configuration(false);
- configuration.set("fs.file.impl", LocalFileSystem.class.getName());
- configuration.set("fs.default.name", "file:///");
-
- final File testFile = new File(HadoopGraphProvider.PATHS.get("grateful-dead.kryo"));
- final int numberOfSplits = 4;
- final long testFileSize = testFile.length();
- final long splitLength = (long) ((double) testFileSize / (double) numberOfSplits);
- //System.out.println("Test file size: " + testFileSize);
- //System.out.println("Test file split length: " + splitLength);
- final List<FileSplit> splits = new ArrayList<>();
- for (int i = 0; i < testFileSize; i = i + (int) splitLength + 1) {
- splits.add(new FileSplit(new Path(testFile.getAbsoluteFile().toURI().toString()), i, splitLength, null));
- }
-
-
- final List<String> writeLines = new ArrayList<>();
- final GryoInputFormat inputFormat = ReflectionUtils.newInstance(GryoInputFormat.class, configuration);
- final TaskAttemptContext job = new TaskAttemptContext(configuration, new TaskAttemptID());
- int vertexCount = 0;
- int outEdgeCount = 0;
- int inEdgeCount = 0;
- boolean foundKeyValue = false;
- for (final FileSplit split : splits) {
- System.out.println("reading Gryo file split " + testFile.getAbsolutePath() + " (" + split.getStart() + "--to-->" + (split.getStart() + split.getLength()) + " bytes)");
- final RecordReader reader = inputFormat.createRecordReader(split, job);
- final ByteArrayOutputStream bos = new ByteArrayOutputStream();
- try (final DataOutputStream dos = new DataOutputStream(bos)) {
- final GryoOutputFormat outputFormat = new GryoOutputFormat();
- final RecordWriter writer = outputFormat.getRecordWriter(job, dos);
- float lastProgress = -1f;
- while (reader.nextKeyValue()) {
- //System.out.println("" + reader.getProgress() + "> " + reader.getCurrentKey() + ": " + reader.getCurrentValue());
- final float progress = reader.getProgress();
- assertTrue(progress >= lastProgress);
- assertEquals(NullWritable.class, reader.getCurrentKey().getClass());
- final VertexWritable v = (VertexWritable) reader.getCurrentValue();
- writer.write(NullWritable.get(), v);
- vertexCount++;
- outEdgeCount = outEdgeCount + (int) IteratorUtils.count(v.get().edges(Direction.OUT));
- inEdgeCount = inEdgeCount + (int) IteratorUtils.count(v.get().edges(Direction.IN));
-
- final Vertex vertex = v.get();
- assertEquals(Integer.class, vertex.id().getClass());
-
- final Object value = vertex.property("name");
- if (((Property) value).value().equals("SUGAR MAGNOLIA")) {
- foundKeyValue = true;
- assertEquals(92, IteratorUtils.count(vertex.edges(Direction.OUT)));
- assertEquals(77, IteratorUtils.count(vertex.edges(Direction.IN)));
- }
-
- lastProgress = progress;
- }
- writeLines.addAll(Arrays.asList(new String(bos.toByteArray()).split("\\x3a\\x15.\\x11\\x70...")));
- }
+ public void shouldSplitFileAndWriteProperSplits() throws Exception {
+ for (int numberOfSplits = 1; numberOfSplits < 10; numberOfSplits++) {
+ final File testFile = new File(HadoopGraphProvider.PATHS.get("grateful-dead.kryo"));
+ System.out.println("Testing: " + testFile + " (splits " + numberOfSplits + ")");
+ final List<FileSplit> splits = TestFileReaderWriterHelper.generateFileSplits(testFile, numberOfSplits);
+ TestFileReaderWriterHelper.validateFileSplits(splits, GryoInputFormat.class, Optional.of(GryoOutputFormat.class));
}
- assertEquals(8049,outEdgeCount);
- assertEquals(8049,inEdgeCount);
- assertEquals(outEdgeCount,inEdgeCount);
- assertEquals(808, vertexCount);
- assertTrue(foundKeyValue);
- assertEquals(808, writeLines.size());
- final String line42 = writeLines.get(41);
- assertTrue(line42.contains("ITS ALL OVER NO"));
}
}