You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2015/10/19 19:14:32 UTC

[3/3] git commit: updated refs/heads/trunk to 5b0cd0e

GIRAPH-1033: Remove zookeeper from input splits handling

Summary: Currently we use zookeeper for handling input splits, by having each worker checking each split, and when a lot of splits are used this becomes very slow. We should have master coordinate input splits allocation instead, making the complexity proportional to #splits instead of #workers*#splits. Master holds all the splits and worker send requests to him asking for splits when they need them.

Test Plan: Run a job with 200 machines and 200k small splits - without this change input superstep takes 30 minutes, and with it less than 2 minutes. Also verified correctness on sample job. mvn clean verify passes.

Differential Revision: https://reviews.facebook.net/D48531


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/5b0cd0e0
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/5b0cd0e0
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/5b0cd0e0

Branch: refs/heads/trunk
Commit: 5b0cd0e0a2ddbf722b6140d28474295c8376e561
Parents: 47da751
Author: Maja Kabiljo <ma...@fb.com>
Authored: Mon Oct 12 10:56:39 2015 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Mon Oct 19 10:13:43 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/giraph/bsp/BspService.java  | 334 +++----------------
 .../giraph/bsp/CentralizedServiceMaster.java    |   4 +-
 .../giraph/bsp/CentralizedServiceWorker.java    |   8 +
 .../org/apache/giraph/comm/MasterClient.java    |   9 +
 .../giraph/comm/netty/NettyMasterClient.java    |   6 +
 .../handler/MasterRequestServerHandler.java     |  22 +-
 .../comm/requests/AskForInputSplitRequest.java  |  76 +++++
 .../giraph/comm/requests/MasterRequest.java     |   6 +-
 .../requests/ReplyWithInputSplitRequest.java    |  81 +++++
 .../giraph/comm/requests/RequestType.java       |   6 +-
 .../requests/SendReducedToMasterRequest.java    |   6 +-
 .../giraph/graph/FinishedSuperstepStats.java    |   2 +-
 .../apache/giraph/graph/InputSplitEvents.java   |  85 -----
 .../apache/giraph/graph/InputSplitPaths.java    |  88 -----
 .../apache/giraph/graph/VertexEdgeCount.java    |  20 +-
 .../java/org/apache/giraph/io/InputType.java    |  31 ++
 .../apache/giraph/master/BspServiceMaster.java  | 281 +++-------------
 .../giraph/master/MasterAggregatorHandler.java  |   2 +-
 .../giraph/master/MasterGlobalCommHandler.java  |  76 +++++
 .../giraph/master/MasterGlobalCommUsage.java    |  49 +--
 .../MasterGlobalCommUsageAggregators.java       |  69 ++++
 .../input/BasicInputSplitsMasterOrganizer.java  |  46 +++
 .../input/InputSplitsMasterOrganizer.java       |  32 ++
 ...LocalityAwareInputSplitsMasterOrganizer.java | 125 +++++++
 .../MappingInputSplitsMasterOrganizer.java      |  64 ++++
 .../master/input/MasterInputSplitsHandler.java  | 140 ++++++++
 .../giraph/master/input/package-info.java       |  21 ++
 .../apache/giraph/partition/PartitionUtils.java |   2 +-
 .../apache/giraph/worker/BspServiceWorker.java  | 187 ++---------
 .../giraph/worker/EdgeInputSplitsCallable.java  |  16 +-
 .../worker/EdgeInputSplitsCallableFactory.java  |  13 +-
 .../giraph/worker/FullInputSplitCallable.java   | 210 ------------
 .../giraph/worker/InputSplitPathOrganizer.java  | 142 --------
 .../giraph/worker/InputSplitsCallable.java      |  77 ++---
 .../giraph/worker/InputSplitsHandler.java       | 208 ------------
 .../worker/MappingInputSplitsCallable.java      |  28 +-
 .../MappingInputSplitsCallableFactory.java      |  34 +-
 .../worker/VertexInputSplitsCallable.java       |  16 +-
 .../VertexInputSplitsCallableFactory.java       |  13 +-
 .../giraph/worker/WorkerInputSplitsHandler.java | 108 ++++++
 .../java/org/apache/giraph/TestBspBasic.java    |  69 ++--
 41 files changed, 1164 insertions(+), 1648 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 0a5a7ba..15e4dbe 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -21,8 +21,6 @@ package org.apache.giraph.bsp;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphTaskManager;
-import org.apache.giraph.graph.InputSplitEvents;
-import org.apache.giraph.graph.InputSplitPaths;
 import org.apache.giraph.job.JobProgressTracker;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.utils.CheckpointingUtils;
@@ -77,59 +75,13 @@ public abstract class BspService<I extends WritableComparable,
   /** Master job state znode above base dir */
   public static final String MASTER_JOB_STATE_NODE = "/_masterJobState";
 
-  /** Mapping input split directory about base dir */
-  public static final String MAPPING_INPUT_SPLIT_DIR = "/_mappingInputSplitDir";
-  /** Mapping input split done directory about base dir */
-  public static final String MAPPING_INPUT_SPLIT_DONE_DIR =
-      "/_mappingInputSplitDoneDir";
-  /** Denotes a reserved mapping input split */
-  public static final String MAPPING_INPUT_SPLIT_RESERVED_NODE =
-      "/_mappingInputSplitReserved";
-  /** Denotes a finished mapping input split */
-  public static final String MAPPING_INPUT_SPLIT_FINISHED_NODE =
-      "/_mappingInputSplitFinished";
-  /** Denotes that all the mapping input splits are are ready for consumption */
-  public static final String MAPPING_INPUT_SPLITS_ALL_READY_NODE =
-      "/_mappingInputSplitsAllReady";
-  /** Denotes that all the mapping input splits are done. */
-  public static final String MAPPING_INPUT_SPLITS_ALL_DONE_NODE =
-      "/_mappingInputSplitsAllDone";
-
-  /** Vertex input split directory about base dir */
-  public static final String VERTEX_INPUT_SPLIT_DIR = "/_vertexInputSplitDir";
-  /** Vertex input split done directory about base dir */
-  public static final String VERTEX_INPUT_SPLIT_DONE_DIR =
-      "/_vertexInputSplitDoneDir";
-  /** Denotes a reserved vertex input split */
-  public static final String VERTEX_INPUT_SPLIT_RESERVED_NODE =
-      "/_vertexInputSplitReserved";
-  /** Denotes a finished vertex input split */
-  public static final String VERTEX_INPUT_SPLIT_FINISHED_NODE =
-      "/_vertexInputSplitFinished";
-  /** Denotes that all the vertex input splits are are ready for consumption */
-  public static final String VERTEX_INPUT_SPLITS_ALL_READY_NODE =
-      "/_vertexInputSplitsAllReady";
-  /** Denotes that all the vertex input splits are done. */
-  public static final String VERTEX_INPUT_SPLITS_ALL_DONE_NODE =
-      "/_vertexInputSplitsAllDone";
-
-  /** Edge input split directory about base dir */
-  public static final String EDGE_INPUT_SPLIT_DIR = "/_edgeInputSplitDir";
-  /** Edge input split done directory about base dir */
-  public static final String EDGE_INPUT_SPLIT_DONE_DIR =
-      "/_edgeInputSplitDoneDir";
-  /** Denotes a reserved edge input split */
-  public static final String EDGE_INPUT_SPLIT_RESERVED_NODE =
-      "/_edgeInputSplitReserved";
-  /** Denotes a finished edge input split */
-  public static final String EDGE_INPUT_SPLIT_FINISHED_NODE =
-      "/_edgeInputSplitFinished";
-  /** Denotes that all the edge input splits are are ready for consumption */
-  public static final String EDGE_INPUT_SPLITS_ALL_READY_NODE =
-      "/_edgeInputSplitsAllReady";
-  /** Denotes that all the edge input splits are done. */
-  public static final String EDGE_INPUT_SPLITS_ALL_DONE_NODE =
-      "/_edgeInputSplitsAllDone";
+  /** Input splits worker done directory */
+  public static final String INPUT_SPLITS_WORKER_DONE_DIR =
+      "/_inputSplitsWorkerDoneDir";
+  /** Input splits all done node*/
+  public static final String INPUT_SPLITS_ALL_DONE_NODE =
+      "/_inputSplitsAllDone";
+
   /** Directory of attempts of this application */
   public static final String APPLICATION_ATTEMPTS_DIR =
       "/_applicationAttemptsDir";
