You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/11/02 22:37:31 UTC

svn commit: r1405175 [1/3] - in /giraph/trunk: ./ giraph/src/main/java/org/apache/giraph/ giraph/src/main/java/org/apache/giraph/bsp/ giraph/src/main/java/org/apache/giraph/comm/ giraph/src/main/java/org/apache/giraph/comm/netty/ giraph/src/main/java/o...

Author: apresta
Date: Fri Nov  2 21:37:30 2012
New Revision: 1405175

URL: http://svn.apache.org/viewvc?rev=1405175&view=rev
Log:
GIRAPH-155: Allow creation of graph by adding edges that span multiple workers (apresta)

Added:
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BasicVertexValueReader.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallableFactory.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeReader.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeWithSource.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallableFactory.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallableFactory.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueReader.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexValueInputFormat.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/IntPair.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/TestEdgeInput.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspUtils.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphJob.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexReader.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexResolver.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FileUtils.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/TestJsonBase64Format.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Fri Nov  2 21:37:30 2012
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-155: Allow creation of graph by adding edges that span
+  multiple workers (apresta)
+
   GIRAPH-398: Missing a dependency (nitay via majakabiljo)
 
   GIRAPH-394: mapreduce.job.user.classpath.first hadoop option typo

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java Fri Nov  2 21:37:30 2012
@@ -19,6 +19,7 @@
 package org.apache.giraph;
 
 import org.apache.giraph.graph.AggregatorWriter;
+import org.apache.giraph.graph.EdgeInputFormat;
 import org.apache.giraph.graph.MasterCompute;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexCombiner;
@@ -38,16 +39,9 @@ import org.apache.hadoop.conf.Configurat
 public class GiraphConfiguration extends Configuration {
   /** Vertex class - required */
   public static final String VERTEX_CLASS = "giraph.vertexClass";
-  /** VertexInputFormat class - required */
-  public static final String VERTEX_INPUT_FORMAT_CLASS =
-      "giraph.vertexInputFormatClass";
 
   /** Class for Master - optional */
   public static final String MASTER_COMPUTE_CLASS = "giraph.masterComputeClass";
-
-  /** VertexOutputFormat class - optional */
-  public static final String VERTEX_OUTPUT_FORMAT_CLASS =
-      "giraph.vertexOutputFormatClass";
   /** Vertex combiner class - optional */
   public static final String VERTEX_COMBINER_CLASS =
       "giraph.combinerClass";
@@ -58,6 +52,18 @@ public class GiraphConfiguration extends
   public static final String GRAPH_PARTITIONER_FACTORY_CLASS =
       "giraph.graphPartitionerFactoryClass";
 
+  // At least one of the input format classes is required.
+  /** VertexInputFormat class */
+  public static final String VERTEX_INPUT_FORMAT_CLASS =
+      "giraph.vertexInputFormatClass";
+  /** EdgeInputFormat class */
+  public static final String EDGE_INPUT_FORMAT_CLASS =
+      "giraph.edgeInputFormatClass";
+
+  /** VertexOutputFormat class */
+  public static final String VERTEX_OUTPUT_FORMAT_CLASS =
+      "giraph.vertexOutputFormatClass";
+
   /** Vertex index class */
   public static final String VERTEX_ID_CLASS = "giraph.vertexIdClass";
   /** Vertex value class */
@@ -425,18 +431,29 @@ public class GiraphConfiguration extends
   public static final float INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT = 100f;
 
   /**
-   * To limit outlier input splits from producing too many vertices or to
-   * help with testing, the number of vertices loaded from an input split can
-   * be limited.  By default, everything is loaded.
+   * To limit outlier vertex input splits from producing too many vertices or
+   * to help with testing, the number of vertices loaded from an input split
+   * can be limited.  By default, everything is loaded.
    */
   public static final String INPUT_SPLIT_MAX_VERTICES =
       "giraph.InputSplitMaxVertices";
   /**
-   * Default is that all the vertices are to be loaded from the input
-   * split
+   * Default is that all the vertices are to be loaded from the input split
    */
   public static final long INPUT_SPLIT_MAX_VERTICES_DEFAULT = -1;
 
+  /**
+   * To limit outlier vertex input splits from producing too many vertices or
+   * to help with testing, the number of edges loaded from an input split
+   * can be limited.  By default, everything is loaded.
+   */
+  public static final String INPUT_SPLIT_MAX_EDGES =
+      "giraph.InputSplitMaxEdges";
+  /**
+   * Default is that all the edges are to be loaded from the input split
+   */
+  public static final long INPUT_SPLIT_MAX_EDGES_DEFAULT = -1;
+
   /** Java opts passed to ZooKeeper startup */
   public static final String ZOOKEEPER_JAVA_OPTS =
       "giraph.zkJavaOpts";
@@ -632,6 +649,18 @@ public class GiraphConfiguration extends
   }
 
   /**
+   * Set the edge input format class (required)
+   *
+   * @param edgeInputFormatClass Determines how graph is input
+   */
+  public final void setEdgeInputFormatClass(
+      Class<? extends EdgeInputFormat> edgeInputFormatClass) {
+    setClass(EDGE_INPUT_FORMAT_CLASS,
+        edgeInputFormatClass,
+        EdgeInputFormat.class);
+  }
+
+  /**
    * Set the master class (optional)
    *
    * @param masterComputeClass Runs master computation
@@ -937,4 +966,8 @@ public class GiraphConfiguration extends
   public long getInputSplitMaxVertices() {
     return getLong(INPUT_SPLIT_MAX_VERTICES, INPUT_SPLIT_MAX_VERTICES_DEFAULT);
   }
+
+  public long getInputSplitMaxEdges() {
+    return getLong(INPUT_SPLIT_MAX_EDGES, INPUT_SPLIT_MAX_EDGES_DEFAULT);
+  }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java Fri Nov  2 21:37:30 2012
@@ -18,10 +18,10 @@
 
 package org.apache.giraph;
 
-import java.util.List;
 import org.apache.giraph.graph.AggregatorWriter;
 import org.apache.giraph.graph.DefaultMasterCompute;
 import org.apache.giraph.graph.DefaultWorkerContext;
+import org.apache.giraph.graph.EdgeInputFormat;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.MasterCompute;
 import org.apache.giraph.graph.TextAggregatorWriter;
@@ -41,6 +41,8 @@ import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import java.util.List;
+
 /**
  * The classes set here are immutable, the remaining configuration is mutable.
  * Classes are immutable and final to provide the best performance for
@@ -77,6 +79,10 @@ public class ImmutableClassesGiraphConfi
   /** Vertex output format class - cached for fast access */
   private final Class<? extends VertexOutputFormat<I, V, E>>
   vertexOutputFormatClass;
+  /** Edge input format class - cached for fast access */
+  private final Class<? extends EdgeInputFormat<I, E>>
+  edgeInputFormatClass;
+
 
   /** Aggregator writer class - cached for fast access */
   private final Class<? extends AggregatorWriter> aggregatorWriterClass;
@@ -124,6 +130,9 @@ public class ImmutableClassesGiraphConfi
     vertexOutputFormatClass = (Class<? extends VertexOutputFormat<I, V, E>>)
         conf.getClass(VERTEX_OUTPUT_FORMAT_CLASS,
         null, VertexOutputFormat.class);
+    edgeInputFormatClass = (Class<? extends EdgeInputFormat<I, E>>)
+        conf.getClass(EDGE_INPUT_FORMAT_CLASS,
+        null, EdgeInputFormat.class);
 
     aggregatorWriterClass = conf.getClass(AGGREGATOR_WRITER_CLASS,
         TextAggregatorWriter.class, AggregatorWriter.class);
