You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/09/21 01:00:28 UTC
svn commit: r1388258 [1/2] - in /giraph/trunk: ./
src/main/java/org/apache/giraph/examples/
src/main/java/org/apache/giraph/graph/ src/main/java/org/apache/giraph/io/
src/test/java/org/apache/giraph/io/
Author: aching
Date: Thu Sep 20 23:00:27 2012
New Revision: 1388258
URL: http://svn.apache.org/viewvc?rev=1388258&view=rev
Log:
GIRAPH-277: Text Vertex Input/Output Format base classes overhaul.
(nitayj via aching)
Added:
giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/checkstyle.xml
giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java
giraph/trunk/src/main/java/org/apache/giraph/graph/VertexWriter.java
giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java
giraph/trunk/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java
giraph/trunk/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java
giraph/trunk/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Sep 20 23:00:27 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-277: Text Vertex Input/Output Format base classes overhaul.
+ (nitayj via aching)
+
GIRAPH-331: ReviewBoard post-review config. (nitayj via aching)
GIRAPH-332: Duplicate unnecessary info in giraph-formats-contrib
Modified: giraph/trunk/checkstyle.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/checkstyle.xml?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/checkstyle.xml (original)
+++ giraph/trunk/checkstyle.xml Thu Sep 20 23:00:27 2012
@@ -260,39 +260,9 @@
<!-- Setup special comments to suppress specific checks from source files -->
<module name="SuppressionCommentFilter">
- <property name="offCommentFormat" value="CHECKSTYLE\: stop JavadocVariable"/>
- <property name="onCommentFormat" value="CHECKSTYLE\: resume JavadocVariable"/>
- <property name="checkFormat" value="JavadocVariable"/>
- </module>
- <module name="SuppressionCommentFilter">
- <property name="offCommentFormat" value="CHECKSTYLE\: stop JavadocMethodCheck"/>
- <property name="onCommentFormat" value="CHECKSTYLE\: resume JavadocMethodCheck"/>
- <property name="checkFormat" value="JavadocMethodCheck"/>
- </module>
- <module name="SuppressionCommentFilter">
- <property name="offCommentFormat" value="CHECKSTYLE\: stop ConstantName"/>
- <property name="onCommentFormat" value="CHECKSTYLE\: resume ConstantName"/>
- <property name="checkFormat" value="ConstantName"/>
- </module>
- <module name="SuppressionCommentFilter">
- <property name="offCommentFormat" value="CHECKSTYLE\: stop HideUtilityClassConstructor"/>
- <property name="onCommentFormat" value="CHECKSTYLE\: resume HideUtilityClassConstructor"/>
- <property name="checkFormat" value="HideUtilityClassConstructor"/>
- </module>
- <module name="SuppressionCommentFilter">
- <property name="offCommentFormat" value="CHECKSTYLE\: stop MultipleVariableDeclarations"/>
- <property name="onCommentFormat" value="CHECKSTYLE\: resume MultipleVariableDeclarations"/>
- <property name="checkFormat" value="MultipleVariableDeclarations"/>
- </module>
- <module name="SuppressionCommentFilter">
- <property name="offCommentFormat" value="CHECKSTYLE\: stop IllegalCatch"/>
- <property name="onCommentFormat" value="CHECKSTYLE\: resume IllegalCatch"/>
- <property name="checkFormat" value="IllegalCatch"/>
- </module>
- <module name="SuppressionCommentFilter">
- <property name="offCommentFormat" value="CHECKSTYLE\: stop DeclarationOrder"/>
- <property name="onCommentFormat" value="CHECKSTYLE\: resume DeclarationOrder"/>
- <property name="checkFormat" value="DeclarationOrder"/>
+ <property name="offCommentFormat" value="CHECKSTYLE\: stop ([\w\|]+)"/>
+ <property name="onCommentFormat" value="CHECKSTYLE\: resume ([\w\|]+)"/>
+ <property name="checkFormat" value="$1"/>
</module>
</module>
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java Thu Sep 20 23:00:27 2012
@@ -24,14 +24,11 @@ import java.util.regex.Pattern;
import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexReader;
import org.apache.giraph.io.TextVertexInputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.google.common.collect.Lists;
@@ -48,31 +45,19 @@ public class LongDoubleFloatDoubleTextIn
FloatWritable, DoubleWritable> {
@Override
- public VertexReader<LongWritable,
- DoubleWritable, FloatWritable, DoubleWritable>
- createVertexReader(InputSplit split, TaskAttemptContext context)
+ public TextVertexReader createVertexReader(InputSplit split,
+ TaskAttemptContext context)
throws IOException {
- return new LongDoubleFloatDoubleVertexReader(
- textInputFormat.createRecordReader(split, context));
+ return new LongDoubleFloatDoubleVertexReader();
}
/**
* Vertex reader associated with {@link LongDoubleFloatDoubleTextInputFormat}.
*/
- public static class LongDoubleFloatDoubleVertexReader extends
- TextVertexInputFormat.TextVertexReader<LongWritable, DoubleWritable,
- FloatWritable, DoubleWritable> {
+ public class LongDoubleFloatDoubleVertexReader extends
+ TextVertexInputFormat.TextVertexReader {
/** Separator of the vertex and neighbors */
- private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
-
- /**
- * Constructor with the line reader.
- * @param lineReader Internal line reader.
- */
- public LongDoubleFloatDoubleVertexReader(
- RecordReader<LongWritable, Text> lineReader) {
- super(lineReader);
- }
+ private final Pattern separator = Pattern.compile("[\t ]");
@Override
public Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
@@ -83,7 +68,7 @@ public class LongDoubleFloatDoubleTextIn
.getConfiguration());
String[] tokens =
- SEPARATOR.split(getRecordReader().getCurrentValue().toString());
+ separator.split(getRecordReader().getCurrentValue().toString());
Map<LongWritable, FloatWritable> edges =
Maps.newHashMapWithExpectedSize(tokens.length - 1);
float weight = 1.0f / (tokens.length - 1);
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java Thu Sep 20 23:00:27 2012
@@ -24,14 +24,11 @@ import java.util.regex.Pattern;
import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexReader;
import org.apache.giraph.io.TextVertexInputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.google.common.collect.Lists;
@@ -48,34 +45,20 @@ public class NormalizingLongDoubleFloatD
FloatWritable, DoubleWritable> {
@Override
- public VertexReader<LongWritable, DoubleWritable,
- FloatWritable, DoubleWritable> createVertexReader(
+ public TextVertexReader createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
- return new NormalizingLongDoubleFloatDoubleVertexReader(
- textInputFormat.createRecordReader(split, context));
+ return new NormalizingLongDoubleFloatDoubleVertexReader();
}
/**
* Vertex reader associated with {@link LongDoubleFloatDoubleTextInputFormat}.
*/
- public static class NormalizingLongDoubleFloatDoubleVertexReader
- extends
- TextVertexInputFormat.TextVertexReader<LongWritable, DoubleWritable,
- FloatWritable, DoubleWritable> {
+ public class NormalizingLongDoubleFloatDoubleVertexReader
+ extends TextVertexInputFormat.TextVertexReader {
/** Separator of the vertex and neighbors */
- private static final Pattern EDGE_SEPARATOR = Pattern.compile("\\s+");
+ private final Pattern edgeSeparator = Pattern.compile("\\s+");
/** Separator of the edge id and edge weight */
- private static final Pattern WEIGHT_SEPARATOR = Pattern.compile(":");
-
- /**
- * Constructor with the line reader.
- * @param lineReader
- * Internal line reader.
- */
- public NormalizingLongDoubleFloatDoubleVertexReader(
- RecordReader<LongWritable, Text> lineReader) {
- super(lineReader);
- }
+ private final Pattern weightSeparator = Pattern.compile(":");
@Override
public Vertex<LongWritable, DoubleWritable,
@@ -87,7 +70,7 @@ public class NormalizingLongDoubleFloatD
FloatWritable, DoubleWritable>createVertex(getContext()
.getConfiguration());
- String[] tokens = EDGE_SEPARATOR.split(getRecordReader()
+ String[] tokens = edgeSeparator.split(getRecordReader()
.getCurrentValue().toString());
Map<LongWritable, FloatWritable> edges = Maps
.newHashMapWithExpectedSize(tokens.length - 1);
@@ -106,9 +89,9 @@ public class NormalizingLongDoubleFloatD
* @param tokens The tokens to be parsed.
* @param edges The map that will contain the result of the parsing.
*/
- static void parse(String[] tokens, Map<LongWritable, FloatWritable> edges) {
+ void parse(String[] tokens, Map<LongWritable, FloatWritable> edges) {
for (int n = 1; n < tokens.length; n++) {
- String[] parts = WEIGHT_SEPARATOR.split(tokens[n]);
+ String[] parts = weightSeparator.split(tokens[n]);
edges.put(new LongWritable(Long.parseLong(parts[0])),
new FloatWritable(Float.parseFloat(parts[1])));
}
@@ -118,7 +101,7 @@ public class NormalizingLongDoubleFloatD
* Normalize the edges with L1 normalization.
* @param edges The edges to be normalized.
*/
- static void normalize(Map<LongWritable, FloatWritable> edges) {
+ void normalize(Map<LongWritable, FloatWritable> edges) {
if (edges == null || edges.size() == 0) {
throw new IllegalArgumentException(
"Cannot normalize an empy set of edges");
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Thu Sep 20 23:00:27 2012
@@ -26,17 +26,14 @@ import org.apache.giraph.graph.DefaultMa
import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexReader;
-import org.apache.giraph.graph.VertexWriter;
import org.apache.giraph.graph.WorkerContext;
import org.apache.giraph.io.GeneratedVertexInputFormat;
import org.apache.giraph.io.TextVertexOutputFormat;
-import org.apache.giraph.io.TextVertexOutputFormat.TextVertexWriter;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
@@ -179,13 +176,6 @@ public class SimplePageRankVertex extend
private static final Logger LOG =
Logger.getLogger(SimplePageRankVertexReader.class);
- /**
- * Constructor.
- */
- public SimplePageRankVertexReader() {
- super();
- }
-
@Override
public boolean nextVertex() {
return totalRecords > recordsRead;
@@ -234,42 +224,28 @@ public class SimplePageRankVertex extend
}
/**
- * Simple VertexWriter that supports {@link SimplePageRankVertex}
- */
- public static class SimplePageRankVertexWriter extends
- TextVertexWriter<LongWritable, DoubleWritable, FloatWritable> {
- /**
- * Constructor with line writer.
- *
- * @param lineRecordWriter Line writer that will do the writing.
- */
- public SimplePageRankVertexWriter(
- RecordWriter<Text, Text> lineRecordWriter) {
- super(lineRecordWriter);
- }
-
- @Override
- public void writeVertex(
- Vertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
- throws IOException, InterruptedException {
- getRecordWriter().write(
- new Text(vertex.getId().toString()),
- new Text(vertex.getValue().toString()));
- }
- }
-
- /**
* Simple VertexOutputFormat that supports {@link SimplePageRankVertex}
*/
public static class SimplePageRankVertexOutputFormat extends
TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
@Override
- public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
- createVertexWriter(TaskAttemptContext context)
+ public TextVertexWriter createVertexWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
- RecordWriter<Text, Text> recordWriter =
- textOutputFormat.getRecordWriter(context);
- return new SimplePageRankVertexWriter(recordWriter);
+ return new SimplePageRankVertexWriter();
+ }
+
+ /**
+ * Simple VertexWriter that supports {@link SimplePageRankVertex}
+ */
+ public class SimplePageRankVertexWriter extends TextVertexWriter {
+ @Override
+ public void writeVertex(
+ Vertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
+ throws IOException, InterruptedException {
+ getRecordWriter().write(
+ new Text(vertex.getId().toString()),
+ new Text(vertex.getValue().toString()));
+ }
}
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java Thu Sep 20 23:00:27 2012
@@ -22,16 +22,13 @@ import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexReader;
-import org.apache.giraph.graph.VertexWriter;
import org.apache.giraph.io.GeneratedVertexInputFormat;
import org.apache.giraph.io.TextVertexOutputFormat;
-import org.apache.giraph.io.TextVertexOutputFormat.TextVertexWriter;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
@@ -62,12 +59,6 @@ public class SimpleSuperstepVertex exten
/** Class logger */
private static final Logger LOG =
Logger.getLogger(SimpleSuperstepVertexReader.class);
- /**
- * Constructor.
- */
- public SimpleSuperstepVertexReader() {
- super();
- }
@Override
public boolean nextVertex() throws IOException, InterruptedException {
@@ -121,29 +112,6 @@ public class SimpleSuperstepVertex exten
}
}
- /**
- * Simple VertexWriter that supports {@link SimpleSuperstepVertex}
- */
- public static class SimpleSuperstepVertexWriter extends
- TextVertexWriter<LongWritable, IntWritable, FloatWritable> {
- /**
- * Constructor with the line record writer.
- *
- * @param lineRecordWriter Writer to write to.
- */
- public SimpleSuperstepVertexWriter(
- RecordWriter<Text, Text> lineRecordWriter) {
- super(lineRecordWriter);
- }
-
- @Override
- public void writeVertex(Vertex<LongWritable, IntWritable,
- FloatWritable, ?> vertex) throws IOException, InterruptedException {
- getRecordWriter().write(
- new Text(vertex.getId().toString()),
- new Text(vertex.getValue().toString()));
- }
- }
/**
* Simple VertexOutputFormat that supports {@link SimpleSuperstepVertex}
@@ -151,12 +119,22 @@ public class SimpleSuperstepVertex exten
public static class SimpleSuperstepVertexOutputFormat extends
TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
@Override
- public VertexWriter<LongWritable, IntWritable, FloatWritable>
- createVertexWriter(TaskAttemptContext context)
+ public TextVertexWriter createVertexWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
- RecordWriter<Text, Text> recordWriter =
- textOutputFormat.getRecordWriter(context);
- return new SimpleSuperstepVertexWriter(recordWriter);
+ return new SimpleSuperstepVertexWriter();
+ }
+
+ /**
+ * Simple VertexWriter that supports {@link SimpleSuperstepVertex}
+ */
+ public class SimpleSuperstepVertexWriter extends TextVertexWriter {
+ @Override
+ public void writeVertex(Vertex<LongWritable, IntWritable,
+ FloatWritable, ?> vertex) throws IOException, InterruptedException {
+ getRecordWriter().write(
+ new Text(vertex.getId().toString()),
+ new Text(vertex.getValue().toString()));
+ }
}
}
}
Added: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java?rev=1388258&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java Thu Sep 20 23:00:27 2012
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.TextVertexOutputFormat;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Simple text based vertex output format example.
+ */
+public class SimpleTextVertexOutputFormat extends
+ TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
+ /**
+ * Simple text based vertex writer
+ */
+ private class SimpleTextVertexWriter extends TextVertexWriter {
+ @Override
+ public void writeVertex(
+ Vertex<LongWritable, IntWritable, FloatWritable, ?> vertex)
+ throws IOException, InterruptedException {
+ getRecordWriter().write(
+ new Text(vertex.getId().toString()),
+ new Text(vertex.getValue().toString()));
+ }
+ }
+
+ @Override
+ public TextVertexWriter createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new SimpleTextVertexWriter();
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java Thu Sep 20 23:00:27 2012
@@ -21,13 +21,11 @@ package org.apache.giraph.examples;
import java.io.IOException;
import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexWriter;
import org.apache.giraph.io.TextVertexOutputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
@@ -36,31 +34,18 @@ import org.apache.hadoop.mapreduce.TaskA
public class VertexWithDoubleValueFloatEdgeTextOutputFormat extends
TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
@Override
- public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
- createVertexWriter(TaskAttemptContext context) throws IOException,
- InterruptedException {
- RecordWriter<Text, Text> recordWriter =
- textOutputFormat.getRecordWriter(context);
- return new VertexWithDoubleValueWriter(recordWriter);
+ public TextVertexWriter createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new VertexWithDoubleValueWriter();
}
/**
* Vertex writer used with {@link VertexWithComponentTextOutputFormat}.
*/
- public static class VertexWithDoubleValueWriter extends
- TextVertexOutputFormat.TextVertexWriter<LongWritable,
- DoubleWritable, FloatWritable> {
- /**
- * Constructor with record writer.
- * @param writer Where the vertices will finally be written.
- */
- public VertexWithDoubleValueWriter(RecordWriter<Text, Text> writer) {
- super(writer);
- }
-
+ public class VertexWithDoubleValueWriter extends TextVertexWriter {
@Override
public void writeVertex(
- Vertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
+ Vertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
throws IOException, InterruptedException {
StringBuilder output = new StringBuilder();
output.append(vertex.getId().get());
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java Thu Sep 20 23:00:27 2012
@@ -38,7 +38,7 @@ import java.io.IOException;
public interface VertexReader<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> {
/**
- * Use the input split and context to setup reading the vertices.
+ * Use the input split and context t o setup reading the vertices.
* Guaranteed to be called prior to any other function.
*
* @param inputSplit Input split to be used for reading vertices.
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/VertexWriter.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexWriter.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/VertexWriter.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/VertexWriter.java Thu Sep 20 23:00:27 2012
@@ -40,8 +40,10 @@ public interface VertexWriter<I extends
*
* @param context Context used to write the vertices.
* @throws IOException
+ * @throws InterruptedException
*/
- void initialize(TaskAttemptContext context) throws IOException;
+ void initialize(TaskAttemptContext context) throws IOException,
+ InterruptedException;
/**
* Writes the next vertex and associated data
Added: giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java?rev=1388258&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java Thu Sep 20 23:00:27 2012
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * VertexReader that readers lines of text with vertices encoded as adjacency
+ * lists and converts each token to the correct type. For example, a graph
+ * with vertices as integers and values as doubles could be encoded as:
+ * 1 0.1 2 0.2 3 0.3
+ * to represent a vertex named 1, with 0.1 as its value and two edges, to
+ * vertices 2 and 3, with edge values of 0.2 and 0.3, respectively.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class AdjacencyListTextVertexInputFormat<I extends
+ WritableComparable, V extends Writable, E extends Writable, M extends
+ Writable> extends TextVertexInputFormat<I, V, E, M> {
+ /** Delimiter for split */
+ public static final String LINE_TOKENIZE_VALUE = "adj.list.input.delimiter";
+ /** Default delimiter for split */
+ public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
+
+ /**
+ * Utility for doing any cleaning of each line before it is tokenized.
+ */
+ public interface LineSanitizer {
+ /**
+ * Clean string s before attempting to tokenize it.
+ *
+ * @param s String to be cleaned.
+ * @return Sanitized string.
+ */
+ String sanitize(String s);
+ }
+
+ @Override
+ public abstract AdjacencyListTextVertexReader createVertexReader(
+ InputSplit split, TaskAttemptContext context);
+
+ /**
+ * Vertex reader associated with {@link AdjacencyListTextVertexInputFormat}.
+ */
+ protected abstract class AdjacencyListTextVertexReader extends
+ TextVertexReaderFromEachLineProcessed<String[]> {
+ /**
+ * Cached configuration.
+ */
+ private Configuration conf;
+
+ /** Cached delimiter used for split */
+ private String splitValue = null;
+
+ /**
+ * Sanitizer from constructor.
+ */
+ private final LineSanitizer sanitizer;
+
+
+ /**
+ * Constructor without line sanitizer.
+ *
+ */
+ public AdjacencyListTextVertexReader() {
+ this(null);
+ }
+
+ /**
+ * Constructor with line sanitizer.
+ *
+ * @param sanitizer Sanitizer to be used.
+ */
+ public AdjacencyListTextVertexReader(LineSanitizer sanitizer) {
+ this.sanitizer = sanitizer;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ super.initialize(inputSplit, context);
+ conf = context.getConfiguration();
+ splitValue = conf.get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+ }
+
+ @Override
+ protected String[] preprocessLine(Text line) throws IOException {
+ String sanitizedLine;
+ if (sanitizer != null) {
+ sanitizedLine = sanitizer.sanitize(line.toString());
+ } else {
+ sanitizedLine = line.toString();
+ }
+ String [] values = sanitizedLine.split(splitValue);
+ if ((values.length < 2) || (values.length % 2 != 0)) {
+ throw new IllegalArgumentException(
+ "Line did not split correctly: " + line);
+ }
+ return values;
+ }
+
+ @Override
+ protected I getId(String[] values) throws IOException {
+ I vertexId = BspUtils.<I>createVertexId(conf);
+ decodeId(values[0], vertexId);
+ return vertexId;
+ }
+
+ /**
+ * Store the Id for this line in an instance of its correct type.
+ *
+ * @param s Id of vertex from line
+ * @param id Instance of Id's type, in which to store its value
+ */
+ public abstract void decodeId(String s, I id);
+
+ @Override
+ protected V getValue(String[] values) throws IOException {
+ V value = BspUtils.<V>createVertexValue(conf);
+ decodeValue(values[1], value);
+ return value;
+ }
+
+
+ /**
+ * Store the value for this line in an instance of its correct type.
+ * @param s Value from line
+ * @param value Instance of value's type, in which to store its value
+ */
+ public abstract void decodeValue(String s, V value);
+
+ @Override
+ protected Map<I, E> getEdges(String[] values) throws IOException {
+ int i = 2;
+ Map<I, E> edges = Maps.newHashMap();
+ Edge<I, E> edge = new Edge<I, E>();
+ while (i < values.length) {
+ decodeEdge(values[i], values[i + 1], edge);
+ edges.put(edge.getTargetVertexId(), edge.getValue());
+ i += 2;
+ }
+ return edges;
+ }
+
+ /**
+ * Store an edge from the line into an instance of a correctly typed Edge
+ * @param id The edge's id from the line
+ * @param value The edge's value from the line
+ * @param edge Instance of edge in which to store the id and value
+ */
+ public abstract void decodeEdge(String id, String value, Edge<I, E> edge);
+
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java Thu Sep 20 23:00:27 2012
@@ -19,11 +19,9 @@ package org.apache.giraph.io;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexWriter;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
@@ -43,39 +41,37 @@ public class AdjacencyListTextVertexOutp
V extends Writable, E extends Writable>
extends TextVertexOutputFormat<I, V, E> {
+ /** Split delimiter */
+ public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
+ /** Default split delimiter */
+ public static final String LINE_TOKENIZE_VALUE_DEFAULT =
+ AdjacencyListTextVertexInputFormat.LINE_TOKENIZE_VALUE_DEFAULT;
+
+ @Override
+ public AdjacencyListTextVertexWriter createVertexWriter(
+ TaskAttemptContext context) {
+ return new AdjacencyListTextVertexWriter();
+ }
+
/**
- * Vertex writer associated wtih {@link AdjacencyListTextVertexOutputFormat}.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
+ * Vertex writer associated with {@link AdjacencyListTextVertexOutputFormat}.
*/
- static class AdjacencyListVertexWriter<I extends WritableComparable, V extends
- Writable, E extends Writable> extends TextVertexWriter<I, V, E> {
- /** Split delimiter */
- public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
- /** Default split delimiter */
- public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
+ protected class AdjacencyListTextVertexWriter extends
+ TextVertexWriterToEachLine {
/** Cached split delimeter */
private String delimiter;
- /**
- * Constructor with writer.
- *
- * @param recordWriter Record writer used for writing.
- */
- public AdjacencyListVertexWriter(RecordWriter<Text, Text> recordWriter) {
- super(recordWriter);
+ @Override
+ public void initialize(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ super.initialize(context);
+ delimiter = context.getConfiguration()
+ .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
}
@Override
- public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException,
- InterruptedException {
- if (delimiter == null) {
- delimiter = getContext().getConfiguration()
- .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
- }
-
+ public Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+ throws IOException {
StringBuffer sb = new StringBuffer(vertex.getId().toString());
sb.append(delimiter);
sb.append(vertex.getValue());
@@ -85,14 +81,8 @@ public class AdjacencyListTextVertexOutp
sb.append(delimiter).append(edge.getValue());
}
- getRecordWriter().write(new Text(sb.toString()), null);
+ return new Text(sb.toString());
}
}
- @Override
- public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new AdjacencyListVertexWriter<I, V, E>
- (textOutputFormat.getRecordWriter(context));
- }
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java Thu Sep 20 23:00:27 2012
@@ -20,11 +20,10 @@ package org.apache.giraph.io;
import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
@@ -44,48 +43,45 @@ public class IdWithValueTextOutputFormat
V extends Writable, E extends Writable>
extends TextVertexOutputFormat<I, V, E> {
+ /** Specify the output delimiter */
+ public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
+ /** Default output delimiter */
+ public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
+ /** Reverse id and value order? */
+ public static final String REVERSE_ID_AND_VALUE = "reverse.id.and.value";
+ /** Default is to not reverse id and value order. */
+ public static final boolean REVERSE_ID_AND_VALUE_DEFAULT = false;
+
+ @Override
+ public TextVertexWriter createVertexWriter(TaskAttemptContext context) {
+ return new IdWithValueVertexWriter();
+ }
+
/**
* Vertex writer used with {@link IdWithValueTextOutputFormat}.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
*/
- static class IdWithValueVertexWriter<I extends WritableComparable, V extends
- Writable, E extends Writable> extends TextVertexWriter<I, V, E> {
- /** Specify the output delimiter */
- public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
- /** Default output delimiter */
- public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
- /** Reverse id and value order? */
- public static final String REVERSE_ID_AND_VALUE = "reverse.id.and.value";
- /** Default is to not reverse id and value order. */
- public static final boolean REVERSE_ID_AND_VALUE_DEFAULT = false;
+ protected class IdWithValueVertexWriter extends TextVertexWriterToEachLine {
/** Saved delimiter */
private String delimiter;
+ /** Cached reserve option */
+ private boolean reverseOutput;
- /**
- * Constructor with record writer.
- *
- * @param recordWriter Writer from LineRecordWriter.
- */
- public IdWithValueVertexWriter(RecordWriter<Text, Text> recordWriter) {
- super(recordWriter);
+ @Override
+ public void initialize(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ super.initialize(context);
+ Configuration conf = context.getConfiguration();
+ delimiter = conf
+ .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+ reverseOutput = conf
+ .getBoolean(REVERSE_ID_AND_VALUE, REVERSE_ID_AND_VALUE_DEFAULT);
}
@Override
- public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException,
- InterruptedException {
- if (delimiter == null) {
- delimiter = getContext().getConfiguration()
- .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
- }
-
+ protected Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+ throws IOException {
String first;
String second;
- boolean reverseOutput = getContext().getConfiguration()
- .getBoolean(REVERSE_ID_AND_VALUE, REVERSE_ID_AND_VALUE_DEFAULT);
-
if (reverseOutput) {
first = vertex.getValue().toString();
second = vertex.getId().toString();
@@ -93,17 +89,10 @@ public class IdWithValueTextOutputFormat
first = vertex.getId().toString();
second = vertex.getValue().toString();
}
-
Text line = new Text(first + delimiter + second);
-
- getRecordWriter().write(line, null);
+ return line;
}
- }
- @Override
- public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new IdWithValueVertexWriter<I, V, E>
- (textOutputFormat.getRecordWriter(context));
}
+
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java Thu Sep 20 23:00:27 2012
@@ -18,18 +18,12 @@
package org.apache.giraph.io;
-import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexReader;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
@@ -45,60 +39,53 @@ import java.util.regex.Pattern;
public class IntIntNullIntTextInputFormat extends
TextVertexInputFormat<IntWritable, IntWritable, NullWritable,
IntWritable> {
+ /** Separator of the vertex and neighbors */
+ private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
@Override
- public VertexReader<IntWritable, IntWritable, NullWritable, IntWritable>
- createVertexReader(InputSplit split, TaskAttemptContext context)
+ public TextVertexReader createVertexReader(InputSplit split,
+ TaskAttemptContext context)
throws IOException {
- return new IntIntNullIntVertexReader(
- textInputFormat.createRecordReader(split, context));
+ return new IntIntNullIntVertexReader();
}
/**
* Vertex reader associated with {@link IntIntNullIntTextInputFormat}.
*/
- public static class IntIntNullIntVertexReader extends
- TextVertexInputFormat.TextVertexReader<IntWritable, IntWritable,
- NullWritable, IntWritable> {
- /** Separator of the vertex and neighbors */
- private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
-
+ public class IntIntNullIntVertexReader extends
+ TextVertexReaderFromEachLineProcessed<String[]> {
/**
- * Constructor with the line reader.
- *
- * @param lineReader Internal line reader.
+ * Cached vertex id for the current line
*/
- public IntIntNullIntVertexReader(RecordReader<LongWritable, Text>
- lineReader) {
- super(lineReader);
+ private IntWritable id;
+
+ @Override
+ protected String[] preprocessLine(Text line) throws IOException {
+ String[] tokens = SEPARATOR.split(line.toString());
+ id = new IntWritable(Integer.parseInt(tokens[0]));
+ return tokens;
}
@Override
- public Vertex<IntWritable, IntWritable, NullWritable, IntWritable>
- getCurrentVertex() throws IOException, InterruptedException {
- Vertex<IntWritable, IntWritable, NullWritable, IntWritable>
- vertex = BspUtils.<IntWritable, IntWritable, NullWritable,
- IntWritable>createVertex(getContext().getConfiguration());
+ protected IntWritable getId(String[] tokens) throws IOException {
+ return id;
+ }
+
+ @Override
+ protected IntWritable getValue(String[] tokens) throws IOException {
+ return id;
+ }
- String[] tokens = SEPARATOR.split(getRecordReader()
- .getCurrentValue().toString());
+ @Override
+ protected Map<IntWritable, NullWritable> getEdges(String[] tokens)
+ throws IOException {
Map<IntWritable, NullWritable> edges =
Maps.newHashMapWithExpectedSize(tokens.length - 1);
for (int n = 1; n < tokens.length; n++) {
edges.put(new IntWritable(Integer.parseInt(tokens[n])),
NullWritable.get());
}
-
- IntWritable vertexId = new IntWritable(Integer.parseInt(tokens[0]));
- vertex.initialize(vertexId, vertexId, edges,
- Lists.<IntWritable>newArrayList());
-
- return vertex;
- }
-
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return getRecordReader().nextKeyValue();
+ return edges;
}
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java Thu Sep 20 23:00:27 2012
@@ -19,22 +19,17 @@
package org.apache.giraph.io;
import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexReader;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import com.google.common.collect.Maps;
-
import net.iharder.Base64;
import java.io.ByteArrayInputStream;
@@ -58,72 +53,73 @@ import java.util.Map;
public class JsonBase64VertexInputFormat<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
extends TextVertexInputFormat<I, V, E, M> {
+
+ @Override
+ public TextVertexReader createVertexReader(InputSplit split,
+ TaskAttemptContext context) {
+ return new JsonBase64VertexReader();
+ }
+
/**
* Simple reader that supports {@link JsonBase64VertexInputFormat}
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
*/
- private static class JsonBase64VertexReader<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends TextVertexReader<I, V, E, M> {
- /**
- * Only constructor. Requires the LineRecordReader
- *
- * @param lineRecordReader Line record reader to read from
- */
- public JsonBase64VertexReader(
- RecordReader<LongWritable, Text> lineRecordReader) {
- super(lineRecordReader);
- }
+ protected class JsonBase64VertexReader extends
+ TextVertexReaderFromEachLineProcessed<JSONObject> {
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return getRecordReader().nextKeyValue();
- }
+ /** Cached configuration */
+ private Configuration conf;
@Override
- public Vertex<I, V, E, M> getCurrentVertex()
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
- Configuration conf = getContext().getConfiguration();
- Vertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
+ super.initialize(inputSplit, context);
+ conf = context.getConfiguration();
+ }
- Text line = getRecordReader().getCurrentValue();
- JSONObject vertexObject;
+ @Override
+ protected JSONObject preprocessLine(Text line) {
try {
- vertexObject = new JSONObject(line.toString());
+ return new JSONObject(line.toString());
} catch (JSONException e) {
throw new IllegalArgumentException(
"next: Failed to get the vertex", e);
}
- DataInput input = null;
- byte[] decodedWritable = null;
- I vertexId = null;
+ }
+
+ @Override
+ protected I getId(JSONObject vertexObject) throws IOException {
try {
- decodedWritable = Base64.decode(
- vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY));
- input = new DataInputStream(
- new ByteArrayInputStream(decodedWritable));
- vertexId = BspUtils.<I>createVertexId(conf);
+ byte[] decodedWritable = Base64.decode(
+ vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY));
+ DataInput input = new DataInputStream(
+ new ByteArrayInputStream(decodedWritable));
+ I vertexId = BspUtils.<I>createVertexId(conf);
vertexId.readFields(input);
+ return vertexId;
} catch (JSONException e) {
throw new IllegalArgumentException(
"next: Failed to get vertex id", e);
}
- V vertexValue = null;
+ }
+
+ @Override
+ protected V getValue(JSONObject vertexObject) throws IOException {
try {
- decodedWritable = Base64.decode(
- vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY));
- input = new DataInputStream(
- new ByteArrayInputStream(decodedWritable));
- vertexValue = BspUtils.<V>createVertexValue(conf);
+ byte[] decodedWritable = Base64.decode(
+ vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY));
+ DataInputStream input = new DataInputStream(
+ new ByteArrayInputStream(decodedWritable));
+ V vertexValue = BspUtils.<V>createVertexValue(conf);
vertexValue.readFields(input);
+ return vertexValue;
} catch (JSONException e) {
throw new IllegalArgumentException(
"next: Failed to get vertex value", e);
}
+ }
+
+ @Override
+ protected Map<I, E> getEdges(JSONObject vertexObject) throws IOException {
JSONArray edgeArray = null;
try {
edgeArray = vertexObject.getJSONArray(
@@ -132,6 +128,7 @@ public class JsonBase64VertexInputFormat
throw new IllegalArgumentException(
"next: Failed to get edge array", e);
}
+ byte[] decodedWritable;
Map<I, E> edgeMap = Maps.newHashMap();
for (int i = 0; i < edgeArray.length(); ++i) {
try {
@@ -140,7 +137,7 @@ public class JsonBase64VertexInputFormat
throw new IllegalArgumentException(
"next: Failed to get edge value", e);
}
- input = new DataInputStream(
+ DataInputStream input = new DataInputStream(
new ByteArrayInputStream(decodedWritable));
I targetVertexId =
BspUtils.<I>createVertexId(getContext().getConfiguration());
@@ -150,15 +147,9 @@ public class JsonBase64VertexInputFormat
edgeValue.readFields(input);
edgeMap.put(targetVertexId, edgeValue);
}
- vertex.initialize(vertexId, vertexValue, edgeMap, null);
- return vertex;
+ return edgeMap;
}
- }
- @Override
- public VertexReader<I, V, E, M> createVertexReader(
- InputSplit split, TaskAttemptContext context) throws IOException {
- return new JsonBase64VertexReader<I, V, E, M>(
- textInputFormat.createRecordReader(split, context));
}
+
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java Thu Sep 20 23:00:27 2012
@@ -20,11 +20,9 @@ package org.apache.giraph.io;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexWriter;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.json.JSONArray;
import org.json.JSONException;
@@ -51,29 +49,20 @@ import java.io.IOException;
public class JsonBase64VertexOutputFormat<I extends WritableComparable,
V extends Writable, E extends Writable> extends
TextVertexOutputFormat<I, V, E> {
+
+ @Override
+ public TextVertexWriter createVertexWriter(TaskAttemptContext context) {
+ return new JsonBase64VertexWriter();
+ }
+
/**
* Simple writer that supports {@link JsonBase64VertexOutputFormat}
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
*/
- private static class JsonBase64VertexWriter<I extends WritableComparable,
- V extends Writable, E extends Writable> extends
- TextVertexWriter<I, V, E> {
- /**
- * Only constructor. Requires the LineRecordWriter
- *
- * @param lineRecordWriter Line record writer to write to
- */
- public JsonBase64VertexWriter(
- RecordWriter<Text, Text> lineRecordWriter) {
- super(lineRecordWriter);
- }
+ protected class JsonBase64VertexWriter extends TextVertexWriterToEachLine {
@Override
- public void writeVertex(Vertex<I, V, E, ?> vertex)
- throws IOException, InterruptedException {
+ protected Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+ throws IOException {
ByteArrayOutputStream outputStream =
new ByteArrayOutputStream();
DataOutput output = new DataOutputStream(outputStream);
@@ -112,14 +101,9 @@ public class JsonBase64VertexOutputForma
throw new IllegalStateException(
"writerVertex: Failed to insert edge array", e);
}
- getRecordWriter().write(new Text(vertexObject.toString()), null);
+ return new Text(vertexObject.toString());
}
- }
- @Override
- public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new JsonBase64VertexWriter<I, V, E>(
- textOutputFormat.getRecordWriter(context));
}
+
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java Thu Sep 20 23:00:27 2012
@@ -17,15 +17,12 @@
*/
package org.apache.giraph.io;
-import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexReader;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.json.JSONArray;
import org.json.JSONException;
@@ -46,11 +43,9 @@ public class JsonLongDoubleFloatDoubleVe
FloatWritable, DoubleWritable> {
@Override
- public VertexReader<LongWritable, DoubleWritable, FloatWritable,
- DoubleWritable> createVertexReader(InputSplit split,
- TaskAttemptContext context) throws IOException {
- return new JsonLongDoubleFloatDoubleVertexReader(
- textInputFormat.createRecordReader(split, context));
+ public TextVertexReader createVertexReader(InputSplit split,
+ TaskAttemptContext context) {
+ return new JsonLongDoubleFloatDoubleVertexReader();
}
/**
@@ -64,52 +59,47 @@ public class JsonLongDoubleFloatDoubleVe
* Second edge has a destination vertex 3, edge value 0.7.
* [1,4.3,[[2,2.1],[3,0.7]]]
*/
- static class JsonLongDoubleFloatDoubleVertexReader extends
- TextVertexReader<LongWritable, DoubleWritable,
- FloatWritable, DoubleWritable> {
-
- /**
- * Constructor with the line record reader.
- *
- * @param lineRecordReader Will read from this line.
- */
- public JsonLongDoubleFloatDoubleVertexReader(
- RecordReader<LongWritable, Text> lineRecordReader) {
- super(lineRecordReader);
+ class JsonLongDoubleFloatDoubleVertexReader extends
+ TextVertexReaderFromEachLineProcessedHandlingExceptions<JSONArray,
+ JSONException> {
+
+ @Override
+ protected JSONArray preprocessLine(Text line) throws JSONException {
+ return new JSONArray(line.toString());
+ }
+
+ @Override
+ protected LongWritable getId(JSONArray jsonVertex) throws JSONException,
+ IOException {
+ return new LongWritable(jsonVertex.getLong(0));
}
@Override
- public Vertex<LongWritable, DoubleWritable, FloatWritable,
- DoubleWritable> getCurrentVertex()
- throws IOException, InterruptedException {
- Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- vertex = BspUtils.<LongWritable, DoubleWritable, FloatWritable,
- DoubleWritable>createVertex(getContext().getConfiguration());
-
- Text line = getRecordReader().getCurrentValue();
- try {
- JSONArray jsonVertex = new JSONArray(line.toString());
- LongWritable vertexId = new LongWritable(jsonVertex.getLong(0));
- DoubleWritable vertexValue =
- new DoubleWritable(jsonVertex.getDouble(1));
- Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
- JSONArray jsonEdgeArray = jsonVertex.getJSONArray(2);
- for (int i = 0; i < jsonEdgeArray.length(); ++i) {
- JSONArray jsonEdge = jsonEdgeArray.getJSONArray(i);
- edges.put(new LongWritable(jsonEdge.getLong(0)),
+ protected DoubleWritable getValue(JSONArray jsonVertex) throws
+ JSONException, IOException {
+ return new DoubleWritable(jsonVertex.getDouble(1));
+ }
+
+ @Override
+ protected Map<LongWritable, FloatWritable> getEdges(JSONArray jsonVertex)
+ throws JSONException, IOException {
+ Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
+ JSONArray jsonEdgeArray = jsonVertex.getJSONArray(2);
+ for (int i = 0; i < jsonEdgeArray.length(); ++i) {
+ JSONArray jsonEdge = jsonEdgeArray.getJSONArray(i);
+ edges.put(new LongWritable(jsonEdge.getLong(0)),
new FloatWritable((float) jsonEdge.getDouble(1)));
- }
- vertex.initialize(vertexId, vertexValue, edges, null);
- } catch (JSONException e) {
- throw new IllegalArgumentException(
- "next: Couldn't get vertex from line " + line, e);
}
- return vertex;
+ return edges;
}
@Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return getRecordReader().nextKeyValue();
+ protected Vertex<LongWritable, DoubleWritable, FloatWritable,
+ DoubleWritable> handleException(Text line, JSONArray jsonVertex,
+ JSONException e) {
+ throw new IllegalArgumentException(
+ "Couldn't get vertex from line " + line, e);
}
+
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java Thu Sep 20 23:00:27 2012
@@ -20,12 +20,10 @@ package org.apache.giraph.io;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexWriter;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.json.JSONArray;
import org.json.JSONException;
@@ -39,35 +37,24 @@ import java.io.IOException;
public class JsonLongDoubleFloatDoubleVertexOutputFormat extends
TextVertexOutputFormat<LongWritable, DoubleWritable,
FloatWritable> {
+
@Override
- public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
- createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- RecordWriter<Text, Text> recordWriter =
- textOutputFormat.getRecordWriter(context);
- return new JsonLongDoubleFloatDoubleVertexWriter(recordWriter);
+ public TextVertexWriter createVertexWriter(
+ TaskAttemptContext context) {
+ return new JsonLongDoubleFloatDoubleVertexWriter();
}
/**
* VertexWriter that supports vertices with <code>double</code>
* values and <code>float</code> out-edge weights.
*/
- static class JsonLongDoubleFloatDoubleVertexWriter extends
- TextVertexWriter<LongWritable, DoubleWritable, FloatWritable> {
-
- /**
- * Vertex writer with the internal line writer.
- *
- * @param lineRecordWriter Wil actually be written to.
- */
- public JsonLongDoubleFloatDoubleVertexWriter(
- RecordWriter<Text, Text> lineRecordWriter) {
- super(lineRecordWriter);
- }
-
+ private class JsonLongDoubleFloatDoubleVertexWriter extends
+ TextVertexWriterToEachLine {
@Override
- public void writeVertex(Vertex<LongWritable, DoubleWritable,
- FloatWritable, ?> vertex) throws IOException, InterruptedException {
+ public Text convertVertexToLine(
+ Vertex<LongWritable, DoubleWritable,
+ FloatWritable, ?> vertex
+ ) throws IOException {
JSONArray jsonVertex = new JSONArray();
try {
jsonVertex.put(vertex.getId().get());
@@ -84,7 +71,7 @@ public class JsonLongDoubleFloatDoubleVe
throw new IllegalArgumentException(
"writeVertex: Couldn't write vertex " + vertex);
}
- getRecordWriter().write(new Text(jsonVertex.toString()), null);
+ return new Text(jsonVertex.toString());
}
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java Thu Sep 20 23:00:27 2012
@@ -20,14 +20,10 @@ package org.apache.giraph.io;
import org.apache.giraph.graph.Edge;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import java.io.IOException;
-
/**
* InputFormat for reading graphs stored as (ordered) adjacency lists
* with the vertex ids longs and the vertex values and edges doubles.
@@ -39,37 +35,30 @@ import java.io.IOException;
* @param <M> Message data
*/
public class LongDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
- extends TextVertexInputFormat<LongWritable, DoubleWritable,
+ extends AdjacencyListTextVertexInputFormat<LongWritable, DoubleWritable,
DoubleWritable, M> {
+ @Override
+ public AdjacencyListTextVertexReader createVertexReader(InputSplit split,
+ TaskAttemptContext context) {
+ return new LongDoubleDoubleAdjacencyListVertexReader(null);
+ }
+
/**
* VertexReader associated with
* {@link LongDoubleDoubleAdjacencyListVertexInputFormat}.
- *
- * @param <M> Message data.
*/
- static class VertexReader<M extends Writable> extends
- AdjacencyListVertexReader<LongWritable, DoubleWritable,
- DoubleWritable, M> {
+ protected class LongDoubleDoubleAdjacencyListVertexReader extends
+ AdjacencyListTextVertexReader {
/**
- * Constructor with Line record reader.
+ * Constructor with {@link LineSanitizer}.
*
- * @param lineRecordReader Reader to internally use.
+ * @param lineSanitizer the sanitizer to use for reading
*/
- VertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
- super(lineRecordReader);
- }
-
- /**
- * Constructor with Line record reader and sanitizer.
- *
- * @param lineRecordReader Reader to internally use.
- * @param sanitizer Line sanitizer.
- */
- VertexReader(RecordReader<LongWritable, Text> lineRecordReader,
- LineSanitizer sanitizer) {
- super(lineRecordReader, sanitizer);
+ public LongDoubleDoubleAdjacencyListVertexReader(LineSanitizer
+ lineSanitizer) {
+ super(lineSanitizer);
}
@Override
@@ -92,12 +81,4 @@ public class LongDoubleDoubleAdjacencyLi
}
}
- @Override
- public org.apache.giraph.graph.VertexReader<LongWritable,
- DoubleWritable, DoubleWritable, M> createVertexReader(
- InputSplit split,
- TaskAttemptContext context) throws IOException {
- return new VertexReader<M>(textInputFormat.createRecordReader(
- split, context));
- }
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java Thu Sep 20 23:00:27 2012
@@ -19,15 +19,11 @@ package org.apache.giraph.io;
import org.apache.giraph.graph.Edge;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import java.io.IOException;
-
/**
* Class to read graphs stored as adjacency lists with ids represented by
* Strings and values as doubles. This is a good inputformat for reading
@@ -36,8 +32,14 @@ import java.io.IOException;
* @param <M> Message type.
*/
public class TextDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
- extends TextVertexInputFormat<Text, DoubleWritable, DoubleWritable, M> {
+ extends AdjacencyListTextVertexInputFormat<Text, DoubleWritable,
+ DoubleWritable, M> {
+ @Override
+ public AdjacencyListTextVertexReader createVertexReader(InputSplit split,
+ TaskAttemptContext context) {
+ return new TextDoubleDoubleAdjacencyListVertexReader(null);
+ }
/**
* Vertex reader used with
@@ -45,26 +47,17 @@ public class TextDoubleDoubleAdjacencyLi
*
* @param <M> Message type.
*/
- static class VertexReader<M extends Writable> extends
- AdjacencyListVertexReader<Text, DoubleWritable, DoubleWritable, M> {
- /**
- * Constructor without sanitzer.
- *
- * @param lineRecordReader Internal reader.
- */
- VertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
- super(lineRecordReader);
- }
+ protected class TextDoubleDoubleAdjacencyListVertexReader extends
+ AdjacencyListTextVertexReader {
/**
- * Constructor with {@link LineRecordReader}
+ * Constructor with {@link LineSanitizer}.
*
- * @param lineRecordReader Internal reader.
- * @param sanitizer Sanitizer of the lines.
+ * @param lineSanitizer the sanitizer to use for reading
*/
- VertexReader(RecordReader<LongWritable, Text> lineRecordReader,
- LineSanitizer sanitizer) {
- super(lineRecordReader, sanitizer);
+ public TextDoubleDoubleAdjacencyListVertexReader(LineSanitizer
+ lineSanitizer) {
+ super(lineSanitizer);
}
@Override
@@ -85,11 +78,4 @@ public class TextDoubleDoubleAdjacencyLi
}
}
- @Override
- public org.apache.giraph.graph.VertexReader<Text, DoubleWritable,
- DoubleWritable, M> createVertexReader(InputSplit split,
- TaskAttemptContext context) throws IOException {
- return new VertexReader<M>(textInputFormat.createRecordReader(
- split, context));
- }
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java Thu Sep 20 23:00:27 2012
@@ -18,8 +18,11 @@
package org.apache.giraph.io;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexInputFormat;
import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -32,6 +35,7 @@ import org.apache.hadoop.mapreduce.lib.i
import java.io.IOException;
import java.util.List;
+import java.util.Map;
/**
* Abstract class that users should subclass to use their own text based
@@ -46,41 +50,78 @@ import java.util.List;
public abstract class TextVertexInputFormat<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
extends VertexInputFormat<I, V, E, M> {
+
/** Uses the TextInputFormat to do everything */
protected TextInputFormat textInputFormat = new TextInputFormat();
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int numWorkers)
+ throws IOException, InterruptedException {
+ // Ignore the hint of numWorkers here since we are using TextInputFormat
+ // to do this for us
+ return textInputFormat.getSplits(context);
+ }
+
+ /**
+ * The factory method which produces the {@link TextVertexReader} used by this
+ * input format.
+ *
+ * @param split
+ * the split to be read
+ * @param context
+ * the information about the task
+ * @return
+ * the text vertex reader to be used
+ */
+ @Override
+ public abstract TextVertexReader createVertexReader(InputSplit split,
+ TaskAttemptContext context) throws IOException;
+
/**
* Abstract class to be implemented by the user based on their specific
- * vertex input. Easiest to ignore the key value separator and only use
+ * vertex input. Easiest to ignore the key value separator and only use
* key instead.
*
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
+ * When reading a vertex from each line, extend
+ * {@link TextVertexReaderFromEachLine}. If you need to preprocess each line
+ * first, then extend {@link TextVertexReaderFromEachLineProcessed}. If you
+ * need common exception handling while preprocessing, then extend
+ * {@link TextVertexReaderFromEachLineProcessedHandlingExceptions}.
*/
- public abstract static class TextVertexReader<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements VertexReader<I, V, E, M> {
+ protected abstract class TextVertexReader implements
+ VertexReader<I, V, E, M> {
/** Internal line record reader */
- private final RecordReader<LongWritable, Text> lineRecordReader;
+ private RecordReader<LongWritable, Text> lineRecordReader;
/** Context passed to initialize */
private TaskAttemptContext context;
- /**
- * Initialize with the LineRecordReader.
- *
- * @param lineRecordReader Line record reader from TextInputFormat
- */
- public TextVertexReader(
- RecordReader<LongWritable, Text> lineRecordReader) {
- this.lineRecordReader = lineRecordReader;
- }
-
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
- lineRecordReader.initialize(inputSplit, context);
this.context = context;
+ lineRecordReader = createLineRecordReader(inputSplit, context);
+ lineRecordReader.initialize(inputSplit, context);
+ }
+
+ /**
+ * Create the line record reader. Override this to use a different
+ * underlying record reader (useful for testing).
+ *
+ * @param inputSplit
+ * the split to read
+ * @param context
+ * the context passed to initialize
+ * @return
+ * the record reader to be used
+ * @throws IOException
+ * exception that can be thrown during creation
+ * @throws InterruptedException
+ * exception that can be thrown during creation
+ */
+ protected RecordReader<LongWritable, Text>
+ createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return textInputFormat.createRecordReader(inputSplit, context);
}
@Override
@@ -112,11 +153,266 @@ public abstract class TextVertexInputFor
}
}
- @Override
- public List<InputSplit> getSplits(JobContext context, int numWorkers)
- throws IOException, InterruptedException {
- // Ignore the hint of numWorkers here since we are using TextInputFormat
- // to do this for us
- return textInputFormat.getSplits(context);
+ /**
+ * Abstract class to be implemented by the user to read a vertex from each
+ * text line.
+ */
+ protected abstract class TextVertexReaderFromEachLine extends
+ TextVertexReader {
+
+ @Override
+ public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
+ InterruptedException {
+ Text line = getRecordReader().getCurrentValue();
+ Vertex<I, V, E, M> vertex = BspUtils
+ .<I, V, E, M>createVertex(getContext().getConfiguration());
+ vertex.initialize(getId(line), getValue(line), getEdges(line), null);
+ return vertex;
+ }
+
+ @Override
+ public final boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ /**
+ * Reads vertex id from the current line.
+ *
+ * @param line
+ * the current line
+ * @return
+ * the vertex id corresponding to the line
+ * @throws IOException
+ * exception that can be thrown while reading
+ */
+ protected abstract I getId(Text line) throws IOException;
+
+ /**
+ * Reads vertex value from the current line.
+ *
+ * @param line
+ * the current line
+ * @return
+ * the vertex value corresponding to the line
+ * @throws IOException
+ * exception that can be thrown while reading
+ */
+ protected abstract V getValue(Text line) throws IOException;
+
+ /**
+ * Reads edges value from the current line.
+ *
+ * @param line
+ * the current line
+ * @return
+ * the edges mapping target vertex id to its edge value
+ * @throws IOException
+ * exception that can be thrown while reading
+ */
+ protected abstract Map<I, E> getEdges(Text line) throws IOException;
+
+ }
+
+ /**
+ * Abstract class to be implemented by the user to read a vertex from each
+ * text line after preprocessing it.
+ *
+ * @param <T>
+ * The resulting type of preprocessing.
+ */
+ protected abstract class TextVertexReaderFromEachLineProcessed<T> extends
+ TextVertexReader {
+
+ @Override
+ public final boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ @Override
+ public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
+ InterruptedException {
+ Text line = getRecordReader().getCurrentValue();
+ Vertex<I, V, E, M> vertex;
+ T processed = preprocessLine(line);
+ vertex = BspUtils
+ .<I, V, E, M>createVertex(getContext().getConfiguration());
+ vertex.initialize(getId(processed), getValue(processed),
+ getEdges(processed), null);
+ return vertex;
+ }
+
+ /**
+ * Preprocess the line so other methods can easily read necessary
+ * information for creating vertex.
+ *
+ * @param line
+ * the current line to be read
+ * @return
+ * the preprocessed object
+ * @throws IOException
+ * exception that can be thrown while reading
+ */
+ protected abstract T preprocessLine(Text line) throws IOException;
+
+ /**
+ * Reads vertex id from the preprocessed line.
+ *
+ * @param line
+ * the object obtained by preprocessing the line
+ * @return
+ * the vertex id
+ * @throws IOException
+ * exception that can be thrown while reading
+ */
+ protected abstract I getId(T line) throws IOException;
+
+ /**
+ * Reads vertex value from the preprocessed line.
+ *
+ * @param line
+ * the object obtained by preprocessing the line
+ * @return
+ * the vertex value
+ * @throws IOException
+ * exception that can be thrown while reading
+ */
+ protected abstract V getValue(T line) throws IOException;
+
+ /**
+ * Reads edges from the preprocessed line.
+ *
+ * @param line
+ * the object obtained by preprocessing the line
+ * @return
+ * the edges mapping target vertex id to its edge value
+ * @throws IOException
+ * exception that can be thrown while reading
+ */
+ protected abstract Map<I, E> getEdges(T line) throws IOException;
+
+ }
+
+ // CHECKSTYLE: stop RedundantThrows
+ /**
+ * Abstract class to be implemented by the user to read a vertex from each
+ * text line after preprocessing it with exception handling.
+ *
+ * @param <T>
+ * The resulting type of preprocessing.
+ * @param <X>
+ * The exception type that can be thrown due to preprocessing.
+ */
+ protected abstract class
+ TextVertexReaderFromEachLineProcessedHandlingExceptions<T, X extends
+ Throwable> extends TextVertexReader {
+
+ @Override
+ public final boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
+ InterruptedException {
+ // Note we are reading from value only since key is the line number
+ Text line = getRecordReader().getCurrentValue();
+ Vertex<I, V, E, M> vertex;
+ T processed = null;
+ try {
+ processed = preprocessLine(line);
+ Configuration conf = getContext().getConfiguration();
+ vertex = BspUtils
+ .<I, V, E, M>createVertex(conf);
+ vertex.initialize(getId(processed), getValue(processed),
+ getEdges(processed), null);
+ } catch (IOException e) {
+ throw e;
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Throwable t) {
+ return handleException(line, processed, (X) t);
+ // CHECKSTYLE: resume IllegalCatch
+ }
+ return vertex;
+ }
+
+ /**
+ * Preprocess the line so other methods can easily read necessary
+ * information for creating vertex.
+ *
+ * @param line
+ * the current line to be read
+ * @return
+ * the preprocessed object
+ * @throws X
+ * exception that can be thrown while preprocessing the line
+ * @throws IOException
+ * exception that can be thrown while reading
+ */
+ protected abstract T preprocessLine(Text line) throws X, IOException;
+
+ /**
+ * Reads vertex id from the preprocessed line.
+ *
+ * @param line
+ * the object obtained by preprocessing the line
+ * @return
+ * the vertex id
+ * @throws X
+ * exception that can be thrown while reading the preprocessed
+ * object
+ * @throws IOException
+ * exception that can be thrown while reading
+ */
+ protected abstract I getId(T line) throws X, IOException;
+
+ /**
+ * Reads vertex value from the preprocessed line.
+ *
+ * @param line
+ * the object obtained by preprocessing the line
+ * @return
+ * the vertex value
+ * @throws X
+ * exception that can be thrown while reading the preprocessed
+ * object
+ * @throws IOException
+ * exception that can be thrown while reading
+ */
+ protected abstract V getValue(T line) throws X, IOException;
+
+ /**
+ * Reads edges from the preprocessed line.
+ *
+ * @param line
+ * the object obtained by preprocessing the line
+ * @return
+ * the edges mapping target vertex id to its edge value
+ * @throws X
+ * exception that can be thrown while reading the preprocessed
+ * object
+ * @throws IOException
+ * exception that can be thrown while reading
+ */
+ protected abstract Map<I, E> getEdges(T line) throws X, IOException;
+
+ /**
+ * Handles exceptions while reading vertex from each line.
+ *
+ * @param line
+ * the line that was being read when the exception was thrown
+ * @param processed
+ * the object obtained by preprocessing the line. Can be null if
+ * exception was thrown during preprocessing.
+ * @param e
+ * the exception thrown while reading the line
+ * @return the recovered/alternative vertex to be used
+ */
+ protected Vertex<I, V, E, M> handleException(Text line, T processed, X e) {
+ throw new IllegalArgumentException(e);
+ }
+
}
+ // CHECKSTYLE: resume RedundantThrows
+
}