@@ -192,18 +144,10 @@ public abstract class BspService<I extends WritableComparable,
   protected final String basePath;
   /** Path to the job state determined by the master (informative only) */
   protected final String masterJobStatePath;
-  /** ZooKeeper paths for mapping input splits. */
-  protected final InputSplitPaths mappingInputSplitsPaths;
-  /** ZooKeeper paths for vertex input splits. */
-  protected final InputSplitPaths vertexInputSplitsPaths;
-  /** ZooKeeper paths for edge input splits. */
-  protected final InputSplitPaths edgeInputSplitsPaths;
-  /** Mapping input splits events */
-  protected final InputSplitEvents mappingInputSplitsEvents;
-  /** Vertex input split events. */
-  protected final InputSplitEvents vertexInputSplitsEvents;
-  /** Edge input split events. */
-  protected final InputSplitEvents edgeInputSplitsEvents;
+  /** Input splits worker done directory */
+  protected final String inputSplitsWorkerDonePath;
+  /** Input splits all done node */
+  protected final String inputSplitsAllDonePath;
   /** Path to the application attempts) */
   protected final String applicationAttemptsPath;
   /** Path to the cleaned up notifications */
@@ -226,6 +170,10 @@ public abstract class BspService<I extends WritableComparable,
   private final BspEvent addressesAndPartitionsReadyChanged;
   /** Application attempt changed */
   private final BspEvent applicationAttemptChanged;
+  /** Input splits worker done */
+  private final BspEvent inputSplitsWorkerDoneEvent;
+  /** Input splits all done */
+  private final BspEvent inputSplitsAllDoneEvent;
   /** Superstep finished synchronization */
   private final BspEvent superstepFinished;
   /** Master election changed for any waited on attempt */
@@ -269,23 +217,20 @@ public abstract class BspService<I extends WritableComparable,
   public BspService(
       Mapper<?, ?, ?, ?>.Context context,
       GraphTaskManager<I, V, E> graphTaskManager) {
-    this.mappingInputSplitsEvents = new InputSplitEvents(context);
-    this.vertexInputSplitsEvents = new InputSplitEvents(context);
-    this.edgeInputSplitsEvents = new InputSplitEvents(context);
     this.connectedEvent = new PredicateLock(context);
     this.workerHealthRegistrationChanged = new PredicateLock(context);
     this.addressesAndPartitionsReadyChanged = new PredicateLock(context);
     this.applicationAttemptChanged = new PredicateLock(context);
+    this.inputSplitsWorkerDoneEvent = new PredicateLock(context);
+    this.inputSplitsAllDoneEvent = new PredicateLock(context);
     this.superstepFinished = new PredicateLock(context);
     this.masterElectionChildrenChanged = new PredicateLock(context);
     this.cleanedUpChildrenChanged = new PredicateLock(context);
 
     registerBspEvent(connectedEvent);
     registerBspEvent(workerHealthRegistrationChanged);
-    registerBspEvent(vertexInputSplitsEvents.getAllReadyChanged());
-    registerBspEvent(vertexInputSplitsEvents.getStateChanged());
-    registerBspEvent(edgeInputSplitsEvents.getAllReadyChanged());
-    registerBspEvent(edgeInputSplitsEvents.getStateChanged());
+    registerBspEvent(inputSplitsWorkerDoneEvent);
+    registerBspEvent(inputSplitsAllDoneEvent);
     registerBspEvent(addressesAndPartitionsReadyChanged);
     registerBspEvent(applicationAttemptChanged);
     registerBspEvent(superstepFinished);
@@ -311,16 +256,8 @@ public abstract class BspService<I extends WritableComparable,
     getContext().getCounter(GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP,
         basePath);
     masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
-    mappingInputSplitsPaths = new InputSplitPaths(basePath,
-        MAPPING_INPUT_SPLIT_DIR, MAPPING_INPUT_SPLIT_DONE_DIR,
-        MAPPING_INPUT_SPLITS_ALL_READY_NODE,
-        MAPPING_INPUT_SPLITS_ALL_DONE_NODE);
-    vertexInputSplitsPaths = new InputSplitPaths(basePath,
-        VERTEX_INPUT_SPLIT_DIR, VERTEX_INPUT_SPLIT_DONE_DIR,
-        VERTEX_INPUT_SPLITS_ALL_READY_NODE, VERTEX_INPUT_SPLITS_ALL_DONE_NODE);
-    edgeInputSplitsPaths = new InputSplitPaths(basePath,
-        EDGE_INPUT_SPLIT_DIR, EDGE_INPUT_SPLIT_DONE_DIR,
-        EDGE_INPUT_SPLITS_ALL_READY_NODE, EDGE_INPUT_SPLITS_ALL_DONE_NODE);
+    inputSplitsWorkerDonePath = basePath + INPUT_SPLITS_WORKER_DONE_DIR;
+    inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE;
     applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
     cleanedUpPath = basePath + CLEANED_UP_DIR;
 
@@ -433,24 +370,6 @@ public abstract class BspService<I extends WritableComparable,
   }
 
   /**
-   * Get the input split events for edge input.
-   *
-   * @return InputSplitEvents for edge input.
-   */
-  public InputSplitEvents getEdgeInputSplitsEvents() {
-    return edgeInputSplitsEvents;
-  }
-
-  /**
-   * Get the input split events for vertex input.
-   *
-   * @return InputSplitEvents for vertex input.
-   */
-  public InputSplitEvents getVertexInputSplitsEvents() {
-    return vertexInputSplitsEvents;
-  }
-
-  /**
    * Generate the worker information "healthy" directory path for a
    * superstep
    *
@@ -655,6 +574,14 @@ public abstract class BspService<I extends WritableComparable,
     return applicationAttemptChanged;
   }
 
+  public final BspEvent getInputSplitsWorkerDoneEvent() {
+    return inputSplitsWorkerDoneEvent;
+  }
+
+  public final BspEvent getInputSplitsAllDoneEvent() {
+    return inputSplitsAllDoneEvent;
+  }
+
   public final BspEvent getSuperstepFinishedEvent() {
     return superstepFinished;
   }
@@ -952,9 +879,20 @@ public abstract class BspService<I extends WritableComparable,
       }
       workerHealthRegistrationChanged.signal();
       eventProcessed = true;
-    } else if (processMappingEvent(event) || processVertexEvent(event) ||
-        processEdgeEvent(event)) {
-      return;
+    } else if (event.getPath().contains(INPUT_SPLITS_ALL_DONE_NODE) &&
+        event.getType() == EventType.NodeCreated) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: all input splits done");
+      }
+      inputSplitsAllDoneEvent.signal();
+      eventProcessed = true;
+    } else if (event.getPath().contains(INPUT_SPLITS_WORKER_DONE_DIR) &&
+        event.getType() == EventType.NodeChildrenChanged) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("process: worker done reading input splits");
+      }
+      inputSplitsWorkerDoneEvent.signal();
+      eventProcessed = true;
     } else if (event.getPath().contains(ADDRESSES_AND_PARTITIONS_DIR) &&
         event.getType() == EventType.NodeCreated) {
       if (LOG.isInfoEnabled()) {
@@ -1001,192 +939,6 @@ public abstract class BspService<I extends WritableComparable,
   }
 
   /**
-   * Process WatchedEvent for Mapping Inputsplits
-   *
-   * @param event watched event
-   * @return true if event processed
-   */
-  public final boolean processMappingEvent(WatchedEvent event) {
-    boolean eventProcessed = false;
-    if (event.getPath().equals(
-        mappingInputSplitsPaths.getAllReadyPath()) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: inputSplitsReadyChanged " +
-            "(input splits ready)");
-      }
-      mappingInputSplitsEvents.getAllReadyChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_RESERVED_NODE) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process: mappingInputSplitsStateChanged " +
-            "(made a reservation)");
-      }
-      mappingInputSplitsEvents.getStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_RESERVED_NODE) &&
-        (event.getType() == EventType.NodeDeleted)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: mappingInputSplitsStateChanged " +
-            "(lost a reservation)");
-      }
-      mappingInputSplitsEvents.getStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_FINISHED_NODE) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process: mappingInputSplitsStateChanged " +
-            "(finished inputsplit)");
-      }
-      mappingInputSplitsEvents.getStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_DONE_DIR) &&
-        (event.getType() == EventType.NodeChildrenChanged)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process: mappingInputSplitsDoneStateChanged " +
-            "(worker finished sending)");
-      }
-      mappingInputSplitsEvents.getDoneStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().equals(
-        mappingInputSplitsPaths.getAllDonePath()) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: mappingInputSplitsAllDoneChanged " +
-            "(all entries sent from input splits)");
-      }
-      mappingInputSplitsEvents.getAllDoneChanged().signal();
-      eventProcessed = true;
-    }
-    return eventProcessed;
-  }
-
-  /**
-   * Process WatchedEvent for Vertex Inputsplits
-   *
-   * @param event watched event
-   * @return true if event processed
-   */
-  public final boolean processVertexEvent(WatchedEvent event) {
-    boolean eventProcessed = false;
-    if (event.getPath().equals(
-        vertexInputSplitsPaths.getAllReadyPath()) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: inputSplitsReadyChanged " +
-            "(input splits ready)");
-      }
-      vertexInputSplitsEvents.getAllReadyChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_RESERVED_NODE) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process: vertexInputSplitsStateChanged " +
-            "(made a reservation)");
-      }
-      vertexInputSplitsEvents.getStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_RESERVED_NODE) &&
-        (event.getType() == EventType.NodeDeleted)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: vertexInputSplitsStateChanged " +
-            "(lost a reservation)");
-      }
-      vertexInputSplitsEvents.getStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_FINISHED_NODE) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process: vertexInputSplitsStateChanged " +
-            "(finished inputsplit)");
-      }
-      vertexInputSplitsEvents.getStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_DONE_DIR) &&
-        (event.getType() == EventType.NodeChildrenChanged)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process: vertexInputSplitsDoneStateChanged " +
-            "(worker finished sending)");
-      }
-      vertexInputSplitsEvents.getDoneStateChanged().signal();
-      eventProcessed = true;
-    }  else if (event.getPath().equals(
-        vertexInputSplitsPaths.getAllDonePath()) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: vertexInputSplitsAllDoneChanged " +
-            "(all vertices sent from input splits)");
-      }
-      vertexInputSplitsEvents.getAllDoneChanged().signal();
-      eventProcessed = true;
-    }
-    return eventProcessed;
-  }
-
-  /**
-   * Process WatchedEvent for Edge Inputsplits
-   *
-   * @param event watched event
-   * @return true if event processed
-   */
-  public final boolean processEdgeEvent(WatchedEvent event) {
-    boolean eventProcessed = false;
-    if (event.getPath().equals(
-        edgeInputSplitsPaths.getAllReadyPath()) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: edgeInputSplitsReadyChanged " +
-            "(input splits ready)");
-      }
-      edgeInputSplitsEvents.getAllReadyChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_RESERVED_NODE) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process: edgeInputSplitsStateChanged " +
-            "(made a reservation)");
-      }
-      edgeInputSplitsEvents.getStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_RESERVED_NODE) &&
-        (event.getType() == EventType.NodeDeleted)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: edgeInputSplitsStateChanged " +
-            "(lost a reservation)");
-      }
-      edgeInputSplitsEvents.getStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_FINISHED_NODE) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process: edgeInputSplitsStateChanged " +
-            "(finished inputsplit)");
-      }
-      edgeInputSplitsEvents.getStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_DONE_DIR) &&
-        (event.getType() == EventType.NodeChildrenChanged)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process: edgeInputSplitsDoneStateChanged " +
-            "(worker finished sending)");
-      }
-      edgeInputSplitsEvents.getDoneStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().equals(
-        edgeInputSplitsPaths.getAllDonePath()) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: edgeInputSplitsAllDoneChanged " +
-            "(all edges sent from input splits)");
-      }
-      edgeInputSplitsEvents.getAllDoneChanged().signal();
-      eventProcessed = true;
-    }
-    return eventProcessed;
-  }
-
-  /**
    * Get the last saved superstep.
    *
    * @return Last good superstep number

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index 1e8d519..f05a79d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -22,8 +22,8 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.giraph.master.AggregatorToGlobalCommTranslation;
-import org.apache.giraph.master.MasterAggregatorHandler;
 import org.apache.giraph.master.MasterCompute;
+import org.apache.giraph.master.MasterGlobalCommHandler;
 import org.apache.giraph.master.MasterInfo;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
@@ -144,7 +144,7 @@ public interface CentralizedServiceMaster<I extends WritableComparable,
    *
    * @return Global communication handler
    */