@@ -169,6 +178,15 @@ public class ImmutableClassesGiraphConfi
   }
 
   /**
+   * Does the job have a {@link VertexInputFormat}?
+   *
+   * @return True iff a {@link VertexInputFormat} has been specified.
+   */
+  public boolean hasVertexInputFormat() {
+    return vertexInputFormatClass != null;
+  }
+
+  /**
    * Get the user's subclassed
    * {@link org.apache.giraph.graph.VertexInputFormat}.
    *
@@ -211,6 +229,34 @@ public class ImmutableClassesGiraphConfi
   }
 
   /**
+   * Does the job have an {@link EdgeInputFormat}?
+   *
+   * @return True iff an {@link EdgeInputFormat} has been specified.
+   */
+  public boolean hasEdgeInputFormat() {
+    return edgeInputFormatClass != null;
+  }
+
+  /**
+   * Get the user's subclassed
+   * {@link org.apache.giraph.graph.EdgeInputFormat}.
+   *
+   * @return User's edge input format class
+   */
+  public Class<? extends EdgeInputFormat<I, E>> getEdgeInputFormatClass() {
+    return edgeInputFormatClass;
+  }
+
+  /**
+   * Create a user edge input format class
+   *
+   * @return Instantiated user edge input format class
+   */
+  public EdgeInputFormat<I, E> createEdgeInputFormat() {
+    return ReflectionUtils.newInstance(edgeInputFormatClass, this);
+  }
+
+  /**
    * Get the user's subclassed {@link org.apache.giraph.graph.AggregatorWriter}.
    *
    * @return User's aggregator writer class

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java Fri Nov  2 21:37:30 2012
@@ -54,10 +54,20 @@ public interface CentralizedServiceMaste
    * user-defined VertexInputFormat.  The {@link InputSplit} objects will
    * processed by the workers later on during the INPUT_SUPERSTEP.
    *
-   * @return Number of partitions. Returns -1 on failure to create
+   * @return Number of splits. Returns -1 on failure to create
    *         valid input splits.
    */
-  int createInputSplits();
+  int createVertexInputSplits();
+
+  /**
+   * Create the {@link InputSplit} objects from the index range based on the
+   * user-defined EdgeInputFormat.  The {@link InputSplit} objects will
+   * processed by the workers later on during the INPUT_SUPERSTEP.
+   *
+   * @return Number of splits. Returns -1 on failure to create
+   *         valid input splits.
+   */
+  int createEdgeInputSplits();
 
   /**
    * Master coordinates the superstep

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java Fri Nov  2 21:37:30 2012
@@ -44,14 +44,20 @@ public interface WorkerServer<I extends 
   int getPort();
 
   /**
-   * Move the in transition messages to the in messages for every vertex and
-   * add new connections to any newly appearing IPC proxies.
+   * Prepare incoming messages for computation, and resolve mutation requests.
    *
    * @param graphState Current graph state
    */
   void prepareSuperstep(GraphState<I, V, E, M> graphState);
 
   /**
+   * Only resolve mutations requests (used for edge-oriented input).
+   *
+   * @param graphState Current graph state
+   */
+  void resolveMutations(GraphState<I, V, E, M> graphState);
+
+  /**
    * Get server data
    *
    * @return Server data

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java Fri Nov  2 21:37:30 2012
@@ -17,8 +17,6 @@
  */
 package org.apache.giraph.comm.netty;
 
-import com.google.common.collect.Maps;
-import com.yammer.metrics.core.Histogram;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -49,6 +47,9 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Maps;
+import com.yammer.metrics.core.Histogram;
+
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java Fri Nov  2 21:37:30 2012
@@ -118,7 +118,11 @@ public class NettyWorkerServer<I extends
   @Override
   public void prepareSuperstep(GraphState<I, V, E, M> graphState) {
     serverData.prepareSuperstep();
+    resolveMutations(graphState);
+  }
 
