You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC

svn commit: r1245205 [15/18] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/example...

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java Thu Feb 16 22:12:31 2012
@@ -37,102 +37,115 @@ import com.google.common.collect.Maps;
  * Helper class for {@link Partition} related operations.
  */
 public class PartitionUtils {
-    /** Class logger */
-    private static Logger LOG = Logger.getLogger(PartitionUtils.class);
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(PartitionUtils.class);
 
-    private static class EdgeCountComparator implements
-            Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
+  /**
+   * Do not construct this object.
+   */
+  private PartitionUtils() { }
+
+  /**
+   * Compare edge counts for Entry<WorkerInfo, VertexEdgeCount> objects.
+   */
+  private static class EdgeCountComparator implements
+      Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
+    @Override
+    public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
+        Entry<WorkerInfo, VertexEdgeCount> worker2) {
+      return (int) (worker1.getValue().getEdgeCount() -
+        worker2.getValue().getEdgeCount());
+    }
+  }
 
-        @Override
-        public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
-                           Entry<WorkerInfo, VertexEdgeCount> worker2) {
-            return (int) (worker1.getValue().getEdgeCount() -
-                          worker2.getValue().getEdgeCount());
-        }
+  /**
+   * Compare vertex counts between a {@link WorkerInfo} and
+   * {@link VertexEdgeCount}.
+   */
+  private static class VertexCountComparator implements
+      Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
+    @Override
+    public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
+        Entry<WorkerInfo, VertexEdgeCount> worker2) {
+      return (int) (worker1.getValue().getVertexCount() -
+        worker2.getValue().getVertexCount());
     }
+  }
 
-    private static class VertexCountComparator implements
-            Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
+  /**
+   * Check for imbalances on a per worker basis, by calculating the
+   * mean, high and low workers by edges and vertices.
+   *
+   * @param partitionOwnerList List of partition owners.
+   * @param allPartitionStats All the partition stats.
+   */
+  public static void analyzePartitionStats(
+      Collection<PartitionOwner> partitionOwnerList,
+      List<PartitionStats> allPartitionStats) {
+    Map<Integer, PartitionOwner> idOwnerMap =
+        new HashMap<Integer, PartitionOwner>();
+    for (PartitionOwner partitionOwner : partitionOwnerList) {
+      if (idOwnerMap.put(partitionOwner.getPartitionId(),
+          partitionOwner) != null) {
+        throw new IllegalStateException(
+            "analyzePartitionStats: Duplicate partition " +
+                partitionOwner);
+      }
+    }
 
-        @Override
-        public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
-                           Entry<WorkerInfo, VertexEdgeCount> worker2) {
-            return (int) (worker1.getValue().getEdgeCount() -
-                          worker2.getValue().getEdgeCount());
-        }
+    Map<WorkerInfo, VertexEdgeCount> workerStatsMap = Maps.newHashMap();
+    VertexEdgeCount totalVertexEdgeCount = new VertexEdgeCount();
+    for (PartitionStats partitionStats : allPartitionStats) {
+      WorkerInfo workerInfo =
+          idOwnerMap.get(partitionStats.getPartitionId()).getWorkerInfo();
+      VertexEdgeCount vertexEdgeCount =
+          workerStatsMap.get(workerInfo);
+      if (vertexEdgeCount == null) {
+        workerStatsMap.put(
+            workerInfo,
+            new VertexEdgeCount(partitionStats.getVertexCount(),
+                partitionStats.getEdgeCount()));
+      } else {
+        workerStatsMap.put(
+            workerInfo,
+            vertexEdgeCount.incrVertexEdgeCount(
+                partitionStats.getVertexCount(),
+                partitionStats.getEdgeCount()));
+      }
+      totalVertexEdgeCount =
+          totalVertexEdgeCount.incrVertexEdgeCount(
+              partitionStats.getVertexCount(),
+              partitionStats.getEdgeCount());
     }
 
