You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/09/21 01:00:28 UTC

svn commit: r1388258 [1/2] - in /giraph/trunk: ./ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/graph/ src/main/java/org/apache/giraph/io/ src/test/java/org/apache/giraph/io/

Author: aching
Date: Thu Sep 20 23:00:27 2012
New Revision: 1388258

URL: http://svn.apache.org/viewvc?rev=1388258&view=rev
Log:
GIRAPH-277: Text Vertex Input/Output Format base classes overhaul.
(nitayj via aching)


Added:
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/checkstyle.xml
    giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/VertexWriter.java
    giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java
    giraph/trunk/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java
    giraph/trunk/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java
    giraph/trunk/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
    giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Sep 20 23:00:27 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-277: Text Vertex Input/Output Format base classes overhaul.
+  (nitayj via aching)
+
   GIRAPH-331: ReviewBoard post-review config. (nitayj via aching)
 
   GIRAPH-332: Duplicate unnecessary info in giraph-formats-contrib

Modified: giraph/trunk/checkstyle.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/checkstyle.xml?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/checkstyle.xml (original)
+++ giraph/trunk/checkstyle.xml Thu Sep 20 23:00:27 2012
@@ -260,39 +260,9 @@
   
   <!-- Setup special comments to suppress specific checks from source files -->
   <module name="SuppressionCommentFilter">
-    <property name="offCommentFormat" value="CHECKSTYLE\: stop JavadocVariable"/>
-    <property name="onCommentFormat"  value="CHECKSTYLE\: resume JavadocVariable"/>
-    <property name="checkFormat"      value="JavadocVariable"/>
-  </module>
-  <module name="SuppressionCommentFilter">
-    <property name="offCommentFormat" value="CHECKSTYLE\: stop JavadocMethodCheck"/>
-    <property name="onCommentFormat"  value="CHECKSTYLE\: resume JavadocMethodCheck"/>
-    <property name="checkFormat"      value="JavadocMethodCheck"/>
-  </module>
-  <module name="SuppressionCommentFilter">
-    <property name="offCommentFormat" value="CHECKSTYLE\: stop ConstantName"/>
-    <property name="onCommentFormat"  value="CHECKSTYLE\: resume ConstantName"/>
-    <property name="checkFormat"      value="ConstantName"/>
-  </module>
-  <module name="SuppressionCommentFilter">
-    <property name="offCommentFormat" value="CHECKSTYLE\: stop HideUtilityClassConstructor"/>
-    <property name="onCommentFormat"  value="CHECKSTYLE\: resume HideUtilityClassConstructor"/>
-    <property name="checkFormat"      value="HideUtilityClassConstructor"/>
-  </module>
-  <module name="SuppressionCommentFilter">
-    <property name="offCommentFormat" value="CHECKSTYLE\: stop MultipleVariableDeclarations"/>
-    <property name="onCommentFormat"  value="CHECKSTYLE\: resume MultipleVariableDeclarations"/>
-    <property name="checkFormat"      value="MultipleVariableDeclarations"/>
-  </module>
-  <module name="SuppressionCommentFilter">
-    <property name="offCommentFormat" value="CHECKSTYLE\: stop IllegalCatch"/>
-    <property name="onCommentFormat"  value="CHECKSTYLE\: resume IllegalCatch"/>
-    <property name="checkFormat"      value="IllegalCatch"/>
-  </module>
-  <module name="SuppressionCommentFilter">
-    <property name="offCommentFormat" value="CHECKSTYLE\: stop DeclarationOrder"/>
-    <property name="onCommentFormat"  value="CHECKSTYLE\: resume DeclarationOrder"/>
-    <property name="checkFormat"      value="DeclarationOrder"/>
+    <property name="offCommentFormat" value="CHECKSTYLE\: stop ([\w\|]+)"/>
+    <property name="onCommentFormat"  value="CHECKSTYLE\: resume ([\w\|]+)"/>
+    <property name="checkFormat"      value="$1"/>
   </module>
 </module>
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java Thu Sep 20 23:00:27 2012
@@ -24,14 +24,11 @@ import java.util.regex.Pattern;
 
 import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexReader;
 import org.apache.giraph.io.TextVertexInputFormat;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import com.google.common.collect.Lists;