+  @Override
+  public void resolveMutations(GraphState<I, V, E, M> graphState) {
     Set<I> resolveVertexIndexSet = Sets.newHashSet();
     // Keep track of the vertices which are not here but have received messages
     for (Integer partitionId : service.getPartitionStore().getPartitionIds()) {
@@ -180,8 +184,8 @@ public class NettyWorkerServer<I extends
       if (partition == null) {
         throw new IllegalStateException(
             "prepareSuperstep: No partition for index " + vertexIndex +
-            " in " + service.getPartitionStore() + " should have been " +
-            service.getVertexPartitionOwner(vertexIndex));
+                " in " + service.getPartitionStore() + " should have been " +
+                service.getVertexPartitionOwner(vertexIndex));
       }
       if (vertex != null) {
         partition.putVertex(vertex);

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java Fri Nov  2 21:37:30 2012
@@ -18,13 +18,6 @@
 
 package org.apache.giraph.comm.requests;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.hadoop.io.Writable;
@@ -33,6 +26,13 @@ import org.apache.log4j.Logger;
 
 import com.google.common.collect.Maps;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
 /**
  * Send a collection of vertex mutations for a partition.
  *

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BasicVertexValueReader.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BasicVertexValueReader.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BasicVertexValueReader.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BasicVertexValueReader.java Fri Nov  2 21:37:30 2012
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Abstract base class for VertexValueReader.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public abstract class BasicVertexValueReader<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements VertexReader<I, V, E, M> {
+  /**
+   * User-defined method to extract the vertex id.
+   *
+   * @return The vertex id
+   * @throws java.io.IOException
+   * @throws InterruptedException
+   */
+  public abstract I getCurrentVertexId() throws IOException,
+      InterruptedException;
+
+  /**
+   * User-defined method to extract the vertex value.
+   *
+   * @return The vertex value
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract V getCurrentVertexValue() throws IOException,
+      InterruptedException;
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java Fri Nov  2 21:37:30 2012
@@ -72,22 +72,40 @@ public abstract class BspService<I exten
   public static final String BASE_DIR = "/_hadoopBsp";
   /** Master job state znode above base dir */
   public static final String MASTER_JOB_STATE_NODE = "/_masterJobState";
-  /** Input split directory about base dir */
-  public static final String INPUT_SPLIT_DIR = "/_inputSplitDir";
-  /** Input split done directory about base dir */
-  public static final String INPUT_SPLIT_DONE_DIR = "/_inputSplitDoneDir";
-  /** Denotes a reserved input split */
-  public static final String INPUT_SPLIT_RESERVED_NODE =
-      "/_inputSplitReserved";
-  /** Denotes a finished input split */
-  public static final String INPUT_SPLIT_FINISHED_NODE =
-      "/_inputSplitFinished";
-  /** Denotes that all the input splits are are ready for consumption */
-  public static final String INPUT_SPLITS_ALL_READY_NODE =
-      "/_inputSplitsAllReady";
-  /** Denotes that all the input splits are done. */
-  public static final String INPUT_SPLITS_ALL_DONE_NODE =
-      "/_inputSplitsAllDone";
+  /** Vertex input split directory about base dir */
+  public static final String VERTEX_INPUT_SPLIT_DIR = "/_vertexInputSplitDir";
+  /** Vertex input split done directory about base dir */
+  public static final String VERTEX_INPUT_SPLIT_DONE_DIR =
+      "/_vertexInputSplitDoneDir";
+  /** Denotes a reserved vertex input split */
+  public static final String VERTEX_INPUT_SPLIT_RESERVED_NODE =
+      "/_vertexInputSplitReserved";
+  /** Denotes a finished vertex input split */
+  public static final String VERTEX_INPUT_SPLIT_FINISHED_NODE =
+      "/_vertexInputSplitFinished";
+  /** Denotes that all the vertex input splits are are ready for consumption */
+  public static final String VERTEX_INPUT_SPLITS_ALL_READY_NODE =
+      "/_vertexInputSplitsAllReady";
+  /** Denotes that all the vertex input splits are done. */
+  public static final String VERTEX_INPUT_SPLITS_ALL_DONE_NODE =
+      "/_vertexInputSplitsAllDone";
+  /** Edge input split directory about base dir */
+  public static final String EDGE_INPUT_SPLIT_DIR = "/_edgeInputSplitDir";
+  /** Edge input split done directory about base dir */
+  public static final String EDGE_INPUT_SPLIT_DONE_DIR =
+      "/_edgeInputSplitDoneDir";
+  /** Denotes a reserved edge input split */
+  public static final String EDGE_INPUT_SPLIT_RESERVED_NODE =
+      "/_edgeInputSplitReserved";
+  /** Denotes a finished edge input split */
+  public static final String EDGE_INPUT_SPLIT_FINISHED_NODE =
+      "/_edgeInputSplitFinished";
+  /** Denotes that all the edge input splits are are ready for consumption */
+  public static final String EDGE_INPUT_SPLITS_ALL_READY_NODE =
+      "/_edgeInputSplitsAllReady";
+  /** Denotes that all the edge input splits are done. */
+  public static final String EDGE_INPUT_SPLITS_ALL_DONE_NODE =
+      "/_edgeInputSplitsAllDone";
   /** Directory of attempts of this application */
   public static final String APPLICATION_ATTEMPTS_DIR =
       "/_applicationAttemptsDir";
@@ -120,32 +138,8 @@ public abstract class BspService<I exten
   /** JSON partition stats key */
   public static final String JSONOBJ_PARTITION_STATS_KEY =
       "_partitionStatsKey";
-  /** JSON finished vertices key */
-  public static final String JSONOBJ_FINISHED_VERTICES_KEY =
-      "_verticesFinishedKey";
-  /** JSON vertex count key */
-  public static final String JSONOBJ_NUM_VERTICES_KEY = "_numVerticesKey";
-  /** JSON edge count key */
-  public static final String JSONOBJ_NUM_EDGES_KEY = "_numEdgesKey";
   /** JSON message count key */
   public static final String JSONOBJ_NUM_MESSAGES_KEY = "_numMsgsKey";
-  /** JSON hostname id key */
-  public static final String JSONOBJ_HOSTNAME_ID_KEY = "_hostnameIdKey";
-  /** JSON max vertex index key */
-  public static final String JSONOBJ_MAX_VERTEX_INDEX_KEY =
-      "_maxVertexIndexKey";
-  /** JSON hostname key */
-  public static final String JSONOBJ_HOSTNAME_KEY = "_hostnameKey";
-  /** JSON port key */
-  public static final String JSONOBJ_PORT_KEY = "_portKey";
-  /** JSON checkpoint file prefix key */
-  public static final String JSONOBJ_CHECKPOINT_FILE_PREFIX_KEY =
-      "_checkpointFilePrefixKey";
-  /** JSON previous hostname key */
-  public static final String JSONOBJ_PREVIOUS_HOSTNAME_KEY =
-      "_previousHostnameKey";
-  /** JSON previous port key */
-  public static final String JSONOBJ_PREVIOUS_PORT_KEY = "_previousPortKey";
   /** JSON state key */
   public static final String JSONOBJ_STATE_KEY = "_stateKey";
   /** JSON application attempt key */
@@ -181,14 +175,14 @@ public abstract class BspService<I exten
   protected final String basePath;
   /** Path to the job state determined by the master (informative only) */
   protected final String masterJobStatePath;
-  /** Path to the input splits written by the master */
-  protected final String inputSplitsPath;
-  /** Path to the input splits all ready to be processed by workers */
-  protected final String inputSplitsAllReadyPath;
-  /** Path to the input splits done */
-  protected final String inputSplitsDonePath;
-  /** Path to the input splits all done to notify the workers to proceed */
-  protected final String inputSplitsAllDonePath;
+  /** ZooKeeper paths for vertex input splits. */
+  protected final InputSplitPaths vertexInputSplitsPaths;
+  /** ZooKeeper paths for edge input splits. */
+  protected final InputSplitPaths edgeInputSplitsPaths;
+  /** Vertex input split events. */
+  protected final InputSplitEvents vertexInputSplitsEvents;
+  /** Edge input split events. */
+  protected final InputSplitEvents edgeInputSplitsEvents;
   /** Path to the application attempts) */
   protected final String applicationAttemptsPath;
   /** Path to the cleaned up notifications */
@@ -203,14 +197,6 @@ public abstract class BspService<I exten
   private final BspEvent connectedEvent;
   /** Has worker registration changed (either healthy or unhealthy) */
   private final BspEvent workerHealthRegistrationChanged;
-  /** InputSplits are ready for consumption by workers */
-  private final BspEvent inputSplitsAllReadyChanged;
-  /** InputSplit reservation or finished notification and synchronization */
-  private final BspEvent inputSplitsStateChanged;
-  /** InputSplits are done being processed by workers */
-  private final BspEvent inputSplitsAllDoneChanged;
-  /** InputSplit done by a worker finished notification and synchronization */
-  private final BspEvent inputSplitsDoneStateChanged;
   /** Are the addresses and partition assignments to workers ready? */
   private final BspEvent addressesAndPartitionsReadyChanged;
   /** Application attempt changed */
@@ -263,12 +249,10 @@ public abstract class BspService<I exten
       int sessionMsecTimeout,
       Mapper<?, ?, ?, ?>.Context context,
       GraphMapper<I, V, E, M> graphMapper) {
+    this.vertexInputSplitsEvents = new InputSplitEvents(context);
+    this.edgeInputSplitsEvents = new InputSplitEvents(context);
     this.connectedEvent = new PredicateLock(context);
     this.workerHealthRegistrationChanged = new PredicateLock(context);
-    this.inputSplitsAllReadyChanged = new PredicateLock(context);
-    this.inputSplitsStateChanged = new PredicateLock(context);
-    this.inputSplitsAllDoneChanged = new PredicateLock(context);
-    this.inputSplitsDoneStateChanged = new PredicateLock(context);
     this.addressesAndPartitionsReadyChanged = new PredicateLock(context);
     this.applicationAttemptChanged = new PredicateLock(context);
     this.superstepFinished = new PredicateLock(context);
@@ -277,8 +261,10 @@ public abstract class BspService<I exten
 
     registerBspEvent(connectedEvent);
     registerBspEvent(workerHealthRegistrationChanged);
-    registerBspEvent(inputSplitsAllReadyChanged);
-    registerBspEvent(inputSplitsStateChanged);
+    registerBspEvent(vertexInputSplitsEvents.getAllReadyChanged());
+    registerBspEvent(vertexInputSplitsEvents.getStateChanged());
+    registerBspEvent(edgeInputSplitsEvents.getAllReadyChanged());
+    registerBspEvent(edgeInputSplitsEvents.getStateChanged());
     registerBspEvent(addressesAndPartitionsReadyChanged);
     registerBspEvent(applicationAttemptChanged);
     registerBspEvent(superstepFinished);
@@ -315,10 +301,12 @@ public abstract class BspService<I exten
 
     basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
     masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
-    inputSplitsPath = basePath + INPUT_SPLIT_DIR;
-    inputSplitsAllReadyPath = basePath + INPUT_SPLITS_ALL_READY_NODE;
-    inputSplitsDonePath = basePath + INPUT_SPLIT_DONE_DIR;
-    inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE;
+    vertexInputSplitsPaths = new InputSplitPaths(basePath,
+        VERTEX_INPUT_SPLIT_DIR, VERTEX_INPUT_SPLIT_DONE_DIR,
+        VERTEX_INPUT_SPLITS_ALL_READY_NODE, VERTEX_INPUT_SPLITS_ALL_DONE_NODE);
+    edgeInputSplitsPaths = new InputSplitPaths(basePath,
+        EDGE_INPUT_SPLIT_DIR, EDGE_INPUT_SPLIT_DONE_DIR,
+        EDGE_INPUT_SPLITS_ALL_READY_NODE, EDGE_INPUT_SPLITS_ALL_DONE_NODE);
     applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
     cleanedUpPath = basePath + CLEANED_UP_DIR;
     checkpointBasePath = getConfiguration().get(
@@ -498,18 +486,6 @@ public abstract class BspService<I exten
   }
 
   /**
-   * Generate the merged aggregator directory path for a superstep
-   *
-   * @param attempt application attempt number
-   * @param superstep superstep to use
-   * @return directory path based on the a superstep
-   */
-  public final String getMergedAggregatorPath(long attempt, long superstep) {
-    return applicationAttemptsPath + "/" + attempt +
-        SUPERSTEP_DIR + "/" + superstep + MERGED_AGGREGATOR_DIR;
-  }
-
-  /**
    * Generate the "superstep finished" directory path for a superstep
    *
    * @param attempt application attempt number
@@ -636,22 +612,6 @@ public abstract class BspService<I exten
     return workerHealthRegistrationChanged;
   }
 
-  public final BspEvent getInputSplitsAllReadyEvent() {
-    return inputSplitsAllReadyChanged;
-  }
-
-  public final BspEvent getInputSplitsStateChangedEvent() {
-    return inputSplitsStateChanged;
-  }
-
-  public final BspEvent getInputSplitsAllDoneEvent() {
-    return inputSplitsAllDoneChanged;
-  }
-
-  public final BspEvent getInputSplitsDoneStateChangedEvent() {
-    return inputSplitsDoneStateChanged;
-  }
-
   public final BspEvent getAddressesAndPartitionsReadyChangedEvent() {
     return addressesAndPartitionsReadyChanged;
   }
@@ -962,53 +922,105 @@ public abstract class BspService<I exten
       }
       workerHealthRegistrationChanged.signal();
       eventProcessed = true;
-    } else if (event.getPath().equals(inputSplitsAllReadyPath) &&
+    } else if (event.getPath().equals(
+        vertexInputSplitsPaths.getAllReadyPath()) &&
         (event.getType() == EventType.NodeCreated)) {
       if (LOG.isInfoEnabled()) {
         LOG.info("process: inputSplitsReadyChanged " +
             "(input splits ready)");
       }
-      inputSplitsAllReadyChanged.signal();
+      vertexInputSplitsEvents.getAllReadyChanged().signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_RESERVED_NODE) &&
+        (event.getType() == EventType.NodeCreated)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("process: vertexInputSplitsStateChanged " +
+            "(made a reservation)");
+      }
+      vertexInputSplitsEvents.getStateChanged().signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_RESERVED_NODE) &&
+        (event.getType() == EventType.NodeDeleted)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: vertexInputSplitsStateChanged " +
+            "(lost a reservation)");
+      }
+      vertexInputSplitsEvents.getStateChanged().signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_FINISHED_NODE) &&
+        (event.getType() == EventType.NodeCreated)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("process: vertexInputSplitsStateChanged " +
+            "(finished inputsplit)");
+      }
+      vertexInputSplitsEvents.getStateChanged().signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_DONE_DIR) &&
+        (event.getType() == EventType.NodeChildrenChanged)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("process: vertexInputSplitsDoneStateChanged " +
+            "(worker finished sending)");
+      }
+      vertexInputSplitsEvents.getDoneStateChanged().signal();
+      eventProcessed = true;
+    }  else if (event.getPath().equals(
+        vertexInputSplitsPaths.getAllDonePath()) &&
+        (event.getType() == EventType.NodeCreated)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: vertexInputSplitsAllDoneChanged " +
+            "(all vertices sent from input splits)");
+      }
+      vertexInputSplitsEvents.getAllDoneChanged().signal();
+      eventProcessed = true;
+    } else if (event.getPath().equals(
+        edgeInputSplitsPaths.getAllReadyPath()) &&
+        (event.getType() == EventType.NodeCreated)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: edgeInputSplitsReadyChanged " +
+            "(input splits ready)");
+      }
+      edgeInputSplitsEvents.getAllReadyChanged().signal();
       eventProcessed = true;
-    } else if (event.getPath().endsWith(INPUT_SPLIT_RESERVED_NODE) &&
+    } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_RESERVED_NODE) &&
         (event.getType() == EventType.NodeCreated)) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("process: inputSplitsStateChanged " +
+        LOG.debug("process: edgeInputSplitsStateChanged " +
             "(made a reservation)");
       }
-      inputSplitsStateChanged.signal();
+      edgeInputSplitsEvents.getStateChanged().signal();
       eventProcessed = true;
-    } else if (event.getPath().endsWith(INPUT_SPLIT_RESERVED_NODE) &&
+    } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_RESERVED_NODE) &&
         (event.getType() == EventType.NodeDeleted)) {
       if (LOG.isInfoEnabled()) {
-        LOG.info("process: inputSplitsStateChanged " +
+        LOG.info("process: edgeInputSplitsStateChanged " +
             "(lost a reservation)");
       }
-      inputSplitsStateChanged.signal();
+      edgeInputSplitsEvents.getStateChanged().signal();
       eventProcessed = true;
-    } else if (event.getPath().endsWith(INPUT_SPLIT_FINISHED_NODE) &&
+    } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_FINISHED_NODE) &&
         (event.getType() == EventType.NodeCreated)) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("process: inputSplitsStateChanged " +
+        LOG.debug("process: edgeInputSplitsStateChanged " +
             "(finished inputsplit)");
       }
-      inputSplitsStateChanged.signal();
+      edgeInputSplitsEvents.getStateChanged().signal();
       eventProcessed = true;
-    } else if (event.getPath().endsWith(INPUT_SPLIT_DONE_DIR) &&
+    } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_DONE_DIR) &&
         (event.getType() == EventType.NodeChildrenChanged)) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("process: inputSplitsDoneStateChanged " +
+        LOG.debug("process: edgeInputSplitsDoneStateChanged " +
             "(worker finished sending)");
       }
-      inputSplitsDoneStateChanged.signal();
+      edgeInputSplitsEvents.getDoneStateChanged().signal();
       eventProcessed = true;
-    }  else if (event.getPath().equals(inputSplitsAllDonePath) &&
+    } else if (event.getPath().equals(
+        edgeInputSplitsPaths.getAllDonePath()) &&
         (event.getType() == EventType.NodeCreated)) {
       if (LOG.isInfoEnabled()) {
-        LOG.info("process: inputSplitsAllDoneChanged " +
+        LOG.info("process: edgeInputSplitsAllDoneChanged " +
             "(all vertices sent from input splits)");
       }
-      inputSplitsAllDoneChanged.signal();
+      edgeInputSplitsEvents.getAllDoneChanged().signal();
       eventProcessed = true;
     } else if (event.getPath().contains(ADDRESSES_AND_PARTITIONS_DIR) &&
         event.getType() == EventType.NodeCreated) {

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Fri Nov  2 21:37:30 2012
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.graph;
 
-import com.google.common.collect.Sets;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.BspInputFormat;
@@ -57,6 +56,7 @@ import org.json.JSONException;
 import org.json.JSONObject;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import net.iharder.Base64;
 
 import java.io.ByteArrayOutputStream;
@@ -237,47 +237,44 @@ public class BspServiceMaster<I extends 
   }
 
   /**
-   * Master uses this to calculate the {@link VertexInputFormat}
-   * input splits and write it to ZooKeeper.
+   * Common method for generating vertex/edge input splits.
    *
+   * @param inputFormat The vertex/edge input format
    * @param numWorkers Number of available workers
-   * @return List of input splits
-   * @throws InstantiationException
-   * @throws IllegalAccessException
-   * @throws IOException
-   * @throws InterruptedException
+   * @param inputSplitType Type of input splits (for logging purposes)
+   * @return List of input splits for the given format
    */
-  private List<InputSplit> generateInputSplits(int numWorkers) {
-    VertexInputFormat<I, V, E, M> vertexInputFormat =
-        getConfiguration().createVertexInputFormat();
+  private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat,
+                                               int numWorkers,
+                                               String inputSplitType) {
+    String logPrefix = "generate" + inputSplitType + "InputSplits";
     List<InputSplit> splits;
     try {
-      splits = vertexInputFormat.getSplits(getContext(), numWorkers);
-      float samplePercent =
-          getConfiguration().getFloat(
-              GiraphConfiguration.INPUT_SPLIT_SAMPLE_PERCENT,
-              GiraphConfiguration.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT);
-      if (samplePercent !=
-          GiraphConfiguration.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
-        int lastIndex = (int) (samplePercent * splits.size() / 100f);
-        List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
-        LOG.warn("generateInputSplits: Using sampling - Processing " +
-            "only " + sampleSplits.size() + " instead of " +
-            splits.size() + " expected splits.");
-        return sampleSplits;
-      } else {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("generateInputSplits: Got " + splits.size() +
-              " input splits for " + numWorkers + " workers");
-        }
-        return splits;
-      }
+      splits = inputFormat.getSplits(getContext(), numWorkers);
     } catch (IOException e) {
-      throw new IllegalStateException(
-          "generateInputSplits: Got IOException", e);
+      throw new IllegalStateException(logPrefix + ": Got IOException", e);
     } catch (InterruptedException e) {
       throw new IllegalStateException(
-          "generateInputSplits: Got InterruptedException", e);
+          logPrefix + ": Got InterruptedException", e);
+    }
+    float samplePercent =
+        getConfiguration().getFloat(
+            GiraphConfiguration.INPUT_SPLIT_SAMPLE_PERCENT,
+            GiraphConfiguration.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT);
+    if (samplePercent !=
+        GiraphConfiguration.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
+      int lastIndex = (int) (samplePercent * splits.size() / 100f);
+      List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
+      LOG.warn(logPrefix + ": Using sampling - Processing only " +
+          sampleSplits.size() + " instead of " + splits.size() +
+          " expected splits.");
+      return sampleSplits;
+    } else {
+      if (LOG.isInfoEnabled()) {
+        LOG.info(logPrefix + ": Got " + splits.size() +
+            " input splits for " + numWorkers + " workers");
+      }
+      return splits;
     }
   }
 