-  MasterAggregatorHandler getGlobalCommHandler();
+  MasterGlobalCommHandler getGlobalCommHandler();
 
   /**
    * Handler for aggregators to reduce/broadcast translation

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index f6d77d0..94cd265 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -30,6 +30,7 @@ import org.apache.giraph.metrics.GiraphTimerContext;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.worker.WorkerInputSplitsHandler;
 import org.apache.giraph.worker.WorkerAggregatorHandler;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerInfo;
@@ -252,4 +253,11 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
    * @return number of partitions owned
    */
   int getNumPartitionsOwned();
+
+  /**
+   * Get input splits handler used during input
+   *
+   * @return Input splits handler
+   */
+  WorkerInputSplitsHandler getInputSplitsHandler();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
index aea93fd..244dd74 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
@@ -20,6 +20,7 @@ package org.apache.giraph.comm;
 
 import java.io.IOException;
 
+import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -54,6 +55,14 @@ public interface MasterClient {
   void flush();
 
   /**
+   * Send a request to a remote server (should be already connected)
+   *
+   * @param destTaskId Destination worker id
+   * @param request Request to send
+   */
+  void sendWritableRequest(int destTaskId, WritableRequest request);
+
+  /**
    * Closes all connections.
    */
   void closeConnections();

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
index e110782..9b348e8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
@@ -26,6 +26,7 @@ import org.apache.giraph.comm.MasterClient;
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.comm.aggregators.SendGlobalCommCache;
 import org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest;
+import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
@@ -117,6 +118,11 @@ public class NettyMasterClient implements MasterClient {
   }
 
   @Override
+  public void sendWritableRequest(int destTaskId, WritableRequest request) {
+    nettyClient.sendWritableRequest(destTaskId, request);
+  }
+
+  @Override
   public void closeConnections() {
     nettyClient.stop();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
index 02c72f7..9aa88ae 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
@@ -21,13 +21,13 @@ package org.apache.giraph.comm.netty.handler;
 import org.apache.giraph.comm.requests.MasterRequest;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.TaskInfo;
-import org.apache.giraph.master.MasterAggregatorHandler;
+import org.apache.giraph.master.MasterGlobalCommHandler;
 
 /** Handler for requests on master */
 public class MasterRequestServerHandler extends
     RequestServerHandler<MasterRequest> {
   /** Aggregator handler */
-  private final MasterAggregatorHandler aggregatorHandler;
+  private final MasterGlobalCommHandler commHandler;
 
   /**
    * Constructor
@@ -35,22 +35,22 @@ public class MasterRequestServerHandler extends
    * @param workerRequestReservedMap Worker request reservation map
    * @param conf                     Configuration
    * @param myTaskInfo               Current task info
-   * @param aggregatorHandler        Master aggregator handler
+   * @param commHandler              Master communication handler
    * @param exceptionHandler         Handles uncaught exceptions
    */
   public MasterRequestServerHandler(
       WorkerRequestReservedMap workerRequestReservedMap,
       ImmutableClassesGiraphConfiguration conf,
       TaskInfo myTaskInfo,
-      MasterAggregatorHandler aggregatorHandler,
+      MasterGlobalCommHandler commHandler,
       Thread.UncaughtExceptionHandler exceptionHandler) {
     super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
-    this.aggregatorHandler = aggregatorHandler;
+    this.commHandler = commHandler;
   }
 
   @Override
   public void processRequest(MasterRequest request) {
-    request.doRequest(aggregatorHandler);
+    request.doRequest(commHandler);
   }
 
   /**
@@ -58,15 +58,15 @@ public class MasterRequestServerHandler extends
    */
   public static class Factory implements RequestServerHandler.Factory {
     /** Master aggregator handler */
-    private final MasterAggregatorHandler aggregatorHandler;
+    private final MasterGlobalCommHandler commHandler;
 
     /**
      * Constructor
      *
-     * @param aggregatorHandler Master aggregator handler
+     * @param commHandler Master global communication handler
      */
-    public Factory(MasterAggregatorHandler aggregatorHandler) {
-      this.aggregatorHandler = aggregatorHandler;
+    public Factory(MasterGlobalCommHandler commHandler) {
+      this.commHandler = commHandler;
     }
 
     @Override
@@ -76,7 +76,7 @@ public class MasterRequestServerHandler extends
         TaskInfo myTaskInfo,
         Thread.UncaughtExceptionHandler exceptionHandler) {
       return new MasterRequestServerHandler(workerRequestReservedMap, conf,
-          myTaskInfo, aggregatorHandler, exceptionHandler);
+          myTaskInfo, commHandler, exceptionHandler);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java
new file mode 100644
index 0000000..5d9e4e6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.master.MasterGlobalCommHandler;
+import org.apache.giraph.io.InputType;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A request which workers will send to master to ask it to give them splits
+ */
+public class AskForInputSplitRequest extends WritableRequest
+    implements MasterRequest {
+  /** Type of split we are requesting */
+  private InputType splitType;
+  /** Task id of worker which requested the split */
+  private int workerTaskId;
+
+  /**
+   * Constructor
+   *
+   * @param splitType Type of split we are requesting
+   * @param workerTaskId Task id of worker which requested the split
+   */
+  public AskForInputSplitRequest(InputType splitType, int workerTaskId) {
+    this.splitType = splitType;
+    this.workerTaskId = workerTaskId;
+  }
+
+  /**
+   * Constructor used for reflection only
+   */
+  public AskForInputSplitRequest() {
+  }
+
+  @Override
+  public void doRequest(MasterGlobalCommHandler commHandler) {
+    commHandler.getInputSplitsHandler().sendSplitTo(splitType, workerTaskId);
+  }
+
+  @Override
+  void readFieldsRequest(DataInput in) throws IOException {
+    splitType = InputType.values()[in.readInt()];
+    workerTaskId = in.readInt();
+  }
+
+  @Override
+  void writeRequest(DataOutput out) throws IOException {
+    out.writeInt(splitType.ordinal());
+    out.writeInt(workerTaskId);
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.ASK_FOR_INPUT_SPLIT_REQUEST;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
index 7fedcc5..43632b0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.comm.requests;
 
-import org.apache.giraph.master.MasterAggregatorHandler;
+import org.apache.giraph.master.MasterGlobalCommHandler;
 
 /**
  * Interface for requests sent to master to extend
@@ -27,7 +27,7 @@ public interface MasterRequest {
   /**
    * Execute the request
    *
-   * @param aggregatorHandler Master aggregator handler
+   * @param commHandler Master communication handler
    */
-  void doRequest(MasterAggregatorHandler aggregatorHandler);
+  void doRequest(MasterGlobalCommHandler commHandler);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java
new file mode 100644
index 0000000..6b50562
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.io.InputType;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A request which master will send to workers to give them splits
+ */
+public class ReplyWithInputSplitRequest extends WritableRequest
+    implements WorkerRequest {
+  /** Type of input split */
+  private InputType splitType;
+  /** Serialized input split */
+  private byte[] serializedInputSplit;
+
+  /**
+   * Constructor
+   *
+   * @param splitType Type of input split
+   * @param serializedInputSplit Serialized input split
+   */
+  public ReplyWithInputSplitRequest(InputType splitType,
+      byte[] serializedInputSplit) {
+    this.splitType = splitType;
+    this.serializedInputSplit = serializedInputSplit;
+  }
+
+  /**
+   * Constructor used for reflection only
+   */
+  public ReplyWithInputSplitRequest() {
+  }
+
+  @Override
+  void readFieldsRequest(DataInput in) throws IOException {
+    splitType = InputType.values()[in.readInt()];
+    int size = in.readInt();
+    serializedInputSplit = new byte[size];
+    in.readFully(serializedInputSplit);
+  }
+
+  @Override
+  void writeRequest(DataOutput out) throws IOException {
+    out.writeInt(splitType.ordinal());
+    out.writeInt(serializedInputSplit.length);
+    out.write(serializedInputSplit);
+  }
+
+  @Override
+  public void doRequest(ServerData serverData) {
+    serverData.getServiceWorker().getInputSplitsHandler().receivedInputSplit(
+        splitType, serializedInputSplit);
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.REPLY_WITH_INPUT_SPLIT_REQUEST;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
index 343a2de..bebac28 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
@@ -60,7 +60,11 @@ else[HADOOP_NON_SECURE]*/
   /** Send aggregators from worker owner to other workers */
   SEND_AGGREGATORS_TO_WORKER_REQUEST(SendAggregatorsToWorkerRequest.class),
   /** Send message from worker to worker */
-  SEND_WORKER_TO_WORKER_MESSAGE_REQUEST(SendWorkerToWorkerMessageRequest.class);
+  SEND_WORKER_TO_WORKER_MESSAGE_REQUEST(SendWorkerToWorkerMessageRequest.class),
+  /** Send request for input split from worker to master */
+  ASK_FOR_INPUT_SPLIT_REQUEST(AskForInputSplitRequest.class),
+  /** Send request with granted input split from master to workers */
+  REPLY_WITH_INPUT_SPLIT_REQUEST(ReplyWithInputSplitRequest.class);
 
   /** Class of request which this type corresponds to */
   private final Class<? extends WritableRequest> requestClass;

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
index 7171f04..3a1bd64 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
@@ -20,7 +20,7 @@ package org.apache.giraph.comm.requests;
 
 import java.io.IOException;
 
-import org.apache.giraph.master.MasterAggregatorHandler;
+import org.apache.giraph.master.MasterGlobalCommHandler;
 
 /**
  * Request to send final aggregated values from worker which owns
@@ -45,9 +45,9 @@ public class SendReducedToMasterRequest extends ByteArrayRequest
   }
 
   @Override
-  public void doRequest(MasterAggregatorHandler aggregatorHandler) {
+  public void doRequest(MasterGlobalCommHandler commHandler) {
     try {
-      aggregatorHandler.acceptReducedValues(getDataInput());
+      commHandler.getAggregatorHandler().acceptReducedValues(getDataInput());
     } catch (IOException e) {
       throw new IllegalStateException("doRequest: " +
           "IOException occurred while processing request", e);

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
index cfb9799..c53b34f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
@@ -51,7 +51,7 @@ public class FinishedSuperstepStats extends VertexEdgeCount {
                                 long numEdges,
                                 boolean mustLoadCheckpoint,
                                 CheckpointStatus checkpointStatus) {
-    super(numVertices, numEdges);
+    super(numVertices, numEdges, 0);
     this.localVertexCount = numLocalVertices;
     this.allVerticesHalted = allVerticesHalted;
     this.mustLoadCheckpoint = mustLoadCheckpoint;

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
deleted file mode 100644
index 23be1c4..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.zk.BspEvent;
-import org.apache.giraph.zk.PredicateLock;
-import org.apache.hadoop.util.Progressable;
-
-/**
- * Simple container of input split events.
- */
-public class InputSplitEvents {
-  /** Input splits are ready for consumption by workers */
-  private final BspEvent allReadyChanged;
-  /** Input split reservation or finished notification and synchronization */
-  private final BspEvent stateChanged;
-  /** Input splits are done being processed by workers */
-  private final BspEvent allDoneChanged;
-  /** Input split done by a worker finished notification and synchronization */
-  private final BspEvent doneStateChanged;
-
-  /**
-   * Constructor.
-   *
-   * @param progressable {@link Progressable} to report progress
-   */
-  public InputSplitEvents(Progressable progressable) {
-    allReadyChanged = new PredicateLock(progressable);
-    stateChanged = new PredicateLock(progressable);
-    allDoneChanged = new PredicateLock(progressable);
-    doneStateChanged = new PredicateLock(progressable);
-  }
-
-  /**
-   * Get event for input splits all ready
-   *
-   * @return {@link BspEvent} for input splits all ready
-   */
-  public BspEvent getAllReadyChanged() {
-    return allReadyChanged;
-  }
-
-  /**
-   * Get event for input splits state
-   *
-   * @return {@link BspEvent} for input splits state
-   */
-  public BspEvent getStateChanged() {
-    return stateChanged;
-  }
-
-  /**
-   * Get event for input splits all done
-   *
-   * @return {@link BspEvent} for input splits all done
-   */
-  public BspEvent getAllDoneChanged() {
-    return allDoneChanged;
-  }
-
-  /**
-   * Get event for input split done
-   *
-   * @return {@link BspEvent} for input split done
-   */
-  public BspEvent getDoneStateChanged() {
-    return doneStateChanged;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
deleted file mode 100644
index 4cf005e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-/**
- * Simple container of input split paths for coordination via ZooKeeper.
- */
-public class InputSplitPaths {
-  /** Path to the input splits written by the master */
-  private final String path;
-  /** Path to the input splits all ready to be processed by workers */
-  private final String allReadyPath;
-  /** Path to the input splits done */
-  private final String donePath;
-  /** Path to the input splits all done to notify the workers to proceed */
-  private final String allDonePath;
-
-  /**
-   * Constructor.
-   *
-   * @param basePath Base path
-   * @param dir Input splits path
-   * @param doneDir Input split done path
-   * @param allReadyNode Input splits all ready path
-   * @param allDoneNode Input splits all done path
-   */
-  public InputSplitPaths(String basePath,
-                         String dir,
-                         String doneDir,
-                         String allReadyNode,
-                         String allDoneNode) {
-    path = basePath + dir;
-    allReadyPath = basePath + allReadyNode;
-    donePath = basePath + doneDir;
-    allDonePath = basePath + allDoneNode;
-  }
-
-  /**
-   * Get path to the input splits.
-   *
-   * @return Path to input splits
-   */
-  public String getPath() {
-    return path;
-  }
-
-  /**
-   * Get path to the input splits all ready.
-   *
-   * @return Path to input splits all ready
-   */
-  public String getAllReadyPath() {
-    return allReadyPath;
-  }
-
-  /** Get path to the input splits done.
-   *
-   * @return Path to input splits done
-   */
-  public String getDonePath() {
-    return donePath;
-  }
-
-  /**
-   * Get path to the input splits all done.
-   *
-   * @return Path to input splits all done
-   */
-  public String getAllDonePath() {
-    return allDonePath;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
index c2d13cc..1c871f0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
@@ -26,6 +26,8 @@ public class VertexEdgeCount {
   private final long vertexCount;
   /** Immutable edges */
   private final long edgeCount;
+  /** Immutable mappings */
+  private final long mappingCount;
 
   /**
    * Default constructor.
@@ -33,6 +35,7 @@ public class VertexEdgeCount {
   public VertexEdgeCount() {
     vertexCount = 0;
     edgeCount = 0;
+    mappingCount = 0;
   }
 
   /**
@@ -40,10 +43,12 @@ public class VertexEdgeCount {
    *
    * @param vertexCount Final number of vertices.
    * @param edgeCount Final number of edges.
+   * @param mappingCount Final number of mappings.
    */
-  public VertexEdgeCount(long vertexCount, long edgeCount) {
+  public VertexEdgeCount(long vertexCount, long edgeCount, long mappingCount) {
     this.vertexCount = vertexCount;
     this.edgeCount = edgeCount;
+    this.mappingCount = mappingCount;
   }
 
   public long getVertexCount() {
@@ -54,6 +59,10 @@ public class VertexEdgeCount {
     return edgeCount;
   }
 
+  public long getMappingCount() {
+    return mappingCount;
+  }
+
   /**
    * Increment the both the vertex edge count with a {@link VertexEdgeCount}.
    *
@@ -64,7 +73,8 @@ public class VertexEdgeCount {
       VertexEdgeCount vertexEdgeCount) {
     return new VertexEdgeCount(
         vertexCount + vertexEdgeCount.getVertexCount(),
-        edgeCount + vertexEdgeCount.getEdgeCount());
+        edgeCount + vertexEdgeCount.getEdgeCount(),
+        mappingCount + vertexEdgeCount.getMappingCount());
   }
 
   /**
@@ -78,11 +88,13 @@ public class VertexEdgeCount {
       long vertexCount, long edgeCount) {
     return new VertexEdgeCount(
         this.vertexCount + vertexCount,
-        this.edgeCount + edgeCount);
+        this.edgeCount + edgeCount,
+        this.mappingCount + mappingCount);
   }
 
   @Override
   public String toString() {
-    return "(v=" + getVertexCount() + ", e=" + getEdgeCount() + ")";
+    return "(v=" + getVertexCount() + ", e=" + getEdgeCount() +
+        (mappingCount > 0 ? ", m=" + mappingCount : "") + ")";
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/io/InputType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/InputType.java b/giraph-core/src/main/java/org/apache/giraph/io/InputType.java
new file mode 100644
index 0000000..26ee966
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/InputType.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+/**
+ * Type of input
+ */
+public enum InputType {
+  /** Vertex input */
+  VERTEX,
+  /** Edge input */
+  EDGE,
+  /** Mapping input */
+  MAPPING
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 0b56a4f..0e7bb9d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -21,12 +21,8 @@ package org.apache.giraph.master;
 import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
 import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
 import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
-import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.charset.Charset;
@@ -38,9 +34,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import net.iharder.Base64;
@@ -66,12 +59,12 @@ import org.apache.giraph.graph.GlobalStats;
 import org.apache.giraph.graph.GraphFunctions;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.GraphTaskManager;
-import org.apache.giraph.graph.InputSplitEvents;
-import org.apache.giraph.graph.InputSplitPaths;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.InputType;
+import org.apache.giraph.master.input.MasterInputSplitsHandler;
 import org.apache.giraph.metrics.AggregatedMetrics;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.GiraphTimer;
@@ -88,8 +81,6 @@ import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
 import org.apache.giraph.utils.CheckpointingUtils;
 import org.apache.giraph.utils.JMapHistoDumper;
-import org.apache.giraph.utils.LogStacktraceCallable;
-import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.utils.ReactiveJMapHistoDumper;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.WritableUtils;
@@ -99,7 +90,6 @@ import org.apache.giraph.zk.PredicateLock;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobID;
@@ -170,7 +160,7 @@ public class BspServiceMaster<I extends WritableComparable,
   private final List<PartitionStats> allPartitionStatsList =
       new ArrayList<PartitionStats>();
   /** Handler for global communication */
-  private MasterAggregatorHandler globalCommHandler;
+  private MasterGlobalCommHandler globalCommHandler;
   /** Handler for aggregators to reduce/broadcast translation */
   private AggregatorToGlobalCommTranslation aggregatorTranslation;
   /** Master class */
@@ -331,7 +321,7 @@ public class BspServiceMaster<I extends WritableComparable,
    */
   private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat,
                                                int minSplitCountHint,
-                                               String inputSplitType) {
+                                               InputType inputSplitType) {
     String logPrefix = "generate" + inputSplitType + "InputSplits";
     List<InputSplit> splits;
     try {
@@ -604,46 +594,25 @@ public class BspServiceMaster<I extends WritableComparable,
    * Common method for creating vertex/edge input splits.
    *
    * @param inputFormat The vertex/edge input format
-   * @param inputSplitPaths ZooKeeper input split paths
    * @param inputSplitType Type of input split (for logging purposes)
    * @return Number of splits. Returns -1 on failure to create
    *         valid input splits.
    */
   private int createInputSplits(GiraphInputFormat inputFormat,
-                                InputSplitPaths inputSplitPaths,
-                                String inputSplitType) {
+                                InputType inputSplitType) {
     ImmutableClassesGiraphConfiguration conf = getConfiguration();
     String logPrefix = "create" + inputSplitType + "InputSplits";
     // Only the 'master' should be doing this.  Wait until the number of
     // processes that have reported health exceeds the minimum percentage.
     // If the minimum percentage is not met, fail the job.  Otherwise
     // generate the input splits
-    String inputSplitsPath = inputSplitPaths.getPath();
-    try {
-      if (getZkExt().exists(inputSplitsPath, false) != null) {
-        LOG.info(inputSplitsPath + " already exists, no need to create");
-        return Integer.parseInt(
-            new String(getZkExt().getData(inputSplitsPath, false, null),
-                Charset.defaultCharset()));
-      }
-    } catch (KeeperException.NoNodeException e) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info(logPrefix + ": Need to create the input splits at " +
-            inputSplitsPath);
-      }
-    } catch (KeeperException e) {
-      throw new IllegalStateException(logPrefix + ": KeeperException", e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(logPrefix + ": InterruptedException", e);
-    }
-
-    // When creating znodes, in case the master has already run, resume
-    // where it left off.
     List<WorkerInfo> healthyWorkerInfoList = checkWorkers();
     if (healthyWorkerInfoList == null) {
       setJobStateFailed("Not enough healthy workers to create input splits");
       return -1;
     }
+    globalCommHandler.getInputSplitsHandler().initialize(masterClient,
+        healthyWorkerInfoList);
 
     // Create at least as many splits as the total number of input threads.
     int minSplitCountHint = healthyWorkerInfoList.size() *
@@ -671,54 +640,8 @@ public class BspServiceMaster<I extends WritableComparable,
           "some threads will be not used");
     }
 
-    // Write input splits to zookeeper in parallel
-    int inputSplitThreadCount = conf.getInt(NUM_MASTER_ZK_INPUT_SPLIT_THREADS,
-        DEFAULT_INPUT_SPLIT_THREAD_COUNT);
-    if (LOG.isInfoEnabled()) {
-      LOG.info(logPrefix + ": Starting to write input split data " +
-          "to zookeeper with " + inputSplitThreadCount + " threads");
-    }
-    try {
-      getZkExt().createExt(inputSplitsPath, null,
-          Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT,
-          false);
-    } catch (KeeperException e) {
-      LOG.info(logPrefix + ": Node " +
-          inputSplitsPath + " keeper exception " + e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(logPrefix + ' ' + e.getMessage(), e);
-    }
-    ExecutorService taskExecutor =
-        Executors.newFixedThreadPool(inputSplitThreadCount);
-    boolean writeLocations = USE_INPUT_SPLIT_LOCALITY.get(conf);
-    for (int i = 0; i < splitList.size(); ++i) {
-      InputSplit inputSplit = splitList.get(i);
-      taskExecutor.submit(new LogStacktraceCallable<Void>(
-          new WriteInputSplit(inputFormat, inputSplit, inputSplitsPath, i,
-              writeLocations)));
-    }
-    taskExecutor.shutdown();
-    ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext());
-    if (LOG.isInfoEnabled()) {
-      LOG.info(logPrefix + ": Done writing input split data to zookeeper");
-    }
-
-    // Let workers know they can start trying to load the input splits
-    try {
-      getZkExt().createExt(inputSplitPaths.getAllReadyPath(),
-          null,
-          Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT,
-          false);
-    } catch (KeeperException.NodeExistsException e) {
-      LOG.info(logPrefix + ": Node " +
-          inputSplitPaths.getAllReadyPath() + " already exists.");
-    } catch (KeeperException e) {
-      throw new IllegalStateException(logPrefix + ": KeeperException", e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(logPrefix + ": IllegalStateException", e);
-    }
+    globalCommHandler.getInputSplitsHandler().addSplits(inputSplitType,
+        splitList, inputFormat);
 
     return splitList.size();
   }
@@ -730,8 +653,7 @@ public class BspServiceMaster<I extends WritableComparable,
     }
     MappingInputFormat<I, V, E, ? extends Writable> mappingInputFormat =
       getConfiguration().createWrappedMappingInputFormat();
-    return createInputSplits(mappingInputFormat, mappingInputSplitsPaths,
-      "Mapping");
+    return createInputSplits(mappingInputFormat, InputType.MAPPING);
   }
 
   @Override
@@ -742,8 +664,7 @@ public class BspServiceMaster<I extends WritableComparable,
     }
     VertexInputFormat<I, V, E> vertexInputFormat =
         getConfiguration().createWrappedVertexInputFormat();
-    return createInputSplits(vertexInputFormat, vertexInputSplitsPaths,
-        "Vertex");
+    return createInputSplits(vertexInputFormat, InputType.VERTEX);
   }
 
   @Override
@@ -754,8 +675,7 @@ public class BspServiceMaster<I extends WritableComparable,
     }
     EdgeInputFormat<I, E> edgeInputFormat =
         getConfiguration().createWrappedEdgeInputFormat();
-    return createInputSplits(edgeInputFormat, edgeInputSplitsPaths,
-        "Edge");
+    return createInputSplits(edgeInputFormat, InputType.EDGE);
   }
 
   @Override
@@ -764,7 +684,7 @@ public class BspServiceMaster<I extends WritableComparable,
   }
 
   @Override
-  public MasterAggregatorHandler getGlobalCommHandler() {
+  public MasterGlobalCommHandler getGlobalCommHandler() {
     return globalCommHandler;
   }
 
@@ -838,7 +758,7 @@ public class BspServiceMaster<I extends WritableComparable,
     });
 
 
-    globalCommHandler.readFields(finalizedStream);
+    globalCommHandler.getAggregatorHandler().readFields(finalizedStream);
     aggregatorTranslation.readFields(finalizedStream);
     masterCompute.readFields(finalizedStream);
     finalizedStream.close();
@@ -911,12 +831,15 @@ public class BspServiceMaster<I extends WritableComparable,
         if (masterChildArr.get(0).equals(myBid)) {
           GiraphStats.getInstance().getCurrentMasterTaskPartition().
               setValue(getTaskPartition());
-          globalCommHandler = new MasterAggregatorHandler(
-              getConfiguration(), getContext());
+
+          globalCommHandler = new MasterGlobalCommHandler(
+              new MasterAggregatorHandler(getConfiguration(), getContext()),
+              new MasterInputSplitsHandler(
+                  getConfiguration().useInputSplitLocality()));
           aggregatorTranslation = new AggregatorToGlobalCommTranslation(
               getConfiguration(), globalCommHandler);
 
-          globalCommHandler.initialize(this);
+          globalCommHandler.getAggregatorHandler().initialize(this);
           masterCompute = getConfiguration().createMasterCompute();
           masterCompute.setMasterService(this);
 
@@ -1128,7 +1051,7 @@ public class BspServiceMaster<I extends WritableComparable,
     for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
       finalizedOutputStream.writeInt(getWorkerId(chosenWorkerInfo));
     }
-    globalCommHandler.write(finalizedOutputStream);
+    globalCommHandler.getAggregatorHandler().write(finalizedOutputStream);
     aggregatorTranslation.write(finalizedOutputStream);
     masterCompute.write(finalizedOutputStream);
     finalizedOutputStream.close();
@@ -1265,12 +1188,8 @@ public class BspServiceMaster<I extends WritableComparable,
   @Override
   public void restartFromCheckpoint(long checkpoint) {
     // Process:
-    // 1. Remove all old input split data
-    // 2. Increase the application attempt and set to the correct checkpoint
-    // 3. Send command to all workers to restart their tasks
-    zkDeleteNode(vertexInputSplitsPaths.getPath());
-    zkDeleteNode(edgeInputSplitsPaths.getPath());
-
+    // 1. Increase the application attempt and set to the correct checkpoint
+    // 2. Send command to all workers to restart their tasks
     setApplicationAttempt(getApplicationAttempt() + 1);
     setCachedSuperstep(checkpoint);
     setRestartedSuperstep(checkpoint);
@@ -1493,37 +1412,32 @@ public class BspServiceMaster<I extends WritableComparable,
 
   /**
    * Coordinate the exchange of vertex/edge input splits among workers.
-   *
-   * @param inputSplitPaths Input split paths
-   * @param inputSplitEvents Input split events
-   * @param inputSplitsType Type of input splits (for logging purposes)
    */
-  private void coordinateInputSplits(InputSplitPaths inputSplitPaths,
-                                     InputSplitEvents inputSplitEvents,
-                                     String inputSplitsType) {
+  private void coordinateInputSplits() {
     // Coordinate the workers finishing sending their vertices/edges to the
     // correct workers and signal when everything is done.
-    String logPrefix = "coordinate" + inputSplitsType + "InputSplits";
-    if (!barrierOnWorkerList(inputSplitPaths.getDonePath(),
+    if (!barrierOnWorkerList(inputSplitsWorkerDonePath,
         chosenWorkerInfoList,
-        inputSplitEvents.getDoneStateChanged(),
+        getInputSplitsWorkerDoneEvent(),
         false)) {
-      throw new IllegalStateException(logPrefix + ": Worker failed during " +
-          "input split (currently not supported)");
+      throw new IllegalStateException("coordinateInputSplits: Worker failed " +
+          "during input split (currently not supported)");
     }
     try {
-      getZkExt().createExt(inputSplitPaths.getAllDonePath(),
+      getZkExt().createExt(inputSplitsAllDonePath,
           null,
           Ids.OPEN_ACL_UNSAFE,
           CreateMode.PERSISTENT,
           false);
     } catch (KeeperException.NodeExistsException e) {
       LOG.info("coordinateInputSplits: Node " +
-          inputSplitPaths.getAllDonePath() + " already exists.");
+          inputSplitsAllDonePath + " already exists.");
     } catch (KeeperException e) {
-      throw new IllegalStateException(logPrefix + ": KeeperException", e);
+      throw new IllegalStateException(
+          "coordinateInputSplits: KeeperException", e);
     } catch (InterruptedException e) {
-      throw new IllegalStateException(logPrefix + ": IllegalStateException", e);
+      throw new IllegalStateException(
+          "coordinateInputSplits: IllegalStateException", e);
     }
   }
 
@@ -1543,7 +1457,7 @@ public class BspServiceMaster<I extends WritableComparable,
    */
   private void initializeAggregatorInputSuperstep()
     throws InterruptedException {
-    globalCommHandler.prepareSuperstep();
+    globalCommHandler.getAggregatorHandler().prepareSuperstep();
 
     prepareMasterCompute(getSuperstep());
     try {
@@ -1559,9 +1473,9 @@ public class BspServiceMaster<I extends WritableComparable,
         "initializeAggregatorInputSuperstep: Failed in access", e);
     }
     aggregatorTranslation.postMasterCompute();
-    globalCommHandler.finishSuperstep();
+    globalCommHandler.getAggregatorHandler().finishSuperstep();
 
-    globalCommHandler.sendDataToOwners(masterClient);
+    globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
   }
 
   /**
@@ -1627,7 +1541,7 @@ public class BspServiceMaster<I extends WritableComparable,
     // We need to finalize aggregators from previous superstep
     if (getSuperstep() >= 0) {
       aggregatorTranslation.postMasterCompute();
-      globalCommHandler.finishSuperstep();
+      globalCommHandler.getAggregatorHandler().finishSuperstep();
     }
 
     masterClient.openConnections();
@@ -1663,25 +1577,13 @@ public class BspServiceMaster<I extends WritableComparable,
 
     // We need to send aggregators to worker owners after new worker assignments
     if (getSuperstep() >= 0) {
-      globalCommHandler.sendDataToOwners(masterClient);
+      globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
     }
 
     if (getSuperstep() == INPUT_SUPERSTEP) {
       // Initialize aggregators before coordinating
       initializeAggregatorInputSuperstep();
-      if (getConfiguration().hasMappingInputFormat()) {
-        coordinateInputSplits(mappingInputSplitsPaths, mappingInputSplitsEvents,
-            "Mapping");
-      }
-      // vertex loading and edge loading
-      if (getConfiguration().hasVertexInputFormat()) {
-        coordinateInputSplits(vertexInputSplitsPaths, vertexInputSplitsEvents,
-            "Vertex");
-      }
-      if (getConfiguration().hasEdgeInputFormat()) {
-        coordinateInputSplits(edgeInputSplitsPaths, edgeInputSplitsEvents,
-            "Edge");
-      }
+      coordinateInputSplits();
     }
 
     String finishedWorkerPath =
@@ -1695,7 +1597,7 @@ public class BspServiceMaster<I extends WritableComparable,
 
     // Collect aggregator values, then run the master.compute() and
     // finally save the aggregator values
-    globalCommHandler.prepareSuperstep();
+    globalCommHandler.getAggregatorHandler().prepareSuperstep();
     aggregatorTranslation.prepareSuperstep();
 
     SuperstepClasses superstepClasses =
@@ -1761,7 +1663,8 @@ public class BspServiceMaster<I extends WritableComparable,
     } else {
       superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
     }
-    globalCommHandler.writeAggregators(getSuperstep(), superstepState);
+    globalCommHandler.getAggregatorHandler().writeAggregators(
+        getSuperstep(), superstepState);
 
     return superstepState;
   }
@@ -2009,7 +1912,7 @@ public class BspServiceMaster<I extends WritableComparable,
         failJob(new Exception("Checkpoint and halt requested. " +
             "Killing this job."));
       }
-      globalCommHandler.close();
+      globalCommHandler.getAggregatorHandler().close();
       masterClient.closeConnections();
       masterServer.close();
     }
@@ -2122,100 +2025,4 @@ public class BspServiceMaster<I extends WritableComparable,
     gs.getAggregateSentMessageBytes()
       .increment(globalStats.getMessageBytesCount());
   }
-
-  /**
-   * Task that writes a given input split to zookeeper.
-   * Upon failure call() throws an exception.
-   */
-  private class WriteInputSplit implements Callable<Void> {
-    /** Input format */
-    private final GiraphInputFormat inputFormat;
-    /** Input split which we are going to write */
-    private final InputSplit inputSplit;
-    /** Input splits path */
-    private final String inputSplitsPath;
-    /** Index of the input split */
-    private final int index;
-    /** Whether to write locality information */
-    private final boolean writeLocations;
-
-    /**
-     * Constructor
-     *
-     * @param inputFormat Input format
-     * @param inputSplit Input split which we are going to write
-     * @param inputSplitsPath Input splits path
-     * @param index Index of the input split
-     * @param writeLocations whether to write the input split's locations (to
-     *                       be used by workers for prioritizing local splits
-     *                       when reading)
-     */
-    public WriteInputSplit(GiraphInputFormat inputFormat,
-                           InputSplit inputSplit,
-                           String inputSplitsPath,
-                           int index,
-                           boolean writeLocations) {
-      this.inputFormat = inputFormat;
-      this.inputSplit = inputSplit;
-      this.inputSplitsPath = inputSplitsPath;
-      this.index = index;
-      this.writeLocations = writeLocations;
-    }
-
-    @Override
-    public Void call() {
-      String inputSplitPath = null;
-      try {
-        ByteArrayOutputStream byteArrayOutputStream =
-            new ByteArrayOutputStream();
-        DataOutput outputStream =
-            new DataOutputStream(byteArrayOutputStream);
-
-        if (writeLocations) {
-          String[] splitLocations = inputSplit.getLocations();
-          StringBuilder locations = null;
-          if (splitLocations != null) {
-            int splitListLength =
-                Math.min(splitLocations.length, localityLimit);
-            locations = new StringBuilder();
-            for (String location : splitLocations) {
-              locations.append(location)
-                  .append(--splitListLength > 0 ? "\t" : "");
-            }
-          }
-          Text.writeString(outputStream,
-              locations == null ? "" : locations.toString());
-        }
-
-        inputFormat.writeInputSplit(inputSplit, outputStream);
-        inputSplitPath = inputSplitsPath + "/" + index;
-        getZkExt().createExt(inputSplitPath,
-            byteArrayOutputStream.toByteArray(),
-            Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT,
-            true);
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("call: Created input split " +
-              "with index " + index + " serialized as " +
-              byteArrayOutputStream.toString(Charset.defaultCharset().name()));
-        }
-      } catch (KeeperException.NodeExistsException e) {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("call: Node " +
-              inputSplitPath + " already exists.");
-        }
-      } catch (KeeperException e) {
-        throw new IllegalStateException(
-            "call: KeeperException", e);
-      } catch (InterruptedException e) {
-        throw new IllegalStateException(
-            "call: IllegalStateException", e);
-      } catch (IOException e) {
-        throw new IllegalStateException(
-            "call: IOException", e);
-      }
-      return null;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
index 5558cee..8ca3d3a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
@@ -42,7 +42,7 @@ import com.google.common.collect.Maps;
 
 /** Handler for reduce/broadcast on the master */
 public class MasterAggregatorHandler
-    implements MasterGlobalCommUsage, Writable {
+    implements MasterGlobalCommUsageAggregators, Writable {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(MasterAggregatorHandler.class);

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java
new file mode 100644
index 0000000..717a24d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.master.input.MasterInputSplitsHandler;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Handler for all master communications
+ */
+public class MasterGlobalCommHandler implements MasterGlobalCommUsage {
+  /** Aggregator handler */
+  private final MasterAggregatorHandler aggregatorHandler;
+  /** Input splits handler*/
+  private final MasterInputSplitsHandler inputSplitsHandler;
+
+  /**
+   * Constructor
+   *
+   * @param aggregatorHandler Aggregator handler
+   * @param inputSplitsHandler Input splits handler
+   */
+  public MasterGlobalCommHandler(
+      MasterAggregatorHandler aggregatorHandler,
+      MasterInputSplitsHandler inputSplitsHandler) {
+    this.aggregatorHandler = aggregatorHandler;
+    this.inputSplitsHandler = inputSplitsHandler;
+  }
+
+  public MasterAggregatorHandler getAggregatorHandler() {
+    return aggregatorHandler;
+  }
+
+  public MasterInputSplitsHandler getInputSplitsHandler() {
+    return inputSplitsHandler;
+  }
+
+  @Override
+  public <S, R extends Writable> void registerReducer(String name,
+      ReduceOperation<S, R> reduceOp) {
+    aggregatorHandler.registerReducer(name, reduceOp);
+  }
+
+  @Override
+  public <S, R extends Writable> void registerReducer(String name,
+      ReduceOperation<S, R> reduceOp, R globalInitialValue) {
+    aggregatorHandler.registerReducer(name, reduceOp, globalInitialValue);
+  }
+
+  @Override
+  public <R extends Writable> R getReduced(String name) {
+    return aggregatorHandler.getReduced(name);
+  }
+
+  @Override
+  public void broadcast(String name, Writable value) {
+    aggregatorHandler.broadcast(name, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
index 7ee9048..60b1809 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
@@ -17,52 +17,9 @@
  */
 package org.apache.giraph.master;
 
-import org.apache.giraph.reducers.ReduceOperation;
-import org.apache.hadoop.io.Writable;
-
 /**
- * Master compute can access reduce and broadcast methods
- * through this interface, from masterCompute method.
+ * All global master communication
  */
-public interface MasterGlobalCommUsage {
-  /**
-   * Register reducer to be reduced in the next worker computation,
-   * using given name and operations.
-   * @param name Name of the reducer
-   * @param reduceOp Reduce operations
-   * @param <S> Single value type
-   * @param <R> Reduced value type
-   */
-  <S, R extends Writable> void registerReducer(
-      String name, ReduceOperation<S, R> reduceOp);
-
-  /**
-   * Register reducer to be reduced in the next worker computation, using
-   * given name and operations, starting globally from globalInitialValue.
-   * (globalInitialValue is reduced only once, each worker will still start
-   * from neutral initial value)
-   *
-   * @param name Name of the reducer
-   * @param reduceOp Reduce operations
-   * @param globalInitialValue Global initial value
-   * @param <S> Single value type
-   * @param <R> Reduced value type
-   */
-  <S, R extends Writable> void registerReducer(
-      String name, ReduceOperation<S, R> reduceOp, R globalInitialValue);
-
-  /**
-   * Get reduced value from previous worker computation.
-   * @param name Name of the reducer
-   * @return Reduced value
-   * @param <R> Reduced value type
-   */
-  <R extends Writable> R getReduced(String name);
-
-  /**
-   * Broadcast given value to all workers for next computation.
-   * @param name Name of the broadcast object
-   * @param value Value
-   */
-  void broadcast(String name, Writable value);
+public interface MasterGlobalCommUsage
+    extends MasterGlobalCommUsageAggregators {
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java
new file mode 100644
index 0000000..62c1f3f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Master compute can access reduce and broadcast methods
+ * through this interface, from masterCompute method.
+ */
+public interface MasterGlobalCommUsageAggregators {
+  /**
+   * Register reducer to be reduced in the next worker computation,
+   * using given name and operations.
+   * @param name Name of the reducer
+   * @param reduceOp Reduce operations
+   * @param <S> Single value type
+   * @param <R> Reduced value type
+   */
+  <S, R extends Writable> void registerReducer(
+      String name, ReduceOperation<S, R> reduceOp);
+
+  /**
+   * Register reducer to be reduced in the next worker computation, using
+   * given name and operations, starting globally from globalInitialValue.
+   * (globalInitialValue is reduced only once, each worker will still start
+   * from neutral initial value)
+   *
+   * @param name Name of the reducer
+   * @param reduceOp Reduce operations
+   * @param globalInitialValue Global initial value
+   * @param <S> Single value type
+   * @param <R> Reduced value type
+   */
+  <S, R extends Writable> void registerReducer(
+      String name, ReduceOperation<S, R> reduceOp, R globalInitialValue);
+
+  /**
+   * Get reduced value from previous worker computation.
+   * @param name Name of the reducer
+   * @return Reduced value
+   * @param <R> Reduced value type
+   */
+  <R extends Writable> R getReduced(String name);
+
+  /**
+   * Broadcast given value to all workers for next computation.
+   * @param name Name of the broadcast object
+   * @param value Value
+   */
+  void broadcast(String name, Writable value);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java
new file mode 100644
index 0000000..5168e32
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Input splits organizer for vertex and edge input splits on master, which
+ * doesn't use locality information
+ */
+public class BasicInputSplitsMasterOrganizer
+    implements InputSplitsMasterOrganizer {
+  /** Available splits queue */
+  private final ConcurrentLinkedQueue<byte[]> splits;
+
+  /**
+   * Constructor
+   *
+   * @param serializedSplits Splits
+   */
+  public BasicInputSplitsMasterOrganizer(List<byte[]> serializedSplits) {
+    splits = new ConcurrentLinkedQueue<>(serializedSplits);
+  }
+
+  @Override
+  public byte[] getSerializedSplitFor(int workerTaskId) {
+    return splits.poll();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java
new file mode 100644
index 0000000..d5a0131
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+/**
+ * Interface for different input split organizers on master
+ */
+public interface InputSplitsMasterOrganizer {
+  /**
+   * @param workerTaskId Id of worker requesting split
+   *
+   * @return Get next split for the worker, or null if all splits were taken
+   * already
+   */
+  byte[] getSerializedSplitFor(int workerTaskId);
+}