-    /**
-     * Check for imbalances on a per worker basis, by calculating the
-     * mean, high and low workers by edges and vertices.
-     */
-    public static void analyzePartitionStats(
-            Collection<PartitionOwner> partitionOwnerList,
-            List<PartitionStats> allPartitionStats) {
-        Map<Integer, PartitionOwner> idOwnerMap =
-            new HashMap<Integer, PartitionOwner>();
-        for (PartitionOwner partitionOwner : partitionOwnerList) {
-            if (idOwnerMap.put(partitionOwner.getPartitionId(),
-                               partitionOwner) != null) {
-                throw new IllegalStateException(
-                    "analyzePartitionStats: Duplicate partition " +
-                    partitionOwner);
-            }
-        }
-
-        Map<WorkerInfo, VertexEdgeCount> workerStatsMap = Maps.newHashMap();
-        VertexEdgeCount totalVertexEdgeCount = new VertexEdgeCount();
-        for (PartitionStats partitionStats : allPartitionStats) {
-            WorkerInfo workerInfo =
-                idOwnerMap.get(partitionStats.getPartitionId()).getWorkerInfo();
-            VertexEdgeCount vertexEdgeCount =
-                workerStatsMap.get(workerInfo);
-            if (vertexEdgeCount == null) {
-                workerStatsMap.put(
-                    workerInfo,
-                    new VertexEdgeCount(partitionStats.getVertexCount(),
-                                        partitionStats.getEdgeCount()));
-            } else {
-                workerStatsMap.put(
-                    workerInfo,
-                    vertexEdgeCount.incrVertexEdgeCount(
-                        partitionStats.getVertexCount(),
-                        partitionStats.getEdgeCount()));
-            }
-            totalVertexEdgeCount =
-                totalVertexEdgeCount.incrVertexEdgeCount(
-                    partitionStats.getVertexCount(),
-                    partitionStats.getEdgeCount());
-        }
-
-        List<Entry<WorkerInfo, VertexEdgeCount>> workerEntryList =
-            Lists.newArrayList(workerStatsMap.entrySet());
-
-        if (LOG.isInfoEnabled()) {
-            Collections.sort(workerEntryList, new VertexCountComparator());
-            LOG.info("analyzePartitionStats: Vertices - Mean: " +
-                    (totalVertexEdgeCount.getVertexCount() /
-                        workerStatsMap.size()) +
-                    ", Min: " +
-                    workerEntryList.get(0).getKey() + " - " +
-                    workerEntryList.get(0).getValue().getVertexCount() +
-                    ", Max: "+
-                    workerEntryList.get(workerEntryList.size() - 1).getKey() +
-                    " - " +
-                    workerEntryList.get(workerEntryList.size() - 1).
-                    getValue().getVertexCount());
-            Collections.sort(workerEntryList, new EdgeCountComparator());
-            LOG.info("analyzePartitionStats: Edges - Mean: " +
-                     (totalVertexEdgeCount.getEdgeCount() /
-                         workerStatsMap.size()) +
-                     ", Min: " +
-                     workerEntryList.get(0).getKey() + " - " +
-                     workerEntryList.get(0).getValue().getEdgeCount() +
-                     ", Max: "+
-                     workerEntryList.get(workerEntryList.size() - 1).getKey() +
-                     " - " +
-                     workerEntryList.get(workerEntryList.size() - 1).
-                     getValue().getEdgeCount());
-        }
+    List<Entry<WorkerInfo, VertexEdgeCount>> workerEntryList =
+        Lists.newArrayList(workerStatsMap.entrySet());
+
+    if (LOG.isInfoEnabled()) {
+      Collections.sort(workerEntryList, new VertexCountComparator());
+      LOG.info("analyzePartitionStats: Vertices - Mean: " +
+          (totalVertexEdgeCount.getVertexCount() /
+              workerStatsMap.size()) +
+              ", Min: " +
+              workerEntryList.get(0).getKey() + " - " +
+              workerEntryList.get(0).getValue().getVertexCount() +
+              ", Max: " +
+              workerEntryList.get(workerEntryList.size() - 1).getKey() +
+              " - " +
+              workerEntryList.get(workerEntryList.size() - 1).
+              getValue().getVertexCount());
+      Collections.sort(workerEntryList, new EdgeCountComparator());
+      LOG.info("analyzePartitionStats: Edges - Mean: " +
+          (totalVertexEdgeCount.getEdgeCount() /
+              workerStatsMap.size()) +
+              ", Min: " +
+              workerEntryList.get(0).getKey() + " - " +
+              workerEntryList.get(0).getValue().getEdgeCount() +
+              ", Max: " +
+              workerEntryList.get(workerEntryList.size() - 1).getKey() +
+              " - " +
+              workerEntryList.get(workerEntryList.size() - 1).
+              getValue().getEdgeCount());
     }
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java Thu Feb 16 22:12:31 2012
@@ -34,11 +34,10 @@ import org.apache.hadoop.io.WritableComp
  */
 @SuppressWarnings("rawtypes")
 public abstract class RangeMasterPartitioner<I extends WritableComparable,
-        V extends Writable, E extends Writable, M extends Writable> implements
-        MasterGraphPartitioner<I, V, E, M> {
-
-    @Override
-    public PartitionStats createPartitionStats() {
-        return new RangePartitionStats<I>();
-    }
+    V extends Writable, E extends Writable, M extends Writable> implements
+    MasterGraphPartitioner<I, V, E, M> {
+  @Override
+  public PartitionStats createPartitionStats() {
+    return new RangePartitionStats<I>();
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java Thu Feb 16 22:12:31 2012
@@ -33,31 +33,43 @@ import org.apache.hadoop.io.WritableComp
  */
 @SuppressWarnings("rawtypes")
 public class RangePartitionOwner<I extends WritableComparable>
-        extends BasicPartitionOwner {
-    /** Max index for this partition */
-    private I maxIndex;
+    extends BasicPartitionOwner {
+  /** Max index for this partition */
+  private I maxIndex;
 
-    public RangePartitionOwner() {
-    }
+  /**
+   * Default constructor.
+   */
+  public RangePartitionOwner() { }
 
-    public RangePartitionOwner(I maxIndex) {
-        this.maxIndex = maxIndex;
-    }
+  /**
+   * Constructor with the max index.
+   *
+   * @param maxIndex Max index of this partition.
+   */
+  public RangePartitionOwner(I maxIndex) {
+    this.maxIndex = maxIndex;
+  }
 
-    public I getMaxIndex() {
-        return maxIndex;
-    }
+  /**
+   * Get the maximum index of this partition owner.
+   *
+   * @return Maximum index.
+   */
+  public I getMaxIndex() {
+    return maxIndex;
+  }
 
-    @Override
-    public void readFields(DataInput input) throws IOException {
-        super.readFields(input);
-        maxIndex = BspUtils.<I>createVertexIndex(getConf());
-        maxIndex.readFields(input);
-    }
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    super.readFields(input);
+    maxIndex = BspUtils.<I>createVertexIndex(getConf());
+    maxIndex.readFields(input);
+  }
 
-    @Override
-    public void write(DataOutput output) throws IOException {
-        super.write(output);
-        maxIndex.write(output);
-    }
+  @Override
+  public void write(DataOutput output) throws IOException {
+    super.write(output);
+    maxIndex.write(output);
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java Thu Feb 16 22:12:31 2012
@@ -32,37 +32,37 @@ import org.apache.hadoop.io.WritableComp
  */
 @SuppressWarnings("rawtypes")
 public class RangePartitionStats<I extends WritableComparable>
-        extends PartitionStats {
-    /** Can be null if no hint, otherwise a splitting hint */
-    private RangeSplitHint<I> hint;
+    extends PartitionStats {
+  /** Can be null if no hint, otherwise a splitting hint */
+  private RangeSplitHint<I> hint;
 
-    /**
-     * Get the range split hint (if any)
-     *
-     * @return Hint of how to split the range if desired, null otherwise
-     */
-    public RangeSplitHint<I> getRangeSplitHint() {
-        return hint;
-    }
+  /**
+   * Get the range split hint (if any)
+   *
+   * @return Hint of how to split the range if desired, null otherwise
+   */
+  public RangeSplitHint<I> getRangeSplitHint() {
+    return hint;
+  }
 
-    @Override
-    public void readFields(DataInput input) throws IOException {
-        super.readFields(input);
-        boolean hintExists = input.readBoolean();
-        if (hintExists) {
-            hint = new RangeSplitHint<I>();
-            hint.readFields(input);
-        } else {
-            hint = null;
-        }
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    super.readFields(input);
+    boolean hintExists = input.readBoolean();
+    if (hintExists) {
+      hint = new RangeSplitHint<I>();
+      hint.readFields(input);
+    } else {
+      hint = null;
     }
+  }
 
-    @Override
-    public void write(DataOutput output) throws IOException {
-        super.write(output);
-        output.writeBoolean(hint != null);
-        if (hint != null) {
-            hint.write(output);
-        }
+  @Override
+  public void write(DataOutput output) throws IOException {
+    super.write(output);
+    output.writeBoolean(hint != null);
+    if (hint != null) {
+      hint.write(output);
     }
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java Thu Feb 16 22:12:31 2012
@@ -36,38 +36,38 @@ import org.apache.hadoop.io.WritableComp
  */
 @SuppressWarnings("rawtypes")
 public class RangeSplitHint<I extends WritableComparable>
-        implements Writable, Configurable {
-    /** Hinted split index */
-    private I splitIndex;
-    /** Number of vertices in this range before the split */
-    private long preSplitVertexCount;
-    /** Number of vertices in this range after the split */
-    private long postSplitVertexCount;
-    /** Configuration */
-    private Configuration conf;
+    implements Writable, Configurable {
+  /** Hinted split index */
+  private I splitIndex;
+  /** Number of vertices in this range before the split */
+  private long preSplitVertexCount;
+  /** Number of vertices in this range after the split */
+  private long postSplitVertexCount;
+  /** Configuration */
+  private Configuration conf;
 
-    @Override
-    public void readFields(DataInput input) throws IOException {
-        splitIndex = BspUtils.<I>createVertexIndex(conf);
-        splitIndex.readFields(input);
-        preSplitVertexCount = input.readLong();
-        postSplitVertexCount = input.readLong();
-    }
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    splitIndex = BspUtils.<I>createVertexIndex(conf);
+    splitIndex.readFields(input);
+    preSplitVertexCount = input.readLong();
+    postSplitVertexCount = input.readLong();
+  }
 
-    @Override
-    public void write(DataOutput output) throws IOException {
-        splitIndex.write(output);
-        output.writeLong(preSplitVertexCount);
-        output.writeLong(postSplitVertexCount);
-    }
+  @Override
+  public void write(DataOutput output) throws IOException {
+    splitIndex.write(output);
+    output.writeLong(preSplitVertexCount);
+    output.writeLong(postSplitVertexCount);
+  }
 
-    @Override
-    public Configuration getConf() {
-        return conf;
-    }
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
 
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java Thu Feb 16 22:12:31 2012
@@ -43,36 +43,36 @@ import org.apache.hadoop.io.WritableComp
  */
 @SuppressWarnings("rawtypes")
 public abstract class RangeWorkerPartitioner<I extends WritableComparable,
-        V extends Writable, E extends Writable, M extends Writable> implements
-        WorkerGraphPartitioner<I, V, E, M> {
-    /** Mapping of the vertex ids to the {@link PartitionOwner} */
-    protected NavigableMap<I, RangePartitionOwner<I>> vertexRangeMap =
-        new TreeMap<I, RangePartitionOwner<I>>();
+    V extends Writable, E extends Writable, M extends Writable> implements
+    WorkerGraphPartitioner<I, V, E, M> {
+  /** Mapping of the vertex ids to the {@link PartitionOwner} */
+  protected NavigableMap<I, RangePartitionOwner<I>> vertexRangeMap =
+      new TreeMap<I, RangePartitionOwner<I>>();
 
-    @Override
-    public PartitionOwner createPartitionOwner() {
-        return new RangePartitionOwner<I>();
-    }
+  @Override
+  public PartitionOwner createPartitionOwner() {
+    return new RangePartitionOwner<I>();
+  }
 
-    @Override
-    public PartitionOwner getPartitionOwner(I vertexId) {
-        // Find the partition owner based on the maximum partition id.
-        // If the vertex id exceeds any of the maximum partition ids, give
-        // it to the last one
-        if (vertexId == null) {
-            throw new IllegalArgumentException(
-                "getPartitionOwner: Illegal null vertex id");
-        }
-        I maxVertexIndex = vertexRangeMap.ceilingKey(vertexId);
-        if (maxVertexIndex == null) {
-            return vertexRangeMap.lastEntry().getValue();
-        } else {
-            return vertexRangeMap.get(vertexId);
-        }
+  @Override
+  public PartitionOwner getPartitionOwner(I vertexId) {
+    // Find the partition owner based on the maximum partition id.
+    // If the vertex id exceeds any of the maximum partition ids, give
+    // it to the last one
+    if (vertexId == null) {
+      throw new IllegalArgumentException(
+          "getPartitionOwner: Illegal null vertex id");
     }
-
-    @Override
-    public Collection<? extends PartitionOwner> getPartitionOwners() {
-        return vertexRangeMap.values();
+    I maxVertexIndex = vertexRangeMap.ceilingKey(vertexId);
+    if (maxVertexIndex == null) {
+      return vertexRangeMap.lastEntry().getValue();
+    } else {
+      return vertexRangeMap.get(vertexId);
     }
+  }
+
+  @Override
+  public Collection<? extends PartitionOwner> getPartitionOwners() {
+    return vertexRangeMap.values();
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java Thu Feb 16 22:12:31 2012
@@ -29,62 +29,67 @@ import org.apache.hadoop.io.WritableComp
  * Stores the {@link PartitionOwner} objects from the master and provides the
  * mapping of vertex to {@link PartitionOwner}. Also generates the partition
  * owner implementation.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public interface WorkerGraphPartitioner<I extends WritableComparable,
-        V extends Writable, E extends Writable, M extends Writable> {
-    /**
-     * Instantiate the {@link PartitionOwner} implementation used to read the
-     * master assignments.
-     *
-     * @return Instantiated {@link PartitionOwner} object
-     */
-    PartitionOwner createPartitionOwner();
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Instantiate the {@link PartitionOwner} implementation used to read the
+   * master assignments.
+   *
+   * @return Instantiated {@link PartitionOwner} object
+   */
+  PartitionOwner createPartitionOwner();
 
-    /**
-     * Figure out the owner of a vertex
-     *
-     * @param vertexId Vertex id to get the partition for
-     * @return Correct partition owner
-     */
-    PartitionOwner getPartitionOwner(I vertexId);
+  /**
+   * Figure out the owner of a vertex
+   *
+   * @param vertexId Vertex id to get the partition for
+   * @return Correct partition owner
+   */
+  PartitionOwner getPartitionOwner(I vertexId);
 
-    /**
-     * At the end of a superstep, workers have {@link PartitionStats} generated
-     * for each of their partitions.  This method will allow the user to
-     * modify or create their own {@link PartitionStats} interfaces to send to
-     * the master.
-     *
-     * @param workerPartitionStats Stats generated by the infrastructure during
-     *        the superstep
-     * @param partitionMap Map of all the partitions owned by this worker
-     *        (could be used to provide more useful stat information)
-     * @return Final partition stats
-     */
-    Collection<PartitionStats> finalizePartitionStats(
-            Collection<PartitionStats> workerPartitionStats,
-            Map<Integer, Partition<I, V, E, M>> partitionMap);
+  /**
+   * At the end of a superstep, workers have {@link PartitionStats} generated
+   * for each of their partitions.  This method will allow the user to
+   * modify or create their own {@link PartitionStats} interfaces to send to
+   * the master.
+   *
+   * @param workerPartitionStats Stats generated by the infrastructure during
+   *        the superstep
+   * @param partitionMap Map of all the partitions owned by this worker
+   *        (could be used to provide more useful stat information)
+   * @return Final partition stats
+   */
+  Collection<PartitionStats> finalizePartitionStats(
+      Collection<PartitionStats> workerPartitionStats,
+      Map<Integer, Partition<I, V, E, M>> partitionMap);
 
-    /**
-     * Get the partitions owners and update locally.  Returns the partitions
-     * to send to other workers and other dependencies.
-     *
-     * @param myWorkerInfo Worker info.
-     * @param masterSetPartitionOwners Master set partition owners, received
-     *        prior to beginning the superstep
-     * @param partitionMap Map of all the partitions owned by this worker
-     *        (can be used to fill the return map of partitions to send)
-     * @return Information for the partition exchange.
-     */
-    PartitionExchange updatePartitionOwners(
-            WorkerInfo myWorkerInfo,
-            Collection<? extends PartitionOwner> masterSetPartitionOwners,
-            Map<Integer, Partition<I, V, E, M>> partitionMap);
+  /**
+   * Get the partitions owners and update locally.  Returns the partitions
+   * to send to other workers and other dependencies.
+   *
+   * @param myWorkerInfo Worker info.
+   * @param masterSetPartitionOwners Master set partition owners, received
+   *        prior to beginning the superstep
+   * @param partitionMap Map of all the partitions owned by this worker
+   *        (can be used to fill the return map of partitions to send)
+   * @return Information for the partition exchange.
+   */
+  PartitionExchange updatePartitionOwners(
+      WorkerInfo myWorkerInfo,
+      Collection<? extends PartitionOwner> masterSetPartitionOwners,
+      Map<Integer, Partition<I, V, E, M>> partitionMap);
 
-    /**
-     * Get a collection of the {@link PartitionOwner} objects.
-     *
-     * @return Collection of owners for every partition.
-     */
-    Collection<? extends PartitionOwner> getPartitionOwners();
+  /**
+   * Get a collection of the {@link PartitionOwner} objects.
+   *
+   * @return Collection of owners for every partition.
+   */
+  Collection<? extends PartitionOwner> getPartitionOwners();
 }

Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.giraph.bsp;
-
 /**
- *  State of the BSP application
+ * Package of partitioning related objects.
  */
-public enum ApplicationState {
-    UNKNOWN, ///< Shouldn't be seen, just an initial state
-    START_SUPERSTEP, ///< Start from a desired superstep
-    FAILED, ///< Unrecoverable
-    FINISHED ///< Successful completion
-}
+package org.apache.giraph.graph.partition;

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspPolicyProvider.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspPolicyProvider.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspPolicyProvider.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspPolicyProvider.java Thu Feb 16 22:12:31 2012
@@ -23,18 +23,21 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.authorize.Service;
 
 /**
-  * {@link PolicyProvider} for Map-Reduce protocols.
-  */
+ * {@link PolicyProvider} for Map-Reduce protocols.
+ */
 public class BspPolicyProvider extends PolicyProvider {
-    private static final Service[] bspCommunicationsServices =
-        new Service[] {
-            new Service("security.bsp.communications.protocol.acl",
-                        CommunicationsInterface.class),
+  /**
+   * Communication services array.
+   */
+  private static final Service[] BSP_COMMUNICATION_SERVICES =
+    new Service[] {
+      new Service("security.bsp.communications.protocol.acl",
+                  CommunicationsInterface.class),
     };
 
-    @Override
-    public Service[] getServices() {
-        return bspCommunicationsServices;
-    }
+  @Override
+  public Service[] getServices() {
+    return BSP_COMMUNICATION_SERVICES;
+  }
 }
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspTokenSelector.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspTokenSelector.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspTokenSelector.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspTokenSelector.java Thu Feb 16 22:12:31 2012
@@ -27,25 +27,24 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.TokenSelector;
 
 /**
-  * Look through tokens to find the first job token that matches the service
-  * and return it.
-  */
+ * Look through tokens to find the first job token that matches the service
+ * and return it.
+ */
 public class BspTokenSelector implements TokenSelector<JobTokenIdentifier> {
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public Token<JobTokenIdentifier> selectToken(Text service,
-            Collection<Token<? extends TokenIdentifier>> tokens) {
-        if (service == null) {
-            return null;
-        }
-        Text KIND_NAME = new Text("mapreduce.job");
-        for (Token<? extends TokenIdentifier> token : tokens) {
-            if (KIND_NAME.equals(token.getKind())) {
-                return (Token<JobTokenIdentifier>) token;
-            }
-        }
-        return null;
+  @SuppressWarnings("unchecked")
+  @Override
+  public Token<JobTokenIdentifier> selectToken(Text service,
+      Collection<Token<? extends TokenIdentifier>> tokens) {
+    if (service == null) {
+      return null;
+    }
+    Text kindName = new Text("mapreduce.job");
+    for (Token<? extends TokenIdentifier> token : tokens) {
+      if (kindName.equals(token.getKind())) {
+        return (Token<JobTokenIdentifier>) token;
+      }
     }
+    return null;
+  }
 }
 

Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.giraph.bsp;
-
 /**
- *  State of the BSP application
+ * Package of policy and token information for Hadoop.
  */
-public enum ApplicationState {
-    UNKNOWN, ///< Shouldn't be seen, just an initial state
-    START_SUPERSTEP, ///< Start from a desired superstep
-    FAILED, ///< Unrecoverable
-    FINISHED ///< Successful completion
-}
+package org.apache.giraph.hadoop;

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java Thu Feb 16 22:12:31 2012
@@ -38,84 +38,90 @@ import org.apache.log4j.Logger;
  * Example graph partitioner that builds on {@link HashMasterPartitioner} to
  * send the partitions to the worker that matches the superstep.  It is for
  * testing only and should never be used in practice.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
-public class SuperstepHashPartitionerFactory<
-        I extends WritableComparable,
-        V extends Writable, E extends Writable, M extends Writable>
-        extends HashPartitionerFactory<I, V, E, M> {
+public class SuperstepHashPartitionerFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends HashPartitionerFactory<I, V, E, M> {
+  /**
+   * Changes the {@link HashMasterPartitioner} to make ownership of the
+   * partitions based on a superstep.  For testing only as it is totally
+   * unbalanced.
+   *
+   * @param <I> vertex id
+   * @param <V> vertex data
+   * @param <E> edge data
+   * @param <M> message data
+   */
+  private static class SuperstepMasterPartition<I extends WritableComparable,
+      V extends Writable, E extends Writable, M extends Writable>
+      extends HashMasterPartitioner<I, V, E, M> {
+    /** Class logger */
+    private static Logger LOG =
+        Logger.getLogger(SuperstepMasterPartition.class);
 
     /**
-     * Changes the {@link HashMasterPartitioner} to make ownership of the
-     * partitions based on a superstep.  For testing only as it is totally
-     * unbalanced.
+     * Construction with configuration.
      *
-     * @param <I> vertex id
-     * @param <V> vertex data
-     * @param <E> edge data
-     * @param <M> message data
+     * @param conf Configuration to be stored.
      */
-    private static class SuperstepMasterPartition<
-            I extends WritableComparable,
-            V extends Writable, E extends Writable, M extends Writable>
-            extends HashMasterPartitioner<I, V, E, M> {
-        /** Class logger */
-        private static Logger LOG =
-            Logger.getLogger(SuperstepMasterPartition.class);
+    public SuperstepMasterPartition(Configuration conf) {
+      super(conf);
+    }
 
-        public SuperstepMasterPartition(Configuration conf) {
-            super(conf);
+    @Override
+    public Collection<PartitionOwner> generateChangedPartitionOwners(
+        Collection<PartitionStats> allPartitionStatsList,
+        Collection<WorkerInfo> availableWorkerInfos,
+        int maxWorkers,
+        long superstep) {
+      // Assign all the partitions to
+      // superstep mod availableWorkerInfos
+      // Guaranteed to be different if the workers (and their order)
+      // do not change
+      long workerIndex = superstep % availableWorkerInfos.size();
+      int i = 0;
+      WorkerInfo chosenWorkerInfo = null;
+      for (WorkerInfo workerInfo : availableWorkerInfos) {
+        if (workerIndex == i) {
+          chosenWorkerInfo = workerInfo;
         }
+        ++i;
+      }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("generateChangedPartitionOwners: Chosen worker " +
+                 "for superstep " + superstep + " is " +
+                 chosenWorkerInfo);
+      }
 
-        @Override
-        public Collection<PartitionOwner> generateChangedPartitionOwners(
-                Collection<PartitionStats> allPartitionStatsList,
-                Collection<WorkerInfo> availableWorkerInfos,
-                int maxWorkers,
-                long superstep) {
-            // Assign all the partitions to
-            // superstep mod availableWorkerInfos
-            // Guaranteed to be different if the workers (and their order)
-            // do not change
-            long workerIndex = superstep % availableWorkerInfos.size();
-            int i = 0;
-            WorkerInfo chosenWorkerInfo = null;
-            for (WorkerInfo workerInfo : availableWorkerInfos) {
-                if (workerIndex == i) {
-                    chosenWorkerInfo = workerInfo;
-                }
-                ++i;
-            }
-            if (LOG.isInfoEnabled()) {
-                LOG.info("generateChangedPartitionOwners: Chosen worker " +
-                         "for superstep " + superstep + " is " +
-                         chosenWorkerInfo);
-            }
-
-            List<PartitionOwner> partitionOwnerList =
-                new ArrayList<PartitionOwner>();
-            for (PartitionOwner partitionOwner :
-                    getCurrentPartitionOwners()) {
-                WorkerInfo prevWorkerinfo =
-                    partitionOwner.getWorkerInfo().equals(chosenWorkerInfo) ?
-                        null : partitionOwner.getWorkerInfo();
-                PartitionOwner tmpPartitionOwner =
-                    new BasicPartitionOwner(partitionOwner.getPartitionId(),
-                                            chosenWorkerInfo,
-                                            prevWorkerinfo,
-                                            null);
-                partitionOwnerList.add(tmpPartitionOwner);
-                LOG.info("partition owner was " + partitionOwner +
-                         ", new " + tmpPartitionOwner);
-            }
-            setPartitionOwnerList(partitionOwnerList);
-            return partitionOwnerList;
-        }
+      List<PartitionOwner> partitionOwnerList = new ArrayList<PartitionOwner>();
+      for (PartitionOwner partitionOwner :
+        getCurrentPartitionOwners()) {
+        WorkerInfo prevWorkerinfo =
+          partitionOwner.getWorkerInfo().equals(chosenWorkerInfo) ?
+            null : partitionOwner.getWorkerInfo();
+        PartitionOwner tmpPartitionOwner =
+          new BasicPartitionOwner(partitionOwner.getPartitionId(),
+                                  chosenWorkerInfo,
+                                  prevWorkerinfo,
+                                  null);
+        partitionOwnerList.add(tmpPartitionOwner);
+        LOG.info("partition owner was " + partitionOwner +
+            ", new " + tmpPartitionOwner);
+      }
+      setPartitionOwnerList(partitionOwnerList);
+      return partitionOwnerList;
     }
+  }
 
-    @Override
-    public MasterGraphPartitioner<I, V, E, M>
-            createMasterGraphPartitioner() {
-        return new SuperstepMasterPartition<I, V, E, M>(getConf());
-    }
+  @Override
+  public MasterGraphPartitioner<I, V, E, M>
+  createMasterGraphPartitioner() {
+    return new SuperstepMasterPartition<I, V, E, M>(getConf());
+  }
 }

Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.giraph.bsp;
-
 /**
- *  State of the BSP application
+ * Package of all helper integration test objects.
  */
-public enum ApplicationState {
-    UNKNOWN, ///< Shouldn't be seen, just an initial state
-    START_SUPERSTEP, ///< Start from a desired superstep
-    FAILED, ///< Unrecoverable
-    FINISHED ///< Successful completion
-}
+package org.apache.giraph.integration;

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java Thu Feb 16 22:12:31 2012
@@ -38,26 +38,41 @@ import java.io.IOException;
  * @param <E> Edge value
  */
 @SuppressWarnings("rawtypes")
-public class AdjacencyListTextVertexOutputFormat <I extends WritableComparable,
-    V extends Writable, E extends Writable> extends TextVertexOutputFormat<I, V, E>{
-
+public class AdjacencyListTextVertexOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends TextVertexOutputFormat<I, V, E> {
+
+  /**
+   * Vertex writer associated wtih {@link AdjacencyListTextVertexOutputFormat}.
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   */
   static class AdjacencyListVertexWriter<I extends WritableComparable, V extends
       Writable, E extends Writable> extends TextVertexWriter<I, V, E> {
+    /** Split delimiter */
     public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
+    /** Default split delimiter */
     public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
-
+    /** Cached split delimeter */
     private String delimiter;
 
-    public AdjacencyListVertexWriter(RecordWriter<Text,Text> recordWriter) {
+    /**
+     * Constructor with writer.
+     *
+     * @param recordWriter Record writer used for writing.
+     */
+    public AdjacencyListVertexWriter(RecordWriter<Text, Text> recordWriter) {
       super(recordWriter);
     }
 
     @Override
     public void writeVertex(BasicVertex<I, V, E, ?> vertex) throws IOException,
-        InterruptedException {
+    InterruptedException {
       if (delimiter == null) {
         delimiter = getContext().getConfiguration()
-           .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+            .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
       }
 
       StringBuffer sb = new StringBuffer(vertex.getVertexId().toString());
@@ -75,9 +90,8 @@ public class AdjacencyListTextVertexOutp
 
   @Override
   public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
-      throws IOException, InterruptedException {
+    throws IOException, InterruptedException {
     return new AdjacencyListVertexWriter<I, V, E>
-        (textOutputFormat.getRecordWriter(context));
+    (textOutputFormat.getRecordWriter(context));
   }
-
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java Thu Feb 16 22:12:31 2012
@@ -21,6 +21,7 @@ import com.google.common.collect.Maps;
 import org.apache.giraph.graph.BasicVertex;
 import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.Edge;
+import org.apache.giraph.lib.TextVertexInputFormat.TextVertexReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -42,15 +43,17 @@ import java.util.Map;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public abstract class AdjacencyListVertexReader<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> extends
     TextVertexInputFormat.TextVertexReader<I, V, E, M> {
-
+  /** Delimiter for split */
   public static final String LINE_TOKENIZE_VALUE = "adj.list.input.delimiter";
+  /** Default delimiter for split */
   public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
-
+  /** Cached delimiter used for split */
   private String splitValue = null;
 
   /**
@@ -59,17 +62,37 @@ public abstract class AdjacencyListVerte
   public interface LineSanitizer {
     /**
      * Clean string s before attempting to tokenize it.
+     *
+     * @param s String to be cleaned.
+     * @return Sanitized string.
      */
-    public String sanitize(String s);
+    String sanitize(String s);
   }
 
-  private LineSanitizer sanitizer = null;
+  /**
+   * Sanitizer from constructor.
+   */
+  private final LineSanitizer sanitizer;
 
-  public AdjacencyListVertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
+  /**
+   * Constructor with line record reader.
+   *
+   * @param lineRecordReader Reader from {@link TextVertexReader}.
+   */
+  public AdjacencyListVertexReader(
+      RecordReader<LongWritable, Text> lineRecordReader) {
     super(lineRecordReader);
+    sanitizer = null;
   }
 
-  public AdjacencyListVertexReader(RecordReader<LongWritable, Text> lineRecordReader,
+  /**
+   * Constructor with line record reader.
+   *
+   * @param lineRecordReader Reader from {@link TextVertexReader}.
+   * @param sanitizer Sanitizer to be used.
+   */
+  public AdjacencyListVertexReader(
+      RecordReader<LongWritable, Text> lineRecordReader,
       LineSanitizer sanitizer) {
     super(lineRecordReader);
     this.sanitizer = sanitizer;
@@ -77,17 +100,18 @@ public abstract class AdjacencyListVerte
 
   /**
    * Store the Id for this line in an instance of its correct type.
+   *
    * @param s Id of vertex from line
    * @param id Instance of Id's type, in which to store its value
    */
-  abstract public void decodeId(String s, I id);
+  public abstract void decodeId(String s, I id);
 
   /**
    * Store the value for this line in an instance of its correct type.
    * @param s Value from line
    * @param value Instance of value's type, in which to store its value
    */
-  abstract public void decodeValue(String s, V value);
+  public abstract void decodeValue(String s, V value);
 
   /**
    * Store an edge from the line into an instance of a correctly typed Edge
@@ -95,7 +119,7 @@ public abstract class AdjacencyListVerte
    * @param value The edge's value from the line
    * @param edge Instance of edge in which to store the id and value
    */
-  abstract public void decodeEdge(String id, String value, Edge<I, E> edge);
+  public abstract void decodeEdge(String id, String value, Edge<I, E> edge);
 
 
   @Override
@@ -104,7 +128,8 @@ public abstract class AdjacencyListVerte
   }
 
   @Override
-  public BasicVertex<I, V, E, M> getCurrentVertex() throws IOException, InterruptedException {
+  public BasicVertex<I, V, E, M> getCurrentVertex()
+    throws IOException, InterruptedException {
     Configuration conf = getContext().getConfiguration();
     String line = getRecordReader().getCurrentValue().toString();
     BasicVertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
@@ -120,7 +145,8 @@ public abstract class AdjacencyListVerte
     String [] values = line.split(splitValue);
 
     if ((values.length < 2) || (values.length % 2 != 0)) {
-      throw new IllegalArgumentException("Line did not split correctly: " + line);
+      throw new IllegalArgumentException(
+        "Line did not split correctly: " + line);
     }
 
     I vertexId = BspUtils.<I>createVertexIndex(conf);
@@ -132,7 +158,7 @@ public abstract class AdjacencyListVerte
     int i = 2;
     Map<I, E> edges = Maps.newHashMap();
     Edge<I, E> edge = new Edge<I, E>();
-    while(i < values.length) {
+    while (i < values.length) {
       decodeEdge(values[i], values[i + 1], edge);
       edges.put(edge.getDestVertexId(), edge.getEdgeValue());
       i += 2;

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/IdWithValueTextOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/IdWithValueTextOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/IdWithValueTextOutputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/IdWithValueTextOutputFormat.java Thu Feb 16 22:12:31 2012
@@ -40,30 +40,45 @@ import java.io.IOException;
  * @param <E> Edge value
  */
 @SuppressWarnings("rawtypes")
-public class IdWithValueTextOutputFormat <I extends WritableComparable,
-    V extends Writable, E extends Writable> extends TextVertexOutputFormat<I, V, E>{
-
+public class IdWithValueTextOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends TextVertexOutputFormat<I, V, E> {
+
+  /**
+   * Vertex writer used with {@link IdWithValueTextOutputFormat}.
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   */
   static class IdWithValueVertexWriter<I extends WritableComparable, V extends
       Writable, E extends Writable> extends TextVertexWriter<I, V, E> {
-
+    /** Specify the output delimiter */
     public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
+    /** Default output delimiter */
     public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
-
+    /** Reverse id and value order? */
     public static final String REVERSE_ID_AND_VALUE = "reverse.id.and.value";
+    /** Default is to not reverse id and value order. */
     public static final boolean REVERSE_ID_AND_VALUE_DEFAULT = false;
-
+    /** Saved delimiter */
     private String delimiter;
 
+    /**
+     * Constructor with record writer.
+     *
+     * @param recordWriter Writer from LineRecordWriter.
+     */
     public IdWithValueVertexWriter(RecordWriter<Text, Text> recordWriter) {
       super(recordWriter);
     }
 
     @Override
     public void writeVertex(BasicVertex<I, V, E, ?> vertex) throws IOException,
-        InterruptedException {
+    InterruptedException {
       if (delimiter == null) {
         delimiter = getContext().getConfiguration()
-           .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+            .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
       }
 
       String first;
@@ -87,9 +102,8 @@ public class IdWithValueTextOutputFormat
 
   @Override
   public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
-      throws IOException, InterruptedException {
+    throws IOException, InterruptedException {
     return new IdWithValueVertexWriter<I, V, E>
-        (textOutputFormat.getRecordWriter(context));
+    (textOutputFormat.getRecordWriter(context));
   }
-
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexFormat.java Thu Feb 16 22:12:31 2012
@@ -21,11 +21,16 @@ package org.apache.giraph.lib;
 /**
  * Keeps the vertex keys for the input/output vertex format
  */
-public interface JsonBase64VertexFormat {
-    /** Vertex id key */
-    public static final String VERTEX_ID_KEY = "vertexId";
-    /** Vertex value key*/
-    public static final String VERTEX_VALUE_KEY = "vertexValue";
-    /** Edge value array key (all the edges are stored here) */
-    public static final String EDGE_ARRAY_KEY = "edgeArray";
+public class JsonBase64VertexFormat {
+  /** Vertex id key */
+  public static final String VERTEX_ID_KEY = "vertexId";
+  /** Vertex value key*/
+  public static final String VERTEX_VALUE_KEY = "vertexValue";
+  /** Edge value array key (all the edges are stored here) */
+  public static final String EDGE_ARRAY_KEY = "edgeArray";
+
+  /**
+   * Don't construct.
+   */
+  private JsonBase64VertexFormat() { }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java Thu Feb 16 22:12:31 2012
@@ -51,110 +51,110 @@ import java.util.Map;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
-public class JsonBase64VertexInputFormat<
-        I extends WritableComparable, V extends Writable, E extends Writable,
-        M extends Writable>
-        extends TextVertexInputFormat<I, V, E, M> implements
-        JsonBase64VertexFormat {
+public class JsonBase64VertexInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends TextVertexInputFormat<I, V, E, M> {
+  /**
+   * Simple reader that supports {@link JsonBase64VertexInputFormat}
+   *
+   * @param <I> Vertex index value
+   * @param <V> Vertex value
+   * @param <E> Edge value
+   * @param <M> Message data
+   */
+  private static class JsonBase64VertexReader<I extends WritableComparable,
+      V extends Writable, E extends Writable, M extends Writable>
+      extends TextVertexReader<I, V, E, M> {
     /**
-     * Simple reader that supports {@link JsonBase64VertexInputFormat}
+     * Only constructor.  Requires the LineRecordReader
      *
-     * @param <I> Vertex index value
-     * @param <V> Vertex value
-     * @param <E> Edge value
+     * @param lineRecordReader Line record reader to read from
      */
-    private static class JsonBase64VertexReader<
-            I extends WritableComparable, V extends Writable,
-            E extends Writable, M extends Writable> extends TextVertexReader<I, V, E, M> {
-        /**
-         * Only constructor.  Requires the LineRecordReader
-         *
-         * @param lineRecordReader Line record reader to read from
-         */
-        public JsonBase64VertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
-            super(lineRecordReader);
-        }
+    public JsonBase64VertexReader(
+        RecordReader<LongWritable, Text> lineRecordReader) {
+      super(lineRecordReader);
+    }
 
-        @Override
-        public boolean nextVertex() throws IOException, InterruptedException {
-            return getRecordReader().nextKeyValue();
-        }
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
 
-        @Override
-        public BasicVertex<I, V, E, M> getCurrentVertex()
-                throws IOException, InterruptedException {
-            Configuration conf = getContext().getConfiguration();
-            BasicVertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
+    @Override
+    public BasicVertex<I, V, E, M> getCurrentVertex()
+      throws IOException, InterruptedException {
+      Configuration conf = getContext().getConfiguration();
+      BasicVertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
 
-            Text line = getRecordReader().getCurrentValue();
-            JSONObject vertexObject;
-            try {
-                vertexObject = new JSONObject(line.toString());
-            } catch (JSONException e) {
-                throw new IllegalArgumentException(
-                    "next: Failed to get the vertex", e);
-            }
-            DataInput input = null;
-            byte[] decodedWritable = null;
-            I vertexId = null;
-            try {
-                decodedWritable = Base64.decode(
-                    vertexObject.getString(VERTEX_ID_KEY));
-                input = new DataInputStream(
-                    new ByteArrayInputStream(decodedWritable));
-                vertexId = BspUtils.<I>createVertexIndex(conf);
-                vertexId.readFields(input);
-            } catch (JSONException e) {
-                throw new IllegalArgumentException(
-                    "next: Failed to get vertex id", e);
-            }
-            V vertexValue = null;
-            try {
-                decodedWritable = Base64.decode(
-                    vertexObject.getString(VERTEX_VALUE_KEY));
-                input = new DataInputStream(
-                    new ByteArrayInputStream(decodedWritable));
-                vertexValue = BspUtils.<V>createVertexValue(conf);
-                vertexValue.readFields(input);
-            } catch (JSONException e) {
-                throw new IllegalArgumentException(
-                    "next: Failed to get vertex value", e);
-            }
-            JSONArray edgeArray = null;
-            try {
-                edgeArray = vertexObject.getJSONArray(EDGE_ARRAY_KEY);
-            } catch (JSONException e) {
-                throw new IllegalArgumentException(
-                    "next: Failed to get edge array", e);
-            }
-            Map<I, E> edgeMap = Maps.newHashMap();
-            for (int i = 0; i < edgeArray.length(); ++i) {
-                try {
-                    decodedWritable =
-                        Base64.decode(edgeArray.getString(i));
-                } catch (JSONException e) {
-                    throw new IllegalArgumentException(
-                        "next: Failed to get edge value", e);
-                }
-                input = new DataInputStream(
-                    new ByteArrayInputStream(decodedWritable));
-                Edge<I, E> edge = new Edge<I, E>();
-                edge.setConf(getContext().getConfiguration());
-                edge.readFields(input);
-                edgeMap.put(edge.getDestVertexId(), edge.getEdgeValue());
-            }
-            vertex.initialize(vertexId, vertexValue, edgeMap, null);
-            return vertex;
+      Text line = getRecordReader().getCurrentValue();
+      JSONObject vertexObject;
+      try {
+        vertexObject = new JSONObject(line.toString());
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "next: Failed to get the vertex", e);
+      }
+      DataInput input = null;
+      byte[] decodedWritable = null;
+      I vertexId = null;
+      try {
+        decodedWritable = Base64.decode(
+          vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY));
+        input = new DataInputStream(
+          new ByteArrayInputStream(decodedWritable));
+        vertexId = BspUtils.<I>createVertexIndex(conf);
+        vertexId.readFields(input);
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "next: Failed to get vertex id", e);
+      }
+      V vertexValue = null;
+      try {
+        decodedWritable = Base64.decode(
+          vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY));
+        input = new DataInputStream(
+          new ByteArrayInputStream(decodedWritable));
+        vertexValue = BspUtils.<V>createVertexValue(conf);
+        vertexValue.readFields(input);
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "next: Failed to get vertex value", e);
+      }
+      JSONArray edgeArray = null;
+      try {
+        edgeArray = vertexObject.getJSONArray(
+          JsonBase64VertexFormat.EDGE_ARRAY_KEY);
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "next: Failed to get edge array", e);
+      }
+      Map<I, E> edgeMap = Maps.newHashMap();
+      for (int i = 0; i < edgeArray.length(); ++i) {
+        try {
+          decodedWritable = Base64.decode(edgeArray.getString(i));
+        } catch (JSONException e) {
+          throw new IllegalArgumentException(
+            "next: Failed to get edge value", e);
         }
+        input = new DataInputStream(
+            new ByteArrayInputStream(decodedWritable));
+        Edge<I, E> edge = new Edge<I, E>();
+        edge.setConf(getContext().getConfiguration());
+        edge.readFields(input);
+        edgeMap.put(edge.getDestVertexId(), edge.getEdgeValue());
+      }
+      vertex.initialize(vertexId, vertexValue, edgeMap, null);
+      return vertex;
     }
+  }
 
-    @Override
-    public VertexReader<I, V, E, M> createVertexReader(
-            InputSplit split,
-            TaskAttemptContext context) throws IOException {
-        return new JsonBase64VertexReader<I, V, E, M>(textInputFormat.createRecordReader(split,
-            context));
-    }
+  @Override
+  public VertexReader<I, V, E, M> createVertexReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return new JsonBase64VertexReader<I, V, E, M>(
+      textInputFormat.createRecordReader(split, context));
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexOutputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexOutputFormat.java Thu Feb 16 22:12:31 2012
@@ -47,80 +47,80 @@ import java.io.IOException;
  * @param <E> Edge value
  */
 @SuppressWarnings("rawtypes")
-public class JsonBase64VertexOutputFormat<
-        I extends WritableComparable, V extends Writable, E extends Writable>
-        extends TextVertexOutputFormat<I, V, E>
-        implements JsonBase64VertexFormat {
+public class JsonBase64VertexOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable> extends
+    TextVertexOutputFormat<I, V, E> {
+  /**
+   * Simple writer that supports {@link JsonBase64VertexOutputFormat}
+   *
+   * @param <I> Vertex index value
+   * @param <V> Vertex value
+   * @param <E> Edge value
+   */
+  private static class JsonBase64VertexWriter<I extends WritableComparable,
+      V extends Writable, E extends Writable> extends
+      TextVertexWriter<I, V, E> {
     /**
-     * Simple writer that supports {@link JsonBase64VertexOutputFormat}
+     * Only constructor.  Requires the LineRecordWriter
      *
-     * @param <I> Vertex index value
-     * @param <V> Vertex value
-     * @param <E> Edge value
+     * @param lineRecordWriter Line record writer to write to
      */
-    private static class JsonBase64VertexWriter<
-            I extends WritableComparable, V extends Writable,
-            E extends Writable> extends TextVertexWriter<I, V, E> {
-        /**
-         * Only constructor.  Requires the LineRecordWriter
-         *
-         * @param lineRecordWriter Line record writer to write to
-         */
-        public JsonBase64VertexWriter(
-                RecordWriter<Text, Text> lineRecordWriter) {
-            super(lineRecordWriter);
-        }
-
-        @Override
-        public void writeVertex(BasicVertex<I, V, E, ?> vertex)
-                throws IOException, InterruptedException {
-            ByteArrayOutputStream outputStream =
-                new ByteArrayOutputStream();
-            DataOutput output = new DataOutputStream(outputStream);
-            JSONObject vertexObject = new JSONObject();
-            vertex.getVertexId().write(output);
-            try {
-                vertexObject.put(
-                    VERTEX_ID_KEY,
-                    Base64.encodeBytes(outputStream.toByteArray()));
-            } catch (JSONException e) {
-                throw new IllegalStateException(
-                    "writerVertex: Failed to insert vertex id", e);
-            }
-            outputStream.reset();
-            vertex.getVertexValue().write(output);
-            try {
-                vertexObject.put(
-                    VERTEX_VALUE_KEY,
-                    Base64.encodeBytes(outputStream.toByteArray()));
-            } catch (JSONException e) {
-                throw new IllegalStateException(
-                    "writerVertex: Failed to insert vertex value", e);
-            }
-            JSONArray edgeArray = new JSONArray();
-            for (I targetVertexId : vertex) {
-                Edge<I, E> edge = new Edge<I, E>(
-                    targetVertexId, vertex.getEdgeValue(targetVertexId));
-                edge.setConf(getContext().getConfiguration());
-                outputStream.reset();
-                edge.write(output);
-                edgeArray.put(Base64.encodeBytes(outputStream.toByteArray()));
-            }
-            try {
-                vertexObject.put(EDGE_ARRAY_KEY, edgeArray);
-            } catch (JSONException e) {
-                throw new IllegalStateException(
-                    "writerVertex: Failed to insert edge array", e);
-            }
-            getRecordWriter().write(new Text(vertexObject.toString()), null);
-        }
+    public JsonBase64VertexWriter(
+        RecordWriter<Text, Text> lineRecordWriter) {
+      super(lineRecordWriter);
     }
 
     @Override
-    public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
-            throws IOException, InterruptedException {
-        return new JsonBase64VertexWriter<I, V, E>(
-            textOutputFormat.getRecordWriter(context));
+    public void writeVertex(BasicVertex<I, V, E, ?> vertex)
+      throws IOException, InterruptedException {
+      ByteArrayOutputStream outputStream =
+          new ByteArrayOutputStream();
+      DataOutput output = new DataOutputStream(outputStream);
+      JSONObject vertexObject = new JSONObject();
+      vertex.getVertexId().write(output);
+      try {
+        vertexObject.put(
+          JsonBase64VertexFormat.VERTEX_ID_KEY,
+          Base64.encodeBytes(outputStream.toByteArray()));
+      } catch (JSONException e) {
+        throw new IllegalStateException(
+            "writerVertex: Failed to insert vertex id", e);
+      }
+      outputStream.reset();
+      vertex.getVertexValue().write(output);
+      try {
+        vertexObject.put(
+          JsonBase64VertexFormat.VERTEX_VALUE_KEY,
+          Base64.encodeBytes(outputStream.toByteArray()));
+      } catch (JSONException e) {
+        throw new IllegalStateException(
+            "writerVertex: Failed to insert vertex value", e);
+      }
+      JSONArray edgeArray = new JSONArray();
+      for (I targetVertexId : vertex) {
+        Edge<I, E> edge = new Edge<I, E>(
+            targetVertexId, vertex.getEdgeValue(targetVertexId));
+        edge.setConf(getContext().getConfiguration());
+        outputStream.reset();
+        edge.write(output);
+        edgeArray.put(Base64.encodeBytes(outputStream.toByteArray()));
+      }
+      try {
+        vertexObject.put(
+          JsonBase64VertexFormat.EDGE_ARRAY_KEY,
+          edgeArray);
+      } catch (JSONException e) {
+        throw new IllegalStateException(
+            "writerVertex: Failed to insert edge array", e);
+      }
+      getRecordWriter().write(new Text(vertexObject.toString()), null);
     }
+  }
 
+  @Override
+  public VertexWriter<I, V, E> createVertexWriter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return new JsonBase64VertexWriter<I, V, E>(
+        textOutputFormat.getRecordWriter(context));
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java Thu Feb 16 22:12:31 2012
@@ -35,19 +35,40 @@ import java.io.IOException;
  * 22 0.1 45 0.3 99 0.44
  * to repesent a vertex with id 22, value of 0.1 and edges to nodes 45 and 99,
  * with values of 0.3 and 0.44, respectively.
+ *
+ * @param <M> Message data
  */
-public class LongDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable> extends
-    TextVertexInputFormat<LongWritable, DoubleWritable, DoubleWritable, M>  {
-
+public class LongDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
+    extends TextVertexInputFormat<LongWritable, DoubleWritable,
+    DoubleWritable, M> {
+
+  /**
+   * VertexReader associated with
+   * {@link LongDoubleDoubleAdjacencyListVertexInputFormat}.
+   *
+   * @param <M> Message data.
+   */
   static class VertexReader<M extends Writable> extends
-      AdjacencyListVertexReader<LongWritable, DoubleWritable, DoubleWritable, M> {
+      AdjacencyListVertexReader<LongWritable, DoubleWritable,
+      DoubleWritable, M> {
 
+    /**
+     * Constructor with Line record reader.
+     *
+     * @param lineRecordReader Reader to internally use.
+     */
     VertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
       super(lineRecordReader);
     }
 
+    /**
+     * Constructor with Line record reader and sanitizer.
+     *
+     * @param lineRecordReader Reader to internally use.
+     * @param sanitizer Line sanitizer.
+     */
     VertexReader(RecordReader<LongWritable, Text> lineRecordReader,
-                 LineSanitizer sanitizer) {
+        LineSanitizer sanitizer) {
       super(lineRecordReader, sanitizer);
     }
 
@@ -62,8 +83,10 @@ public class LongDoubleDoubleAdjacencyLi
     }
 
     @Override
-    public void decodeEdge(String s1, String s2, Edge<LongWritable, DoubleWritable>
-        textIntWritableEdge) {
+    public void decodeEdge(
+        String s1,
+        String s2,
+        Edge<LongWritable, DoubleWritable> textIntWritableEdge) {
       textIntWritableEdge.setDestVertexId(new LongWritable(Long.valueOf(s1)));
       textIntWritableEdge.setEdgeValue(new DoubleWritable(Double.valueOf(s2)));
     }
@@ -71,10 +94,10 @@ public class LongDoubleDoubleAdjacencyLi
 
   @Override
   public org.apache.giraph.graph.VertexReader<LongWritable,
-    DoubleWritable, DoubleWritable, M> createVertexReader(
+  DoubleWritable, DoubleWritable, M> createVertexReader(
       InputSplit split,
       TaskAttemptContext context) throws IOException {
     return new VertexReader<M>(textInputFormat.createRecordReader(
-      split, context));
+        split, context));
   }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java Thu Feb 16 22:12:31 2012
@@ -31,49 +31,73 @@ import org.apache.hadoop.mapreduce.lib.i
 import java.io.IOException;
 import java.util.List;
 
+/**
+ * Sequence file vertex input format based on {@link SequenceFileInputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ * @param <X> Value type
+ */
 public class SequenceFileVertexInputFormat<I extends WritableComparable<I>,
-                                           V extends Writable,
-                                           E extends Writable,
-                                           M extends Writable,
-                                           X extends BasicVertex<I, V, E, M>>
+    V extends Writable, E extends Writable, M extends Writable,
+    X extends BasicVertex<I, V, E, M>>
     extends VertexInputFormat<I, V, E, M> {
-  protected SequenceFileInputFormat<I, X> sequenceFileInputFormat
-      = new SequenceFileInputFormat<I, X>();
+  /** Internal input format */
+  protected SequenceFileInputFormat<I, X> sequenceFileInputFormat =
+    new SequenceFileInputFormat<I, X>();
 
-  @Override public List<InputSplit> getSplits(JobContext context, int numWorkers)
-      throws IOException, InterruptedException {
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
     return sequenceFileInputFormat.getSplits(context);
   }
 
   @Override
   public VertexReader<I, V, E, M> createVertexReader(InputSplit split,
-      TaskAttemptContext context)
-      throws IOException {
+      TaskAttemptContext context) throws IOException {
     return new SequenceFileVertexReader<I, V, E, M, X>(
         sequenceFileInputFormat.createRecordReader(split, context));
   }
 
+  /**
+   * Vertex reader used with {@link SequenceFileVertexInputFormat}.
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param <X> Value type
+   */
   public static class SequenceFileVertexReader<I extends WritableComparable<I>,
       V extends Writable, E extends Writable, M extends Writable,
       X extends BasicVertex<I, V, E, M>>
       implements VertexReader<I, V, E, M> {
+    /** Internal record reader from {@link SequenceFileInputFormat} */
     private final RecordReader<I, X> recordReader;
 
+    /**
+     * Constructor with record reader.
+     *
+     * @param recordReader Reader from {@link SequenceFileInputFormat}.
+     */
     public SequenceFileVertexReader(RecordReader<I, X> recordReader) {
       this.recordReader = recordReader;
     }
 
-    @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-        throws IOException, InterruptedException {
+    @Override public void initialize(InputSplit inputSplit,
+        TaskAttemptContext context) throws IOException, InterruptedException {
       recordReader.initialize(inputSplit, context);
     }
 
-    @Override public boolean nextVertex() throws IOException, InterruptedException {
+    @Override public boolean nextVertex() throws IOException,
+        InterruptedException {
       return recordReader.nextKeyValue();
     }
 
     @Override public BasicVertex<I, V, E, M> getCurrentVertex()
-        throws IOException, InterruptedException {
+      throws IOException, InterruptedException {
       return recordReader.getCurrentValue();
     }
 
@@ -82,7 +106,8 @@ public class SequenceFileVertexInputForm
       recordReader.close();
     }
 
-    @Override public float getProgress() throws IOException, InterruptedException {
+    @Override public float getProgress() throws IOException,
+        InterruptedException {
       return recordReader.getProgress();
     }
   }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java Thu Feb 16 22:12:31 2012
@@ -32,19 +32,38 @@ import java.io.IOException;
  * Class to read graphs stored as adjacency lists with ids represented by
  * Strings and values as doubles.  This is a good inputformat for reading
  * graphs where the id types do not matter and can be stashed in a String.
+ *
+ * @param <M> Message type.
  */
 public class TextDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
     extends TextVertexInputFormat<Text, DoubleWritable, DoubleWritable, M>  {
 
-  static class VertexReader<M extends Writable> extends AdjacencyListVertexReader<Text,
-      DoubleWritable, DoubleWritable, M> {
 
+  /**
+   * Vertex reader used with
+   * {@link TextDoubleDoubleAdjacencyListVertexInputFormat}
+   *
+   * @param <M> Message type.
+   */
+  static class VertexReader<M extends Writable> extends
+      AdjacencyListVertexReader<Text, DoubleWritable, DoubleWritable, M> {
+    /**
+     * Constructor without sanitzer.
+     *
+     * @param lineRecordReader Internal reader.
+     */
     VertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
       super(lineRecordReader);
     }
 
+    /**
+     * Constructor with {@link LineRecordReader}
+     *
+     * @param lineRecordReader Internal reader.
+     * @param sanitizer Sanitizer of the lines.
+     */
     VertexReader(RecordReader<LongWritable, Text> lineRecordReader,
-                 LineSanitizer sanitizer) {
+        LineSanitizer sanitizer) {
       super(lineRecordReader, sanitizer);
     }
 
@@ -59,8 +78,8 @@ public class TextDoubleDoubleAdjacencyLi
     }
 
     @Override
-    public void decodeEdge(String s1, String s2, Edge<Text, DoubleWritable>
-            textIntWritableEdge) {
+    public void decodeEdge(String s1, String s2,
+                           Edge<Text, DoubleWritable> textIntWritableEdge) {
       textIntWritableEdge.setDestVertexId(new Text(s1));
       textIntWritableEdge.setEdgeValue(new DoubleWritable(Double.valueOf(s2)));
     }
@@ -68,11 +87,9 @@ public class TextDoubleDoubleAdjacencyLi
 
   @Override
   public org.apache.giraph.graph.VertexReader<Text, DoubleWritable,
-    DoubleWritable, M> createVertexReader(
-      InputSplit split,
+      DoubleWritable, M> createVertexReader(InputSplit split,
       TaskAttemptContext context) throws IOException {
     return new VertexReader<M>(textInputFormat.createRecordReader(
-      split, context));
+        split, context));
   }
-
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java Thu Feb 16 22:12:31 2012
@@ -43,85 +43,80 @@ import java.util.List;
  * @param <M> Message value
  */
 @SuppressWarnings("rawtypes")
-public abstract class TextVertexInputFormat<
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable,
-        M extends Writable>
-        extends VertexInputFormat<I, V, E, M> {
-    /** Uses the TextInputFormat to do everything */
-    protected TextInputFormat textInputFormat = new TextInputFormat();
+public abstract class TextVertexInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends VertexInputFormat<I, V, E, M> {
+  /** Uses the TextInputFormat to do everything */
+  protected TextInputFormat textInputFormat = new TextInputFormat();
+
+  /**
+   * Abstract class to be implemented by the user based on their specific
+   * vertex input.  Easiest to ignore the key value separator and only use
+   * key instead.
+   *
+   * @param <I> Vertex index value
+   * @param <V> Vertex value
+   * @param <E> Edge value
+   */
+  public abstract static class TextVertexReader<I extends WritableComparable,
+      V extends Writable, E extends Writable, M extends Writable>
+      implements VertexReader<I, V, E, M> {
+    /** Internal line record reader */
+    private final RecordReader<LongWritable, Text> lineRecordReader;
+    /** Context passed to initialize */
+    private TaskAttemptContext context;
 
     /**
-     * Abstract class to be implemented by the user based on their specific
-     * vertex input.  Easiest to ignore the key value separator and only use
-     * key instead.
+     * Initialize with the LineRecordReader.
      *
-     * @param <I> Vertex index value
-     * @param <V> Vertex value
-     * @param <E> Edge value
+     * @param lineRecordReader Line record reader from TextInputFormat
      */
-    public static abstract class TextVertexReader<I extends WritableComparable,
-            V extends Writable, E extends Writable, M extends Writable>
-            implements VertexReader<I, V, E, M> {
-        /** Internal line record reader */
-        private final RecordReader<LongWritable, Text> lineRecordReader;
-        /** Context passed to initialize */
-        private TaskAttemptContext context;
-
-        /**
-         * Initialize with the LineRecordReader.
-         *
-         * @param lineRecordReader Line record reader from TextInputFormat
-         */
-        public TextVertexReader(
-                RecordReader<LongWritable, Text> lineRecordReader) {
-            this.lineRecordReader = lineRecordReader;
-        }
-
-        @Override
-        public void initialize(InputSplit inputSplit,
-                               TaskAttemptContext context)
-                               throws IOException, InterruptedException {
-            lineRecordReader.initialize(inputSplit, context);
-            this.context = context;
-        }
-
-        @Override
-        public void close() throws IOException {
-            lineRecordReader.close();
-        }
-
-        @Override
-        public float getProgress() throws IOException, InterruptedException {
-            return lineRecordReader.getProgress();
-        }
-
-        /**
-         * Get the line record reader.
-         *
-         * @return Record reader to be used for reading.
-         */
-        protected RecordReader<LongWritable, Text> getRecordReader() {
-            return lineRecordReader;
-        }
-
-        /**
-         * Get the context.
-         *
-         * @return Context passed to initialize.
-         */
-        protected TaskAttemptContext getContext() {
-            return context;
-        }
+    public TextVertexReader(
+        RecordReader<LongWritable, Text> lineRecordReader) {
+      this.lineRecordReader = lineRecordReader;
     }
 
     @Override
-    public List<InputSplit> getSplits(
-            JobContext context, int numWorkers)
-            throws IOException, InterruptedException {
-        // Ignore the hint of numWorkers here since we are using TextInputFormat
-        // to do this for us
-        return textInputFormat.getSplits(context);
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      lineRecordReader.initialize(inputSplit, context);
+      this.context = context;
     }
+
+    @Override
+    public void close() throws IOException {
+      lineRecordReader.close();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return lineRecordReader.getProgress();
+    }
+
+    /**
+     * Get the line record reader.
+     *
+     * @return Record reader to be used for reading.
+     */
+    protected RecordReader<LongWritable, Text> getRecordReader() {
+      return lineRecordReader;
+    }
+
+    /**
+     * Get the context.
+     *
+     * @return Context passed to initialize.
+     */
+    protected TaskAttemptContext getContext() {
+      return context;
+    }
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    // Ignore the hint of numWorkers here since we are using TextInputFormat
+    // to do this for us
+    return textInputFormat.getSplits(context);
+  }
 }