@@ -508,31 +505,39 @@ public class BspServiceMaster<I extends 
     }
   }
 
-  @Override
-  public int createInputSplits() {
+  /**
+   * Common method for creating vertex/edge input splits.
+   *
+   * @param inputFormat The vertex/edge input format
+   * @param inputSplitPaths ZooKeeper input split paths
+   * @param inputSplitType Type of input split (for logging purposes)
+   * @return Number of splits. Returns -1 on failure to create
+   *         valid input splits.
+   */
+  private int createInputSplits(GiraphInputFormat inputFormat,
+                                InputSplitPaths inputSplitPaths,
+                                String inputSplitType) {
+    String logPrefix = "create" + inputSplitType + "InputSplits";
     // Only the 'master' should be doing this.  Wait until the number of
     // processes that have reported health exceeds the minimum percentage.
     // If the minimum percentage is not met, fail the job.  Otherwise
     // generate the input splits
+    String inputSplitsPath = inputSplitPaths.getPath();
     try {
       if (getZkExt().exists(inputSplitsPath, false) != null) {
-        LOG.info(inputSplitsPath +
-            " already exists, no need to create");
+        LOG.info(inputSplitsPath + " already exists, no need to create");
         return Integer.parseInt(
-            new String(
-                getZkExt().getData(inputSplitsPath, false, null)));
+            new String(getZkExt().getData(inputSplitsPath, false, null)));
       }
     } catch (KeeperException.NoNodeException e) {
       if (LOG.isInfoEnabled()) {
-        LOG.info("createInputSplits: Need to create the " +
-            "input splits at " + inputSplitsPath);
+        LOG.info(logPrefix + ": Need to create the input splits at " +
+            inputSplitsPath);
       }
     } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "createInputSplits: KeeperException", e);