@@ -48,31 +45,19 @@ public class LongDoubleFloatDoubleTextIn
     FloatWritable, DoubleWritable> {
 
   @Override
-  public VertexReader<LongWritable,
-    DoubleWritable, FloatWritable, DoubleWritable>
-  createVertexReader(InputSplit split, TaskAttemptContext context)
+  public TextVertexReader createVertexReader(InputSplit split,
+      TaskAttemptContext context)
     throws IOException {
-    return new LongDoubleFloatDoubleVertexReader(
-        textInputFormat.createRecordReader(split, context));
+    return new LongDoubleFloatDoubleVertexReader();
   }
 
   /**
    * Vertex reader associated with {@link LongDoubleFloatDoubleTextInputFormat}.
    */
-  public static class LongDoubleFloatDoubleVertexReader extends
-    TextVertexInputFormat.TextVertexReader<LongWritable, DoubleWritable,
-    FloatWritable, DoubleWritable> {
+  public class LongDoubleFloatDoubleVertexReader extends
+    TextVertexInputFormat.TextVertexReader {
     /** Separator of the vertex and neighbors */
-    private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
-
-    /**
-     * Constructor with the line reader.
-     * @param lineReader Internal line reader.
-     */
-    public LongDoubleFloatDoubleVertexReader(
-        RecordReader<LongWritable, Text> lineReader) {
-      super(lineReader);
-    }
+    private final Pattern separator = Pattern.compile("[\t ]");
 
     @Override
     public Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
@@ -83,7 +68,7 @@ public class LongDoubleFloatDoubleTextIn
             .getConfiguration());
 
       String[] tokens =
-          SEPARATOR.split(getRecordReader().getCurrentValue().toString());
+          separator.split(getRecordReader().getCurrentValue().toString());
       Map<LongWritable, FloatWritable> edges =
           Maps.newHashMapWithExpectedSize(tokens.length - 1);
       float weight = 1.0f / (tokens.length - 1);

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java Thu Sep 20 23:00:27 2012
@@ -24,14 +24,11 @@ import java.util.regex.Pattern;
 
 import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexReader;
 import org.apache.giraph.io.TextVertexInputFormat;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import com.google.common.collect.Lists;
@@ -48,34 +45,20 @@ public class NormalizingLongDoubleFloatD
       FloatWritable, DoubleWritable> {
 
   @Override
-  public VertexReader<LongWritable, DoubleWritable,
-  FloatWritable, DoubleWritable> createVertexReader(
+  public TextVertexReader createVertexReader(
       InputSplit split, TaskAttemptContext context) throws IOException {
-    return new NormalizingLongDoubleFloatDoubleVertexReader(
-        textInputFormat.createRecordReader(split, context));
+    return new NormalizingLongDoubleFloatDoubleVertexReader();
   }
 
   /**
    * Vertex reader associated with {@link LongDoubleFloatDoubleTextInputFormat}.
    */
-  public static class NormalizingLongDoubleFloatDoubleVertexReader
-      extends
-      TextVertexInputFormat.TextVertexReader<LongWritable, DoubleWritable,
-      FloatWritable, DoubleWritable> {
+  public class NormalizingLongDoubleFloatDoubleVertexReader
+      extends TextVertexInputFormat.TextVertexReader {
     /** Separator of the vertex and neighbors */
-    private static final Pattern EDGE_SEPARATOR = Pattern.compile("\\s+");
+    private final Pattern edgeSeparator = Pattern.compile("\\s+");
     /** Separator of the edge id and edge weight */
-    private static final Pattern WEIGHT_SEPARATOR = Pattern.compile(":");
-
-    /**
-     * Constructor with the line reader.
-     * @param lineReader
-     *          Internal line reader.
-     */
-    public NormalizingLongDoubleFloatDoubleVertexReader(
-        RecordReader<LongWritable, Text> lineReader) {
-      super(lineReader);
-    }
+    private final Pattern weightSeparator = Pattern.compile(":");
 
     @Override
     public Vertex<LongWritable, DoubleWritable,
@@ -87,7 +70,7 @@ public class NormalizingLongDoubleFloatD
           FloatWritable, DoubleWritable>createVertex(getContext()
               .getConfiguration());
 
-      String[] tokens = EDGE_SEPARATOR.split(getRecordReader()
+      String[] tokens = edgeSeparator.split(getRecordReader()
           .getCurrentValue().toString());
       Map<LongWritable, FloatWritable> edges = Maps
           .newHashMapWithExpectedSize(tokens.length - 1);
@@ -106,9 +89,9 @@ public class NormalizingLongDoubleFloatD
      * @param tokens The tokens to be parsed.
      * @param edges The map that will contain the result of the parsing.
      */
-    static void parse(String[] tokens, Map<LongWritable, FloatWritable> edges) {
+    void parse(String[] tokens, Map<LongWritable, FloatWritable> edges) {
       for (int n = 1; n < tokens.length; n++) {
-        String[] parts = WEIGHT_SEPARATOR.split(tokens[n]);
+        String[] parts = weightSeparator.split(tokens[n]);
         edges.put(new LongWritable(Long.parseLong(parts[0])),
             new FloatWritable(Float.parseFloat(parts[1])));
       }
@@ -118,7 +101,7 @@ public class NormalizingLongDoubleFloatD
      * Normalize the edges with L1 normalization.
      * @param edges The edges to be normalized.
      */
-    static void normalize(Map<LongWritable, FloatWritable> edges) {
+    void normalize(Map<LongWritable, FloatWritable> edges) {
       if (edges == null || edges.size() == 0) {
         throw new IllegalArgumentException(
             "Cannot normalize an empy set of edges");

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Thu Sep 20 23:00:27 2012
@@ -26,17 +26,14 @@ import org.apache.giraph.graph.DefaultMa
 import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexReader;
-import org.apache.giraph.graph.VertexWriter;
 import org.apache.giraph.graph.WorkerContext;
 import org.apache.giraph.io.GeneratedVertexInputFormat;
 import org.apache.giraph.io.TextVertexOutputFormat;
-import org.apache.giraph.io.TextVertexOutputFormat.TextVertexWriter;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
 
@@ -179,13 +176,6 @@ public class SimplePageRankVertex extend
     private static final Logger LOG =
         Logger.getLogger(SimplePageRankVertexReader.class);
 
-    /**
-     * Constructor.
-     */
-    public SimplePageRankVertexReader() {
-      super();
-    }
-
     @Override
     public boolean nextVertex() {
       return totalRecords > recordsRead;
@@ -234,42 +224,28 @@ public class SimplePageRankVertex extend
   }
 
   /**
-   * Simple VertexWriter that supports {@link SimplePageRankVertex}
-   */
-  public static class SimplePageRankVertexWriter extends
-      TextVertexWriter<LongWritable, DoubleWritable, FloatWritable> {
-    /**
-     * Constructor with line writer.
-     *
-     * @param lineRecordWriter Line writer that will do the writing.
-     */
-    public SimplePageRankVertexWriter(
-        RecordWriter<Text, Text> lineRecordWriter) {
-      super(lineRecordWriter);
-    }
-
-    @Override
-    public void writeVertex(
-      Vertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
-      throws IOException, InterruptedException {
-      getRecordWriter().write(
-          new Text(vertex.getId().toString()),
-          new Text(vertex.getValue().toString()));
-    }
-  }
-
-  /**
    * Simple VertexOutputFormat that supports {@link SimplePageRankVertex}
    */
   public static class SimplePageRankVertexOutputFormat extends
       TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
     @Override
-    public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
-    createVertexWriter(TaskAttemptContext context)
+    public TextVertexWriter createVertexWriter(TaskAttemptContext context)
       throws IOException, InterruptedException {
-      RecordWriter<Text, Text> recordWriter =
-          textOutputFormat.getRecordWriter(context);
-      return new SimplePageRankVertexWriter(recordWriter);
+      return new SimplePageRankVertexWriter();
+    }
+
+    /**
+     * Simple VertexWriter that supports {@link SimplePageRankVertex}
+     */
+    public class SimplePageRankVertexWriter extends TextVertexWriter {
+      @Override
+      public void writeVertex(
+          Vertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
+        throws IOException, InterruptedException {
+        getRecordWriter().write(
+            new Text(vertex.getId().toString()),
+            new Text(vertex.getValue().toString()));
+      }
     }
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java Thu Sep 20 23:00:27 2012
@@ -22,16 +22,13 @@ import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexReader;
-import org.apache.giraph.graph.VertexWriter;
 import org.apache.giraph.io.GeneratedVertexInputFormat;
 import org.apache.giraph.io.TextVertexOutputFormat;
-import org.apache.giraph.io.TextVertexOutputFormat.TextVertexWriter;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
 
@@ -62,12 +59,6 @@ public class SimpleSuperstepVertex exten
     /** Class logger */
     private static final Logger LOG =
         Logger.getLogger(SimpleSuperstepVertexReader.class);
-    /**
-     * Constructor.
-     */
-    public SimpleSuperstepVertexReader() {
-      super();
-    }
 
     @Override
     public boolean nextVertex() throws IOException, InterruptedException {
@@ -121,29 +112,6 @@ public class SimpleSuperstepVertex exten
     }
   }
 
-  /**
-   * Simple VertexWriter that supports {@link SimpleSuperstepVertex}
-   */
-  public static class SimpleSuperstepVertexWriter extends
-      TextVertexWriter<LongWritable, IntWritable, FloatWritable> {
-    /**
-     * Constructor with the line record writer.
-     *
-     * @param lineRecordWriter Writer to write to.
-     */
-    public SimpleSuperstepVertexWriter(
-        RecordWriter<Text, Text> lineRecordWriter) {
-      super(lineRecordWriter);
-    }
-
-    @Override
-    public void writeVertex(Vertex<LongWritable, IntWritable,
-            FloatWritable, ?> vertex) throws IOException, InterruptedException {
-      getRecordWriter().write(
-          new Text(vertex.getId().toString()),
-          new Text(vertex.getValue().toString()));
-    }
-  }
 
   /**
    * Simple VertexOutputFormat that supports {@link SimpleSuperstepVertex}
@@ -151,12 +119,22 @@ public class SimpleSuperstepVertex exten
   public static class SimpleSuperstepVertexOutputFormat extends
       TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
     @Override
-    public VertexWriter<LongWritable, IntWritable, FloatWritable>
-    createVertexWriter(TaskAttemptContext context)
+    public TextVertexWriter createVertexWriter(TaskAttemptContext context)
       throws IOException, InterruptedException {
-      RecordWriter<Text, Text> recordWriter =
-          textOutputFormat.getRecordWriter(context);
-      return new SimpleSuperstepVertexWriter(recordWriter);
+      return new SimpleSuperstepVertexWriter();
+    }
+
+    /**
+     * Simple VertexWriter that supports {@link SimpleSuperstepVertex}
+     */
+    public class SimpleSuperstepVertexWriter extends TextVertexWriter {
+      @Override
+      public void writeVertex(Vertex<LongWritable, IntWritable,
+          FloatWritable, ?> vertex) throws IOException, InterruptedException {
+        getRecordWriter().write(
+            new Text(vertex.getId().toString()),
+            new Text(vertex.getValue().toString()));
+      }
     }
   }
 }

Added: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java?rev=1388258&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java Thu Sep 20 23:00:27 2012
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.TextVertexOutputFormat;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Simple text based vertex output format example.
+ */
+public class SimpleTextVertexOutputFormat extends
+    TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
+  /**
+   * Simple text based vertex writer
+   */
+  private class SimpleTextVertexWriter extends TextVertexWriter {
+    @Override
+    public void writeVertex(
+      Vertex<LongWritable, IntWritable, FloatWritable, ?> vertex)
+      throws IOException, InterruptedException {
+      getRecordWriter().write(
+          new Text(vertex.getId().toString()),
+          new Text(vertex.getValue().toString()));
+    }
+  }
+
+  @Override
+  public TextVertexWriter createVertexWriter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return new SimpleTextVertexWriter();
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java Thu Sep 20 23:00:27 2012
@@ -21,13 +21,11 @@ package org.apache.giraph.examples;
 import java.io.IOException;
 
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexWriter;
 import org.apache.giraph.io.TextVertexOutputFormat;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 /**
@@ -36,31 +34,18 @@ import org.apache.hadoop.mapreduce.TaskA
 public class VertexWithDoubleValueFloatEdgeTextOutputFormat extends
     TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
   @Override
-  public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
-  createVertexWriter(TaskAttemptContext context) throws IOException,
-          InterruptedException {
-    RecordWriter<Text, Text> recordWriter =
-        textOutputFormat.getRecordWriter(context);
-    return new VertexWithDoubleValueWriter(recordWriter);
+  public TextVertexWriter createVertexWriter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return new VertexWithDoubleValueWriter();
   }
 
   /**
    * Vertex writer used with {@link VertexWithComponentTextOutputFormat}.
    */
-  public static class VertexWithDoubleValueWriter extends
-    TextVertexOutputFormat.TextVertexWriter<LongWritable,
-    DoubleWritable, FloatWritable> {
-    /**
-     * Constructor with record writer.
-     * @param writer Where the vertices will finally be written.
-     */
-    public VertexWithDoubleValueWriter(RecordWriter<Text, Text> writer) {
-      super(writer);
-    }
-
+  public class VertexWithDoubleValueWriter extends TextVertexWriter {
     @Override
     public void writeVertex(
-        Vertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
+      Vertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
       throws IOException, InterruptedException {
       StringBuilder output = new StringBuilder();
       output.append(vertex.getId().get());

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java Thu Sep 20 23:00:27 2012
@@ -38,7 +38,7 @@ import java.io.IOException;
 public interface VertexReader<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> {
   /**
-   * Use the input split and context to setup reading the vertices.
+   * Use the input split and context t o setup reading the vertices.
    * Guaranteed to be called prior to any other function.
    *
    * @param inputSplit Input split to be used for reading vertices.

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/VertexWriter.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexWriter.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/VertexWriter.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/VertexWriter.java Thu Sep 20 23:00:27 2012
@@ -40,8 +40,10 @@ public interface VertexWriter<I extends 
    *
    * @param context Context used to write the vertices.
    * @throws IOException
+   * @throws InterruptedException
    */
-  void initialize(TaskAttemptContext context) throws IOException;
+  void initialize(TaskAttemptContext context) throws IOException,
+    InterruptedException;
 
   /**
    * Writes the next vertex and associated data

Added: giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java?rev=1388258&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java Thu Sep 20 23:00:27 2012
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * VertexReader that readers lines of text with vertices encoded as adjacency
+ * lists and converts each token to the correct type.  For example, a graph
+ * with vertices as integers and values as doubles could be encoded as:
+ *   1 0.1 2 0.2 3 0.3
+ * to represent a vertex named 1, with 0.1 as its value and two edges, to
+ * vertices 2 and 3, with edge values of 0.2 and 0.3, respectively.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class AdjacencyListTextVertexInputFormat<I extends
+    WritableComparable, V extends Writable, E extends Writable, M extends
+    Writable> extends TextVertexInputFormat<I, V, E, M> {
+  /** Delimiter for split */
+  public static final String LINE_TOKENIZE_VALUE = "adj.list.input.delimiter";
+  /** Default delimiter for split */
+  public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
+
+  /**
+   * Utility for doing any cleaning of each line before it is tokenized.
+   */
+  public interface LineSanitizer {
+    /**
+     * Clean string s before attempting to tokenize it.
+     *
+     * @param s String to be cleaned.
+     * @return Sanitized string.
+     */
+    String sanitize(String s);
+  }
+
+  @Override
+  public abstract AdjacencyListTextVertexReader createVertexReader(
+      InputSplit split, TaskAttemptContext context);
+
+  /**
+   * Vertex reader associated with {@link AdjacencyListTextVertexInputFormat}.
+   */
+  protected abstract class AdjacencyListTextVertexReader extends
+    TextVertexReaderFromEachLineProcessed<String[]> {
+    /**
+     * Cached configuration.
+     */
+    private Configuration conf;
+
+    /** Cached delimiter used for split */
+    private String splitValue = null;
+
+    /**
+     * Sanitizer from constructor.
+     */
+    private final LineSanitizer sanitizer;
+
+
+    /**
+     * Constructor without line sanitizer.
+     *
+     */
+    public AdjacencyListTextVertexReader() {
+      this(null);
+    }
+
+    /**
+     * Constructor with line sanitizer.
+     *
+     * @param sanitizer Sanitizer to be used.
+     */
+    public AdjacencyListTextVertexReader(LineSanitizer sanitizer) {
+      this.sanitizer = sanitizer;
+    }
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      super.initialize(inputSplit, context);
+      conf = context.getConfiguration();
+      splitValue = conf.get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+    }
+
+    @Override
+    protected String[] preprocessLine(Text line) throws IOException {
+      String sanitizedLine;
+      if (sanitizer != null) {
+        sanitizedLine = sanitizer.sanitize(line.toString());
+      } else {
+        sanitizedLine = line.toString();
+      }
+      String [] values = sanitizedLine.split(splitValue);
+      if ((values.length < 2) || (values.length % 2 != 0)) {
+        throw new IllegalArgumentException(
+          "Line did not split correctly: " + line);
+      }
+      return values;
+    }
+
+    @Override
+    protected I getId(String[] values) throws IOException {
+      I vertexId = BspUtils.<I>createVertexId(conf);
+      decodeId(values[0], vertexId);
+      return vertexId;
+    }
+
+    /**
+     * Store the Id for this line in an instance of its correct type.
+     *
+     * @param s Id of vertex from line
+     * @param id Instance of Id's type, in which to store its value
+     */
+    public abstract void decodeId(String s, I id);
+
+    @Override
+    protected V getValue(String[] values) throws IOException {
+      V value = BspUtils.<V>createVertexValue(conf);
+      decodeValue(values[1], value);
+      return value;
+    }
+
+
+    /**
+     * Store the value for this line in an instance of its correct type.
+     * @param s Value from line
+     * @param value Instance of value's type, in which to store its value
+     */
+    public abstract void decodeValue(String s, V value);
+
+    @Override
+    protected Map<I, E> getEdges(String[] values) throws IOException {
+      int i = 2;
+      Map<I, E> edges = Maps.newHashMap();
+      Edge<I, E> edge = new Edge<I, E>();
+      while (i < values.length) {
+        decodeEdge(values[i], values[i + 1], edge);
+        edges.put(edge.getTargetVertexId(), edge.getValue());
+        i += 2;
+      }
+      return edges;
+    }
+
+    /**
+     * Store an edge from the line into an instance of a correctly typed Edge
+     * @param id The edge's id from the line
+     * @param value The edge's value from the line
+     * @param edge Instance of edge in which to store the id and value
+     */
+    public abstract void decodeEdge(String id, String value, Edge<I, E> edge);
+
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java Thu Sep 20 23:00:27 2012
@@ -19,11 +19,9 @@ package org.apache.giraph.io;
 
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexWriter;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import java.io.IOException;
@@ -43,39 +41,37 @@ public class AdjacencyListTextVertexOutp
     V extends Writable, E extends Writable>
     extends TextVertexOutputFormat<I, V, E> {
 
+  /** Split delimiter */
+  public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
+  /** Default split delimiter */
+  public static final String LINE_TOKENIZE_VALUE_DEFAULT =
+    AdjacencyListTextVertexInputFormat.LINE_TOKENIZE_VALUE_DEFAULT;
+
+  @Override
+  public AdjacencyListTextVertexWriter createVertexWriter(
+      TaskAttemptContext context) {
+    return new AdjacencyListTextVertexWriter();
+  }
+
   /**
-   * Vertex writer associated wtih {@link AdjacencyListTextVertexOutputFormat}.
-   *
-   * @param <I> Vertex id
-   * @param <V> Vertex data
-   * @param <E> Edge data
+   * Vertex writer associated with {@link AdjacencyListTextVertexOutputFormat}.
    */
-  static class AdjacencyListVertexWriter<I extends WritableComparable, V extends
-      Writable, E extends Writable> extends TextVertexWriter<I, V, E> {
-    /** Split delimiter */
-    public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
-    /** Default split delimiter */
-    public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
+  protected class AdjacencyListTextVertexWriter extends
+    TextVertexWriterToEachLine {
     /** Cached split delimeter */
     private String delimiter;
 
-    /**
-     * Constructor with writer.
-     *
-     * @param recordWriter Record writer used for writing.
-     */
-    public AdjacencyListVertexWriter(RecordWriter<Text, Text> recordWriter) {
-      super(recordWriter);
+    @Override
+    public void initialize(TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      super.initialize(context);
+      delimiter = context.getConfiguration()
+          .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
     }
 
     @Override
-    public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException,
-    InterruptedException {
-      if (delimiter == null) {
-        delimiter = getContext().getConfiguration()
-            .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
-      }
-
+    public Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+      throws IOException {
       StringBuffer sb = new StringBuffer(vertex.getId().toString());
       sb.append(delimiter);
       sb.append(vertex.getValue());
@@ -85,14 +81,8 @@ public class AdjacencyListTextVertexOutp
         sb.append(delimiter).append(edge.getValue());
       }
 
-      getRecordWriter().write(new Text(sb.toString()), null);
+      return new Text(sb.toString());
     }
   }
 
-  @Override
-  public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
-    throws IOException, InterruptedException {
-    return new AdjacencyListVertexWriter<I, V, E>
-    (textOutputFormat.getRecordWriter(context));
-  }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java Thu Sep 20 23:00:27 2012
@@ -20,11 +20,10 @@ package org.apache.giraph.io;
 
 
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import java.io.IOException;
@@ -44,48 +43,45 @@ public class IdWithValueTextOutputFormat
     V extends Writable, E extends Writable>
     extends TextVertexOutputFormat<I, V, E> {
 
+  /** Specify the output delimiter */
+  public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
+  /** Default output delimiter */
+  public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
+  /** Reverse id and value order? */
+  public static final String REVERSE_ID_AND_VALUE = "reverse.id.and.value";
+  /** Default is to not reverse id and value order. */
+  public static final boolean REVERSE_ID_AND_VALUE_DEFAULT = false;
+
+  @Override
+  public TextVertexWriter createVertexWriter(TaskAttemptContext context) {
+    return new IdWithValueVertexWriter();
+  }
+
   /**
    * Vertex writer used with {@link IdWithValueTextOutputFormat}.
-   *
-   * @param <I> Vertex id
-   * @param <V> Vertex data
-   * @param <E> Edge data
    */
-  static class IdWithValueVertexWriter<I extends WritableComparable, V extends
-      Writable, E extends Writable> extends TextVertexWriter<I, V, E> {
-    /** Specify the output delimiter */
-    public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
-    /** Default output delimiter */
-    public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
-    /** Reverse id and value order? */
-    public static final String REVERSE_ID_AND_VALUE = "reverse.id.and.value";
-    /** Default is to not reverse id and value order. */
-    public static final boolean REVERSE_ID_AND_VALUE_DEFAULT = false;
+  protected class IdWithValueVertexWriter extends TextVertexWriterToEachLine {
     /** Saved delimiter */
     private String delimiter;
+    /** Cached reserve option */
+    private boolean reverseOutput;
 
-    /**
-     * Constructor with record writer.
-     *
-     * @param recordWriter Writer from LineRecordWriter.
-     */
-    public IdWithValueVertexWriter(RecordWriter<Text, Text> recordWriter) {
-      super(recordWriter);
+    @Override
+    public void initialize(TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      super.initialize(context);
+      Configuration conf = context.getConfiguration();
+      delimiter = conf
+          .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+      reverseOutput = conf
+          .getBoolean(REVERSE_ID_AND_VALUE, REVERSE_ID_AND_VALUE_DEFAULT);
     }
 
     @Override
-    public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException,
-    InterruptedException {
-      if (delimiter == null) {
-        delimiter = getContext().getConfiguration()
-            .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
-      }
-
+    protected Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+      throws IOException {
       String first;
       String second;
-      boolean reverseOutput = getContext().getConfiguration()
-          .getBoolean(REVERSE_ID_AND_VALUE, REVERSE_ID_AND_VALUE_DEFAULT);
-
       if (reverseOutput) {
         first = vertex.getValue().toString();
         second = vertex.getId().toString();
@@ -93,17 +89,10 @@ public class IdWithValueTextOutputFormat
         first = vertex.getId().toString();
         second = vertex.getValue().toString();
       }
-
       Text line = new Text(first + delimiter + second);
-
-      getRecordWriter().write(line, null);
+      return line;
     }
-  }
 
-  @Override
-  public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
-    throws IOException, InterruptedException {
-    return new IdWithValueVertexWriter<I, V, E>
-    (textOutputFormat.getRecordWriter(context));
   }
+
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java Thu Sep 20 23:00:27 2012
@@ -18,18 +18,12 @@
 
 package org.apache.giraph.io;
 
-import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexReader;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import java.io.IOException;
@@ -45,60 +39,53 @@ import java.util.regex.Pattern;
 public class IntIntNullIntTextInputFormat extends
     TextVertexInputFormat<IntWritable, IntWritable, NullWritable,
     IntWritable> {
+  /** Separator of the vertex and neighbors */
+  private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
 
   @Override
-  public VertexReader<IntWritable, IntWritable, NullWritable, IntWritable>
-  createVertexReader(InputSplit split, TaskAttemptContext context)
+  public TextVertexReader createVertexReader(InputSplit split,
+      TaskAttemptContext context)
     throws IOException {
-    return new IntIntNullIntVertexReader(
-        textInputFormat.createRecordReader(split, context));
+    return new IntIntNullIntVertexReader();
   }
 
   /**
    * Vertex reader associated with {@link IntIntNullIntTextInputFormat}.
    */
-  public static class IntIntNullIntVertexReader extends
-      TextVertexInputFormat.TextVertexReader<IntWritable, IntWritable,
-      NullWritable, IntWritable> {
-    /** Separator of the vertex and neighbors */
-    private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
-
+  public class IntIntNullIntVertexReader extends
+    TextVertexReaderFromEachLineProcessed<String[]> {
     /**
-     * Constructor with the line reader.
-     *
-     * @param lineReader Internal line reader.
+     * Cached vertex id for the current line
      */
-    public IntIntNullIntVertexReader(RecordReader<LongWritable, Text>
-    lineReader) {
-      super(lineReader);
+    private IntWritable id;
+
+    @Override
+    protected String[] preprocessLine(Text line) throws IOException {
+      String[] tokens = SEPARATOR.split(line.toString());
+      id = new IntWritable(Integer.parseInt(tokens[0]));
+      return tokens;
     }
 
     @Override
-    public Vertex<IntWritable, IntWritable, NullWritable, IntWritable>
-    getCurrentVertex() throws IOException, InterruptedException {
-      Vertex<IntWritable, IntWritable, NullWritable, IntWritable>
-      vertex = BspUtils.<IntWritable, IntWritable, NullWritable,
-      IntWritable>createVertex(getContext().getConfiguration());
+    protected IntWritable getId(String[] tokens) throws IOException {
+      return id;
+    }
+
+    @Override
+    protected IntWritable getValue(String[] tokens) throws IOException {
+      return id;
+    }
 
-      String[] tokens = SEPARATOR.split(getRecordReader()
-          .getCurrentValue().toString());
+    @Override
+    protected Map<IntWritable, NullWritable> getEdges(String[] tokens)
+      throws IOException {
       Map<IntWritable, NullWritable> edges =
           Maps.newHashMapWithExpectedSize(tokens.length - 1);
       for (int n = 1; n < tokens.length; n++) {
         edges.put(new IntWritable(Integer.parseInt(tokens[n])),
             NullWritable.get());
       }
-
-      IntWritable vertexId = new IntWritable(Integer.parseInt(tokens[0]));
-      vertex.initialize(vertexId, vertexId, edges,
-          Lists.<IntWritable>newArrayList());
-
-      return vertex;
-    }
-
-    @Override
-    public boolean nextVertex() throws IOException, InterruptedException {
-      return getRecordReader().nextKeyValue();
+      return edges;
     }
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java Thu Sep 20 23:00:27 2012
@@ -19,22 +19,17 @@
 package org.apache.giraph.io;
 
 import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexReader;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
 import com.google.common.collect.Maps;
-
 import net.iharder.Base64;
 
 import java.io.ByteArrayInputStream;
@@ -58,72 +53,73 @@ import java.util.Map;
 public class JsonBase64VertexInputFormat<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     extends TextVertexInputFormat<I, V, E, M> {
+
+  @Override
+  public TextVertexReader createVertexReader(InputSplit split,
+      TaskAttemptContext context) {
+    return new JsonBase64VertexReader();
+  }
+
   /**
    * Simple reader that supports {@link JsonBase64VertexInputFormat}
-   *
-   * @param <I> Vertex index value
-   * @param <V> Vertex value
-   * @param <E> Edge value
-   * @param <M> Message data
    */
-  private static class JsonBase64VertexReader<I extends WritableComparable,
-      V extends Writable, E extends Writable, M extends Writable>
-      extends TextVertexReader<I, V, E, M> {
-    /**
-     * Only constructor.  Requires the LineRecordReader
-     *
-     * @param lineRecordReader Line record reader to read from
-     */
-    public JsonBase64VertexReader(
-        RecordReader<LongWritable, Text> lineRecordReader) {
-      super(lineRecordReader);
-    }
+  protected class JsonBase64VertexReader extends
+    TextVertexReaderFromEachLineProcessed<JSONObject> {
 
-    @Override
-    public boolean nextVertex() throws IOException, InterruptedException {
-      return getRecordReader().nextKeyValue();
-    }
+    /** Cached configuration */
+    private Configuration conf;
 
     @Override
-    public Vertex<I, V, E, M> getCurrentVertex()
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
       throws IOException, InterruptedException {
-      Configuration conf = getContext().getConfiguration();
-      Vertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
+      super.initialize(inputSplit, context);
+      conf = context.getConfiguration();
+    }
 
-      Text line = getRecordReader().getCurrentValue();
-      JSONObject vertexObject;
+    @Override
+    protected JSONObject preprocessLine(Text line) {
       try {
-        vertexObject = new JSONObject(line.toString());
+        return new JSONObject(line.toString());
       } catch (JSONException e) {
         throw new IllegalArgumentException(
           "next: Failed to get the vertex", e);
       }
-      DataInput input = null;
-      byte[] decodedWritable = null;
-      I vertexId = null;
+    }
+
+    @Override
+    protected I getId(JSONObject vertexObject) throws IOException {
       try {
-        decodedWritable = Base64.decode(
-          vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY));
-        input = new DataInputStream(
-          new ByteArrayInputStream(decodedWritable));
-        vertexId = BspUtils.<I>createVertexId(conf);
+        byte[] decodedWritable = Base64.decode(
+            vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY));
+        DataInput input = new DataInputStream(
+            new ByteArrayInputStream(decodedWritable));
+        I vertexId = BspUtils.<I>createVertexId(conf);
         vertexId.readFields(input);
+        return vertexId;
       } catch (JSONException e) {
         throw new IllegalArgumentException(
           "next: Failed to get vertex id", e);
       }
-      V vertexValue = null;
+    }
+
+    @Override
+    protected V getValue(JSONObject vertexObject) throws IOException {
       try {
-        decodedWritable = Base64.decode(
-          vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY));
-        input = new DataInputStream(
-          new ByteArrayInputStream(decodedWritable));
-        vertexValue = BspUtils.<V>createVertexValue(conf);
+        byte[] decodedWritable = Base64.decode(
+            vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY));
+        DataInputStream input = new DataInputStream(
+            new ByteArrayInputStream(decodedWritable));
+        V vertexValue = BspUtils.<V>createVertexValue(conf);
         vertexValue.readFields(input);
+        return vertexValue;
       } catch (JSONException e) {
         throw new IllegalArgumentException(
           "next: Failed to get vertex value", e);
       }
+    }
+
+    @Override
+    protected Map<I, E> getEdges(JSONObject vertexObject) throws IOException {
       JSONArray edgeArray = null;
       try {
         edgeArray = vertexObject.getJSONArray(
@@ -132,6 +128,7 @@ public class JsonBase64VertexInputFormat
         throw new IllegalArgumentException(
           "next: Failed to get edge array", e);
       }
+      byte[] decodedWritable;
       Map<I, E> edgeMap = Maps.newHashMap();
       for (int i = 0; i < edgeArray.length(); ++i) {
         try {
@@ -140,7 +137,7 @@ public class JsonBase64VertexInputFormat
           throw new IllegalArgumentException(
             "next: Failed to get edge value", e);
         }
-        input = new DataInputStream(
+        DataInputStream input = new DataInputStream(
             new ByteArrayInputStream(decodedWritable));
         I targetVertexId =
             BspUtils.<I>createVertexId(getContext().getConfiguration());
@@ -150,15 +147,9 @@ public class JsonBase64VertexInputFormat
         edgeValue.readFields(input);
         edgeMap.put(targetVertexId, edgeValue);
       }
-      vertex.initialize(vertexId, vertexValue, edgeMap, null);
-      return vertex;
+      return edgeMap;
     }
-  }
 
-  @Override
-  public VertexReader<I, V, E, M> createVertexReader(
-      InputSplit split, TaskAttemptContext context) throws IOException {
-    return new JsonBase64VertexReader<I, V, E, M>(
-      textInputFormat.createRecordReader(split, context));
   }
+
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexOutputFormat.java Thu Sep 20 23:00:27 2012
@@ -20,11 +20,9 @@ package org.apache.giraph.io;
 
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexWriter;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.json.JSONArray;
 import org.json.JSONException;
@@ -51,29 +49,20 @@ import java.io.IOException;
 public class JsonBase64VertexOutputFormat<I extends WritableComparable,
     V extends Writable, E extends Writable> extends
     TextVertexOutputFormat<I, V, E> {
+
+  @Override
+  public TextVertexWriter createVertexWriter(TaskAttemptContext context) {
+    return new JsonBase64VertexWriter();
+  }
+
   /**
    * Simple writer that supports {@link JsonBase64VertexOutputFormat}
-   *
-   * @param <I> Vertex index value
-   * @param <V> Vertex value
-   * @param <E> Edge value
    */
-  private static class JsonBase64VertexWriter<I extends WritableComparable,
-      V extends Writable, E extends Writable> extends
-      TextVertexWriter<I, V, E> {
-    /**
-     * Only constructor.  Requires the LineRecordWriter
-     *
-     * @param lineRecordWriter Line record writer to write to
-     */
-    public JsonBase64VertexWriter(
-        RecordWriter<Text, Text> lineRecordWriter) {
-      super(lineRecordWriter);
-    }
+  protected class JsonBase64VertexWriter extends TextVertexWriterToEachLine {
 
     @Override
-    public void writeVertex(Vertex<I, V, E, ?> vertex)
-      throws IOException, InterruptedException {
+    protected Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+      throws IOException {
       ByteArrayOutputStream outputStream =
           new ByteArrayOutputStream();
       DataOutput output = new DataOutputStream(outputStream);
@@ -112,14 +101,9 @@ public class JsonBase64VertexOutputForma
         throw new IllegalStateException(
             "writerVertex: Failed to insert edge array", e);
       }
-      getRecordWriter().write(new Text(vertexObject.toString()), null);
+      return new Text(vertexObject.toString());
     }
-  }
 
-  @Override
-  public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
-    throws IOException, InterruptedException {
-    return new JsonBase64VertexWriter<I, V, E>(
-        textOutputFormat.getRecordWriter(context));
   }
+
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexInputFormat.java Thu Sep 20 23:00:27 2012
@@ -17,15 +17,12 @@
  */
 package org.apache.giraph.io;
 
-import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexReader;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.json.JSONArray;
 import org.json.JSONException;
@@ -46,11 +43,9 @@ public class JsonLongDoubleFloatDoubleVe
   FloatWritable, DoubleWritable> {
 
   @Override
-  public VertexReader<LongWritable, DoubleWritable, FloatWritable,
-    DoubleWritable> createVertexReader(InputSplit split,
-    TaskAttemptContext context) throws IOException {
-    return new JsonLongDoubleFloatDoubleVertexReader(
-      textInputFormat.createRecordReader(split, context));
+  public TextVertexReader createVertexReader(InputSplit split,
+      TaskAttemptContext context) {
+    return new JsonLongDoubleFloatDoubleVertexReader();
   }
 
  /**
@@ -64,52 +59,47 @@ public class JsonLongDoubleFloatDoubleVe
   * Second edge has a destination vertex 3, edge value 0.7.
   * [1,4.3,[[2,2.1],[3,0.7]]]
   */
-  static class JsonLongDoubleFloatDoubleVertexReader extends
-    TextVertexReader<LongWritable, DoubleWritable,
-    FloatWritable, DoubleWritable> {
-
-  /**
-    * Constructor with the line record reader.
-    *
-    * @param lineRecordReader Will read from this line.
-    */
-    public JsonLongDoubleFloatDoubleVertexReader(
-      RecordReader<LongWritable, Text> lineRecordReader) {
-      super(lineRecordReader);
+  class JsonLongDoubleFloatDoubleVertexReader extends
+    TextVertexReaderFromEachLineProcessedHandlingExceptions<JSONArray,
+    JSONException> {
+
+    @Override
+    protected JSONArray preprocessLine(Text line) throws JSONException {
+      return new JSONArray(line.toString());
+    }
+
+    @Override
+    protected LongWritable getId(JSONArray jsonVertex) throws JSONException,
+              IOException {
+      return new LongWritable(jsonVertex.getLong(0));
     }
 
     @Override
-    public Vertex<LongWritable, DoubleWritable, FloatWritable,
-          DoubleWritable> getCurrentVertex()
-      throws IOException, InterruptedException {
-      Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
-          vertex = BspUtils.<LongWritable, DoubleWritable, FloatWritable,
-          DoubleWritable>createVertex(getContext().getConfiguration());
-
-      Text line = getRecordReader().getCurrentValue();
-      try {
-        JSONArray jsonVertex = new JSONArray(line.toString());
-        LongWritable vertexId = new LongWritable(jsonVertex.getLong(0));
-        DoubleWritable vertexValue =
-          new DoubleWritable(jsonVertex.getDouble(1));
-        Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
-        JSONArray jsonEdgeArray = jsonVertex.getJSONArray(2);
-        for (int i = 0; i < jsonEdgeArray.length(); ++i) {
-          JSONArray jsonEdge = jsonEdgeArray.getJSONArray(i);
-          edges.put(new LongWritable(jsonEdge.getLong(0)),
+    protected DoubleWritable getValue(JSONArray jsonVertex) throws
+      JSONException, IOException {
+      return new DoubleWritable(jsonVertex.getDouble(1));
+    }
+
+    @Override
+    protected Map<LongWritable, FloatWritable> getEdges(JSONArray jsonVertex)
+      throws JSONException, IOException {
+      Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
+      JSONArray jsonEdgeArray = jsonVertex.getJSONArray(2);
+      for (int i = 0; i < jsonEdgeArray.length(); ++i) {
+        JSONArray jsonEdge = jsonEdgeArray.getJSONArray(i);
+        edges.put(new LongWritable(jsonEdge.getLong(0)),
             new FloatWritable((float) jsonEdge.getDouble(1)));
-        }
-        vertex.initialize(vertexId, vertexValue, edges, null);
-      } catch (JSONException e) {
-        throw new IllegalArgumentException(
-          "next: Couldn't get vertex from line " + line, e);
       }
-      return vertex;
+      return edges;
     }
 
     @Override
-    public boolean nextVertex() throws IOException, InterruptedException {
-      return getRecordReader().nextKeyValue();
+    protected Vertex<LongWritable, DoubleWritable, FloatWritable,
+              DoubleWritable> handleException(Text line, JSONArray jsonVertex,
+                  JSONException e) {
+      throw new IllegalArgumentException(
+          "Couldn't get vertex from line " + line, e);
     }
+
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonLongDoubleFloatDoubleVertexOutputFormat.java Thu Sep 20 23:00:27 2012
@@ -20,12 +20,10 @@ package org.apache.giraph.io;
 
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexWriter;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.json.JSONArray;
 import org.json.JSONException;
@@ -39,35 +37,24 @@ import java.io.IOException;
 public class JsonLongDoubleFloatDoubleVertexOutputFormat extends
   TextVertexOutputFormat<LongWritable, DoubleWritable,
   FloatWritable> {
+
   @Override
-  public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
-  createVertexWriter(TaskAttemptContext context)
-    throws IOException, InterruptedException {
-    RecordWriter<Text, Text> recordWriter =
-      textOutputFormat.getRecordWriter(context);
-    return new JsonLongDoubleFloatDoubleVertexWriter(recordWriter);
+  public TextVertexWriter createVertexWriter(
+      TaskAttemptContext context) {
+    return new JsonLongDoubleFloatDoubleVertexWriter();
   }
 
  /**
   * VertexWriter that supports vertices with <code>double</code>
   * values and <code>float</code> out-edge weights.
   */
-  static class JsonLongDoubleFloatDoubleVertexWriter extends
-    TextVertexWriter<LongWritable, DoubleWritable, FloatWritable> {
-
-   /**
-    * Vertex writer with the internal line writer.
-    *
-    * @param lineRecordWriter Wil actually be written to.
-    */
-    public JsonLongDoubleFloatDoubleVertexWriter(
-      RecordWriter<Text, Text> lineRecordWriter) {
-      super(lineRecordWriter);
-    }
-
+  private class JsonLongDoubleFloatDoubleVertexWriter extends
+    TextVertexWriterToEachLine {
     @Override
-    public void writeVertex(Vertex<LongWritable, DoubleWritable,
-          FloatWritable, ?> vertex) throws IOException, InterruptedException {
+    public Text convertVertexToLine(
+      Vertex<LongWritable, DoubleWritable,
+        FloatWritable, ?> vertex
+    ) throws IOException {
       JSONArray jsonVertex = new JSONArray();
       try {
         jsonVertex.put(vertex.getId().get());
@@ -84,7 +71,7 @@ public class JsonLongDoubleFloatDoubleVe
         throw new IllegalArgumentException(
           "writeVertex: Couldn't write vertex " + vertex);
       }
-      getRecordWriter().write(new Text(jsonVertex.toString()), null);
+      return new Text(jsonVertex.toString());
     }
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/LongDoubleDoubleAdjacencyListVertexInputFormat.java Thu Sep 20 23:00:27 2012
@@ -20,14 +20,10 @@ package org.apache.giraph.io;
 import org.apache.giraph.graph.Edge;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import java.io.IOException;
-
 /**
  * InputFormat for reading graphs stored as (ordered) adjacency lists
  * with the vertex ids longs and the vertex values and edges doubles.
@@ -39,37 +35,30 @@ import java.io.IOException;
  * @param <M> Message data
  */
 public class LongDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
-    extends TextVertexInputFormat<LongWritable, DoubleWritable,
+    extends AdjacencyListTextVertexInputFormat<LongWritable, DoubleWritable,
     DoubleWritable, M> {
 
+  @Override
+  public AdjacencyListTextVertexReader createVertexReader(InputSplit split,
+      TaskAttemptContext context) {
+    return new LongDoubleDoubleAdjacencyListVertexReader(null);
+  }
+
   /**
    * VertexReader associated with
    * {@link LongDoubleDoubleAdjacencyListVertexInputFormat}.
-   *
-   * @param <M> Message data.
    */
-  static class VertexReader<M extends Writable> extends
-      AdjacencyListVertexReader<LongWritable, DoubleWritable,
-      DoubleWritable, M> {
+  protected class LongDoubleDoubleAdjacencyListVertexReader extends
+      AdjacencyListTextVertexReader {
 
     /**
-     * Constructor with Line record reader.
+     * Constructor with {@link LineSanitizer}.
      *
-     * @param lineRecordReader Reader to internally use.
+     * @param lineSanitizer the sanitizer to use for reading
      */
-    VertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
-      super(lineRecordReader);
-    }
-
-    /**
-     * Constructor with Line record reader and sanitizer.
-     *
-     * @param lineRecordReader Reader to internally use.
-     * @param sanitizer Line sanitizer.
-     */
-    VertexReader(RecordReader<LongWritable, Text> lineRecordReader,
-        LineSanitizer sanitizer) {
-      super(lineRecordReader, sanitizer);
+    public LongDoubleDoubleAdjacencyListVertexReader(LineSanitizer
+        lineSanitizer) {
+      super(lineSanitizer);
     }
 
     @Override
@@ -92,12 +81,4 @@ public class LongDoubleDoubleAdjacencyLi
     }
   }
 
-  @Override
-  public org.apache.giraph.graph.VertexReader<LongWritable,
-  DoubleWritable, DoubleWritable, M> createVertexReader(
-      InputSplit split,
-      TaskAttemptContext context) throws IOException {
-    return new VertexReader<M>(textInputFormat.createRecordReader(
-        split, context));
-  }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/TextDoubleDoubleAdjacencyListVertexInputFormat.java Thu Sep 20 23:00:27 2012
@@ -19,15 +19,11 @@ package org.apache.giraph.io;
 
 import org.apache.giraph.graph.Edge;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import java.io.IOException;
-
 /**
  * Class to read graphs stored as adjacency lists with ids represented by
  * Strings and values as doubles.  This is a good inputformat for reading
@@ -36,8 +32,14 @@ import java.io.IOException;
  * @param <M> Message type.
  */
 public class TextDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
-    extends TextVertexInputFormat<Text, DoubleWritable, DoubleWritable, M>  {
+    extends AdjacencyListTextVertexInputFormat<Text, DoubleWritable,
+            DoubleWritable, M>  {
 
+  @Override
+  public AdjacencyListTextVertexReader createVertexReader(InputSplit split,
+      TaskAttemptContext context) {
+    return new TextDoubleDoubleAdjacencyListVertexReader(null);
+  }
 
   /**
    * Vertex reader used with
@@ -45,26 +47,17 @@ public class TextDoubleDoubleAdjacencyLi
    *
    * @param <M> Message type.
    */
-  static class VertexReader<M extends Writable> extends
-      AdjacencyListVertexReader<Text, DoubleWritable, DoubleWritable, M> {
-    /**
-     * Constructor without sanitzer.
-     *
-     * @param lineRecordReader Internal reader.
-     */
-    VertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
-      super(lineRecordReader);
-    }
+  protected class TextDoubleDoubleAdjacencyListVertexReader extends
+      AdjacencyListTextVertexReader {
 
     /**
-     * Constructor with {@link LineRecordReader}
+     * Constructor with {@link LineSanitizer}.
      *
-     * @param lineRecordReader Internal reader.
-     * @param sanitizer Sanitizer of the lines.
+     * @param lineSanitizer the sanitizer to use for reading
      */
-    VertexReader(RecordReader<LongWritable, Text> lineRecordReader,
-        LineSanitizer sanitizer) {
-      super(lineRecordReader, sanitizer);
+    public TextDoubleDoubleAdjacencyListVertexReader(LineSanitizer
+        lineSanitizer) {
+      super(lineSanitizer);
     }
 
     @Override
@@ -85,11 +78,4 @@ public class TextDoubleDoubleAdjacencyLi
     }
   }
 
-  @Override
-  public org.apache.giraph.graph.VertexReader<Text, DoubleWritable,
-      DoubleWritable, M> createVertexReader(InputSplit split,
-      TaskAttemptContext context) throws IOException {
-    return new VertexReader<M>(textInputFormat.createRecordReader(
-        split, context));
-  }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java Thu Sep 20 23:00:27 2012
@@ -18,8 +18,11 @@
 
 package org.apache.giraph.io;
 
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -32,6 +35,7 @@ import org.apache.hadoop.mapreduce.lib.i
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Abstract class that users should subclass to use their own text based
@@ -46,41 +50,78 @@ import java.util.List;
 public abstract class TextVertexInputFormat<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     extends VertexInputFormat<I, V, E, M> {
+
   /** Uses the TextInputFormat to do everything */
   protected TextInputFormat textInputFormat = new TextInputFormat();
 
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    // Ignore the hint of numWorkers here since we are using TextInputFormat
+    // to do this for us
+    return textInputFormat.getSplits(context);
+  }
+
+  /**
+   * The factory method which produces the {@link TextVertexReader} used by this
+   * input format.
+   *
+   * @param split
+   *          the split to be read
+   * @param context
+   *          the information about the task
+   * @return
+   *         the text vertex reader to be used
+   */
+  @Override
+  public abstract TextVertexReader createVertexReader(InputSplit split,
+      TaskAttemptContext context) throws IOException;
+
   /**
    * Abstract class to be implemented by the user based on their specific
-   * vertex input.  Easiest to ignore the key value separator and only use
+   * vertex input. Easiest to ignore the key value separator and only use
    * key instead.
    *
-   * @param <I> Vertex index value
-   * @param <V> Vertex value
-   * @param <E> Edge value
+   * When reading a vertex from each line, extend
+   * {@link TextVertexReaderFromEachLine}. If you need to preprocess each line
+   * first, then extend {@link TextVertexReaderFromEachLineProcessed}. If you
+   * need common exception handling while preprocessing, then extend
+   * {@link TextVertexReaderFromEachLineProcessedHandlingExceptions}.
    */
-  public abstract static class TextVertexReader<I extends WritableComparable,
-      V extends Writable, E extends Writable, M extends Writable>
-      implements VertexReader<I, V, E, M> {
+  protected abstract class TextVertexReader implements
+    VertexReader<I, V, E, M> {
     /** Internal line record reader */
-    private final RecordReader<LongWritable, Text> lineRecordReader;
+    private RecordReader<LongWritable, Text> lineRecordReader;
     /** Context passed to initialize */
     private TaskAttemptContext context;
 
-    /**
-     * Initialize with the LineRecordReader.
-     *
-     * @param lineRecordReader Line record reader from TextInputFormat
-     */
-    public TextVertexReader(
-        RecordReader<LongWritable, Text> lineRecordReader) {
-      this.lineRecordReader = lineRecordReader;
-    }
-
     @Override
     public void initialize(InputSplit inputSplit, TaskAttemptContext context)
       throws IOException, InterruptedException {
-      lineRecordReader.initialize(inputSplit, context);
       this.context = context;
+      lineRecordReader = createLineRecordReader(inputSplit, context);
+      lineRecordReader.initialize(inputSplit, context);
+    }
+
+    /**
+     * Create the line record reader. Override this to use a different
+     * underlying record reader (useful for testing).
+     *
+     * @param inputSplit
+     *          the split to read
+     * @param context
+     *          the context passed to initialize
+     * @return
+     *         the record reader to be used
+     * @throws IOException
+     *           exception that can be thrown during creation
+     * @throws InterruptedException
+     *           exception that can be thrown during creation
+     */
+    protected RecordReader<LongWritable, Text>
+    createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      return textInputFormat.createRecordReader(inputSplit, context);
     }
 
     @Override
@@ -112,11 +153,266 @@ public abstract class TextVertexInputFor
     }
   }
 
-  @Override
-  public List<InputSplit> getSplits(JobContext context, int numWorkers)
-    throws IOException, InterruptedException {
-    // Ignore the hint of numWorkers here since we are using TextInputFormat
-    // to do this for us
-    return textInputFormat.getSplits(context);
+  /**
+   * Abstract class to be implemented by the user to read a vertex from each
+   * text line.
+   */
+  protected abstract class TextVertexReaderFromEachLine extends
+    TextVertexReader {
+
+    @Override
+    public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
+    InterruptedException {
+      Text line = getRecordReader().getCurrentValue();
+      Vertex<I, V, E, M> vertex = BspUtils
+          .<I, V, E, M>createVertex(getContext().getConfiguration());
+      vertex.initialize(getId(line), getValue(line), getEdges(line), null);
+      return vertex;
+    }
+
+    @Override
+    public final boolean nextVertex() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
+
+    /**
+     * Reads vertex id from the current line.
+     *
+     * @param line
+     *          the current line
+     * @return
+     *         the vertex id corresponding to the line
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract I getId(Text line) throws IOException;
+
+    /**
+     * Reads vertex value from the current line.
+     *
+     * @param line
+     *          the current line
+     * @return
+     *         the vertex value corresponding to the line
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract V getValue(Text line) throws IOException;
+
+    /**
+     * Reads edges value from the current line.
+     *
+     * @param line
+     *          the current line
+     * @return
+     *         the edges mapping target vertex id to its edge value
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract Map<I, E> getEdges(Text line) throws IOException;
+
+  }
+
+  /**
+   * Abstract class to be implemented by the user to read a vertex from each
+   * text line after preprocessing it.
+   *
+   * @param <T>
+   *          The resulting type of preprocessing.
+   */
+  protected abstract class TextVertexReaderFromEachLineProcessed<T> extends
+      TextVertexReader {
+
+    @Override
+    public final boolean nextVertex() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
+
+    @Override
+    public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
+    InterruptedException {
+      Text line = getRecordReader().getCurrentValue();
+      Vertex<I, V, E, M> vertex;
+      T processed = preprocessLine(line);
+      vertex = BspUtils
+          .<I, V, E, M>createVertex(getContext().getConfiguration());
+      vertex.initialize(getId(processed), getValue(processed),
+          getEdges(processed), null);
+      return vertex;
+    }
+
+    /**
+     * Preprocess the line so other methods can easily read necessary
+     * information for creating vertex.
+     *
+     * @param line
+     *          the current line to be read
+     * @return
+     *         the preprocessed object
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract T preprocessLine(Text line) throws IOException;
+
+    /**
+     * Reads vertex id from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the vertex id
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract I getId(T line) throws IOException;
+
+    /**
+     * Reads vertex value from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the vertex value
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract V getValue(T line) throws IOException;
+
+    /**
+     * Reads edges from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the edges mapping target vertex id to its edge value
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract Map<I, E> getEdges(T line) throws IOException;
+
+  }
+
+  // CHECKSTYLE: stop RedundantThrows
+  /**
+   * Abstract class to be implemented by the user to read a vertex from each
+   * text line after preprocessing it with exception handling.
+   *
+   * @param <T>
+   *          The resulting type of preprocessing.
+   * @param <X>
+   *          The exception type that can be thrown due to preprocessing.
+   */
+  protected abstract class
+  TextVertexReaderFromEachLineProcessedHandlingExceptions<T, X extends
+    Throwable> extends TextVertexReader {
+
+    @Override
+    public final boolean nextVertex() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
+        InterruptedException {
+      // Note we are reading from value only since key is the line number
+      Text line = getRecordReader().getCurrentValue();
+      Vertex<I, V, E, M> vertex;
+      T processed = null;
+      try {
+        processed = preprocessLine(line);
+        Configuration conf = getContext().getConfiguration();
+        vertex = BspUtils
+            .<I, V, E, M>createVertex(conf);
+        vertex.initialize(getId(processed), getValue(processed),
+            getEdges(processed), null);
+      } catch (IOException e) {
+        throw e;
+      // CHECKSTYLE: stop IllegalCatch
+      } catch (Throwable t) {
+        return handleException(line, processed, (X) t);
+      // CHECKSTYLE: resume IllegalCatch
+      }
+      return vertex;
+    }
+
+    /**
+     * Preprocess the line so other methods can easily read necessary
+     * information for creating vertex.
+     *
+     * @param line
+     *          the current line to be read
+     * @return
+     *         the preprocessed object
+     * @throws X
+     *           exception that can be thrown while preprocessing the line
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract T preprocessLine(Text line) throws X, IOException;
+
+    /**
+     * Reads vertex id from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the vertex id
+     * @throws X
+     *           exception that can be thrown while reading the preprocessed
+     *           object
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract I getId(T line) throws X, IOException;
+
+    /**
+     * Reads vertex value from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the vertex value
+     * @throws X
+     *           exception that can be thrown while reading the preprocessed
+     *           object
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract V getValue(T line) throws X, IOException;
+
+    /**
+     * Reads edges from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the edges mapping target vertex id to its edge value
+     * @throws X
+     *           exception that can be thrown while reading the preprocessed
+     *           object
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract Map<I, E> getEdges(T line) throws X, IOException;
+
+    /**
+     * Handles exceptions while reading vertex from each line.
+     *
+     * @param line
+     *          the line that was being read when the exception was thrown
+     * @param processed
+     *          the object obtained by preprocessing the line. Can be null if
+     *          exception was thrown during preprocessing.
+     * @param e
+     *          the exception thrown while reading the line
+     * @return the recovered/alternative vertex to be used
+     */
+    protected Vertex<I, V, E, M> handleException(Text line, T processed, X e) {
+      throw new IllegalArgumentException(e);
+    }
+
   }
+  // CHECKSTYLE: resume RedundantThrows
+
 }