You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/11/02 22:37:31 UTC

svn commit: r1405175 [2/3] - in /giraph/trunk: ./ giraph/src/main/java/org/apache/giraph/ giraph/src/main/java/org/apache/giraph/bsp/ giraph/src/main/java/org/apache/giraph/comm/ giraph/src/main/java/org/apache/giraph/comm/netty/ giraph/src/main/java/o...

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeReader.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeReader.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeReader.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeReader.java Fri Nov  2 21:37:30 2012
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Analogous to {@link RecordReader} for edges.  Will read the edges
+ * from an input split.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+@SuppressWarnings("rawtypes")
+public interface EdgeReader<I extends WritableComparable, E extends Writable> {
+  /**
+   * Use the input split and context to setup reading the edges.
+   * Guaranteed to be called prior to any other function.
+   *
+   * @param inputSplit Input split to be used for reading edges.
+   * @param context Context from the task.
+   * @throws java.io.IOException
+   * @throws InterruptedException
+   */
+  void initialize(InputSplit inputSplit, TaskAttemptContext context) throws
+      IOException, InterruptedException;
+
+  /**
+   *
+   * @return false iff there are no more edges
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  boolean nextEdge() throws IOException, InterruptedException;
+
+  /**
+   * Get the current edge.
+   *
+   * @return the current edge which has been read.
+   *         nextEdge() should be called first.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  EdgeWithSource<I, E> getCurrentEdge() throws IOException,
+      InterruptedException;
+
+  /**
+   * Close this {@link EdgeReader} to future operations.
+   *
+   * @throws IOException
+   */
+  void close() throws IOException;
+
+  /**
+   * How much of the input has the {@link EdgeReader} consumed i.e.
+   * has been processed by?
+   *
+   * @return Progress from <code>0.0</code> to <code>1.0</code>.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  float getProgress() throws IOException, InterruptedException;
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeWithSource.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeWithSource.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeWithSource.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeWithSource.java Fri Nov  2 21:37:30 2012
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A pair of source vertex id and Edge object (that is,
+ * all the information about an edge).
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public class EdgeWithSource<I extends WritableComparable, E extends Writable> {
+  /** Source vertex id. */
+  private final I sourceVertexId;
+  /** Edge. */
+  private final Edge<I, E> edge;
+
+  /**
+   * Constructor.
+   *
+   * @param sourceVertexId Source vertex id
+   * @param edge Edge
+   */
+  public EdgeWithSource(I sourceVertexId, Edge<I, E> edge) {
+    this.sourceVertexId = sourceVertexId;
+    this.edge = edge;
+  }
+
+  /**
+   * Get the source vertex id.
+   *
+   * @return Source vertex id.
+   */
+  public I getSourceVertexId() {
+    return sourceVertexId;
+  }
+
+  /**
+   * Get the edge object.
+   *
+   * @return The edge.
+   */
+  public Edge<I, E> getEdge() {
+    return edge;
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java Fri Nov  2 21:37:30 2012
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Common interface for {@link VertexInputFormat} and {@link EdgeInputFormat}.
+ */
+public interface GiraphInputFormat {
+  /**
+   * Get the list of input splits for the format.
+   *
+   * @param context The job context
+   * @param numWorkers Number of workers
+   * @return The list of input splits
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException;
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphJob.java Fri Nov  2 21:37:30 2012
@@ -153,9 +153,16 @@ public class GiraphJob {
       throw new IllegalArgumentException("checkConfiguration: Null" +
           GiraphConfiguration.VERTEX_CLASS);
     }
-    if (conf.getVertexInputFormatClass() == null) {
-      throw new IllegalArgumentException("checkConfiguration: Null " +
-          GiraphConfiguration.VERTEX_INPUT_FORMAT_CLASS);
+    if (conf.getVertexInputFormatClass() == null &&
+        conf.getEdgeInputFormatClass() == null) {
+      throw new IllegalArgumentException("checkConfiguration: One of " +
+          GiraphConfiguration.VERTEX_INPUT_FORMAT_CLASS + " and " +
+          GiraphConfiguration.EDGE_INPUT_FORMAT_CLASS + " must be non-null");
+    }
+    if (conf.getEdgeInputFormatClass() != null &&
+        !(MutableVertex.class.isAssignableFrom(conf.getVertexClass()))) {
+      throw new IllegalArgumentException("checkConfiguration: EdgeInputFormat" +
+          " only works with mutable vertices");
     }
     if (conf.getVertexResolverClass() == null) {
       if (LOG.isInfoEnabled()) {

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java Fri Nov  2 21:37:30 2012
@@ -18,10 +18,6 @@
 
 package org.apache.giraph.graph;
 
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.core.TimerContext;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
@@ -46,6 +42,11 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.yammer.metrics.core.Timer;
+import com.yammer.metrics.core.TimerContext;
+
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.net.URL;

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitEvents.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitEvents.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitEvents.java Fri Nov  2 21:37:30 2012
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.zk.BspEvent;
+import org.apache.giraph.zk.PredicateLock;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Simple container of input split events.
+ */
+public class InputSplitEvents {
+  /** Input splits are ready for consumption by workers */
+  private final BspEvent allReadyChanged;
+  /** Input split reservation or finished notification and synchronization */
+  private final BspEvent stateChanged;
+  /** Input splits are done being processed by workers */
+  private final BspEvent allDoneChanged;
+  /** Input split done by a worker finished notification and synchronization */
+  private final BspEvent doneStateChanged;
+
+  /**
+   * Constructor.
+   *
+   * @param progressable {@link Progressable} to report progress
+   */
+  public InputSplitEvents(Progressable progressable) {
+    allReadyChanged = new PredicateLock(progressable);
+    stateChanged = new PredicateLock(progressable);
+    allDoneChanged = new PredicateLock(progressable);
+    doneStateChanged = new PredicateLock(progressable);
+  }
+
+  /**
+   * Get event for input splits all ready
+   *
+   * @return {@link BspEvent} for input splits all ready
+   */
+  public BspEvent getAllReadyChanged() {
+    return allReadyChanged;
+  }
+
+  /**
+   * Get event for input splits state
+   *
+   * @return {@link BspEvent} for input splits state
+   */
+  public BspEvent getStateChanged() {
+    return stateChanged;
+  }
+
+  /**
+   * Get event for input splits all done
+   *
+   * @return {@link BspEvent} for input splits all done
+   */
+  public BspEvent getAllDoneChanged() {
+    return allDoneChanged;
+  }
+
+  /**
+   * Get event for input split done
+   *
+   * @return {@link BspEvent} for input split done
+   */
+  public BspEvent getDoneStateChanged() {
+    return doneStateChanged;
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPaths.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPaths.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPaths.java Fri Nov  2 21:37:30 2012
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+/**
+ * Simple container of input split paths for coordination via ZooKeeper.
+ */
+public class InputSplitPaths {
+  /** Path to the input splits written by the master */
+  private final String path;
+  /** Path to the input splits all ready to be processed by workers */
+  private final String allReadyPath;
+  /** Path to the input splits done */
+  private final String donePath;
+  /** Path to the input splits all done to notify the workers to proceed */
+  private final String allDonePath;
+
+  /**
+   * Constructor.
+   *
+   * @param basePath Base path
+   * @param dir Input splits path
+   * @param doneDir Input split done path
+   * @param allReadyNode Input splits all ready path
+   * @param allDoneNode Input splits all done path
+   */
+  public InputSplitPaths(String basePath,
+                         String dir,
+                         String doneDir,
+                         String allReadyNode,
+                         String allDoneNode) {
+    path = basePath + dir;
+    allReadyPath = basePath + allReadyNode;
+    donePath = basePath + doneDir;
+    allDonePath = basePath + allDoneNode;
+  }
+
+  /**
+   * Get path to the input splits.
+   *
+   * @return Path to input splits
+   */
+  public String getPath() {
+    return path;
+  }
+
+  /**
+   * Get path to the input splits all ready.
+   *
+   * @return Path to input splits all ready
+   */
+  public String getAllReadyPath() {
+    return allReadyPath;
+  }
+
+  /** Get path to the input splits done.
+   *
+   * @return Path to input splits done
+   */
+  public String getDonePath() {
+    return donePath;
+  }
+
+  /**
+   * Get path to the input splits all done.
+   *
+   * @return Path to input splits all done
+   */
+  public String getAllDonePath() {
+    return allDonePath;
+  }
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java Fri Nov  2 21:37:30 2012
@@ -15,23 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.giraph.graph;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.Callable;
+package org.apache.giraph.graph;
 
-import com.yammer.metrics.core.Counter;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
-import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.giraph.metrics.GiraphMetrics;
-import org.apache.giraph.metrics.MetricGroup;
-import org.apache.giraph.utils.LoggerUtils;
-import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.SystemTime;
 import org.apache.giraph.utils.Time;
 import org.apache.giraph.zk.ZooKeeperExt;
@@ -41,15 +30,20 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.Stat;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+
 /**
- * Load as many input splits as possible.
+ * Abstract base class for loading vertex/edge input splits.
  * Every thread will has its own instance of WorkerClientRequestProcessor
  * to send requests.
  *
@@ -58,7 +52,7 @@ import org.apache.zookeeper.data.Stat;
  * @param <E> Edge value
  * @param <M> Message data
  */
-public class InputSplitsCallable<I extends WritableComparable,
+public abstract class InputSplitsCallable<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     implements Callable<VertexEdgeCount> {
   /** Class logger */
@@ -66,8 +60,11 @@ public class InputSplitsCallable<I exten
       Logger.getLogger(InputSplitsCallable.class);
   /** Class time object */
   private static final Time TIME = SystemTime.getInstance();
+  /** Configuration */
+  protected final ImmutableClassesGiraphConfiguration<I, V, E, M>
+  configuration;
   /** Context */
-  private final Mapper<?, ?, ?, ?>.Context context;
+  protected final Mapper<?, ?, ?, ?>.Context context;
   /** Graph state */
   private final GraphState<I, V, E, M> graphState;
   /** Handles IPC communication */
@@ -80,26 +77,16 @@ public class InputSplitsCallable<I exten
   private final InputSplitPathOrganizer splitOrganizer;
   /** ZooKeeperExt handle */
   private final ZooKeeperExt zooKeeperExt;
-  /** Configuration */
-  private final ImmutableClassesGiraphConfiguration<I, V, E, M>
-  configuration;
-  /** Total vertices loaded */
-  private long totalVerticesLoaded = 0;
-  /** Total edges loaded */
-  private long totalEdgesLoaded = 0;
-  /** Input split max vertices (-1 denotes all) */
-  private final long inputSplitMaxVertices;
-  /** Bsp service worker (only use thread-safe methods) */
-  private final BspServiceWorker<I, V, E, M> bspServiceWorker;
   /** Get the start time in nanos */
   private final long startNanos = TIME.getNanoseconds();
+  /** ZooKeeper input split reserved node. */
+  private final String inputSplitReservedNode;
+  /** ZooKeeper input split finished node. */
+  private final String inputSplitFinishedNode;
+  /** Input split events. */
+  private final InputSplitEvents inputSplitEvents;
 
-    // Metrics
-  /** number of vertices loaded counter */
-  private final Counter verticesLoadedCounter;
-  /** number of edges loaded counter */
-  private final Counter edgesLoadedCounter;
-
+  // CHECKSTYLE: stop ParameterNumberCheck
   /**
    * Constructor.
    *
@@ -110,14 +97,21 @@ public class InputSplitsCallable<I exten
    * @param inputSplitPathList List of the paths of the input splits
    * @param workerInfo This worker's info
    * @param zooKeeperExt Handle to ZooKeeperExt
+   * @param inputSplitReservedNode Path to input split reserved
+   * @param inputSplitFinishedNode Path to input split finsished
+   * @param inputSplitEvents Input split events
    */
   public InputSplitsCallable(
-      Mapper<?, ?, ?, ?>.Context context, GraphState<I, V, E, M> graphState,
+      Mapper<?, ?, ?, ?>.Context context,
+      GraphState<I, V, E, M> graphState,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
       BspServiceWorker<I, V, E, M> bspServiceWorker,
       List<String> inputSplitPathList,
       WorkerInfo workerInfo,
-      ZooKeeperExt zooKeeperExt)  {
+      ZooKeeperExt zooKeeperExt,
+      String inputSplitReservedNode,
+      String inputSplitFinishedNode,
+      InputSplitEvents inputSplitEvents) {
     this.zooKeeperExt = zooKeeperExt;
     this.context = context;
     this.workerClientRequestProcessor =
@@ -137,25 +131,35 @@ public class InputSplitsCallable<I exten
           "InputSplitsCallable: InterruptedException", e);
     }
     this.configuration = configuration;
-    inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
-    this.bspServiceWorker = bspServiceWorker;
-
-        // Initialize Metrics
-    verticesLoadedCounter = GiraphMetrics.getCounter(MetricGroup.IO,
-                                                     "vertices-loaded");
-    edgesLoadedCounter = GiraphMetrics.getCounter(MetricGroup.IO,
-                                                  "edges-loaded");
+    this.inputSplitReservedNode = inputSplitReservedNode;
+    this.inputSplitFinishedNode = inputSplitFinishedNode;
+    this.inputSplitEvents = inputSplitEvents;
   }
+  // CHECKSTYLE: resume ParameterNumberCheck
+
+  /**
+   * Load vertices/edges from the given input split.
+   *
+   * @param inputSplit Input split to load
+   * @param graphState Graph state
+   * @return Count of vertices and edges loaded
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  protected abstract VertexEdgeCount readInputSplit(
+      InputSplit inputSplit,
+      GraphState<I, V, E, M> graphState)
+    throws IOException, InterruptedException;
 
   @Override
   public VertexEdgeCount call() {
     VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
-    String inputSplitPath = null;
+    String inputSplitPath;
     int inputSplitsProcessed = 0;
     try {
       while ((inputSplitPath = reserveInputSplit()) != null) {
         vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
-            loadVerticesFromInputSplit(inputSplitPath,
+            loadInputSplit(inputSplitPath,
                 graphState));
         context.progress();
         ++inputSplitsProcessed;
@@ -212,13 +216,13 @@ public class InputSplitsCallable<I exten
   private String reserveInputSplit()
     throws KeeperException, InterruptedException {
     String reservedInputSplitPath = null;
-    Stat reservedStat = null;
+    Stat reservedStat;
     while (true) {
       int reservedInputSplits = 0;
       for (String nextSplitToClaim : splitOrganizer) {
         context.progress();
-        String tmpInputSplitReservedPath =
-            nextSplitToClaim + BspServiceWorker.INPUT_SPLIT_RESERVED_NODE;
+        String tmpInputSplitReservedPath = nextSplitToClaim +
+            inputSplitReservedNode;
         reservedStat =
             zooKeeperExt.exists(tmpInputSplitReservedPath, true);
         if (reservedStat == null) {
@@ -270,9 +274,9 @@ public class InputSplitsCallable<I exten
       // Wait for either a reservation to go away or a notification that
       // an InputSplit has finished.
       context.progress();
-      bspServiceWorker.getInputSplitsStateChangedEvent().waitMsecs(
+      inputSplitEvents.getStateChanged().waitMsecs(
           60 * 1000);
-      bspServiceWorker.getInputSplitsStateChangedEvent().reset();
+      inputSplitEvents.getStateChanged().reset();
     }
   }
 
@@ -285,7 +289,7 @@ public class InputSplitsCallable<I exten
    */
   private void markInputSplitPathFinished(String inputSplitPath) {
     String inputSplitFinishedPath =
-        inputSplitPath + BspServiceWorker.INPUT_SPLIT_FINISHED_NODE;
+        inputSplitPath + inputSplitFinishedNode;
     try {
       zooKeeperExt.createExt(inputSplitFinishedPath,
           null,
@@ -293,15 +297,15 @@ public class InputSplitsCallable<I exten
           CreateMode.PERSISTENT,
           true);
     } catch (KeeperException.NodeExistsException e) {
-      LOG.warn("loadVertices: " + inputSplitFinishedPath +
+      LOG.warn("markInputSplitPathFinished: " + inputSplitFinishedPath +
           " already exists!");
     } catch (KeeperException e) {
       throw new IllegalStateException(
-          "loadVertices: KeeperException on " +
+          "markInputSplitPathFinished: KeeperException on " +
               inputSplitFinishedPath, e);
     } catch (InterruptedException e) {
       throw new IllegalStateException(
-          "loadVertices: InterruptedException on " +
+          "markInputSplitPathFinished: InterruptedException on " +
               inputSplitFinishedPath, e);
     }
   }
@@ -321,16 +325,16 @@ public class InputSplitsCallable<I exten
    * @throws InstantiationException
    * @throws IllegalAccessException
    */
-  private VertexEdgeCount loadVerticesFromInputSplit(
+  private VertexEdgeCount loadInputSplit(
       String inputSplitPath,
       GraphState<I, V, E, M> graphState)
     throws IOException, ClassNotFoundException, InterruptedException,
       InstantiationException, IllegalAccessException {
-    InputSplit inputSplit = getInputSplitForVertices(inputSplitPath);
+    InputSplit inputSplit = getInputSplit(inputSplitPath);
     VertexEdgeCount vertexEdgeCount =
-        readVerticesFromInputSplit(inputSplit, graphState);
+        readInputSplit(inputSplit, graphState);
     if (LOG.isInfoEnabled()) {
-      LOG.info("loadVerticesFromInputSplit: Finished loading " +
+      LOG.info("loadFromInputSplit: Finished loading " +
           inputSplitPath + " " + vertexEdgeCount);
     }
     markInputSplitPathFinished(inputSplitPath);
@@ -339,24 +343,24 @@ public class InputSplitsCallable<I exten
 
   /**
    * Talk to ZooKeeper to convert the input split path to the actual
-   * InputSplit containing the vertices to read.
+   * InputSplit.
    *
    * @param inputSplitPath Location in ZK of input split
-   * @return instance of InputSplit containing vertices to read
+   * @return instance of InputSplit
    * @throws IOException
    * @throws ClassNotFoundException
    */
-  private InputSplit getInputSplitForVertices(String inputSplitPath)
+  protected InputSplit getInputSplit(String inputSplitPath)
     throws IOException, ClassNotFoundException {
     byte[] splitList;
     try {
       splitList = zooKeeperExt.getData(inputSplitPath, false, null);
     } catch (KeeperException e) {
       throw new IllegalStateException(
-          "loadVertices: KeeperException on " + inputSplitPath, e);
+          "getInputSplit: KeeperException on " + inputSplitPath, e);
     } catch (InterruptedException e) {
       throw new IllegalStateException(
-          "loadVertices: IllegalStateException on " + inputSplitPath, e);
+          "getInputSplit: IllegalStateException on " + inputSplitPath, e);
     }
     context.progress();
 
@@ -371,85 +375,10 @@ public class InputSplitsCallable<I exten
     ((Writable) inputSplit).readFields(inputStream);
 
     if (LOG.isInfoEnabled()) {
-      LOG.info("getInputSplitForVertices: Reserved " + inputSplitPath +
+      LOG.info("getInputSplit: Reserved " + inputSplitPath +
           " from ZooKeeper and got input split '" +
           inputSplit.toString() + "'");
     }
     return inputSplit;
   }
-
-  /**
-   * Read vertices from input split.  If testing, the user may request a
-   * maximum number of vertices to be read from an input split.
-   *
-   * @param inputSplit Input split to process with vertex reader
-   * @param graphState Current graph state
-   * @return Vertices and edges loaded from this input split
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private VertexEdgeCount readVerticesFromInputSplit(
-      InputSplit inputSplit,
-      GraphState<I, V, E, M> graphState)
-    throws IOException, InterruptedException {
-    VertexInputFormat<I, V, E, M> vertexInputFormat =
-        configuration.createVertexInputFormat();
-    VertexReader<I, V, E, M> vertexReader =
-        vertexInputFormat.createVertexReader(inputSplit, context);
-    vertexReader.initialize(inputSplit, context);
-    long inputSplitVerticesLoaded = 0;
-    long inputSplitEdgesLoaded = 0;
-    while (vertexReader.nextVertex()) {
-      Vertex<I, V, E, M> readerVertex =
-          vertexReader.getCurrentVertex();
-      if (readerVertex.getId() == null) {
-        throw new IllegalArgumentException(
-            "readVerticesFromInputSplit: Vertex reader returned a vertex " +
-                "without an id!  - " + readerVertex);
-      }
-      if (readerVertex.getValue() == null) {
-        readerVertex.setValue(configuration.createVertexValue());
-      }
-      readerVertex.setConf(configuration);
-      readerVertex.setGraphState(graphState);
-
-      PartitionOwner partitionOwner =
-          bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
-      graphState.getWorkerClientRequestProcessor().sendVertexRequest(
-          partitionOwner, readerVertex);
-      context.progress(); // do this before potential data transfer
-      ++inputSplitVerticesLoaded;
-      inputSplitEdgesLoaded += readerVertex.getNumEdges();
-
-      // Update status every 250k vertices
-      if (((inputSplitVerticesLoaded + totalVerticesLoaded) % 250000) == 0) {
-        LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
-            "readVerticesFromInputSplit: Loaded " +
-                (inputSplitVerticesLoaded + totalVerticesLoaded) +
-                " vertices " +
-                (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " +
-                MemoryUtils.getRuntimeMemoryStats());
-      }
-
-      // For sampling, or to limit outlier input splits, the number of
-      // records per input split can be limited
-      if (inputSplitMaxVertices > 0 &&
-          inputSplitVerticesLoaded >= inputSplitMaxVertices) {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("readVerticesFromInputSplit: Leaving the input " +
-              "split early, reached maximum vertices " +
-              inputSplitVerticesLoaded);
-        }
-        break;
-      }
-    }
-    vertexReader.close();
-    totalVerticesLoaded += inputSplitVerticesLoaded;
-    verticesLoadedCounter.inc(inputSplitVerticesLoaded);
-    totalEdgesLoaded += inputSplitEdgesLoaded;
-    edgesLoadedCounter.inc(inputSplitEdgesLoaded);
-    return new VertexEdgeCount(
-        inputSplitVerticesLoaded, inputSplitEdgesLoaded);
-  }
 }
-

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallableFactory.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallableFactory.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallableFactory.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallableFactory.java Fri Nov  2 21:37:30 2012
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Factory class for creating {@link InputSplitsCallable}s.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public interface InputSplitsCallableFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Return a newly-created {@link InputSplitsCallable}.
+   *
+   * @return A new {@link InputSplitsCallable}
+   */
+  InputSplitsCallable<I, V, E, M> newCallable();
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java Fri Nov  2 21:37:30 2012
@@ -18,11 +18,6 @@
 
 package org.apache.giraph.graph;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
@@ -33,6 +28,11 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
 
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
 /**
  * Master thread that will coordinate the activities of the tasks.  It runs
  * on all task processes, however, will only execute its algorithm if it knows
@@ -98,7 +98,8 @@ public class MasterThread<I extends Writ
         // Attempt to create InputSplits if necessary. Bail out if that fails.
         if (bspServiceMaster.getRestartedSuperstep() !=
             BspService.UNSET_SUPERSTEP ||
-            bspServiceMaster.createInputSplits() != -1) {
+            (bspServiceMaster.createVertexInputSplits() != -1 &&
+                bspServiceMaster.createEdgeInputSplits() != -1)) {
           long setupMillis = System.currentTimeMillis() - startMillis;
           context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
               "Setup (milliseconds)").
@@ -125,13 +126,13 @@ public class MasterThread<I extends Writ
             if (superstepCounterOn) {
               String counterPrefix;
               if (cachedSuperstep == -1) {
-                counterPrefix = "Vertex input superstep";
+                counterPrefix = "Input superstep";
               } else {
                 counterPrefix = "Superstep " + cachedSuperstep;
               }
               context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
                   counterPrefix +
-                  " (milliseconds)").
+                      " (milliseconds)").
                   increment(superstepMillis);
             }
 
@@ -158,7 +159,7 @@ public class MasterThread<I extends Writ
           if (LOG.isInfoEnabled()) {
             if (entry.getKey().longValue() ==
                 BspService.INPUT_SUPERSTEP) {
-              LOG.info("vertex input superstep: Took " +
+              LOG.info("input superstep: Took " +
                   entry.getValue() + " seconds.");
             } else {
               LOG.info("superstep " + entry.getKey() + ": Took " +

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputFormat.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputFormat.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputFormat.java Fri Nov  2 21:37:30 2012
@@ -40,7 +40,8 @@ import java.util.List;
  */
 @SuppressWarnings("rawtypes")
 public abstract class VertexInputFormat<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
+    V extends Writable, E extends Writable, M extends Writable>
+    implements GiraphInputFormat {
   /**
    * Logically split the vertices for a graph processing application.
    *
@@ -59,6 +60,7 @@ public abstract class VertexInputFormat<
    * @param numWorkers Number of workers used for this job
    * @return an array of {@link InputSplit}s for the job.
    */
+  @Override
   public abstract List<InputSplit> getSplits(
     JobContext context, int numWorkers)
     throws IOException, InterruptedException;

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java Fri Nov  2 21:37:30 2012
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.MetricGroup;
+import org.apache.giraph.utils.LoggerUtils;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.yammer.metrics.core.Counter;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Load as many vertex input splits as possible.
+ * Every thread will has its own instance of WorkerClientRequestProcessor
+ * to send requests.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class VertexInputSplitsCallable<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends InputSplitsCallable<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(VertexInputSplitsCallable.class);
+  /** Total vertices loaded */
+  private long totalVerticesLoaded = 0;
+  /** Total edges loaded */
+  private long totalEdgesLoaded = 0;
+  /** Input split max vertices (-1 denotes all) */
+  private final long inputSplitMaxVertices;
+  /** Bsp service worker (only use thread-safe methods) */
+  private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+
+  // Metrics
+  /** number of vertices loaded counter */
+  private final Counter verticesLoadedCounter;
+  /** number of edges loaded counter */
+  private final Counter edgesLoadedCounter;
+
+  /**
+   * Constructor.
+   *
+   * @param context Context
+   * @param graphState Graph state
+   * @param configuration Configuration
+   * @param bspServiceWorker service worker
+   * @param inputSplitPathList List of the paths of the input splits
+   * @param workerInfo This worker's info
+   * @param zooKeeperExt Handle to ZooKeeperExt
+   */
+  public VertexInputSplitsCallable(
+      Mapper<?, ?, ?, ?>.Context context,
+      GraphState<I, V, E, M> graphState,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      BspServiceWorker<I, V, E, M> bspServiceWorker,
+      List<String> inputSplitPathList,
+      WorkerInfo workerInfo,
+      ZooKeeperExt zooKeeperExt)  {
+    super(context, graphState, configuration, bspServiceWorker,
+        inputSplitPathList, workerInfo, zooKeeperExt,
+        BspServiceWorker.VERTEX_INPUT_SPLIT_RESERVED_NODE,
+        BspServiceWorker.VERTEX_INPUT_SPLIT_FINISHED_NODE,
+        bspServiceWorker.vertexInputSplitsEvents);
+
+    inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
+    this.bspServiceWorker = bspServiceWorker;
+
+    // Initialize Metrics
+    verticesLoadedCounter = GiraphMetrics.getCounter(MetricGroup.IO,
+        "vertices-loaded");
+    edgesLoadedCounter = GiraphMetrics.getCounter(MetricGroup.IO,
+        "edges-loaded");
+  }
+
+  /**
+   * Read vertices from input split.  If testing, the user may request a
+   * maximum number of vertices to be read from an input split.
+   *
+   * @param inputSplit Input split to process with vertex reader
+   * @param graphState Current graph state
+   * @return Vertices and edges loaded from this input split
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  protected VertexEdgeCount readInputSplit(
+      InputSplit inputSplit,
+      GraphState<I, V, E, M> graphState)
+    throws IOException, InterruptedException {
+    VertexInputFormat<I, V, E, M> vertexInputFormat =
+        configuration.createVertexInputFormat();
+    VertexReader<I, V, E, M> vertexReader =
+        vertexInputFormat.createVertexReader(inputSplit, context);
+    vertexReader.initialize(inputSplit, context);
+    long inputSplitVerticesLoaded = 0;
+    long inputSplitEdgesLoaded = 0;
+    while (vertexReader.nextVertex()) {
+      Vertex<I, V, E, M> readerVertex =
+          vertexReader.getCurrentVertex();
+      if (readerVertex.getId() == null) {
+        throw new IllegalArgumentException(
+            "readInputSplit: Vertex reader returned a vertex " +
+                "without an id!  - " + readerVertex);
+      }
+      if (readerVertex.getValue() == null) {
+        readerVertex.setValue(configuration.createVertexValue());
+      }
+      readerVertex.setConf(configuration);
+      readerVertex.setGraphState(graphState);
+
+      PartitionOwner partitionOwner =
+          bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
+      graphState.getWorkerClientRequestProcessor().sendVertexRequest(
+          partitionOwner, readerVertex);
+      context.progress(); // do this before potential data transfer
+      ++inputSplitVerticesLoaded;
+      inputSplitEdgesLoaded += readerVertex.getNumEdges();
+
+      // Update status every 250k vertices
+      if (((inputSplitVerticesLoaded + totalVerticesLoaded) % 250000) == 0) {
+        LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
+            "readInputSplit: Loaded " +
+                (inputSplitVerticesLoaded + totalVerticesLoaded) +
+                " vertices " +
+                (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " +
+                MemoryUtils.getRuntimeMemoryStats());
+      }
+
+      // For sampling, or to limit outlier input splits, the number of
+      // records per input split can be limited
+      if (inputSplitMaxVertices > 0 &&
+          inputSplitVerticesLoaded >= inputSplitMaxVertices) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("readInputSplit: Leaving the input " +
+              "split early, reached maximum vertices " +
+              inputSplitVerticesLoaded);
+        }
+        break;
+      }
+    }
+    vertexReader.close();
+    totalVerticesLoaded += inputSplitVerticesLoaded;
+    verticesLoadedCounter.inc(inputSplitVerticesLoaded);
+    totalEdgesLoaded += inputSplitEdgesLoaded;
+    edgesLoadedCounter.inc(inputSplitEdgesLoaded);
+    return new VertexEdgeCount(
+        inputSplitVerticesLoaded, inputSplitEdgesLoaded);
+  }
+}
+

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallableFactory.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallableFactory.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallableFactory.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallableFactory.java Fri Nov  2 21:37:30 2012
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.util.List;
+
+/**
+ * Factory for {@link VertexInputSplitsCallable}s.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class VertexInputSplitsCallableFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements InputSplitsCallableFactory<I, V, E, M> {
+  /** Mapper context. */
+  private final Mapper<?, ?, ?, ?>.Context context;
+  /** Graph state. */
+  private final GraphState<I, V, E, M> graphState;
+  /** Configuration. */
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+  /** {@link BspServiceWorker} we're running on. */
+  private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+  /** List of input split paths. */
+  private final List<String> inputSplitPathList;
+  /** Worker info. */
+  private final WorkerInfo workerInfo;
+  /** {@link ZooKeeperExt} for this worker. */
+  private final ZooKeeperExt zooKeeperExt;
+
+  /**
+   * Constructor.
+   *
+   * @param context Mapper context
+   * @param graphState Graph state
+   * @param configuration Configuration
+   * @param bspServiceWorker Calling {@link BspServiceWorker}
+   * @param inputSplitPathList List of input split paths
+   * @param workerInfo Worker info
+   * @param zooKeeperExt {@link ZooKeeperExt} for this worker
+   */
+  public VertexInputSplitsCallableFactory(
+      Mapper<?, ?, ?, ?>.Context context,
+      GraphState<I, V, E, M> graphState,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      BspServiceWorker<I, V, E, M> bspServiceWorker,
+      List<String> inputSplitPathList,
+      WorkerInfo workerInfo,
+      ZooKeeperExt zooKeeperExt) {
+    this.context = context;
+    this.graphState = graphState;
+    this.configuration = configuration;
+    this.bspServiceWorker = bspServiceWorker;
+    this.inputSplitPathList = inputSplitPathList;
+    this.workerInfo = workerInfo;
+    this.zooKeeperExt = zooKeeperExt;
+  }
+
+  @Override
+  public InputSplitsCallable<I, V, E, M> newCallable() {
+    return new VertexInputSplitsCallable<I, V, E, M>(
+        context,
+        graphState,
+        configuration,
+        bspServiceWorker,
+        inputSplitPathList,
+        workerInfo,
+        zooKeeperExt);
+  }
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexReader.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexReader.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexReader.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexReader.java Fri Nov  2 21:37:30 2012
@@ -38,7 +38,7 @@ import java.io.IOException;
 public interface VertexReader<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> {
   /**
-   * Use the input split and context t o setup reading the vertices.
+   * Use the input split and context to setup reading the vertices.
    * Guaranteed to be called prior to any other function.
    *
    * @param inputSplit Input split to be used for reading vertices.

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexResolver.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexResolver.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexResolver.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexResolver.java Fri Nov  2 21:37:30 2012
@@ -57,7 +57,7 @@ public class VertexResolver<I extends Wr
     // 1. If the vertex exists, first prune the edges
     // 2. If vertex removal desired, remove the vertex.
     // 3. If creation of vertex desired, pick first vertex
-    // 4. If vertex doesn't exist, but got messages, create
+    // 4. If vertex doesn't exist, but got messages or added edges, create
     // 5. If edge addition, add the edges
     if (vertex != null) {
       if (vertexChanges != null) {
@@ -84,7 +84,8 @@ public class VertexResolver<I extends Wr
           vertex = vertexChanges.getAddedVertexList().get(0);
         }
       }
-      if (vertex == null && hasMessages) {
+      if (vertex == null &&
+          (hasMessages || !vertexChanges.getAddedEdgeList().isEmpty())) {
         vertex = instantiateVertex();
         vertex.initialize(vertexId,
             getConf().createVertexValue(),

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java Fri Nov  2 21:37:30 2012
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Vertex input format that only allows setting vertex id and value. It can
+ * be used in conjunction with {@link EdgeInputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+public abstract class VertexValueInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends VertexInputFormat<I, V, E, M> {
+  /**
+   * Create a {@link VertexValueReader} for a given split. The framework will
+   * call {@link VertexValueReader#initialize(InputSplit,
+   * TaskAttemptContext)} before the split is used.
+   *
+   * @param split The split to be read
+   * @param context The information about the task
+   * @return A new vertex value reader
+   * @throws IOException
+   */
+  public abstract VertexValueReader<I, V, E, M> createVertexValueReader(
+      InputSplit split, TaskAttemptContext context) throws IOException;
+
+  @Override
+  public final VertexReader<I, V, E, M> createVertexReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return createVertexValueReader(split, context);
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueReader.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueReader.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueReader.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueReader.java Fri Nov  2 21:37:30 2012
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Vertex reader for {@link VertexValueInputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+public abstract class VertexValueReader<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends BasicVertexValueReader<I, V, E, M> {
+  /** Configuration. */
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    configuration = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+        context.getConfiguration());
+  }
+
+  @Override
+  public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
+      InterruptedException {
+    Vertex<I, V, E, M> vertex = getConf().createVertex();
+    vertex.initialize(getCurrentVertexId(), getCurrentVertexValue());
+    return vertex;
+  }
+
+  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+    return configuration;
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java Fri Nov  2 21:37:30 2012
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]
+import org.apache.hadoop.mapreduce.security.TokenCache;
+end[HADOOP_NON_SECURE]*/
+
+/**
+ * Provides functionality similar to {@link FileInputFormat},
+ * but allows for different data sources (vertex and edge data).
+ *
+ * @param <K> Key
+ * @param <V> Value
+ */
+public abstract class GiraphFileInputFormat<K, V>
+    extends FileInputFormat<K, V> {
+  /** Vertex input file paths. */
+  public static final String VERTEX_INPUT_DIR = "giraph.vertex.input.dir";
+  /** Edge input file paths. */
+  public static final String EDGE_INPUT_DIR = "giraph.edge.input.dir";
+  /** Number of vertex input files. */
+  public static final String NUM_VERTEX_INPUT_FILES =
+      "giraph.input.vertex.num.files";
+  /** Number of edge input files. */
+  public static final String NUM_EDGE_INPUT_FILES =
+      "giraph.input.edge.num.files";
+
+  /** Split slop. */
+  private static final double SPLIT_SLOP = 1.1; // 10% slop
+
+  /** Filter for hidden files. */
+  private static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+
+  /** Class logger. */
+  private static final Logger LOG =
+      Logger.getLogger(GiraphFileInputFormat.class);
+
+  /**
+   * Add a {@link org.apache.hadoop.fs.Path} to the list of vertex inputs.
+   *
+   * @param job The {@link org.apache.hadoop.mapreduce.Job} to modify
+   * @param path {@link org.apache.hadoop.fs.Path} to be added to the list of
+   *                                              vertex inputs
+   */
+  public static void addVertexInputPath(Job job, Path path) throws IOException {
+    Configuration conf = job.getConfiguration();
+    path = path.getFileSystem(conf).makeQualified(path);
+    String dirStr = StringUtils.escapeString(path.toString());
+    String dirs = conf.get(VERTEX_INPUT_DIR);
+    conf.set(VERTEX_INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
+  }
+
+  /**
+   * Add a {@link org.apache.hadoop.fs.Path} to the list of edge inputs.
+   *
+   * @param job The {@link org.apache.hadoop.mapreduce.Job} to modify
+   * @param path {@link org.apache.hadoop.fs.Path} to be added to the list of
+   *                                              edge inputs
+   */
+  public static void addEdgeInputPath(Job job, Path path) throws IOException {
+    Configuration conf = job.getConfiguration();
+    path = path.getFileSystem(conf).makeQualified(path);
+    String dirStr = StringUtils.escapeString(path.toString());
+    String dirs = conf.get(EDGE_INPUT_DIR);
+    conf.set(EDGE_INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
+  }
+
+  /**
+   * Get the list of vertex input {@link Path}s.
+   *
+   * @param context The job
+   * @return The list of input {@link Path}s
+   */
+  public static Path[] getVertexInputPaths(JobContext context) {
+    String dirs = context.getConfiguration().get(VERTEX_INPUT_DIR, "");
+    String [] list = StringUtils.split(dirs);
+    Path[] result = new Path[list.length];
+    for (int i = 0; i < list.length; i++) {
+      result[i] = new Path(StringUtils.unEscapeString(list[i]));
+    }
+    return result;
+  }
+
+  /**
+   * Get the list of edge input {@link Path}s.
+   *
+   * @param context The job
+   * @return The list of input {@link Path}s
+   */
+  public static Path[] getEdgeInputPaths(JobContext context) {
+    String dirs = context.getConfiguration().get(EDGE_INPUT_DIR, "");
+    String [] list = StringUtils.split(dirs);
+    Path[] result = new Path[list.length];
+    for (int i = 0; i < list.length; i++) {
+      result[i] = new Path(StringUtils.unEscapeString(list[i]));
+    }
+    return result;
+  }
+
+  /**
+   * Proxy PathFilter that accepts a path only if all filters given in the
+   * constructor do. Used by the listPaths() to apply the built-in
+   * HIDDEN_FILE_FILTER together with a user provided one (if any).
+   */
+  private static class MultiPathFilter implements PathFilter {
+    /** List of filters. */
+    private List<PathFilter> filters;
+
+    /**
+     * Constructor.
+     *
+     * @param filters The list of filters
+     */
+    public MultiPathFilter(List<PathFilter> filters) {
+      this.filters = filters;
+    }
+
+    /**
+     * True iff all filters accept the given path.
+     *
+     * @param path The path to check
+     * @return Whether the path is accepted
+     */
+    public boolean accept(Path path) {
+      for (PathFilter filter : filters) {
+        if (!filter.accept(path)) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  /**
+   * Common method for listing vertex/edge input directories.
+   *
+   * @param job The job
+   * @param dirs list of vertex/edge input paths
+   * @return Array of FileStatus objects
+   * @throws IOException
+   */
+  private List<FileStatus> listStatus(JobContext job, Path[] dirs)
+    throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    if (dirs.length == 0) {
+      throw new IOException("No input paths specified in job");
+    }
+
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]
+    // get tokens for all the required FileSystems..
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
+        job.getConfiguration());
+end[HADOOP_NON_SECURE]*/
+
+    List<IOException> errors = new ArrayList<IOException>();
+
+    // creates a MultiPathFilter with the HIDDEN_FILE_FILTER and the
+    // user provided one (if any).
+    List<PathFilter> filters = new ArrayList<PathFilter>();
+    filters.add(HIDDEN_FILE_FILTER);
+    PathFilter jobFilter = getInputPathFilter(job);
+    if (jobFilter != null) {
+      filters.add(jobFilter);
+    }
+    PathFilter inputFilter = new MultiPathFilter(filters);
+
+    for (Path p : dirs) {
+      FileSystem fs = p.getFileSystem(job.getConfiguration());
+      FileStatus[] matches = fs.globStatus(p, inputFilter);
+      if (matches == null) {
+        errors.add(new IOException("Input path does not exist: " + p));
+      } else if (matches.length == 0) {
+        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
+      } else {
+        for (FileStatus globStat: matches) {
+          if (globStat.isDir()) {
+            Collections.addAll(result, fs.listStatus(globStat.getPath()));
+          } else {
+            result.add(globStat);
+          }
+        }
+      }
+    }
+
+    if (!errors.isEmpty()) {
+      throw new InvalidInputException(errors);
+    }
+    LOG.info("Total input paths to process : " + result.size());
+    return result;
+  }
+
+  /**
+   * List vertex input directories.
+   *
+   * @param job the job to list vertex input paths for
+   * @return array of FileStatus objects
+   * @throws IOException if zero items.
+   */
+  protected List<FileStatus> listVertexStatus(JobContext job)
+    throws IOException {
+    return listStatus(job, getVertexInputPaths(job));
+  }
+
+  /**
+   * List edge input directories.
+   *
+   * @param job the job to list edge input paths for
+   * @return array of FileStatus objects
+   * @throws IOException if zero items.
+   */
+  protected List<FileStatus> listEdgeStatus(JobContext job)
+    throws IOException {
+    return listStatus(job, getEdgeInputPaths(job));
+  }
+
+  /**
+   * Common method for generating the list of vertex/edge input splits.
+   *
+   * @param job The job
+   * @param files Array of FileStatus objects for vertex/edge input files
+   * @return The list of vertex/edge input splits
+   * @throws IOException
+   */
+  private List<InputSplit> getSplits(JobContext job, List<FileStatus> files)
+    throws IOException {
+    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
+    long maxSize = getMaxSplitSize(job);
+
+    // generate splits
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    for (FileStatus file: files) {
+      Path path = file.getPath();
+      FileSystem fs = path.getFileSystem(job.getConfiguration());
+      long length = file.getLen();
+      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+      if ((length != 0) && isSplitable(job, path)) {
+        long blockSize = file.getBlockSize();
+        long splitSize = computeSplitSize(blockSize, minSize, maxSize);
+
+        long bytesRemaining = length;
+        while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
+          int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+          splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
+              blkLocations[blkIndex].getHosts()));
+          bytesRemaining -= splitSize;
+        }
+
+        if (bytesRemaining != 0) {
+          splits.add(new FileSplit(path, length - bytesRemaining,
+              bytesRemaining,
+              blkLocations[blkLocations.length - 1].getHosts()));
+        }
+      } else if (length != 0) {
+        splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
+      } else {
+        //Create empty hosts array for zero length files
+        splits.add(new FileSplit(path, 0, length, new String[0]));
+      }
+    }
+    return splits;
+  }
+
+  /**
+   * Generate the list of vertex input splits.
+   *
+   * @param job The job
+   * @return The list of vertex input splits
+   * @throws IOException
+   */
+  public List<InputSplit> getVertexSplits(JobContext job) throws IOException {
+    List<FileStatus> files = listVertexStatus(job);
+    List<InputSplit> splits = getSplits(job, files);
+    // Save the number of input files in the job-conf
+    job.getConfiguration().setLong(NUM_VERTEX_INPUT_FILES, files.size());
+    LOG.debug("Total # of vertex splits: " + splits.size());
+    return splits;
+  }
+
+  /**
+   * Generate the list of edge input splits.
+   *
+   * @param job The job
+   * @return The list of edge input splits
+   * @throws IOException
+   */
+  public List<InputSplit> getEdgeSplits(JobContext job) throws IOException {
+    List<FileStatus> files = listEdgeStatus(job);
+    List<InputSplit> splits = getSplits(job, files);
+    // Save the number of input files in the job-conf
+    job.getConfiguration().setLong(NUM_EDGE_INPUT_FILES, files.size());
+    LOG.debug("Total # of edge splits: " + splits.size());
+    return splits;
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java Fri Nov  2 21:37:30 2012
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+
+/**
+ * Provides functionality similar to {@link org.apache.hadoop
+ * .mapreduce.lib.input.TextInputFormat}, but allows for different data
+ * sources (vertex and edge data).
+ */
+public class GiraphTextInputFormat
+    extends GiraphFileInputFormat<LongWritable, Text> {
+  @Override
+  public RecordReader<LongWritable, Text>
+  createRecordReader(InputSplit split, TaskAttemptContext context) {
+    return new LineRecordReader();
+  }
+
+  @Override
+  protected boolean isSplitable(JobContext context, Path file) {
+    CompressionCodec codec =
+        new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
+    return codec == null;
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java Fri Nov  2 21:37:30 2012
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.utils.IntPair;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+/**
+ * Simple text-based {@link org.apache.giraph.graph.VertexValueInputFormat}
+ * for integer ids and values.
+ *
+ * Each line consists of: id, value
+ *
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class IntIntTextVertexValueInputFormat<E extends Writable,
+    M extends Writable> extends
+    TextVertexValueInputFormat<IntWritable, IntWritable, E, M> {
+  /** Separator for id and value */
+  private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+  @Override
+  public TextVertexValueReader createVertexValueReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return new IntIntTextVertexValueReader();
+  }
+
+  /**
+   * {@link org.apache.giraph.graph.VertexValueReader} associated with
+   * {@link IntIntTextVertexValueInputFormat}.
+   */
+  public class IntIntTextVertexValueReader extends
+      TextVertexValueReaderFromEachLineProcessed<IntPair> {
+
+    @Override
+    protected IntPair preprocessLine(Text line) throws IOException {
+      String[] tokens = SEPARATOR.split(line.toString());
+      return new IntPair(Integer.valueOf(tokens[0]),
+          Integer.valueOf(tokens[1]));
+    }
+
+    @Override
+    protected IntWritable getId(IntPair data) throws IOException {
+      return new IntWritable(data.getFirst());
+    }
+
+    @Override
+    protected IntWritable getValue(IntPair data) throws IOException {
+      return new IntWritable(data.getSecond());
+    }
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java Fri Nov  2 21:37:30 2012
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.utils.IntPair;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+/**
+ * Simple text-based {@link org.apache.giraph.graph.EdgeInputFormat} for
+ * unweighted graphs with int ids.
+ *
+ * Each line consists of: source_vertex, target_vertex
+ */
+public class IntNullTextEdgeInputFormat extends
+    TextEdgeInputFormat<IntWritable, NullWritable> {
+  /** Splitter for endpoints */
+  private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+  @Override
+  public TextEdgeReader createEdgeReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return new IntNullTextEdgeReader();
+  }
+
+  /**
+   * {@link org.apache.giraph.graph.EdgeReader} associated with
+   * {@link IntNullTextEdgeInputFormat}.
+   */
+  public class IntNullTextEdgeReader extends
+      TextEdgeReaderFromEachLineProcessed<IntPair> {
+    @Override
+    protected IntPair preprocessLine(Text line) throws IOException {
+      String[] tokens = SEPARATOR.split(line.toString());
+      return new IntPair(Integer.valueOf(tokens[0]),
+          Integer.valueOf(tokens[1]));
+    }
+
+    @Override
+    protected IntWritable getSourceVertexId(IntPair endpoints)
+      throws IOException {
+      return new IntWritable(endpoints.getFirst());
+    }
+
+    @Override
+    protected IntWritable getTargetVertexId(IntPair endpoints)
+      throws IOException {
+      return new IntWritable(endpoints.getSecond());
+    }
+
+    @Override
+    protected NullWritable getValue(IntPair endpoints) throws IOException {
+      return NullWritable.get();
+    }
+  }
+}