+      throw new IllegalStateException(logPrefix + ": KeeperException", e);
     } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "createInputSplits: InterrtupedException", e);
+      throw new IllegalStateException(logPrefix + ": InterrtupedException", e);
     }
 
     // When creating znodes, in case the master has already run, resume
@@ -545,19 +550,18 @@ public class BspServiceMaster<I extends 
 
     // Note that the input splits may only be a sample if
     // INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100
-    List<InputSplit> splitList =
-        generateInputSplits(healthyWorkerInfoList.size());
+    List<InputSplit> splitList = generateInputSplits(inputFormat,
+        healthyWorkerInfoList.size(), inputSplitType);
+
     if (splitList.isEmpty()) {
-      LOG.fatal("createInputSplits: Failing job due to 0 input splits, " +
-          "check input of " +
-          getConfiguration().getVertexInputFormatClass().getName() + "!");
+      LOG.fatal(logPrefix + ": Failing job due to 0 input splits, " +
+          "check input of " + inputFormat.getClass().getName() + "!");
       getContext().setStatus("Failing job due to 0 input splits, " +
-          "check input of " +
-          getConfiguration().getVertexInputFormatClass().getName() + "!");
+          "check input of " + inputFormat.getClass().getName() + "!");
       failJob();
     }
     if (healthyWorkerInfoList.size() > splitList.size()) {
-      LOG.warn("createInputSplits: Number of inputSplits=" +
+      LOG.warn(logPrefix + ": Number of inputSplits=" +
           splitList.size() + " < " +
           healthyWorkerInfoList.size() +
           "=number of healthy processes, " +
@@ -569,43 +573,65 @@ public class BspServiceMaster<I extends 
         INPUT_SPLIT_THREAD_COUNT,
         DEFAULT_INPUT_SPLIT_THREAD_COUNT);
     if (LOG.isInfoEnabled()) {
-      LOG.info("createInputSplits: Starting to write input split data to " +
-          "zookeeper with " + inputSplitThreadCount + " threads");
+      LOG.info(logPrefix + ": Starting to write input split data " +
+          "to zookeeper with " + inputSplitThreadCount + " threads");
     }
     ExecutorService taskExecutor =
         Executors.newFixedThreadPool(inputSplitThreadCount);
     for (int i = 0; i < splitList.size(); ++i) {
       InputSplit inputSplit = splitList.get(i);
-      taskExecutor.submit(new WriteInputSplit(inputSplit, i));
+      taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i));
     }
     taskExecutor.shutdown();
     ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext());
     if (LOG.isInfoEnabled()) {
-      LOG.info("createInputSplits: Done writing input split data to zookeeper");
+      LOG.info(logPrefix + ": Done writing input split data to zookeeper");
     }
 
     // Let workers know they can start trying to load the input splits
     try {
-      getZkExt().createExt(inputSplitsAllReadyPath,
-                           null,
-                           Ids.OPEN_ACL_UNSAFE,
-                           CreateMode.PERSISTENT,
-                           false);
+      getZkExt().createExt(inputSplitPaths.getAllReadyPath(),
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          false);
     } catch (KeeperException.NodeExistsException e) {
-      LOG.info("createInputSplits: Node " +
-          inputSplitsAllReadyPath + " already exists.");
+      LOG.info(logPrefix + ": Node " +
+          inputSplitPaths.getAllReadyPath() + " already exists.");
     } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "createInputSplits: KeeperException", e);
+      throw new IllegalStateException(logPrefix + ": KeeperException", e);
     } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "createInputSplits: IllegalStateException", e);
+      throw new IllegalStateException(logPrefix + ": IllegalStateException", e);
     }
 
     return splitList.size();
   }
 
   @Override
+  public int createVertexInputSplits() {
+    // Short-circuit if there is no vertex input format
+    if (!getConfiguration().hasVertexInputFormat()) {
+      return 0;
+    }
+    VertexInputFormat<I, V, E, M> vertexInputFormat =
+        getConfiguration().createVertexInputFormat();
+    return createInputSplits(vertexInputFormat, vertexInputSplitsPaths,
+        "Vertex");
+  }
+
+  @Override
+  public int createEdgeInputSplits() {
+    // Short-circuit if there is no edge input format
+    if (!getConfiguration().hasEdgeInputFormat()) {
+      return 0;
+    }
+    EdgeInputFormat<I, E> edgeInputFormat =
+        getConfiguration().createEdgeInputFormat();
+    return createInputSplits(edgeInputFormat, edgeInputSplitsPaths,
+        "Edge");
+  }
+
+  @Override
   public List<WorkerInfo> getWorkerInfoList() {
     return chosenWorkerInfoList;
   }
@@ -1041,7 +1067,10 @@ public class BspServiceMaster<I extends 
     // 2. Increase the application attempt and set to the correct checkpoint
     // 3. Send command to all workers to restart their tasks
     try {
-      getZkExt().deleteExt(inputSplitsPath, -1, true);
+      getZkExt().deleteExt(vertexInputSplitsPaths.getPath(), -1,
+          true);
+      getZkExt().deleteExt(edgeInputSplitsPaths.getPath(), -1,
+          true);
     } catch (InterruptedException e) {
       throw new RuntimeException(
           "restartFromCheckpoint: InterruptedException", e);
@@ -1250,6 +1279,41 @@ public class BspServiceMaster<I extends 
     }
   }
 
+  /**
+   * Coordinate the exchange of vertex/edge input splits among workers.
+   *
+   * @param inputSplitPaths Input split paths
+   * @param inputSplitEvents Input split events
+   * @param inputSplitsType Type of input splits (for logging purposes)
+   */
+  private void coordinateInputSplits(InputSplitPaths inputSplitPaths,
+                                     InputSplitEvents inputSplitEvents,
+                                     String inputSplitsType) {
+    // Coordinate the workers finishing sending their vertices/edges to the
+    // correct workers and signal when everything is done.
+    String logPrefix = "coordinate" + inputSplitsType + "InputSplits";
+    if (!barrierOnWorkerList(inputSplitPaths.getDonePath(),
+        chosenWorkerInfoList,
+        inputSplitEvents.getDoneStateChanged())) {
+      throw new IllegalStateException(logPrefix + ": Worker failed during " +
+          "input split (currently not supported)");
+    }
+    try {
+      getZkExt().createExt(inputSplitPaths.getAllDonePath(),
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          false);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.info("coordinateInputSplits: Node " +
+          inputSplitPaths.getAllDonePath() + " already exists.");
+    } catch (KeeperException e) {
+      throw new IllegalStateException(logPrefix + ": KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(logPrefix + ": IllegalStateException", e);
+    }
+  }
+
   @Override
   public SuperstepState coordinateSuperstep() throws
   KeeperException, InterruptedException {
@@ -1316,30 +1380,13 @@ public class BspServiceMaster<I extends 
     }
 
     if (getSuperstep() == INPUT_SUPERSTEP) {
-      // Coordinate the workers finishing sending their vertices to the
-      // correct workers and signal when everything is done.
-      if (!barrierOnWorkerList(inputSplitsDonePath,
-          chosenWorkerInfoList,
-          getInputSplitsDoneStateChangedEvent())) {
-        throw new IllegalStateException(
-            "coordinateSuperstep: Worker failed during input split " +
-            "(currently not supported)");
-      }
-      try {
-        getZkExt().createExt(inputSplitsAllDonePath,
-                             null,
-                             Ids.OPEN_ACL_UNSAFE,
-                             CreateMode.PERSISTENT,
-                             false);
-      } catch (KeeperException.NodeExistsException e) {
-        LOG.info("coordinateInputSplits: Node " +
-            inputSplitsAllDonePath + " already exists.");
-      } catch (KeeperException e) {
-        throw new IllegalStateException(
-            "coordinateInputSplits: KeeperException", e);
-      } catch (InterruptedException e) {
-        throw new IllegalStateException(
-            "coordinateInputSplits: IllegalStateException", e);
+      if (getConfiguration().hasVertexInputFormat()) {
+        coordinateInputSplits(vertexInputSplitsPaths, vertexInputSplitsEvents,
+            "Vertex");
+      }
+      if (getConfiguration().hasEdgeInputFormat()) {
+        coordinateInputSplits(edgeInputSplitsPaths, edgeInputSplitsEvents,
+            "Edge");
       }
     }
 
@@ -1680,6 +1727,8 @@ public class BspServiceMaster<I extends 
   private class WriteInputSplit implements Callable<Void> {
     /** Input split which we are going to write */
     private final InputSplit inputSplit;
+    /** Input splits path */
+    private final String inputSplitsPath;
     /** Index of the input split */
     private final int index;
 
@@ -1687,10 +1736,14 @@ public class BspServiceMaster<I extends 
      * Constructor
      *
      * @param inputSplit Input split which we are going to write
+     * @param inputSplitsPath Input splits path
      * @param index Index of the input split
      */
-    public WriteInputSplit(InputSplit inputSplit, int index) {
+    public WriteInputSplit(InputSplit inputSplit,
+                           String inputSplitsPath,
+                           int index) {
       this.inputSplit = inputSplit;
+      this.inputSplitsPath = inputSplitsPath;
       this.index = index;
     }
 

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Fri Nov  2 21:37:30 2012
@@ -18,12 +18,7 @@
 
 package org.apache.giraph.graph;
 
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.core.TimerContext;
-import net.iharder.Base64;
+
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -67,6 +62,13 @@ import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Timer;
+import com.yammer.metrics.core.TimerContext;
+import net.iharder.Base64;
+
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutput;
@@ -217,8 +219,8 @@ public class BspServiceWorker<I extends 
   }
 
   /**
-   * Load the vertices from the user-defined VertexReader into our partitions
-   * of vertex ranges.  Do this until all the InputSplits have been processed.
+   * Load the vertices/edges from input slits. Do this until all the
+   * InputSplits have been processed.
    * All workers will try to do as many InputSplits as they can.  The master
    * will monitor progress and stop this once all the InputSplits have been
    * loaded and check-pointed.  Keep track of the last input split path to
@@ -227,47 +229,36 @@ public class BspServiceWorker<I extends 
    *
    * Use one or more threads to do the loading.
    *
-   * @return Statistics of the vertices loaded
+   * @param inputSplitPathList List of input split paths
+   * @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s
+   * @return Statistics of the vertices and edges loaded
    * @throws InterruptedException
    * @throws KeeperException
    */
-  private VertexEdgeCount loadVertices()
-    throws InterruptedException, KeeperException {
+  private VertexEdgeCount loadInputSplits(
+      List<String> inputSplitPathList,
+      InputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory)
+    throws KeeperException, InterruptedException {
     VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
-
-    // Get the number of splits first to determine how many threads to use
-    List<String> inputSplitPathList =
-        getZkExt().getChildrenExt(inputSplitsPath, false, false, true);
-
-    GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
-        INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
-        null);
+    // Determine how many threads to use based on the number of input splits
     int maxInputSplitThreads =
         Math.max(
             inputSplitPathList.size() / getConfiguration().getMaxWorkers(), 1);
-    int numThreads =
-        Math.min(getConfiguration().getNumInputSplitsThreads(),
-            maxInputSplitThreads);
+    int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
+        maxInputSplitThreads);
     ExecutorService inputSplitsExecutor =
         Executors.newFixedThreadPool(numThreads,
             new ThreadFactoryBuilder().setNameFormat("load-%d").build());
     List<Future<VertexEdgeCount>> threadsFutures =
         Lists.newArrayListWithCapacity(numThreads);
     if (LOG.isInfoEnabled()) {
-      LOG.info("loadVertices: Using " + numThreads + " thread(s), " +
+      LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
           "originally " + getConfiguration().getNumInputSplitsThreads() +
           " threads(s) for " + inputSplitPathList.size() + " total splits.");
     }
     for (int i = 0; i < numThreads; ++i) {
       Callable<VertexEdgeCount> inputSplitsCallable =
-          new InputSplitsCallable<I, V, E, M>(
-              getContext(),
-              graphState,
-              getConfiguration(),
-              this,
-              inputSplitPathList,
-              getWorkerInfo(),
-              getZkExt());
+          inputSplitsCallableFactory.newCallable();
       threadsFutures.add(inputSplitsExecutor.submit(inputSplitsCallable));
     }
 
@@ -284,6 +275,63 @@ public class BspServiceWorker<I extends 
     return vertexEdgeCount;
   }
 
+
+  /**
+   * Load the vertices from the user-defined {@link VertexReader}
+   *
+   * @return Count of vertices and edges loaded
+   */
+  private VertexEdgeCount loadVertices() throws KeeperException,
+      InterruptedException {
+    List<String> inputSplitPathList =
+        getZkExt().getChildrenExt(vertexInputSplitsPaths.getPath(),
+            false, false, true);
+
+    GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
+        INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
+        null);
+
+    VertexInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
+        new VertexInputSplitsCallableFactory<I, V, E, M>(
+            getContext(),
+            graphState,
+            getConfiguration(),
+            this,
+            inputSplitPathList,
+            getWorkerInfo(),
+            getZkExt());
+
+    return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory);
+  }
+
+  /**
+   * Load the edges from the user-defined {@link EdgeReader}.
+   *
+   * @return Number of edges loaded
+   */
+  private long loadEdges() throws KeeperException, InterruptedException {
+    List<String> inputSplitPathList =
+        getZkExt().getChildrenExt(edgeInputSplitsPaths.getPath(),
+            false, false, true);
+
+    GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
+        INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
+        null);
+
+    EdgeInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
+        new EdgeInputSplitsCallableFactory<I, V, E, M>(
+            getContext(),
+            graphState,
+            getConfiguration(),
+            this,
+            inputSplitPathList,
+            getWorkerInfo(),
+            getZkExt());
+
+    return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory).
+        getEdgeCount();
+  }
+
   @Override
   public WorkerInfo getMasterInfo() {
     return masterInfo;
@@ -294,6 +342,80 @@ public class BspServiceWorker<I extends 
     return workerInfoList;
   }
 
+  /**
+   * Ensure the input splits are ready for processing
+   *
+   * @param inputSplitPaths Input split paths
+   * @param inputSplitEvents Input split events
+   */
+  private void ensureInputSplitsReady(InputSplitPaths inputSplitPaths,
+                                      InputSplitEvents inputSplitEvents) {
+    while (true) {
+      Stat inputSplitsReadyStat;
+      try {
+        inputSplitsReadyStat = getZkExt().exists(
+            inputSplitPaths.getAllReadyPath(), true);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "setup: KeeperException waiting on input splits", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "setup: InterruptedException waiting on input splits", e);
+      }
+      if (inputSplitsReadyStat != null) {
+        break;
+      }
+      inputSplitEvents.getAllReadyChanged().waitForever();
+      inputSplitEvents.getAllReadyChanged().reset();
+    }
+  }
+
+  /**
+   * Wait for all workers to finish processing input splits.
+   *
+   * @param inputSplitPaths Input split paths
+   * @param inputSplitEvents Input split events
+   */
+  private void waitForOtherWorkers(InputSplitPaths inputSplitPaths,
+                                   InputSplitEvents inputSplitEvents) {
+    String workerInputSplitsDonePath =
+        inputSplitPaths.getDonePath() + "/" +
+            getWorkerInfo().getHostnameId();
+    try {
+      getZkExt().createExt(workerInputSplitsDonePath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "setup: KeeperException creating worker done splits", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "setup: InterruptedException creating worker done splits",
+          e);
+    }
+    while (true) {
+      Stat inputSplitsDoneStat;
+      try {
+        inputSplitsDoneStat =
+            getZkExt().exists(inputSplitPaths.getAllDonePath(),
+                true);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "setup: KeeperException waiting on worker done splits", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "setup: InterruptedException waiting on worker done splits", e);
+      }
+      if (inputSplitsDoneStat != null) {
+        break;
+      }
+      inputSplitEvents.getAllDoneChanged().waitForever();
+      inputSplitEvents.getAllDoneChanged().reset();
+    }
+  }
+
   @Override
   public FinishedSuperstepStats setup() {
     // Unless doing a restart, prepare for computation:
@@ -301,7 +423,8 @@ public class BspServiceWorker<I extends 
     // 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
     // 3. Process input splits until there are no more.
     // 4. Wait until the INPUT_SPLIT_ALL_DONE_PATH node has been created
-    // 5. Wait for superstep INPUT_SUPERSTEP to complete.
+    // 5. Process any mutations deriving from add edge requests
+    // 6. Wait for superstep INPUT_SUPERSTEP to complete.
     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
       setCachedSuperstep(getRestartedSuperstep());
       return new FinishedSuperstepStats(false, -1, -1);
@@ -330,7 +453,7 @@ public class BspServiceWorker<I extends 
       }
     }
 
-    // Add the partitions for that this worker owns
+    // Add the partitions that this worker owns
     GraphState<I, V, E, M> graphState =
         new GraphState<I, V, E, M>(INPUT_SUPERSTEP, 0, 0,
             getContext(), getGraphMapper(), null);
@@ -345,79 +468,54 @@ else[HADOOP_NON_SECURE]*/
     workerClient.setup(getConfiguration().authenticate());
 /*end[HADOOP_NON_SECURE]*/
 
-    // Ensure the InputSplits are ready for processing before processing
-    while (true) {
-      Stat inputSplitsReadyStat;
+    VertexEdgeCount vertexEdgeCount;
+
+    if (getConfiguration().hasVertexInputFormat()) {
+      // Ensure the vertex InputSplits are ready for processing
+      ensureInputSplitsReady(vertexInputSplitsPaths, vertexInputSplitsEvents);
+      getContext().progress();
       try {
-        inputSplitsReadyStat =
-            getZkExt().exists(inputSplitsAllReadyPath, true);
+        vertexEdgeCount = loadVertices();
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "setup: loadVertices failed with InterruptedException", e);
       } catch (KeeperException e) {
         throw new IllegalStateException(
-            "setup: KeeperException waiting on input splits", e);
+            "setup: loadVertices failed with KeeperException", e);
+      }
+      getContext().progress();
+    } else {
+      vertexEdgeCount = new VertexEdgeCount();
+    }
+
+    if (getConfiguration().hasEdgeInputFormat()) {
+      // Ensure the edge InputSplits are ready for processing
+      ensureInputSplitsReady(edgeInputSplitsPaths, edgeInputSplitsEvents);
+      getContext().progress();
+      try {
+        vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges());
       } catch (InterruptedException e) {
         throw new IllegalStateException(
-            "setup: InterruptedException waiting on input splits", e);
-      }
-      if (inputSplitsReadyStat != null) {
-        break;
+            "setup: loadEdges failed with InterruptedException", e);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "setup: loadEdges failed with KeeperException", e);
       }
-      getInputSplitsAllReadyEvent().waitForever();
-      getInputSplitsAllReadyEvent().reset();
+      getContext().progress();
     }
 
-    getContext().progress();
-
-    // Load all the vertices and edges and get some stats about what was loaded
-    VertexEdgeCount vertexEdgeCount = null;
-    try {
-      vertexEdgeCount = loadVertices();
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "setup: loadVertices failed with InterruptedException", e);
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "setup: loadVertices failed with KeeperException", e);
-    }
     if (LOG.isInfoEnabled()) {
-      LOG.info("setup: Finally loaded a total of " +
-          vertexEdgeCount);
+      LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
     }
-    getContext().progress();
 
-    // Workers wait for each other to finish, coordinated by master
-    String workerDonePath =
-        inputSplitsDonePath + "/" + getWorkerInfo().getHostnameId();
-    try {
-      getZkExt().createExt(workerDonePath,
-          null,
-          Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT,
-          true);
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "setup: KeeperException creating worker done splits", e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "setup: InterruptedException creating worker done splits", e);
+    if (getConfiguration().hasVertexInputFormat()) {
+      // Workers wait for each other to finish, coordinated by master
+      waitForOtherWorkers(vertexInputSplitsPaths, vertexInputSplitsEvents);
     }
-    while (true) {
-      Stat inputSplitsDoneStat;
-      try {
-        inputSplitsDoneStat =
-            getZkExt().exists(inputSplitsAllDonePath, true);
-      } catch (KeeperException e) {
-        throw new IllegalStateException(
-            "setup: KeeperException waiting on worker done splits", e);
-      } catch (InterruptedException e) {
-        throw new IllegalStateException(
-            "setup: InterruptedException waiting on worker " +
-                "done splits", e);
-      }
-      if (inputSplitsDoneStat != null) {
-        break;
-      }
-      getInputSplitsAllDoneEvent().waitForever();
-      getInputSplitsAllDoneEvent().reset();
+
+    if (getConfiguration().hasEdgeInputFormat()) {
+      // Workers wait for each other to finish, coordinated by master
+      waitForOtherWorkers(edgeInputSplitsPaths, edgeInputSplitsEvents);
     }
 
     // Create remaining partitions owned by this worker.
@@ -432,6 +530,13 @@ else[HADOOP_NON_SECURE]*/
       }
     }
 
+    if (getConfiguration().hasEdgeInputFormat()) {
+      // Create vertices from added edges via vertex resolver.
+      // Doing this at the beginning of superstep 0 is not enough,
+      // because we want the vertex/edge stats to be accurate.
+      workerServer.resolveMutations(graphState);
+    }
+
     // Generate the partition stats for the input superstep and process
     // if necessary
     List<PartitionStats> partitionStatsList =

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspUtils.java Fri Nov  2 21:37:30 2012
@@ -185,6 +185,42 @@ public class BspUtils {
   }
 
   /**
+   * Get the user's subclassed {@link EdgeInputFormat}.
+   *
+   * @param <I> Vertex id
+   * @param <E> Edge data
+   * @param conf Configuration to check
+   * @return User's edge input format class
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static <I extends WritableComparable, E extends Writable>
+  Class<? extends EdgeInputFormat<I, E>>
+  getEdgeInputFormatClass(Configuration conf) {
+    return (Class<? extends EdgeInputFormat<I, E>>)
+        conf.getClass(GiraphConfiguration.EDGE_INPUT_FORMAT_CLASS,
+            null,
+            EdgeInputFormat.class);
+  }
+
+  /**
+   * Create a user edge input format class
+   *
+   * @param <I> Vertex id
+   * @param <E> Edge data
+   * @param conf Configuration to check
+   * @return Instantiated user edge input format class
+   */
+  @SuppressWarnings("rawtypes")
+  public static <I extends WritableComparable, E extends Writable>
+  EdgeInputFormat<I, E> createEdgeInputFormat(Configuration conf) {
+    Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass =
+        getEdgeInputFormatClass(conf);
+    EdgeInputFormat<I, E> inputFormat =
+        ReflectionUtils.newInstance(edgeInputFormatClass, conf);
+    return inputFormat;
+  }
+
+  /**
    * Get the user's subclassed {@link AggregatorWriter}.
    *
    * @param conf Configuration to check

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java Fri Nov  2 21:37:30 2012
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Input format for reading single edges.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public abstract class EdgeInputFormat<I extends WritableComparable,
+    E extends Writable> implements GiraphInputFormat {
+  /**
+   * Logically split the vertices for a graph processing application.
+   *
+   * Each {@link InputSplit} is then assigned to a worker for processing.
+   *
+   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
+   * input files are not physically split into chunks. For e.g. a split could
+   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
+   * also creates the {@link VertexReader} to read the {@link InputSplit}.
+   *
+   * Also, the number of workers is a hint given to the developer to try to
+   * intelligently determine how many splits to create (if this is
+   * adjustable) at runtime.
+   *
+   * @param context Context of the job
+   * @param numWorkers Number of workers used for this job
+   * @return an array of {@link InputSplit}s for the job.
+   */
+  @Override
+  public abstract List<InputSplit> getSplits(
+      JobContext context, int numWorkers) throws IOException,
+      InterruptedException;
+
+  /**
+   * Create an edge reader for a given split. The framework will call
+   * {@link EdgeReader#initialize(InputSplit, TaskAttemptContext)} before
+   * the split is used.
+   *
+   * @param split the split to be read
+   * @param context the information about the task
+   * @return a new record reader
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract EdgeReader<I, E> createEdgeReader(
+      InputSplit split,
+      TaskAttemptContext context) throws IOException;
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java Fri Nov  2 21:37:30 2012
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.MetricGroup;
+import org.apache.giraph.utils.LoggerUtils;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.yammer.metrics.core.Counter;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Load as many edge input splits as possible.
+ * Every thread will has its own instance of WorkerClientRequestProcessor
+ * to send requests.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class EdgeInputSplitsCallable<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends InputSplitsCallable<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(
+      EdgeInputSplitsCallable.class);
+  /** Total edges loaded */
+  private long totalEdgesLoaded = 0;
+  /** Input split max edges (-1 denotes all) */
+  private final long inputSplitMaxEdges;
+
+  // Metrics
+  /** number of edges loaded counter */
+  private final Counter edgesLoadedCounter;
+
+  /**
+   * Constructor.
+   *
+   * @param context Context
+   * @param graphState Graph state
+   * @param configuration Configuration
+   * @param bspServiceWorker service worker
+   * @param inputSplitPathList List of the paths of the input splits
+   * @param workerInfo This worker's info
+   * @param zooKeeperExt Handle to ZooKeeperExt
+   */
+  public EdgeInputSplitsCallable(
+      Mapper<?, ?, ?, ?>.Context context,
+      GraphState<I, V, E, M> graphState,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      BspServiceWorker<I, V, E, M> bspServiceWorker,
+      List<String> inputSplitPathList,
+      WorkerInfo workerInfo,
+      ZooKeeperExt zooKeeperExt)  {
+    super(context, graphState, configuration, bspServiceWorker,
+        inputSplitPathList, workerInfo, zooKeeperExt,
+        BspServiceWorker.EDGE_INPUT_SPLIT_RESERVED_NODE,
+        BspServiceWorker.EDGE_INPUT_SPLIT_FINISHED_NODE,
+        bspServiceWorker.edgeInputSplitsEvents);
+
+    inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
+
+    // Initialize Metrics
+    edgesLoadedCounter = GiraphMetrics.getCounter(MetricGroup.IO,
+        "edges-loaded");
+  }
+
+  /**
+   * Read edges from input split.  If testing, the user may request a
+   * maximum number of edges to be read from an input split.
+   *
+   * @param inputSplit Input split to process with edge reader
+   * @param graphState Current graph state
+   * @return Edges loaded from this input split
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  protected VertexEdgeCount readInputSplit(
+      InputSplit inputSplit,
+      GraphState<I, V, E, M> graphState) throws IOException,
+      InterruptedException {
+    EdgeInputFormat<I, E> edgeInputFormat =
+        configuration.createEdgeInputFormat();
+    EdgeReader<I, E> edgeReader =
+        edgeInputFormat.createEdgeReader(inputSplit, context);
+    edgeReader.initialize(inputSplit, context);
+    long inputSplitEdgesLoaded = 0;
+    while (edgeReader.nextEdge()) {
+      EdgeWithSource<I, E> readerEdge = edgeReader.getCurrentEdge();
+      if (readerEdge.getSourceVertexId() == null) {
+        throw new IllegalArgumentException(
+            "readInputSplit: Edge reader returned an edge " +
+                "without a source vertex id!  - " + readerEdge);
+      }
+      if (readerEdge.getEdge().getTargetVertexId() == null) {
+        throw new IllegalArgumentException(
+            "readInputSplit: Edge reader returned an edge " +
+                "without a target vertex id!  - " + readerEdge);
+      }
+      if (readerEdge.getEdge().getValue() == null) {
+        throw new IllegalArgumentException(
+            "readInputSplit: Edge reader returned an edge " +
+                "without a value!  - " + readerEdge);
+      }
+
+      graphState.getWorkerClientRequestProcessor().addEdgeRequest(
+          readerEdge.getSourceVertexId(), readerEdge.getEdge());
+      context.progress(); // do this before potential data transfer
+      ++inputSplitEdgesLoaded;
+
+      // Update status every 1M edges
+      if (((inputSplitEdgesLoaded + totalEdgesLoaded) % 1000000) == 0) {
+        LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
+            "readInputSplit: Loaded " +
+                (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " +
+                MemoryUtils.getRuntimeMemoryStats());
+      }
+
+      // For sampling, or to limit outlier input splits, the number of
+      // records per input split can be limited
+      if (inputSplitMaxEdges > 0 &&
+          inputSplitEdgesLoaded >= inputSplitMaxEdges) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("readInputSplit: Leaving the input " +
+              "split early, reached maximum edges " +
+              inputSplitEdgesLoaded);
+        }
+        break;
+      }
+    }
+    edgeReader.close();
+    totalEdgesLoaded += inputSplitEdgesLoaded;
+    edgesLoadedCounter.inc(inputSplitEdgesLoaded);
+    return new VertexEdgeCount(0, inputSplitEdgesLoaded);
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallableFactory.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallableFactory.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallableFactory.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallableFactory.java Fri Nov  2 21:37:30 2012
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.util.List;
+
+/**
+ * Factory for {@link EdgeInputSplitsCallable}s.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements InputSplitsCallableFactory<I, V, E, M> {
+  /** Mapper context. */
+  private final Mapper<?, ?, ?, ?>.Context context;
+  /** Graph state. */
+  private final GraphState<I, V, E, M> graphState;
+  /** Configuration. */
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+  /** {@link BspServiceWorker} we're running on. */
+  private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+  /** List of input split paths. */
+  private final List<String> inputSplitPathList;
+  /** Worker info. */
+  private final WorkerInfo workerInfo;
+  /** {@link ZooKeeperExt} for this worker. */
+  private final ZooKeeperExt zooKeeperExt;
+
+  /**
+   * Constructor.
+   *
+   * @param context Mapper context
+   * @param graphState Graph state
+   * @param configuration Configuration
+   * @param bspServiceWorker Calling {@link BspServiceWorker}
+   * @param inputSplitPathList List of input split paths
+   * @param workerInfo Worker info
+   * @param zooKeeperExt {@link ZooKeeperExt} for this worker
+   */
+  public EdgeInputSplitsCallableFactory(
+      Mapper<?, ?, ?, ?>.Context context,
+      GraphState<I, V, E, M> graphState,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      BspServiceWorker<I, V, E, M> bspServiceWorker,
+      List<String> inputSplitPathList,
+      WorkerInfo workerInfo,
+      ZooKeeperExt zooKeeperExt) {
+    this.context = context;
+    this.graphState = graphState;
+    this.configuration = configuration;
+    this.bspServiceWorker = bspServiceWorker;
+    this.inputSplitPathList = inputSplitPathList;
+    this.workerInfo = workerInfo;
+    this.zooKeeperExt = zooKeeperExt;
+  }
+
+  @Override
+  public InputSplitsCallable<I, V, E, M> newCallable() {
+    return new EdgeInputSplitsCallable<I, V, E, M>(
+        context,
+        graphState,
+        configuration,
+        bspServiceWorker,
+        inputSplitPathList,
+        workerInfo,
+        zooKeeperExt);
+  }
+}