You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2015/10/19 19:14:30 UTC

[1/3] git commit: updated refs/heads/trunk to 5b0cd0e

Repository: giraph
Updated Branches:
  refs/heads/trunk 47da75182 -> 5b0cd0e0a


http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
index b9fc508..8d8e86d 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -32,7 +32,6 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -57,9 +56,9 @@ import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexOutputFormat;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.job.HadoopUtils;
+import org.apache.giraph.master.input.LocalityAwareInputSplitsMasterOrganizer;
 import org.apache.giraph.utils.NoOpComputation;
-import org.apache.giraph.worker.InputSplitPathOrganizer;
-import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -69,7 +68,6 @@ import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.zookeeper.KeeperException;
@@ -316,38 +314,39 @@ public class
    * @throws InterruptedException
    */
   @Test
-  public void testInputSplitPathOrganizer()
+  public void testInputSplitLocality()
     throws IOException, KeeperException, InterruptedException {
-    final List<String> testList = new ArrayList<String>();
-    Collections.addAll(testList, "remote2", "local", "remote1");
-    final String localHost = "node.LOCAL.com";
-    final String testListName = "test_list_parent_znode";
-    // build output just as we do to store hostlists in ZNODES
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DataOutputStream dos = new DataOutputStream(baos);
-    String last = "node.test4.com\tnode.test5.com\tnode.test6.com";
-    Text.writeString(dos, last);
-    byte[] remote1 = baos.toByteArray();
-    baos = new ByteArrayOutputStream();
-    dos = new DataOutputStream(baos);
-    String middle = "node.test1.com\tnode.test2.com\tnode.test3.com";
-    Text.writeString(dos, middle);
-    byte[] remote2 = baos.toByteArray();
-    baos = new ByteArrayOutputStream();
-    dos = new DataOutputStream(baos);
-    String first = "node.testx.com\tnode.LOCAL.com\tnode.testy.com";
-    Text.writeString(dos, first);
-    byte[] local = baos.toByteArray();
-    ZooKeeperExt zk = mock(ZooKeeperExt.class);
-    when(zk.getChildrenExt(testListName, false, false, true)).
-        thenReturn(testList);
-    when(zk.getData("remote1", false, null)).thenReturn(remote1);
-    when(zk.getData("remote2", false, null)).thenReturn(remote2);
-    when(zk.getData("local", false, null)).thenReturn(local);
-    InputSplitPathOrganizer lis =
-      new InputSplitPathOrganizer(zk, testListName, localHost, true);
-    final List<String> resultList = Lists.newArrayList(lis.getPathList());
-    assertEquals("local", resultList.get(0));
+    List<byte[]> serializedSplits = new ArrayList<>();
+    serializedSplits.add(new byte[]{1});
+    serializedSplits.add(new byte[]{2});
+    serializedSplits.add(new byte[]{3});
+
+    WorkerInfo workerInfo = mock(WorkerInfo.class);
+    when(workerInfo.getTaskId()).thenReturn(5);
+    when(workerInfo.getHostname()).thenReturn("node.LOCAL.com");
+
+    List<InputSplit> splits = new ArrayList<>();
+    InputSplit split1 = mock(InputSplit.class);
+    when(split1.getLocations()).thenReturn(new String[]{
+        "node.test1.com", "node.test2.com", "node.test3.com"});
+    splits.add(split1);
+    InputSplit split2 = mock(InputSplit.class);
+    when(split2.getLocations()).thenReturn(new String[]{
+        "node.testx.com", "node.LOCAL.com", "node.testy.com"});
+    splits.add(split2);
+    InputSplit split3 = mock(InputSplit.class);
+    when(split3.getLocations()).thenReturn(new String[]{
+        "node.test4.com", "node.test5.com", "node.test6.com"});
+    splits.add(split3);
+
+    LocalityAwareInputSplitsMasterOrganizer inputSplitOrganizer =
+        new LocalityAwareInputSplitsMasterOrganizer(
+            serializedSplits,
+            splits,
+            Lists.newArrayList(workerInfo));
+
+    assertEquals(2,
+        inputSplitOrganizer.getSerializedSplitFor(workerInfo.getTaskId())[0]);
   }
 
   /**


[2/3] git commit: updated refs/heads/trunk to 5b0cd0e

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java
new file mode 100644
index 0000000..d3eb5da
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Input splits organizer for vertex and edge input splits on master, which
+ * uses locality information
+ */
+public class LocalityAwareInputSplitsMasterOrganizer
+    implements InputSplitsMasterOrganizer {
+  /** All splits before this pointer were taken */
+  private final AtomicInteger listPointer = new AtomicInteger();
+  /** List of serialized splits */
+  private final List<byte[]> serializedSplits;
+  /** Array containing information about whether a split was taken or not */
+  private final AtomicBoolean[] splitsTaken;
+
+  /** Map with preferred splits for each worker */
+  private final Map<Integer, ConcurrentLinkedQueue<Integer>>
+      workerToPreferredSplitsMap;
+
+
+  /**
+   * Constructor
+   *
+   * @param serializedSplits Serialized splits
+   * @param splits           Splits
+   * @param workers          List of workers
+   */
+  public LocalityAwareInputSplitsMasterOrganizer(List<byte[]> serializedSplits,
+      List<InputSplit> splits, List<WorkerInfo> workers) {
+    this.serializedSplits = serializedSplits;
+    splitsTaken = new AtomicBoolean[serializedSplits.size()];
+    // Mark all splits as not taken initially
+    for (int i = 0; i < serializedSplits.size(); i++) {
+      splitsTaken[i] = new AtomicBoolean(false);
+    }
+
+    workerToPreferredSplitsMap = new HashMap<>();
+    for (WorkerInfo worker : workers) {
+      workerToPreferredSplitsMap.put(worker.getTaskId(),
+          new ConcurrentLinkedQueue<Integer>());
+    }
+    // Go through all splits
+    for (int i = 0; i < splits.size(); i++) {
+      try {
+        String[] locations = splits.get(i).getLocations();
+        // For every worker
+        for (WorkerInfo worker : workers) {
+          // Check splits locations
+          for (String location : locations) {
+            // If split is local for the worker, add it to preferred list
+            if (location.contains(worker.getHostname())) {
+              workerToPreferredSplitsMap.get(worker.getTaskId()).add(i);
+              break;
+            }
+          }
+        }
+      } catch (IOException | InterruptedException e) {
+        throw new IllegalStateException(
+            "Exception occurred while getting splits locations", e);
+      }
+    }
+  }
+
+  @Override
+  public byte[] getSerializedSplitFor(int workerTaskId) {
+    ConcurrentLinkedQueue<Integer> preferredSplits =
+        workerToPreferredSplitsMap.get(workerTaskId);
+    // Try to find a local split
+    while (true) {
+      // Get position to check
+      Integer splitIndex = preferredSplits.poll();
+      // Check if all local splits were already processed for this worker
+      if (splitIndex == null) {
+        break;
+      }
+      // Try to reserve the split
+      if (splitsTaken[splitIndex].compareAndSet(false, true)) {
+        return serializedSplits.get(splitIndex);
+      }
+    }
+
+    // No more local splits available, proceed linearly from splits list
+    while (true) {
+      // Get position to check
+      int splitIndex = listPointer.getAndIncrement();
+      // Check if all splits were already taken
+      if (splitIndex >= serializedSplits.size()) {
+        return null;
+      }
+      // Try to reserve the split
+      if (splitsTaken[splitIndex].compareAndSet(false, true)) {
+        return serializedSplits.get(splitIndex);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java
new file mode 100644
index 0000000..8399c8a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+import org.apache.giraph.worker.WorkerInfo;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Organizer for mapping splits on master. Mapping splits need all to be
+ * given to all workers, unlike vertex and edge splits which are read by
+ * exactly one worker each
+ */
+public class MappingInputSplitsMasterOrganizer
+    implements InputSplitsMasterOrganizer {
+  /** List of splits */
+  private final List<byte[]> splits;
+  /** Map from worker task id to atomic pointer in splits list */
+  private final Map<Integer, AtomicInteger>
+      workerTaskIdToNextSplitIndexMap;
+
+  /**
+   * Constructor
+   *
+   * @param serializedSplits Splits
+   * @param workers List of workers
+   */
+  public MappingInputSplitsMasterOrganizer(List<byte[]> serializedSplits,
+      List<WorkerInfo> workers) {
+    this.splits = serializedSplits;
+    workerTaskIdToNextSplitIndexMap = new HashMap<>();
+    for (WorkerInfo worker : workers) {
+      workerTaskIdToNextSplitIndexMap.put(
+          worker.getTaskId(), new AtomicInteger(0));
+    }
+  }
+
+  @Override
+  public byte[] getSerializedSplitFor(int workerTaskId) {
+    AtomicInteger nextSplitIndex =
+        workerTaskIdToNextSplitIndexMap.get(workerTaskId);
+    int splitIndex = nextSplitIndex.getAndIncrement();
+    return splitIndex < splits.size() ? splits.get(splitIndex) : null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java
new file mode 100644
index 0000000..327e59d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+import org.apache.giraph.comm.MasterClient;
+import org.apache.giraph.comm.requests.ReplyWithInputSplitRequest;
+import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.giraph.io.InputType;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Handler for input splits on master
+ *
+ * Since currently Giraph fails if worker fails while reading input, we
+ * didn't complicate this part with retries yet, later it could be added by
+ * keeping track of which worker got which split and then if worker dies put
+ * these splits back to queues.
+ */
+public class MasterInputSplitsHandler {
+  /** Whether to use locality information */
+  private final boolean useLocality;
+  /** Master client */
+  private MasterClient masterClient;
+  /** Master client */
+  private List<WorkerInfo> workers;
+  /** Map of splits organizers for each split type */
+  private Map<InputType, InputSplitsMasterOrganizer> splitsMap =
+      new EnumMap<>(InputType.class);
+  /** Latches to say when one input splits type is ready to be accessed */
+  private Map<InputType, CountDownLatch> latchesMap =
+      new EnumMap<>(InputType.class);
+
+  /**
+   * Constructor
+   *
+   * @param useLocality Whether to use locality information or not
+   */
+  public MasterInputSplitsHandler(boolean useLocality) {
+    this.useLocality = useLocality;
+    for (InputType inputType : InputType.values()) {
+      latchesMap.put(inputType, new CountDownLatch(1));
+    }
+  }
+
+  /**
+   * Initialize
+   *
+   * @param masterClient Master client
+   * @param workers List of workers
+   */
+  public void initialize(MasterClient masterClient, List<WorkerInfo> workers) {
+    this.masterClient = masterClient;
+    this.workers = workers;
+  }
+
+  /**
+   * Add splits
+   *
+   * @param splitsType Type of splits
+   * @param inputSplits Splits
+   * @param inputFormat Format
+   */
+  public void addSplits(InputType splitsType, List<InputSplit> inputSplits,
+      GiraphInputFormat inputFormat) {
+    List<byte[]> serializedSplits = new ArrayList<>();
+    for (InputSplit inputSplit : inputSplits) {
+      try {
+        ByteArrayOutputStream byteArrayOutputStream =
+            new ByteArrayOutputStream();
+        DataOutput outputStream =
+            new DataOutputStream(byteArrayOutputStream);
+        inputFormat.writeInputSplit(inputSplit, outputStream);
+        serializedSplits.add(byteArrayOutputStream.toByteArray());
+      } catch (IOException e) {
+        throw new IllegalStateException("IOException occurred", e);
+      }
+    }
+    InputSplitsMasterOrganizer inputSplitsOrganizer;
+    if (splitsType == InputType.MAPPING) {
+      inputSplitsOrganizer = new MappingInputSplitsMasterOrganizer(
+          serializedSplits, workers);
+    } else {
+      inputSplitsOrganizer = useLocality ?
+          new LocalityAwareInputSplitsMasterOrganizer(serializedSplits,
+              inputSplits, workers) :
+          new BasicInputSplitsMasterOrganizer(serializedSplits);
+    }
+    splitsMap.put(splitsType, inputSplitsOrganizer);
+    latchesMap.get(splitsType).countDown();
+  }
+
+  /**
+   * Called after we receive a split request from some worker, should send
+   * split back to it if available, or send it information that there is no
+   * more available
+   *
+   * @param splitType Type of split requested
+   * @param workerTaskId Id of worker who requested split
+   */
+  public void sendSplitTo(InputType splitType, int workerTaskId) {
+    try {
+      // Make sure we don't try to retrieve splits before they were added
+      latchesMap.get(splitType).await();
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Interrupted", e);
+    }
+    byte[] serializedInputSplit =
+        splitsMap.get(splitType).getSerializedSplitFor(workerTaskId);
+    masterClient.sendWritableRequest(workerTaskId,
+        new ReplyWithInputSplitRequest(splitType,
+            serializedInputSplit == null ? new byte[0] : serializedInputSplit));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java b/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java
new file mode 100644
index 0000000..992b6fe
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Input related master classes
+ */
+package org.apache.giraph.master.input;

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
index 4600745..6914c3b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
@@ -115,7 +115,7 @@ public class PartitionUtils {
         workerStatsMap.put(
             workerInfo,
             new VertexEdgeCount(partitionStats.getVertexCount(),
-                partitionStats.getEdgeCount()));
+                partitionStats.getEdgeCount(), 0));
       } else {
         workerStatsMap.put(
             workerInfo,

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 5b754d6..1031bb3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -59,8 +59,6 @@ import org.apache.giraph.graph.AddressesAndPartitionsWritable;
 import org.apache.giraph.graph.FinishedSuperstepStats;
 import org.apache.giraph.graph.GlobalStats;
 import org.apache.giraph.graph.GraphTaskManager;
-import org.apache.giraph.graph.InputSplitEvents;
-import org.apache.giraph.graph.InputSplitPaths;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.EdgeOutputFormat;
@@ -177,10 +175,8 @@ public class BspServiceWorker<I extends WritableComparable,
   /** Time spent waiting on requests to finish */
   private GiraphTimer waitRequestsTimer;
 
-  /** InputSplit handlers used in INPUT_SUPERSTEP for vertex splits */
-  private InputSplitsHandler vertexSplitsHandler;
-  /** InputSplit handlers used in INPUT_SUPERSTEP for edge splits */
-  private InputSplitsHandler edgeSplitsHandler;
+  /** InputSplit handlers used in INPUT_SUPERSTEP */
+  private WorkerInputSplitsHandler inputSplitsHandler;
 
   /**
    * Constructor for setting up the worker.
@@ -237,8 +233,9 @@ public class BspServiceWorker<I extends WritableComparable,
         null;
 
     GiraphMetrics.get().addSuperstepResetObserver(this);
-    vertexSplitsHandler = null;
-    edgeSplitsHandler = null;
+
+    inputSplitsHandler = new WorkerInputSplitsHandler(
+        workerInfo, masterInfo.getTaskId(), workerClient);
   }
 
   @Override
@@ -295,26 +292,20 @@ public class BspServiceWorker<I extends WritableComparable,
    *
    * Use one or more threads to do the loading.
    *
-   * @param inputSplitPathList List of input split paths
    * @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s
    * @return Statistics of the vertices and edges loaded
    * @throws InterruptedException
    * @throws KeeperException
    */
   private VertexEdgeCount loadInputSplits(
-      List<String> inputSplitPathList,
       CallableFactory<VertexEdgeCount> inputSplitsCallableFactory)
     throws KeeperException, InterruptedException {
     VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
-    // Determine how many threads to use based on the number of input splits
-    int maxInputSplitThreads = (inputSplitPathList.size() - 1) /
-        getConfiguration().getMaxWorkers() + 1;
-    int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
-        maxInputSplitThreads);
+    int numThreads = getConfiguration().getNumInputSplitsThreads();
     if (LOG.isInfoEnabled()) {
       LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
           "originally " + getConfiguration().getNumInputSplitsThreads() +
-          " threads(s) for " + inputSplitPathList.size() + " total splits.");
+          " threads(s)");
     }
 
     List<VertexEdgeCount> results =
@@ -336,46 +327,21 @@ public class BspServiceWorker<I extends WritableComparable,
    */
   private long loadMapping() throws KeeperException,
     InterruptedException {
-    List<String> inputSplitPathList =
-        getZkExt().getChildrenExt(mappingInputSplitsPaths.getPath(),
-        false, false, true);
-
-    InputSplitPathOrganizer splitOrganizer =
-        new InputSplitPathOrganizer(getZkExt(),
-            inputSplitPathList, getWorkerInfo().getHostname(),
-            getConfiguration().useInputSplitLocality());
-
     MappingInputSplitsCallableFactory<I, V, E, ? extends Writable>
-        mappingInputSplitsCallableFactory =
+        inputSplitsCallableFactory =
         new MappingInputSplitsCallableFactory<>(
             getConfiguration().createWrappedMappingInputFormat(),
-            splitOrganizer,
             getContext(),
             getConfiguration(),
             this,
-            getZkExt());
+            inputSplitsHandler);
 
-    long entriesLoaded = 0;
-    // Determine how many threads to use based on the number of input splits
-    int maxInputSplitThreads = inputSplitPathList.size();
-    int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
-        maxInputSplitThreads);
-    if (LOG.isInfoEnabled()) {
-      LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
-          "originally " + getConfiguration().getNumInputSplitsThreads() +
-          " threads(s) for " + inputSplitPathList.size() + " total splits.");
-    }
+    long mappingsLoaded =
+        loadInputSplits(inputSplitsCallableFactory).getMappingCount();
 
-    List<Integer> results =
-        ProgressableUtils.getResultsWithNCallables(
-            mappingInputSplitsCallableFactory,
-            numThreads, "load-mapping-%d", getContext());
-    for (Integer result : results) {
-      entriesLoaded += result;
-    }
     // after all threads finish loading - call postFilling
     localData.getMappingStore().postFilling();
-    return entriesLoaded;
+    return mappingsLoaded;
   }
 
   /**
@@ -386,31 +352,15 @@ public class BspServiceWorker<I extends WritableComparable,
    */
   private VertexEdgeCount loadVertices() throws KeeperException,
       InterruptedException {
-    List<String> inputSplitPathList =
-        getZkExt().getChildrenExt(vertexInputSplitsPaths.getPath(),
-            false, false, true);
-
-    InputSplitPathOrganizer splitOrganizer =
-        new InputSplitPathOrganizer(getZkExt(),
-            inputSplitPathList, getWorkerInfo().getHostname(),
-            getConfiguration().useInputSplitLocality());
-    vertexSplitsHandler = new InputSplitsHandler(
-        splitOrganizer,
-        getZkExt(),
-        getContext(),
-        BspService.VERTEX_INPUT_SPLIT_RESERVED_NODE,
-        BspService.VERTEX_INPUT_SPLIT_FINISHED_NODE);
-
     VertexInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
         new VertexInputSplitsCallableFactory<I, V, E>(
             getConfiguration().createWrappedVertexInputFormat(),
             getContext(),
             getConfiguration(),
             this,
-            vertexSplitsHandler,
-            getZkExt());
+            inputSplitsHandler);
 
-    return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory);
+    return loadInputSplits(inputSplitsCallableFactory);
   }
 
   /**
@@ -420,32 +370,15 @@ public class BspServiceWorker<I extends WritableComparable,
    * @return Number of edges loaded
    */
   private long loadEdges() throws KeeperException, InterruptedException {
-    List<String> inputSplitPathList =
-        getZkExt().getChildrenExt(edgeInputSplitsPaths.getPath(),
-            false, false, true);
-
-    InputSplitPathOrganizer splitOrganizer =
-        new InputSplitPathOrganizer(getZkExt(),
-            inputSplitPathList, getWorkerInfo().getHostname(),
-            getConfiguration().useInputSplitLocality());
-    edgeSplitsHandler = new InputSplitsHandler(
-        splitOrganizer,
-        getZkExt(),
-        getContext(),
-        BspService.EDGE_INPUT_SPLIT_RESERVED_NODE,
-        BspService.EDGE_INPUT_SPLIT_FINISHED_NODE);
-
     EdgeInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
         new EdgeInputSplitsCallableFactory<I, V, E>(
             getConfiguration().createWrappedEdgeInputFormat(),
             getContext(),
             getConfiguration(),
             this,
-            edgeSplitsHandler,
-            getZkExt());
+            inputSplitsHandler);
 
-    return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory).
-        getEdgeCount();
+    return loadInputSplits(inputSplitsCallableFactory).getEdgeCount();
   }
 
   @Override
@@ -459,46 +392,12 @@ public class BspServiceWorker<I extends WritableComparable,
   }
 
   /**
-   * Ensure the input splits are ready for processing
-   *
-   * @param inputSplitPaths Input split paths
-   * @param inputSplitEvents Input split events
-   */
-  private void ensureInputSplitsReady(InputSplitPaths inputSplitPaths,
-                                      InputSplitEvents inputSplitEvents) {
-    while (true) {
-      Stat inputSplitsReadyStat;
-      try {
-        inputSplitsReadyStat = getZkExt().exists(
-            inputSplitPaths.getAllReadyPath(), true);
-      } catch (KeeperException e) {
-        throw new IllegalStateException("ensureInputSplitsReady: " +
-            "KeeperException waiting on input splits", e);
-      } catch (InterruptedException e) {
-        throw new IllegalStateException("ensureInputSplitsReady: " +
-            "InterruptedException waiting on input splits", e);
-      }
-      if (inputSplitsReadyStat != null) {
-        break;
-      }
-      inputSplitEvents.getAllReadyChanged().waitForever();
-      inputSplitEvents.getAllReadyChanged().reset();
-    }
-  }
-
-  /**
    * Mark current worker as done and then wait for all workers
    * to finish processing input splits.
-   *
-   * @param inputSplitPaths Input split paths
-   * @param inputSplitEvents Input split events
    */
-  private void markCurrentWorkerDoneThenWaitForOthers(
-    InputSplitPaths inputSplitPaths,
-    InputSplitEvents inputSplitEvents) {
+  private void markCurrentWorkerDoneReadingThenWaitForOthers() {
     String workerInputSplitsDonePath =
-        inputSplitPaths.getDonePath() + "/" +
-            getWorkerInfo().getHostnameId();
+        inputSplitsWorkerDonePath + "/" + getWorkerInfo().getHostnameId();
     try {
       getZkExt().createExt(workerInputSplitsDonePath,
           null,
@@ -508,32 +407,31 @@ public class BspServiceWorker<I extends WritableComparable,
     } catch (KeeperException e) {
       throw new IllegalStateException(
           "markCurrentWorkerDoneThenWaitForOthers: " +
-          "KeeperException creating worker done splits", e);
+              "KeeperException creating worker done splits", e);
     } catch (InterruptedException e) {
       throw new IllegalStateException(
           "markCurrentWorkerDoneThenWaitForOthers: " +
-          "InterruptedException creating worker done splits", e);
+              "InterruptedException creating worker done splits", e);
     }
     while (true) {
       Stat inputSplitsDoneStat;
       try {
         inputSplitsDoneStat =
-            getZkExt().exists(inputSplitPaths.getAllDonePath(),
-                true);
+            getZkExt().exists(inputSplitsAllDonePath, true);
       } catch (KeeperException e) {
         throw new IllegalStateException(
             "markCurrentWorkerDoneThenWaitForOthers: " +
-            "KeeperException waiting on worker done splits", e);
+                "KeeperException waiting on worker done splits", e);
       } catch (InterruptedException e) {
         throw new IllegalStateException(
             "markCurrentWorkerDoneThenWaitForOthers: " +
-            "InterruptedException waiting on worker done splits", e);
+                "InterruptedException waiting on worker done splits", e);
       }
       if (inputSplitsDoneStat != null) {
         break;
       }
-      inputSplitEvents.getAllDoneChanged().waitForever();
-      inputSplitEvents.getAllDoneChanged().reset();
+      getInputSplitsAllDoneEvent().waitForever();
+      getInputSplitsAllDoneEvent().reset();
     }
   }
 
@@ -597,8 +495,6 @@ else[HADOOP_NON_SECURE]*/
     long entriesLoaded;
 
     if (getConfiguration().hasMappingInputFormat()) {
-      // Ensure the mapping InputSplits are ready for processing
-      ensureInputSplitsReady(mappingInputSplitsPaths, mappingInputSplitsEvents);
       getContext().progress();
       try {
         entriesLoaded = loadMapping();
@@ -618,17 +514,12 @@ else[HADOOP_NON_SECURE]*/
             entriesLoaded + " entries from inputSplits");
       }
 
-      // Workers wait for each other to finish, coordinated by master
-      markCurrentWorkerDoneThenWaitForOthers(mappingInputSplitsPaths,
-          mappingInputSplitsEvents);
       // Print stats for data stored in localData once mapping is fully
       // loaded on all the workers
       localData.printStats();
     }
 
     if (getConfiguration().hasVertexInputFormat()) {
-      // Ensure the vertex InputSplits are ready for processing
-      ensureInputSplitsReady(vertexInputSplitsPaths, vertexInputSplitsEvents);
       getContext().progress();
       try {
         vertexEdgeCount = loadVertices();
@@ -646,8 +537,6 @@ else[HADOOP_NON_SECURE]*/
     WorkerProgress.get().finishLoadingVertices();
 
     if (getConfiguration().hasEdgeInputFormat()) {
-      // Ensure the edge InputSplits are ready for processing
-      ensureInputSplitsReady(edgeInputSplitsPaths, edgeInputSplitsEvents);
       getContext().progress();
       try {
         vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges());
@@ -666,17 +555,7 @@ else[HADOOP_NON_SECURE]*/
       LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
     }
 
-    if (getConfiguration().hasVertexInputFormat()) {
-      // Workers wait for each other to finish, coordinated by master
-      markCurrentWorkerDoneThenWaitForOthers(vertexInputSplitsPaths,
-          vertexInputSplitsEvents);
-    }
-
-    if (getConfiguration().hasEdgeInputFormat()) {
-      // Workers wait for each other to finish, coordinated by master
-      markCurrentWorkerDoneThenWaitForOthers(edgeInputSplitsPaths,
-          edgeInputSplitsEvents);
-    }
+    markCurrentWorkerDoneReadingThenWaitForOthers();
 
     // Create remaining partitions owned by this worker.
     for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
@@ -898,13 +777,6 @@ else[HADOOP_NON_SECURE]*/
 
     if (getSuperstep() != INPUT_SUPERSTEP) {
       postSuperstepCallbacks();
-    } else {
-      if (getConfiguration().hasVertexInputFormat()) {
-        vertexSplitsHandler.setDoneReadingGraph(true);
-      }
-      if (getConfiguration().hasEdgeInputFormat()) {
-        edgeSplitsHandler.setDoneReadingGraph(true);
-      }
     }
 
     globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor);
@@ -1692,7 +1564,7 @@ else[HADOOP_NON_SECURE]*/
       workerClient.setup(getConfiguration().authenticate());
 /*end[HADOOP_NON_SECURE]*/
       return new VertexEdgeCount(globalStats.getVertexCount(),
-          globalStats.getEdgeCount());
+          globalStats.getEdgeCount(), 0);
 
     } catch (IOException e) {
       throw new RuntimeException(
@@ -1963,4 +1835,9 @@ else[HADOOP_NON_SECURE]*/
     }
     return count;
   }
+
+  @Override
+  public WorkerInputSplitsHandler getInputSplitsHandler() {
+    return inputSplitsHandler;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 89f74b3..b7f1eb6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -26,9 +26,9 @@ import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
 import org.apache.giraph.io.filters.EdgeInputFilter;
+import org.apache.giraph.io.InputType;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -89,17 +89,14 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
    * @param configuration Configuration
    * @param bspServiceWorker service worker
    * @param splitsHandler Handler for input splits
-   * @param zooKeeperExt Handle to ZooKeeperExt
    */
   public EdgeInputSplitsCallable(
       EdgeInputFormat<I, E> edgeInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
       BspServiceWorker<I, V, E> bspServiceWorker,
-      InputSplitsHandler splitsHandler,
-      ZooKeeperExt zooKeeperExt)  {
-    super(context, configuration, bspServiceWorker, splitsHandler,
-        zooKeeperExt);
+      WorkerInputSplitsHandler splitsHandler)  {
+    super(context, configuration, bspServiceWorker, splitsHandler);
     this.edgeInputFormat = edgeInputFormat;
 
     this.bspServiceWorker = bspServiceWorker;
@@ -126,6 +123,11 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
     return edgeInputFormat;
   }
 
+  @Override
+  public InputType getInputType() {
+    return InputType.EDGE;
+  }
+
   /**
    * Read edges from input split.  If testing, the user may request a
    * maximum number of edges to be read from an input split.
@@ -226,6 +228,6 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
         inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD);
     WorkerProgress.get().incrementEdgeInputSplitsLoaded();
 
-    return new VertexEdgeCount(0, inputSplitEdgesLoaded);
+    return new VertexEdgeCount(0, inputSplitEdgesLoaded, 0);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
index f68ac93..d4bc1fc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
@@ -22,7 +22,6 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -46,9 +45,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
   /** {@link BspServiceWorker} we're running on. */
   private final BspServiceWorker<I, V, E> bspServiceWorker;
   /** Handler for input splits */
-  private final InputSplitsHandler splitsHandler;
-  /** {@link ZooKeeperExt} for this worker. */
-  private final ZooKeeperExt zooKeeperExt;
+  private final WorkerInputSplitsHandler splitsHandler;
 
   /**
    * Constructor.
@@ -58,20 +55,17 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
    * @param configuration Configuration
    * @param bspServiceWorker Calling {@link BspServiceWorker}
    * @param splitsHandler Handler for input splits
-   * @param zooKeeperExt {@link ZooKeeperExt} for this worker
    */
   public EdgeInputSplitsCallableFactory(
       EdgeInputFormat<I, E> edgeInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
       BspServiceWorker<I, V, E> bspServiceWorker,
-      InputSplitsHandler splitsHandler,
-      ZooKeeperExt zooKeeperExt) {
+      WorkerInputSplitsHandler splitsHandler) {
     this.edgeInputFormat = edgeInputFormat;
     this.context = context;
     this.configuration = configuration;
     this.bspServiceWorker = bspServiceWorker;
-    this.zooKeeperExt = zooKeeperExt;
     this.splitsHandler = splitsHandler;
   }
 
@@ -82,7 +76,6 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
         context,
         configuration,
         bspServiceWorker,
-        splitsHandler,
-        zooKeeperExt);
+        splitsHandler);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
deleted file mode 100644
index 4e93ce0..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.worker;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.io.GiraphInputFormat;
-import org.apache.giraph.time.SystemTime;
-import org.apache.giraph.time.Time;
-import org.apache.giraph.time.Times;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * InputSplitCallable to read all the splits
- *
- * @param <I> vertexId type
- * @param <V> vertexValue type
- * @param <E> edgeValue type
- */
-public abstract class FullInputSplitCallable<I extends WritableComparable,
-  V extends Writable, E extends Writable>
-  implements Callable<Integer> {
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(
-    FullInputSplitCallable.class);
-  /** Class time object */
-  private static final Time TIME = SystemTime.get();
-  /** Configuration */
-  protected final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
-  /** Context */
-  protected final Mapper<?, ?, ?, ?>.Context context;
-
-  /** The List of InputSplit znode paths */
-  private final List<String> pathList;
-  /** Current position in the path list */
-  private final AtomicInteger currentIndex;
-  /** ZooKeeperExt handle */
-  private final ZooKeeperExt zooKeeperExt;
-  /** Get the start time in nanos */
-  private final long startNanos = TIME.getNanoseconds();
-
-  // CHECKSTYLE: stop ParameterNumberCheck
-  /**
-   * Constructor.
-
-   * @param splitOrganizer Input splits organizer
-   * @param context Context
-   * @param configuration Configuration
-   * @param zooKeeperExt Handle to ZooKeeperExt
-   * @param currentIndex Atomic Integer to get splitPath from list
-   */
-  public FullInputSplitCallable(InputSplitPathOrganizer splitOrganizer,
-      Mapper<?, ?, ?, ?>.Context context,
-      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
-      ZooKeeperExt zooKeeperExt,
-      AtomicInteger currentIndex) {
-    this.pathList = Lists.newArrayList(splitOrganizer.getPathList());
-    this.currentIndex = currentIndex;
-    this.zooKeeperExt = zooKeeperExt;
-    this.context = context;
-    this.configuration = configuration;
-  }
-  // CHECKSTYLE: resume ParameterNumberCheck
-
-  /**
-   * Get input format
-   *
-   * @return Input format
-   */
-  public abstract GiraphInputFormat getInputFormat();
-
-  /**
-   * Load mapping entries from all the given input splits
-   *
-   * @param inputSplit Input split to load
-   * @return Count of vertices and edges loaded
-   * @throws java.io.IOException
-   * @throws InterruptedException
-   */
-  protected abstract Integer readInputSplit(InputSplit inputSplit)
-    throws IOException, InterruptedException;
-
-  @Override
-  public Integer call() {
-    int entries = 0;
-    String inputSplitPath;
-    int inputSplitsProcessed = 0;
-    try {
-      while (true) {
-        int pos = currentIndex.getAndIncrement();
-        if (pos >= pathList.size()) {
-          break;
-        }
-        inputSplitPath = pathList.get(pos);
-        entries += loadInputSplit(inputSplitPath);
-        context.progress();
-        ++inputSplitsProcessed;
-      }
-    } catch (InterruptedException e) {
-      throw new IllegalStateException("call: InterruptedException", e);
-    } catch (IOException e) {
-      throw new IllegalStateException("call: IOException", e);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalStateException("call: ClassNotFoundException", e);
-    } catch (InstantiationException e) {
-      throw new IllegalStateException("call: InstantiationException", e);
-    } catch (IllegalAccessException e) {
-      throw new IllegalStateException("call: IllegalAccessException", e);
-    }
-
-    if (LOG.isInfoEnabled()) {
-      float seconds = Times.getNanosSince(TIME, startNanos) /
-          Time.NS_PER_SECOND_AS_FLOAT;
-      float entriesPerSecond = entries / seconds;
-      LOG.info("call: Loaded " + inputSplitsProcessed + " " +
-          "input splits in " + seconds + " secs, " + entries +
-          " " + entriesPerSecond + " entries/sec");
-    }
-    return entries;
-  }
-
-  /**
-   * Extract entries from input split, saving them into mapping store.
-   * Mark the input split finished when done.
-   *
-   * @param inputSplitPath ZK location of input split
-   * @return Number of entries read in this input split
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws InterruptedException
-   * @throws InstantiationException
-   * @throws IllegalAccessException
-   */
-  private Integer loadInputSplit(
-    String inputSplitPath)
-    throws IOException, ClassNotFoundException, InterruptedException,
-    InstantiationException, IllegalAccessException {
-    InputSplit inputSplit = getInputSplit(inputSplitPath);
-    Integer entriesRead = readInputSplit(inputSplit);
-    if (LOG.isInfoEnabled()) {
-      LOG.info("loadFromInputSplit: Finished loading " +
-          inputSplitPath + " " + entriesRead);
-    }
-    return entriesRead;
-  }
-
-  /**
-   * Talk to ZooKeeper to convert the input split path to the actual
-   * InputSplit.
-   *
-   * @param inputSplitPath Location in ZK of input split
-   * @return instance of InputSplit
-   * @throws IOException
-   * @throws ClassNotFoundException
-   */
-  protected InputSplit getInputSplit(String inputSplitPath)
-    throws IOException, ClassNotFoundException {
-    byte[] splitList;
-    try {
-      splitList = zooKeeperExt.getData(inputSplitPath, false, null);
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "getInputSplit: KeeperException on " + inputSplitPath, e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "getInputSplit: IllegalStateException on " + inputSplitPath, e);
-    }
-    context.progress();
-
-    DataInputStream inputStream =
-        new DataInputStream(new ByteArrayInputStream(splitList));
-    InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("getInputSplit: Processing " + inputSplitPath +
-          " from ZooKeeper and got input split '" +
-          inputSplit.toString() + "'");
-    }
-    return inputSplit;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
deleted file mode 100644
index 463601c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.worker;
-
-import com.google.common.collect.Lists;
-
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Text;
-import org.apache.zookeeper.KeeperException;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Utility class to extract the list of InputSplits from the
- * ZooKeeper tree of "claimable splits" the master created,
- * and to sort the list to favor local data blocks.
- *
- * This class provides an Iterator for the list the worker will
- * claim splits from, making all sorting and data-code locality
- * processing done here invisible to callers. The aim is to cut
- * down on the number of ZK reads workers perform before locating
- * an unclaimed InputSplit.
- */
-public class InputSplitPathOrganizer {
-  /** The worker's local ZooKeeperExt ref */
-  private final ZooKeeperExt zooKeeper;
-  /** The List of InputSplit znode paths */
-  private final List<String> pathList;
-  /** The worker's hostname */
-  private final String hostName;
-
-  /**
-   * Constructor
-   *
-   * @param zooKeeper the worker's ZkExt
-   * @param zkPathList the path to read from
-   * @param hostName the worker's host name (for matching)
-   * @param useLocality whether to prioritize local input splits
-   */
-  public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper,
-    final String zkPathList, final String hostName,
-    final boolean useLocality) throws KeeperException, InterruptedException {
-    this(zooKeeper, zooKeeper.getChildrenExt(zkPathList, false, false, true),
-        hostName, useLocality);
-  }
-
-  /**
-   * Constructor
-   *
-   * @param zooKeeper the worker's ZkExt
-   * @param inputSplitPathList path of input splits to read from
-   * @param hostName the worker's host name (for matching)
-   * @param useLocality whether to prioritize local input splits
-   */
-  public InputSplitPathOrganizer(
-      final ZooKeeperExt zooKeeper, final List<String> inputSplitPathList,
-      final String hostName, final boolean useLocality) {
-    this.zooKeeper = zooKeeper;
-    this.pathList = Lists.newArrayList(inputSplitPathList);
-    this.hostName = hostName;
-    // Shuffle input splits in case several workers exist on this host
-    Collections.shuffle(pathList);
-    if (useLocality) {
-      prioritizeLocalInputSplits();
-    }
-  }
-
-  /**
-  * Re-order list of InputSplits so files local to this worker node's
-  * disk are the first it will iterate over when attempting to claim
-  * a split to read. This will increase locality of data reads with greater
-  * probability as the % of total nodes in the cluster hosting data and workers
-  * BOTH increase towards 100%. Replication increases our chances of a "hit."
-  */
-  private void prioritizeLocalInputSplits() {
-    List<String> sortedList = new ArrayList<String>();
-    String hosts;
-    for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) {
-      final String path = iterator.next();
-      try {
-        hosts = getLocationsFromZkInputSplitData(path);
-      } catch (IOException ioe) {
-        hosts = null; // no problem, just don't sort this entry
-      } catch (KeeperException ke) {
-        hosts = null;
-      } catch (InterruptedException ie) {
-        hosts = null;
-      }
-      if (hosts != null && hosts.contains(hostName)) {
-        sortedList.add(path); // collect the local block
-        iterator.remove(); // remove local block from list
-      }
-    }
-    pathList.addAll(0, sortedList);
-  }
-
-  /**
-   * Utility for extracting locality data from an InputSplit ZNode.
-   *
-   * @param zkSplitPath the input split path to attempt to read
-   * ZNode locality data from for this InputSplit.
-   * @return a String of hostnames from ZNode data, or throws
-   */
-  private String getLocationsFromZkInputSplitData(String zkSplitPath)
-    throws IOException, KeeperException, InterruptedException {
-    byte[] locationData = zooKeeper.getData(zkSplitPath, false, null);
-    DataInputStream inputStream =
-      new DataInputStream(new ByteArrayInputStream(locationData));
-    // only read the "first" entry in the znode data, the locations
-    return Text.readString(inputStream);
-  }
-
-  /**
-   * Get the ordered input splits paths.
-   *
-   * @return Ordered input splits paths
-   */
-  public Iterable<String> getPathList() {
-    return pathList;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index 7b2fc0f..92b23bd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -28,6 +28,7 @@ import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.giraph.io.InputType;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.GiraphMetricsRegistry;
 import org.apache.giraph.metrics.MeterDesc;
@@ -35,14 +36,11 @@ import org.apache.giraph.metrics.MetricNames;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
 import org.apache.giraph.time.Times;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
 
 import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Meter;
@@ -75,9 +73,7 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
    * Stores and processes the list of InputSplits advertised
    * in a tree of child znodes by the master.
    */
-  private final InputSplitsHandler splitsHandler;
-  /** ZooKeeperExt handle */
-  private final ZooKeeperExt zooKeeperExt;
+  private final WorkerInputSplitsHandler splitsHandler;
   /** Get the start time in nanos */
   private final long startNanos = TIME.getNanoseconds();
   /** Whether to prioritize local input splits. */
@@ -91,15 +87,12 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
    * @param configuration Configuration
    * @param bspServiceWorker service worker
    * @param splitsHandler Handler for input splits
-   * @param zooKeeperExt Handle to ZooKeeperExt
    */
   public InputSplitsCallable(
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
       BspServiceWorker<I, V, E> bspServiceWorker,
-      InputSplitsHandler splitsHandler,
-      ZooKeeperExt zooKeeperExt) {
-    this.zooKeeperExt = zooKeeperExt;
+      WorkerInputSplitsHandler splitsHandler) {
     this.context = context;
     this.workerClientRequestProcessor =
         new NettyWorkerClientRequestProcessor<I, V, E>(
@@ -119,6 +112,13 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
   public abstract GiraphInputFormat getInputFormat();
 
   /**
+   * Get input type
+   *
+   * @return Input type
+   */
+  public abstract InputType getInputType();
+
+  /**
    * Get Meter tracking edges loaded
    *
    * @return Meter tracking edges loaded
@@ -205,27 +205,22 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
   @Override
   public VertexEdgeCount call() {
     VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
-    String inputSplitPath;
+    byte[] serializedInputSplit;
     int inputSplitsProcessed = 0;
     try {
-      while ((inputSplitPath = splitsHandler.reserveInputSplit()) != null) {
-        vertexEdgeCount =
-            vertexEdgeCount.incrVertexEdgeCount(loadInputSplit(inputSplitPath));
+      while ((serializedInputSplit =
+          splitsHandler.reserveInputSplit(getInputType())) != null) {
+        vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
+            loadInputSplit(serializedInputSplit));
         context.progress();
         ++inputSplitsProcessed;
       }
-    } catch (KeeperException e) {
-      throw new IllegalStateException("call: KeeperException", e);
     } catch (InterruptedException e) {
       throw new IllegalStateException("call: InterruptedException", e);
     } catch (IOException e) {
       throw new IllegalStateException("call: IOException", e);
     } catch (ClassNotFoundException e) {
       throw new IllegalStateException("call: ClassNotFoundException", e);
-    } catch (InstantiationException e) {
-      throw new IllegalStateException("call: InstantiationException", e);
-    } catch (IllegalAccessException e) {
-      throw new IllegalStateException("call: IllegalAccessException", e);
     }
 
     if (LOG.isInfoEnabled()) {
@@ -252,25 +247,19 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
    * reached in readVerticeFromInputSplit.
    * Mark the input split finished when done.
    *
-   * @param inputSplitPath ZK location of input split
+   * @param serializedInputSplit Serialized input split
    * @return Mapping of vertex indices and statistics, or null if no data read
    * @throws IOException
    * @throws ClassNotFoundException
    * @throws InterruptedException
-   * @throws InstantiationException
-   * @throws IllegalAccessException
    */
-  private VertexEdgeCount loadInputSplit(
-      String inputSplitPath)
-    throws IOException, ClassNotFoundException, InterruptedException,
-      InstantiationException, IllegalAccessException {
-    InputSplit inputSplit = getInputSplit(inputSplitPath);
+  private VertexEdgeCount loadInputSplit(byte[] serializedInputSplit)
+      throws IOException, ClassNotFoundException, InterruptedException {
+    InputSplit inputSplit = getInputSplit(serializedInputSplit);
     VertexEdgeCount vertexEdgeCount = readInputSplit(inputSplit);
     if (LOG.isInfoEnabled()) {
-      LOG.info("loadFromInputSplit: Finished loading " +
-          inputSplitPath + " " + vertexEdgeCount);
+      LOG.info("loadFromInputSplit: Finished loading " + vertexEdgeCount);
     }
-    splitsHandler.markInputSplitPathFinished(inputSplitPath);
     return vertexEdgeCount;
   }
 
@@ -278,35 +267,19 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
    * Talk to ZooKeeper to convert the input split path to the actual
    * InputSplit.
    *
-   * @param inputSplitPath Location in ZK of input split
+   * @param serializedInputSplit Serialized input split
    * @return instance of InputSplit
    * @throws IOException
    * @throws ClassNotFoundException
    */
-  protected InputSplit getInputSplit(String inputSplitPath)
-    throws IOException, ClassNotFoundException {
-    byte[] splitList;
-    try {
-      splitList = zooKeeperExt.getData(inputSplitPath, false, null);
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "getInputSplit: KeeperException on " + inputSplitPath, e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "getInputSplit: IllegalStateException on " + inputSplitPath, e);
-    }
-    context.progress();
-
+  protected InputSplit getInputSplit(byte[] serializedInputSplit)
+      throws IOException, ClassNotFoundException {
     DataInputStream inputStream =
-        new DataInputStream(new ByteArrayInputStream(splitList));
-    if (useLocality) {
-      Text.readString(inputStream); // location data unused here, skip
-    }
+        new DataInputStream(new ByteArrayInputStream(serializedInputSplit));
     InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
 
     if (LOG.isInfoEnabled()) {
-      LOG.info("getInputSplit: Reserved " + inputSplitPath +
-          " from ZooKeeper and got input split '" +
+      LOG.info("getInputSplit: Reserved input split '" +
           inputSplit.toString() + "'");
     }
     return inputSplit;

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
deleted file mode 100644
index e2099eb..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.worker;
-
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.Stat;
-
-import com.google.common.collect.Lists;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Stores the list of input split paths, and provides thread-safe way for
- * reserving input splits.
- */
-public class InputSplitsHandler implements Watcher  {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(InputSplitsHandler.class);
-
-  /** The List of InputSplit znode paths */
-  private final List<String> pathList;
-  /** Current position in the path list */
-  private final AtomicInteger currentIndex;
-  /** The worker's local ZooKeeperExt ref */
-  private final ZooKeeperExt zooKeeper;
-  /** Context for reporting progress */
-  private final Mapper<?, ?, ?, ?>.Context context;
-  /** ZooKeeper input split reserved node. */
-  private final String inputSplitReservedNode;
-  /** ZooKeeper input split finished node. */
-  private final String inputSplitFinishedNode;
-  /** Specifies if we finished execution of INPUT_SUPERSTEP. The variable may
-   * be accessed via different threads. */
-  private volatile boolean doneReadingGraph;
-
-  /**
-   * Constructor
-   *
-   * @param splitOrganizer Input splits organizer
-   * @param zooKeeper The worker's local ZooKeeperExt ref
-   * @param context Context for reporting progress
-   * @param inputSplitReservedNode ZooKeeper input split reserved node
-   * @param inputSplitFinishedNode ZooKeeper input split finished node
-   */
-  public InputSplitsHandler(InputSplitPathOrganizer splitOrganizer,
-      ZooKeeperExt zooKeeper, Mapper<?, ?, ?, ?>.Context context,
-      String inputSplitReservedNode, String inputSplitFinishedNode) {
-    this.pathList = Lists.newArrayList(splitOrganizer.getPathList());
-    this.currentIndex = new AtomicInteger(0);
-    this.zooKeeper = zooKeeper;
-    this.context = context;
-    this.inputSplitReservedNode = inputSplitReservedNode;
-    this.inputSplitFinishedNode = inputSplitFinishedNode;
-    this.doneReadingGraph = false;
-  }
-
-  public void setDoneReadingGraph(boolean doneReadingGraph) {
-    this.doneReadingGraph = doneReadingGraph;
-  }
-
-   /**
-   * Try to reserve an InputSplit for loading.  While InputSplits exists that
-   * are not finished, wait until they are.
-   *
-   * NOTE: iterations on the InputSplit list only halt for each worker when it
-   * has scanned the entire list once and found every split marked RESERVED.
-   * When a worker fails, its Ephemeral RESERVED znodes will disappear,
-   * allowing other iterating workers to claim it's previously read splits.
-   * Only when the last worker left iterating on the list fails can a danger
-   * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
-   * causes job failure, this is OK. As the failure model evolves, this
-   * behavior might need to change. We could add watches on
-   * inputSplitFinishedNodes and stop iterating only when all these nodes
-   * have been created.
-   *
-   * @return reserved InputSplit or null if no unfinished InputSplits exist
-   * @throws KeeperException
-   * @throws InterruptedException
-   */
-  public String reserveInputSplit() throws KeeperException,
-      InterruptedException {
-    String reservedInputSplitPath;
-    Stat reservedStat;
-    while (true) {
-      int splitToTry = currentIndex.getAndIncrement();
-      if (splitToTry >= pathList.size()) {
-        return null;
-      }
-      String nextSplitToClaim = pathList.get(splitToTry);
-      context.progress();
-      String tmpInputSplitReservedPath =
-          nextSplitToClaim + inputSplitReservedNode;
-      reservedStat =
-          zooKeeper.exists(tmpInputSplitReservedPath, this);
-      if (reservedStat == null) {
-        try {
-          // Attempt to reserve this InputSplit
-          zooKeeper.createExt(tmpInputSplitReservedPath,
-              null,
-              ZooDefs.Ids.OPEN_ACL_UNSAFE,
-              CreateMode.EPHEMERAL,
-              false);
-          reservedInputSplitPath = nextSplitToClaim;
-          if (LOG.isInfoEnabled()) {
-            float percentFinished =
-                splitToTry * 100.0f / pathList.size();
-            LOG.info("reserveInputSplit: Reserved input " +
-                "split path " + reservedInputSplitPath +
-                ", overall roughly " +
-                +percentFinished +
-                "% input splits reserved");
-          }
-          return reservedInputSplitPath;
-        } catch (KeeperException.NodeExistsException e) {
-          LOG.info("reserveInputSplit: Couldn't reserve " +
-              "(already reserved) inputSplit" +
-              " at " + tmpInputSplitReservedPath);
-        } catch (KeeperException e) {
-          throw new IllegalStateException(
-              "reserveInputSplit: KeeperException on reserve", e);
-        } catch (InterruptedException e) {
-          throw new IllegalStateException(
-              "reserveInputSplit: InterruptedException " +
-                  "on reserve", e);
-        }
-      }
-    }
-  }
-
-  /**
-   * Mark an input split path as completed by this worker.  This notifies
-   * the master and the other workers that this input split has not only
-   * been reserved, but also marked processed.
-   *
-   * @param inputSplitPath Path to the input split.
-   */
-  public void markInputSplitPathFinished(String inputSplitPath) {
-    String inputSplitFinishedPath =
-        inputSplitPath + inputSplitFinishedNode;
-    try {
-      zooKeeper.createExt(inputSplitFinishedPath,
-          null,
-          ZooDefs.Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT,
-          true);
-    } catch (KeeperException.NodeExistsException e) {
-      LOG.warn("markInputSplitPathFinished: " + inputSplitFinishedPath +
-          " already exists!");
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "markInputSplitPathFinished: KeeperException on " +
-              inputSplitFinishedPath, e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "markInputSplitPathFinished: InterruptedException on " +
-              inputSplitFinishedPath, e);
-    }
-  }
-
-  @Override
-  public void process(WatchedEvent event) {
-    if (event.getPath() == null) {
-      LOG.warn("process: Problem with zookeeper, got event with path null, " +
-          "state " + event.getState() + ", event type " + event.getType());
-      return;
-    }
-    // Check if the reservation for the input split was lost in INPUT_SUPERSTEP
-    // (some worker died). If INPUT_SUPERSTEP has already completed, we ignore
-    // this event.
-    if (event.getPath().endsWith(inputSplitReservedNode) &&
-        event.getType() == Watcher.Event.EventType.NodeDeleted &&
-        !doneReadingGraph) {
-      synchronized (pathList) {
-        String split = event.getPath();
-        split = split.substring(0, split.indexOf(inputSplitReservedNode));
-        pathList.add(split);
-        if (LOG.isInfoEnabled()) {
-          LOG.info("process: Input split " + split + " lost reservation");
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
index f6dca25..5ab3ba9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
@@ -19,15 +19,15 @@
 package org.apache.giraph.worker;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.io.MappingReader;
 import org.apache.giraph.mapping.MappingEntry;
 import org.apache.giraph.mapping.MappingStore;
-import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.giraph.io.InputType;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -46,7 +46,7 @@ import org.apache.hadoop.mapreduce.Mapper;
 @SuppressWarnings("unchecked")
 public class MappingInputSplitsCallable<I extends WritableComparable,
   V extends Writable, E extends Writable, B extends Writable>
-  extends FullInputSplitCallable<I, V, E> {
+  extends InputSplitsCallable<I, V, E> {
   /** User supplied mappingInputFormat */
   private final MappingInputFormat<I, V, E, B> mappingInputFormat;
   /** Link to bspServiceWorker */
@@ -56,23 +56,18 @@ public class MappingInputSplitsCallable<I extends WritableComparable,
    * Constructor
    *
    * @param mappingInputFormat mappingInputFormat
-   * @param splitOrganizer Input splits organizer
    * @param context Context
    * @param configuration Configuration
-   * @param zooKeeperExt Handle to ZooKeeperExt
-   * @param currentIndex Atomic Integer to get splitPath from list
    * @param bspServiceWorker bsp service worker
+   * @param splitsHandler Splits handler
    */
   public MappingInputSplitsCallable(
       MappingInputFormat<I, V, E, B> mappingInputFormat,
-      InputSplitPathOrganizer splitOrganizer,
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
-      ZooKeeperExt zooKeeperExt,
-      AtomicInteger currentIndex,
-      BspServiceWorker<I, V, E> bspServiceWorker) {
-    super(splitOrganizer, context,
-      configuration, zooKeeperExt, currentIndex);
+      BspServiceWorker<I, V, E> bspServiceWorker,
+      WorkerInputSplitsHandler splitsHandler) {
+    super(context, configuration, bspServiceWorker, splitsHandler);
     this.mappingInputFormat = mappingInputFormat;
     this.bspServiceWorker = bspServiceWorker;
   }
@@ -83,7 +78,12 @@ public class MappingInputSplitsCallable<I extends WritableComparable,
   }
 
   @Override
-  protected Integer readInputSplit(InputSplit inputSplit)
+  public InputType getInputType() {
+    return InputType.MAPPING;
+  }
+
+  @Override
+  protected VertexEdgeCount readInputSplit(InputSplit inputSplit)
     throws IOException, InterruptedException {
     MappingReader<I, V, E, B> mappingReader =
         mappingInputFormat.createMappingReader(inputSplit, context);
@@ -104,6 +104,6 @@ public class MappingInputSplitsCallable<I extends WritableComparable,
       entriesLoaded += 1;
       mappingStore.addEntry(entry.getVertexId(), entry.getMappingTarget());
     }
-    return entriesLoaded;
+    return new VertexEdgeCount(0, 0, entriesLoaded);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
index 21a981e..6cf702a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
@@ -19,15 +19,13 @@
 package org.apache.giraph.worker;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * Factory for {@link org.apache.giraph.worker.MappingInputSplitsCallable}s.
  *
@@ -38,59 +36,47 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class MappingInputSplitsCallableFactory<I extends WritableComparable,
   V extends Writable, E extends Writable, B extends Writable>
-  implements CallableFactory<Integer> {
+  implements CallableFactory<VertexEdgeCount> {
   /** Mapping input format */
   private final MappingInputFormat<I, V, E, B> mappingInputFormat;
-  /** Input split organizer */
-  private final InputSplitPathOrganizer splitOrganizer;
   /** Mapper context. */
   private final Mapper<?, ?, ?, ?>.Context context;
   /** Configuration. */
   private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
   /** {@link BspServiceWorker} we're running on. */
   private final BspServiceWorker<I, V, E> bspServiceWorker;
-  /** {@link ZooKeeperExt} for this worker. */
-  private final ZooKeeperExt zooKeeperExt;
-  /** Current position in the path list */
-  private final AtomicInteger currentIndex;
-
+  /** Handler for input splits */
+  private final WorkerInputSplitsHandler splitsHandler;
 
   /**
    * Constructor.
    *
    * @param mappingInputFormat Mapping input format
-   * @param splitOrganizer Input split organizer
    * @param context Mapper context
    * @param configuration Configuration
    * @param bspServiceWorker Calling {@link BspServiceWorker}
-   * @param zooKeeperExt {@link org.apache.giraph.zk.ZooKeeperExt}
-   *                     for this worker
+   * @param splitsHandler Splits handler
    */
   public MappingInputSplitsCallableFactory(
       MappingInputFormat<I, V, E, B> mappingInputFormat,
-      InputSplitPathOrganizer splitOrganizer,
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
       BspServiceWorker<I, V, E> bspServiceWorker,
-      ZooKeeperExt zooKeeperExt) {
+      WorkerInputSplitsHandler splitsHandler) {
     this.mappingInputFormat = mappingInputFormat;
-    this.splitOrganizer = splitOrganizer;
     this.context = context;
     this.configuration = configuration;
     this.bspServiceWorker = bspServiceWorker;
-    this.zooKeeperExt = zooKeeperExt;
-    this.currentIndex = new AtomicInteger(0);
+    this.splitsHandler = splitsHandler;
   }
 
   @Override
-  public FullInputSplitCallable<I, V, E> newCallable(int threadId) {
+  public InputSplitsCallable<I, V, E> newCallable(int threadId) {
     return new MappingInputSplitsCallable<>(
         mappingInputFormat,
-        splitOrganizer,
         context,
         configuration,
-        zooKeeperExt,
-        currentIndex,
-        bspServiceWorker);
+        bspServiceWorker,
+        splitsHandler);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 00a2781..540a6b4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -30,10 +30,10 @@ import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
 import org.apache.giraph.io.filters.VertexInputFilter;
 import org.apache.giraph.mapping.translate.TranslateEdge;
+import org.apache.giraph.io.InputType;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -99,17 +99,14 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
    * @param configuration Configuration
    * @param bspServiceWorker service worker
    * @param splitsHandler Handler for input splits
-   * @param zooKeeperExt Handle to ZooKeeperExt
    */
   public VertexInputSplitsCallable(
       VertexInputFormat<I, V, E> vertexInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
       BspServiceWorker<I, V, E> bspServiceWorker,
-      InputSplitsHandler splitsHandler,
-      ZooKeeperExt zooKeeperExt)  {
-    super(context, configuration, bspServiceWorker, splitsHandler,
-        zooKeeperExt);
+      WorkerInputSplitsHandler splitsHandler)  {
+    super(context, configuration, bspServiceWorker, splitsHandler);
     this.vertexInputFormat = vertexInputFormat;
 
     inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
@@ -136,6 +133,11 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
     return vertexInputFormat;
   }
 
+  @Override
+  public InputType getInputType() {
+    return InputType.VERTEX;
+  }
+
   /**
    * Read vertices from input split.  If testing, the user may request a
    * maximum number of vertices to be read from an input split.
@@ -274,7 +276,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
     WorkerProgress.get().incrementVertexInputSplitsLoaded();
 
     return new VertexEdgeCount(inputSplitVerticesLoaded,
-        inputSplitEdgesLoaded + edgesSinceLastUpdate);
+        inputSplitEdgesLoaded + edgesSinceLastUpdate, 0);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
index c9893d2..7aef3a7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
@@ -22,7 +22,6 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -46,9 +45,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
   /** {@link BspServiceWorker} we're running on. */
   private final BspServiceWorker<I, V, E> bspServiceWorker;
   /** Handler for input splits */
-  private final InputSplitsHandler splitsHandler;
-  /** {@link ZooKeeperExt} for this worker. */
-  private final ZooKeeperExt zooKeeperExt;
+  private final WorkerInputSplitsHandler splitsHandler;
 
   /**
    * Constructor.
@@ -58,20 +55,17 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
    * @param configuration Configuration
    * @param bspServiceWorker Calling {@link BspServiceWorker}
    * @param splitsHandler Handler for input splits
-   * @param zooKeeperExt {@link ZooKeeperExt} for this worker
    */
   public VertexInputSplitsCallableFactory(
       VertexInputFormat<I, V, E> vertexInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
       BspServiceWorker<I, V, E> bspServiceWorker,
-      InputSplitsHandler splitsHandler,
-      ZooKeeperExt zooKeeperExt) {
+      WorkerInputSplitsHandler splitsHandler) {
     this.vertexInputFormat = vertexInputFormat;
     this.context = context;
     this.configuration = configuration;
     this.bspServiceWorker = bspServiceWorker;
-    this.zooKeeperExt = zooKeeperExt;
     this.splitsHandler = splitsHandler;
   }
 
@@ -82,7 +76,6 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
         context,
         configuration,
         bspServiceWorker,
-        splitsHandler,
-        zooKeeperExt);
+        splitsHandler);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java
new file mode 100644
index 0000000..0dc42b3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.comm.requests.AskForInputSplitRequest;
+import org.apache.giraph.io.InputType;
+
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Requests splits from master and keeps track of them
+ */
+public class WorkerInputSplitsHandler {
+  /** Worker info of this worker */
+  private final WorkerInfo workerInfo;
+  /** Task id of master */
+  private final int masterTaskId;
+  /** Worker client, used for communication */
+  private final WorkerClient workerClient;
+  /** Map with currently available splits received from master */
+  private final Map<InputType, BlockingQueue<byte[]>> availableInputSplits;
+
+  /**
+   * Constructor
+   *
+   * @param workerInfo   Worker info of this worker
+   * @param masterTaskId Task id of master
+   * @param workerClient Worker client, used for communication
+   */
+  public WorkerInputSplitsHandler(WorkerInfo workerInfo, int masterTaskId,
+      WorkerClient workerClient) {
+    this.workerInfo = workerInfo;
+    this.masterTaskId = masterTaskId;
+    this.workerClient = workerClient;
+    availableInputSplits = new EnumMap<>(InputType.class);
+    for (InputType inputType : InputType.values()) {
+      availableInputSplits.put(
+          inputType, new LinkedBlockingQueue<byte[]>());
+    }
+  }
+
+  /**
+   * Called when an input split has been received from master, adding it to
+   * the map
+   *
+   * @param splitType            Type of split
+   * @param serializedInputSplit Split
+   */
+  public void receivedInputSplit(InputType splitType,
+      byte[] serializedInputSplit) {
+    try {
+      availableInputSplits.get(splitType).put(serializedInputSplit);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Interrupted", e);
+    }
+  }
+
+  /**
+   * Try to reserve an InputSplit for loading.  While InputSplits exists that
+   * are not finished, wait until they are.
+   *
+   * NOTE: iterations on the InputSplit list only halt for each worker when it
+   * has scanned the entire list once and found every split marked RESERVED.
+   * When a worker fails, its Ephemeral RESERVED znodes will disappear,
+   * allowing other iterating workers to claim it's previously read splits.
+   * Only when the last worker left iterating on the list fails can a danger
+   * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
+   * causes job failure, this is OK. As the failure model evolves, this
+   * behavior might need to change. We could add watches on
+   * inputSplitFinishedNodes and stop iterating only when all these nodes
+   * have been created.
+   *
+   * @param splitType Type of split
+   * @return reserved InputSplit or null if no unfinished InputSplits exist
+   */
+  public byte[] reserveInputSplit(InputType splitType) {
+    // Send request
+    workerClient.sendWritableRequest(masterTaskId,
+        new AskForInputSplitRequest(splitType, workerInfo.getTaskId()));
+    try {
+      // Wait for some split to become available
+      byte[] serializedInputSplit = availableInputSplits.get(splitType).take();
+      return serializedInputSplit.length == 0 ? null : serializedInputSplit;
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Interrupted", e);
+    }
+  }
+}


[3/3] git commit: updated refs/heads/trunk to 5b0cd0e

Posted by ma...@apache.org.
GIRAPH-1033: Remove zookeeper from input splits handling

Summary: Currently we use zookeeper for handling input splits, by having each worker checking each split, and when a lot of splits are used this becomes very slow. We should have master coordinate input splits allocation instead, making the complexity proportional to #splits instead of #workers*#splits. Master holds all the splits and worker send requests to him asking for splits when they need them.

Test Plan: Run a job with 200 machines and 200k small splits - without this change input superstep takes 30 minutes, and with it less than 2 minutes. Also verified correctness on sample job. mvn clean verify passes.

Differential Revision: https://reviews.facebook.net/D48531


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/5b0cd0e0
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/5b0cd0e0
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/5b0cd0e0

Branch: refs/heads/trunk
Commit: 5b0cd0e0a2ddbf722b6140d28474295c8376e561
Parents: 47da751
Author: Maja Kabiljo <ma...@fb.com>
Authored: Mon Oct 12 10:56:39 2015 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Mon Oct 19 10:13:43 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/giraph/bsp/BspService.java  | 334 +++----------------
 .../giraph/bsp/CentralizedServiceMaster.java    |   4 +-
 .../giraph/bsp/CentralizedServiceWorker.java    |   8 +
 .../org/apache/giraph/comm/MasterClient.java    |   9 +
 .../giraph/comm/netty/NettyMasterClient.java    |   6 +
 .../handler/MasterRequestServerHandler.java     |  22 +-
 .../comm/requests/AskForInputSplitRequest.java  |  76 +++++
 .../giraph/comm/requests/MasterRequest.java     |   6 +-
 .../requests/ReplyWithInputSplitRequest.java    |  81 +++++
 .../giraph/comm/requests/RequestType.java       |   6 +-
 .../requests/SendReducedToMasterRequest.java    |   6 +-
 .../giraph/graph/FinishedSuperstepStats.java    |   2 +-
 .../apache/giraph/graph/InputSplitEvents.java   |  85 -----
 .../apache/giraph/graph/InputSplitPaths.java    |  88 -----
 .../apache/giraph/graph/VertexEdgeCount.java    |  20 +-
 .../java/org/apache/giraph/io/InputType.java    |  31 ++
 .../apache/giraph/master/BspServiceMaster.java  | 281 +++-------------
 .../giraph/master/MasterAggregatorHandler.java  |   2 +-
 .../giraph/master/MasterGlobalCommHandler.java  |  76 +++++
 .../giraph/master/MasterGlobalCommUsage.java    |  49 +--
 .../MasterGlobalCommUsageAggregators.java       |  69 ++++
 .../input/BasicInputSplitsMasterOrganizer.java  |  46 +++
 .../input/InputSplitsMasterOrganizer.java       |  32 ++
 ...LocalityAwareInputSplitsMasterOrganizer.java | 125 +++++++
 .../MappingInputSplitsMasterOrganizer.java      |  64 ++++
 .../master/input/MasterInputSplitsHandler.java  | 140 ++++++++
 .../giraph/master/input/package-info.java       |  21 ++
 .../apache/giraph/partition/PartitionUtils.java |   2 +-
 .../apache/giraph/worker/BspServiceWorker.java  | 187 ++---------
 .../giraph/worker/EdgeInputSplitsCallable.java  |  16 +-
 .../worker/EdgeInputSplitsCallableFactory.java  |  13 +-
 .../giraph/worker/FullInputSplitCallable.java   | 210 ------------
 .../giraph/worker/InputSplitPathOrganizer.java  | 142 --------
 .../giraph/worker/InputSplitsCallable.java      |  77 ++---
 .../giraph/worker/InputSplitsHandler.java       | 208 ------------
 .../worker/MappingInputSplitsCallable.java      |  28 +-
 .../MappingInputSplitsCallableFactory.java      |  34 +-
 .../worker/VertexInputSplitsCallable.java       |  16 +-
 .../VertexInputSplitsCallableFactory.java       |  13 +-
 .../giraph/worker/WorkerInputSplitsHandler.java | 108 ++++++
 .../java/org/apache/giraph/TestBspBasic.java    |  69 ++--
 41 files changed, 1164 insertions(+), 1648 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 0a5a7ba..15e4dbe 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -21,8 +21,6 @@ package org.apache.giraph.bsp;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphTaskManager;
-import org.apache.giraph.graph.InputSplitEvents;
-import org.apache.giraph.graph.InputSplitPaths;
 import org.apache.giraph.job.JobProgressTracker;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.utils.CheckpointingUtils;
@@ -77,59 +75,13 @@ public abstract class BspService<I extends WritableComparable,
   /** Master job state znode above base dir */
   public static final String MASTER_JOB_STATE_NODE = "/_masterJobState";
 
-  /** Mapping input split directory about base dir */
-  public static final String MAPPING_INPUT_SPLIT_DIR = "/_mappingInputSplitDir";
-  /** Mapping input split done directory about base dir */
-  public static final String MAPPING_INPUT_SPLIT_DONE_DIR =
-      "/_mappingInputSplitDoneDir";
-  /** Denotes a reserved mapping input split */
-  public static final String MAPPING_INPUT_SPLIT_RESERVED_NODE =
-      "/_mappingInputSplitReserved";
-  /** Denotes a finished mapping input split */
-  public static final String MAPPING_INPUT_SPLIT_FINISHED_NODE =
-      "/_mappingInputSplitFinished";
-  /** Denotes that all the mapping input splits are are ready for consumption */
-  public static final String MAPPING_INPUT_SPLITS_ALL_READY_NODE =
-      "/_mappingInputSplitsAllReady";
-  /** Denotes that all the mapping input splits are done. */
-  public static final String MAPPING_INPUT_SPLITS_ALL_DONE_NODE =
-      "/_mappingInputSplitsAllDone";
-
-  /** Vertex input split directory about base dir */
-  public static final String VERTEX_INPUT_SPLIT_DIR = "/_vertexInputSplitDir";
-  /** Vertex input split done directory about base dir */
-  public static final String VERTEX_INPUT_SPLIT_DONE_DIR =
-      "/_vertexInputSplitDoneDir";
-  /** Denotes a reserved vertex input split */
-  public static final String VERTEX_INPUT_SPLIT_RESERVED_NODE =
-      "/_vertexInputSplitReserved";
-  /** Denotes a finished vertex input split */
-  public static final String VERTEX_INPUT_SPLIT_FINISHED_NODE =
-      "/_vertexInputSplitFinished";
-  /** Denotes that all the vertex input splits are are ready for consumption */
-  public static final String VERTEX_INPUT_SPLITS_ALL_READY_NODE =
-      "/_vertexInputSplitsAllReady";
-  /** Denotes that all the vertex input splits are done. */
-  public static final String VERTEX_INPUT_SPLITS_ALL_DONE_NODE =
-      "/_vertexInputSplitsAllDone";
-
-  /** Edge input split directory about base dir */
-  public static final String EDGE_INPUT_SPLIT_DIR = "/_edgeInputSplitDir";
-  /** Edge input split done directory about base dir */
-  public static final String EDGE_INPUT_SPLIT_DONE_DIR =
-      "/_edgeInputSplitDoneDir";
-  /** Denotes a reserved edge input split */
-  public static final String EDGE_INPUT_SPLIT_RESERVED_NODE =
-      "/_edgeInputSplitReserved";
-  /** Denotes a finished edge input split */
-  public static final String EDGE_INPUT_SPLIT_FINISHED_NODE =
-      "/_edgeInputSplitFinished";
-  /** Denotes that all the edge input splits are are ready for consumption */
-  public static final String EDGE_INPUT_SPLITS_ALL_READY_NODE =
-      "/_edgeInputSplitsAllReady";
-  /** Denotes that all the edge input splits are done. */
-  public static final String EDGE_INPUT_SPLITS_ALL_DONE_NODE =
-      "/_edgeInputSplitsAllDone";
+  /** Input splits worker done directory */
+  public static final String INPUT_SPLITS_WORKER_DONE_DIR =
+      "/_inputSplitsWorkerDoneDir";
+  /** Input splits all done node*/
+  public static final String INPUT_SPLITS_ALL_DONE_NODE =
+      "/_inputSplitsAllDone";
+
   /** Directory of attempts of this application */
   public static final String APPLICATION_ATTEMPTS_DIR =
       "/_applicationAttemptsDir";
@@ -192,18 +144,10 @@ public abstract class BspService<I extends WritableComparable,
   protected final String basePath;
   /** Path to the job state determined by the master (informative only) */
   protected final String masterJobStatePath;
-  /** ZooKeeper paths for mapping input splits. */
-  protected final InputSplitPaths mappingInputSplitsPaths;
-  /** ZooKeeper paths for vertex input splits. */
-  protected final InputSplitPaths vertexInputSplitsPaths;
-  /** ZooKeeper paths for edge input splits. */
-  protected final InputSplitPaths edgeInputSplitsPaths;
-  /** Mapping input splits events */
-  protected final InputSplitEvents mappingInputSplitsEvents;
-  /** Vertex input split events. */
-  protected final InputSplitEvents vertexInputSplitsEvents;
-  /** Edge input split events. */
-  protected final InputSplitEvents edgeInputSplitsEvents;
+  /** Input splits worker done directory */
+  protected final String inputSplitsWorkerDonePath;
+  /** Input splits all done node */
+  protected final String inputSplitsAllDonePath;
   /** Path to the application attempts) */
   protected final String applicationAttemptsPath;
   /** Path to the cleaned up notifications */
@@ -226,6 +170,10 @@ public abstract class BspService<I extends WritableComparable,
   private final BspEvent addressesAndPartitionsReadyChanged;
   /** Application attempt changed */
   private final BspEvent applicationAttemptChanged;
+  /** Input splits worker done */
+  private final BspEvent inputSplitsWorkerDoneEvent;
+  /** Input splits all done */
+  private final BspEvent inputSplitsAllDoneEvent;
   /** Superstep finished synchronization */
   private final BspEvent superstepFinished;
   /** Master election changed for any waited on attempt */
@@ -269,23 +217,20 @@ public abstract class BspService<I extends WritableComparable,
   public BspService(
       Mapper<?, ?, ?, ?>.Context context,
       GraphTaskManager<I, V, E> graphTaskManager) {
-    this.mappingInputSplitsEvents = new InputSplitEvents(context);
-    this.vertexInputSplitsEvents = new InputSplitEvents(context);
-    this.edgeInputSplitsEvents = new InputSplitEvents(context);
     this.connectedEvent = new PredicateLock(context);
     this.workerHealthRegistrationChanged = new PredicateLock(context);
     this.addressesAndPartitionsReadyChanged = new PredicateLock(context);
     this.applicationAttemptChanged = new PredicateLock(context);
+    this.inputSplitsWorkerDoneEvent = new PredicateLock(context);
+    this.inputSplitsAllDoneEvent = new PredicateLock(context);
     this.superstepFinished = new PredicateLock(context);
     this.masterElectionChildrenChanged = new PredicateLock(context);
     this.cleanedUpChildrenChanged = new PredicateLock(context);
 
     registerBspEvent(connectedEvent);
     registerBspEvent(workerHealthRegistrationChanged);
-    registerBspEvent(vertexInputSplitsEvents.getAllReadyChanged());
-    registerBspEvent(vertexInputSplitsEvents.getStateChanged());
-    registerBspEvent(edgeInputSplitsEvents.getAllReadyChanged());
-    registerBspEvent(edgeInputSplitsEvents.getStateChanged());
+    registerBspEvent(inputSplitsWorkerDoneEvent);
+    registerBspEvent(inputSplitsAllDoneEvent);
     registerBspEvent(addressesAndPartitionsReadyChanged);
     registerBspEvent(applicationAttemptChanged);
     registerBspEvent(superstepFinished);
@@ -311,16 +256,8 @@ public abstract class BspService<I extends WritableComparable,
     getContext().getCounter(GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP,
         basePath);
     masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
-    mappingInputSplitsPaths = new InputSplitPaths(basePath,
-        MAPPING_INPUT_SPLIT_DIR, MAPPING_INPUT_SPLIT_DONE_DIR,
-        MAPPING_INPUT_SPLITS_ALL_READY_NODE,
-        MAPPING_INPUT_SPLITS_ALL_DONE_NODE);
-    vertexInputSplitsPaths = new InputSplitPaths(basePath,
-        VERTEX_INPUT_SPLIT_DIR, VERTEX_INPUT_SPLIT_DONE_DIR,
-        VERTEX_INPUT_SPLITS_ALL_READY_NODE, VERTEX_INPUT_SPLITS_ALL_DONE_NODE);
-    edgeInputSplitsPaths = new InputSplitPaths(basePath,
-        EDGE_INPUT_SPLIT_DIR, EDGE_INPUT_SPLIT_DONE_DIR,
-        EDGE_INPUT_SPLITS_ALL_READY_NODE, EDGE_INPUT_SPLITS_ALL_DONE_NODE);
+    inputSplitsWorkerDonePath = basePath + INPUT_SPLITS_WORKER_DONE_DIR;
+    inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE;
     applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
     cleanedUpPath = basePath + CLEANED_UP_DIR;
 
@@ -433,24 +370,6 @@ public abstract class BspService<I extends WritableComparable,
   }
 
   /**
-   * Get the input split events for edge input.
-   *
-   * @return InputSplitEvents for edge input.
-   */
-  public InputSplitEvents getEdgeInputSplitsEvents() {
-    return edgeInputSplitsEvents;
-  }
-
-  /**
-   * Get the input split events for vertex input.
-   *
-   * @return InputSplitEvents for vertex input.
-   */
-  public InputSplitEvents getVertexInputSplitsEvents() {
-    return vertexInputSplitsEvents;
-  }
-
-  /**
    * Generate the worker information "healthy" directory path for a
    * superstep
    *
@@ -655,6 +574,14 @@ public abstract class BspService<I extends WritableComparable,
     return applicationAttemptChanged;
   }
 
+  public final BspEvent getInputSplitsWorkerDoneEvent() {
+    return inputSplitsWorkerDoneEvent;
+  }
+
+  public final BspEvent getInputSplitsAllDoneEvent() {
+    return inputSplitsAllDoneEvent;
+  }
+
   public final BspEvent getSuperstepFinishedEvent() {
     return superstepFinished;
   }
@@ -952,9 +879,20 @@ public abstract class BspService<I extends WritableComparable,
       }
       workerHealthRegistrationChanged.signal();
       eventProcessed = true;
-    } else if (processMappingEvent(event) || processVertexEvent(event) ||
-        processEdgeEvent(event)) {
-      return;
+    } else if (event.getPath().contains(INPUT_SPLITS_ALL_DONE_NODE) &&
+        event.getType() == EventType.NodeCreated) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: all input splits done");
+      }
+      inputSplitsAllDoneEvent.signal();
+      eventProcessed = true;
+    } else if (event.getPath().contains(INPUT_SPLITS_WORKER_DONE_DIR) &&
+        event.getType() == EventType.NodeChildrenChanged) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("process: worker done reading input splits");
+      }
+      inputSplitsWorkerDoneEvent.signal();
+      eventProcessed = true;
     } else if (event.getPath().contains(ADDRESSES_AND_PARTITIONS_DIR) &&
         event.getType() == EventType.NodeCreated) {
       if (LOG.isInfoEnabled()) {
@@ -1001,192 +939,6 @@ public abstract class BspService<I extends WritableComparable,
   }
 
   /**
-   * Process WatchedEvent for Mapping Inputsplits
-   *
-   * @param event watched event
-   * @return true if event processed
-   */
-  public final boolean processMappingEvent(WatchedEvent event) {
-    boolean eventProcessed = false;
-    if (event.getPath().equals(
-        mappingInputSplitsPaths.getAllReadyPath()) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: inputSplitsReadyChanged " +
-            "(input splits ready)");
-      }
-      mappingInputSplitsEvents.getAllReadyChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_RESERVED_NODE) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process: mappingInputSplitsStateChanged " +
-            "(made a reservation)");
-      }
-      mappingInputSplitsEvents.getStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_RESERVED_NODE) &&
-        (event.getType() == EventType.NodeDeleted)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: mappingInputSplitsStateChanged " +
-            "(lost a reservation)");
-      }
-      mappingInputSplitsEvents.getStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_FINISHED_NODE) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process: mappingInputSplitsStateChanged " +
-            "(finished inputsplit)");
-      }
-      mappingInputSplitsEvents.getStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_DONE_DIR) &&
-        (event.getType() == EventType.NodeChildrenChanged)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process: mappingInputSplitsDoneStateChanged " +
-            "(worker finished sending)");
-      }
-      mappingInputSplitsEvents.getDoneStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().equals(
-        mappingInputSplitsPaths.getAllDonePath()) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: mappingInputSplitsAllDoneChanged " +
-            "(all entries sent from input splits)");
-      }
-      mappingInputSplitsEvents.getAllDoneChanged().signal();
-      eventProcessed = true;
-    }
-    return eventProcessed;
-  }
-
-  /**
-   * Process WatchedEvent for Vertex Inputsplits
-   *
-   * @param event watched event
-   * @return true if event processed
-   */
-  public final boolean processVertexEvent(WatchedEvent event) {
-    boolean eventProcessed = false;
-    if (event.getPath().equals(
-        vertexInputSplitsPaths.getAllReadyPath()) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: inputSplitsReadyChanged " +
-            "(input splits ready)");
-      }
-      vertexInputSplitsEvents.getAllReadyChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_RESERVED_NODE) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process: vertexInputSplitsStateChanged " +
-            "(made a reservation)");
-      }
-      vertexInputSplitsEvents.getStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_RESERVED_NODE) &&
-        (event.getType() == EventType.NodeDeleted)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: vertexInputSplitsStateChanged " +
-            "(lost a reservation)");
-      }
-      vertexInputSplitsEvents.getStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_FINISHED_NODE) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process: vertexInputSplitsStateChanged " +
-            "(finished inputsplit)");
-      }
-      vertexInputSplitsEvents.getStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_DONE_DIR) &&
-        (event.getType() == EventType.NodeChildrenChanged)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process: vertexInputSplitsDoneStateChanged " +
-            "(worker finished sending)");
-      }
-      vertexInputSplitsEvents.getDoneStateChanged().signal();
-      eventProcessed = true;
-    }  else if (event.getPath().equals(
-        vertexInputSplitsPaths.getAllDonePath()) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: vertexInputSplitsAllDoneChanged " +
-            "(all vertices sent from input splits)");
-      }
-      vertexInputSplitsEvents.getAllDoneChanged().signal();
-      eventProcessed = true;
-    }
-    return eventProcessed;
-  }
-
-  /**
-   * Process WatchedEvent for Edge Inputsplits
-   *
-   * @param event watched event
-   * @return true if event processed
-   */
-  public final boolean processEdgeEvent(WatchedEvent event) {
-    boolean eventProcessed = false;
-    if (event.getPath().equals(
-        edgeInputSplitsPaths.getAllReadyPath()) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: edgeInputSplitsReadyChanged " +
-            "(input splits ready)");
-      }
-      edgeInputSplitsEvents.getAllReadyChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_RESERVED_NODE) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process: edgeInputSplitsStateChanged " +
-            "(made a reservation)");
-      }
-      edgeInputSplitsEvents.getStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_RESERVED_NODE) &&
-        (event.getType() == EventType.NodeDeleted)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: edgeInputSplitsStateChanged " +
-            "(lost a reservation)");
-      }
-      edgeInputSplitsEvents.getStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_FINISHED_NODE) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process: edgeInputSplitsStateChanged " +
-            "(finished inputsplit)");
-      }
-      edgeInputSplitsEvents.getStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_DONE_DIR) &&
-        (event.getType() == EventType.NodeChildrenChanged)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("process: edgeInputSplitsDoneStateChanged " +
-            "(worker finished sending)");
-      }
-      edgeInputSplitsEvents.getDoneStateChanged().signal();
-      eventProcessed = true;
-    } else if (event.getPath().equals(
-        edgeInputSplitsPaths.getAllDonePath()) &&
-        (event.getType() == EventType.NodeCreated)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: edgeInputSplitsAllDoneChanged " +
-            "(all edges sent from input splits)");
-      }
-      edgeInputSplitsEvents.getAllDoneChanged().signal();
-      eventProcessed = true;
-    }
-    return eventProcessed;
-  }
-
-  /**
    * Get the last saved superstep.
    *
    * @return Last good superstep number

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index 1e8d519..f05a79d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -22,8 +22,8 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.giraph.master.AggregatorToGlobalCommTranslation;
-import org.apache.giraph.master.MasterAggregatorHandler;
 import org.apache.giraph.master.MasterCompute;
+import org.apache.giraph.master.MasterGlobalCommHandler;
 import org.apache.giraph.master.MasterInfo;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
@@ -144,7 +144,7 @@ public interface CentralizedServiceMaster<I extends WritableComparable,
    *
    * @return Global communication handler
    */
-  MasterAggregatorHandler getGlobalCommHandler();
+  MasterGlobalCommHandler getGlobalCommHandler();
 
   /**
    * Handler for aggregators to reduce/broadcast translation

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index f6d77d0..94cd265 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -30,6 +30,7 @@ import org.apache.giraph.metrics.GiraphTimerContext;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.worker.WorkerInputSplitsHandler;
 import org.apache.giraph.worker.WorkerAggregatorHandler;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerInfo;
@@ -252,4 +253,11 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
    * @return number of partitions owned
    */
   int getNumPartitionsOwned();
+
+  /**
+   * Get input splits handler used during input
+   *
+   * @return Input splits handler
+   */
+  WorkerInputSplitsHandler getInputSplitsHandler();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
index aea93fd..244dd74 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
@@ -20,6 +20,7 @@ package org.apache.giraph.comm;
 
 import java.io.IOException;
 
+import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -54,6 +55,14 @@ public interface MasterClient {
   void flush();
 
   /**
+   * Send a request to a remote server (should be already connected)
+   *
+   * @param destTaskId Destination worker id
+   * @param request Request to send
+   */
+  void sendWritableRequest(int destTaskId, WritableRequest request);
+
+  /**
    * Closes all connections.
    */
   void closeConnections();

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
index e110782..9b348e8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
@@ -26,6 +26,7 @@ import org.apache.giraph.comm.MasterClient;
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.comm.aggregators.SendGlobalCommCache;
 import org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest;
+import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
@@ -117,6 +118,11 @@ public class NettyMasterClient implements MasterClient {
   }
 
   @Override
+  public void sendWritableRequest(int destTaskId, WritableRequest request) {
+    nettyClient.sendWritableRequest(destTaskId, request);
+  }
+
+  @Override
   public void closeConnections() {
     nettyClient.stop();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
index 02c72f7..9aa88ae 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
@@ -21,13 +21,13 @@ package org.apache.giraph.comm.netty.handler;
 import org.apache.giraph.comm.requests.MasterRequest;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.TaskInfo;
-import org.apache.giraph.master.MasterAggregatorHandler;
+import org.apache.giraph.master.MasterGlobalCommHandler;
 
 /** Handler for requests on master */
 public class MasterRequestServerHandler extends
     RequestServerHandler<MasterRequest> {
   /** Aggregator handler */
-  private final MasterAggregatorHandler aggregatorHandler;
+  private final MasterGlobalCommHandler commHandler;
 
   /**
    * Constructor
@@ -35,22 +35,22 @@ public class MasterRequestServerHandler extends
    * @param workerRequestReservedMap Worker request reservation map
    * @param conf                     Configuration
    * @param myTaskInfo               Current task info
-   * @param aggregatorHandler        Master aggregator handler
+   * @param commHandler              Master communication handler
    * @param exceptionHandler         Handles uncaught exceptions
    */
   public MasterRequestServerHandler(
       WorkerRequestReservedMap workerRequestReservedMap,
       ImmutableClassesGiraphConfiguration conf,
       TaskInfo myTaskInfo,
-      MasterAggregatorHandler aggregatorHandler,
+      MasterGlobalCommHandler commHandler,
       Thread.UncaughtExceptionHandler exceptionHandler) {
     super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
-    this.aggregatorHandler = aggregatorHandler;
+    this.commHandler = commHandler;
   }
 
   @Override
   public void processRequest(MasterRequest request) {
-    request.doRequest(aggregatorHandler);
+    request.doRequest(commHandler);
   }
 
   /**
@@ -58,15 +58,15 @@ public class MasterRequestServerHandler extends
    */
   public static class Factory implements RequestServerHandler.Factory {
     /** Master aggregator handler */
-    private final MasterAggregatorHandler aggregatorHandler;
+    private final MasterGlobalCommHandler commHandler;
 
     /**
      * Constructor
      *
-     * @param aggregatorHandler Master aggregator handler
+     * @param commHandler Master global communication handler
      */
-    public Factory(MasterAggregatorHandler aggregatorHandler) {
-      this.aggregatorHandler = aggregatorHandler;
+    public Factory(MasterGlobalCommHandler commHandler) {
+      this.commHandler = commHandler;
     }
 
     @Override
@@ -76,7 +76,7 @@ public class MasterRequestServerHandler extends
         TaskInfo myTaskInfo,
         Thread.UncaughtExceptionHandler exceptionHandler) {
       return new MasterRequestServerHandler(workerRequestReservedMap, conf,
-          myTaskInfo, aggregatorHandler, exceptionHandler);
+          myTaskInfo, commHandler, exceptionHandler);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java
new file mode 100644
index 0000000..5d9e4e6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.master.MasterGlobalCommHandler;
+import org.apache.giraph.io.InputType;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A request which workers will send to master to ask it to give them splits
+ */
+public class AskForInputSplitRequest extends WritableRequest
+    implements MasterRequest {
+  /** Type of split we are requesting */
+  private InputType splitType;
+  /** Task id of worker which requested the split */
+  private int workerTaskId;
+
+  /**
+   * Constructor
+   *
+   * @param splitType Type of split we are requesting
+   * @param workerTaskId Task id of worker which requested the split
+   */
+  public AskForInputSplitRequest(InputType splitType, int workerTaskId) {
+    this.splitType = splitType;
+    this.workerTaskId = workerTaskId;
+  }
+
+  /**
+   * Constructor used for reflection only
+   */
+  public AskForInputSplitRequest() {
+  }
+
+  @Override
+  public void doRequest(MasterGlobalCommHandler commHandler) {
+    commHandler.getInputSplitsHandler().sendSplitTo(splitType, workerTaskId);
+  }
+
+  @Override
+  void readFieldsRequest(DataInput in) throws IOException {
+    splitType = InputType.values()[in.readInt()];
+    workerTaskId = in.readInt();
+  }
+
+  @Override
+  void writeRequest(DataOutput out) throws IOException {
+    out.writeInt(splitType.ordinal());
+    out.writeInt(workerTaskId);
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.ASK_FOR_INPUT_SPLIT_REQUEST;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
index 7fedcc5..43632b0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.comm.requests;
 
-import org.apache.giraph.master.MasterAggregatorHandler;
+import org.apache.giraph.master.MasterGlobalCommHandler;
 
 /**
  * Interface for requests sent to master to extend
@@ -27,7 +27,7 @@ public interface MasterRequest {
   /**
    * Execute the request
    *
-   * @param aggregatorHandler Master aggregator handler
+   * @param commHandler Master communication handler
    */
-  void doRequest(MasterAggregatorHandler aggregatorHandler);
+  void doRequest(MasterGlobalCommHandler commHandler);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java
new file mode 100644
index 0000000..6b50562
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ReplyWithInputSplitRequest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.io.InputType;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A request which master will send to workers to give them splits
+ */
+public class ReplyWithInputSplitRequest extends WritableRequest
+    implements WorkerRequest {
+  /** Type of input split */
+  private InputType splitType;
+  /** Serialized input split */
+  private byte[] serializedInputSplit;
+
+  /**
+   * Constructor
+   *
+   * @param splitType Type of input split
+   * @param serializedInputSplit Serialized input split
+   */
+  public ReplyWithInputSplitRequest(InputType splitType,
+      byte[] serializedInputSplit) {
+    this.splitType = splitType;
+    this.serializedInputSplit = serializedInputSplit;
+  }
+
+  /**
+   * Constructor used for reflection only
+   */
+  public ReplyWithInputSplitRequest() {
+  }
+
+  @Override
+  void readFieldsRequest(DataInput in) throws IOException {
+    splitType = InputType.values()[in.readInt()];
+    int size = in.readInt();
+    serializedInputSplit = new byte[size];
+    in.readFully(serializedInputSplit);
+  }
+
+  @Override
+  void writeRequest(DataOutput out) throws IOException {
+    out.writeInt(splitType.ordinal());
+    out.writeInt(serializedInputSplit.length);
+    out.write(serializedInputSplit);
+  }
+
+  @Override
+  public void doRequest(ServerData serverData) {
+    serverData.getServiceWorker().getInputSplitsHandler().receivedInputSplit(
+        splitType, serializedInputSplit);
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.REPLY_WITH_INPUT_SPLIT_REQUEST;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
index 343a2de..bebac28 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
@@ -60,7 +60,11 @@ else[HADOOP_NON_SECURE]*/
   /** Send aggregators from worker owner to other workers */
   SEND_AGGREGATORS_TO_WORKER_REQUEST(SendAggregatorsToWorkerRequest.class),
   /** Send message from worker to worker */
-  SEND_WORKER_TO_WORKER_MESSAGE_REQUEST(SendWorkerToWorkerMessageRequest.class);
+  SEND_WORKER_TO_WORKER_MESSAGE_REQUEST(SendWorkerToWorkerMessageRequest.class),
+  /** Send request for input split from worker to master */
+  ASK_FOR_INPUT_SPLIT_REQUEST(AskForInputSplitRequest.class),
+  /** Send request with granted input split from master to workers */
+  REPLY_WITH_INPUT_SPLIT_REQUEST(ReplyWithInputSplitRequest.class);
 
   /** Class of request which this type corresponds to */
   private final Class<? extends WritableRequest> requestClass;

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
index 7171f04..3a1bd64 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
@@ -20,7 +20,7 @@ package org.apache.giraph.comm.requests;
 
 import java.io.IOException;
 
-import org.apache.giraph.master.MasterAggregatorHandler;
+import org.apache.giraph.master.MasterGlobalCommHandler;
 
 /**
  * Request to send final aggregated values from worker which owns
@@ -45,9 +45,9 @@ public class SendReducedToMasterRequest extends ByteArrayRequest
   }
 
   @Override
-  public void doRequest(MasterAggregatorHandler aggregatorHandler) {
+  public void doRequest(MasterGlobalCommHandler commHandler) {
     try {
-      aggregatorHandler.acceptReducedValues(getDataInput());
+      commHandler.getAggregatorHandler().acceptReducedValues(getDataInput());
     } catch (IOException e) {
       throw new IllegalStateException("doRequest: " +
           "IOException occurred while processing request", e);

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
index cfb9799..c53b34f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
@@ -51,7 +51,7 @@ public class FinishedSuperstepStats extends VertexEdgeCount {
                                 long numEdges,
                                 boolean mustLoadCheckpoint,
                                 CheckpointStatus checkpointStatus) {
-    super(numVertices, numEdges);
+    super(numVertices, numEdges, 0);
     this.localVertexCount = numLocalVertices;
     this.allVerticesHalted = allVerticesHalted;
     this.mustLoadCheckpoint = mustLoadCheckpoint;

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
deleted file mode 100644
index 23be1c4..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.zk.BspEvent;
-import org.apache.giraph.zk.PredicateLock;
-import org.apache.hadoop.util.Progressable;
-
-/**
- * Simple container of input split events.
- */
-public class InputSplitEvents {
-  /** Input splits are ready for consumption by workers */
-  private final BspEvent allReadyChanged;
-  /** Input split reservation or finished notification and synchronization */
-  private final BspEvent stateChanged;
-  /** Input splits are done being processed by workers */
-  private final BspEvent allDoneChanged;
-  /** Input split done by a worker finished notification and synchronization */
-  private final BspEvent doneStateChanged;
-
-  /**
-   * Constructor.
-   *
-   * @param progressable {@link Progressable} to report progress
-   */
-  public InputSplitEvents(Progressable progressable) {
-    allReadyChanged = new PredicateLock(progressable);
-    stateChanged = new PredicateLock(progressable);
-    allDoneChanged = new PredicateLock(progressable);
-    doneStateChanged = new PredicateLock(progressable);
-  }
-
-  /**
-   * Get event for input splits all ready
-   *
-   * @return {@link BspEvent} for input splits all ready
-   */
-  public BspEvent getAllReadyChanged() {
-    return allReadyChanged;
-  }
-
-  /**
-   * Get event for input splits state
-   *
-   * @return {@link BspEvent} for input splits state
-   */
-  public BspEvent getStateChanged() {
-    return stateChanged;
-  }
-
-  /**
-   * Get event for input splits all done
-   *
-   * @return {@link BspEvent} for input splits all done
-   */
-  public BspEvent getAllDoneChanged() {
-    return allDoneChanged;
-  }
-
-  /**
-   * Get event for input split done
-   *
-   * @return {@link BspEvent} for input split done
-   */
-  public BspEvent getDoneStateChanged() {
-    return doneStateChanged;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
deleted file mode 100644
index 4cf005e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-/**
- * Simple container of input split paths for coordination via ZooKeeper.
- */
-public class InputSplitPaths {
-  /** Path to the input splits written by the master */
-  private final String path;
-  /** Path to the input splits all ready to be processed by workers */
-  private final String allReadyPath;
-  /** Path to the input splits done */
-  private final String donePath;
-  /** Path to the input splits all done to notify the workers to proceed */
-  private final String allDonePath;
-
-  /**
-   * Constructor.
-   *
-   * @param basePath Base path
-   * @param dir Input splits path
-   * @param doneDir Input split done path
-   * @param allReadyNode Input splits all ready path
-   * @param allDoneNode Input splits all done path
-   */
-  public InputSplitPaths(String basePath,
-                         String dir,
-                         String doneDir,
-                         String allReadyNode,
-                         String allDoneNode) {
-    path = basePath + dir;
-    allReadyPath = basePath + allReadyNode;
-    donePath = basePath + doneDir;
-    allDonePath = basePath + allDoneNode;
-  }
-
-  /**
-   * Get path to the input splits.
-   *
-   * @return Path to input splits
-   */
-  public String getPath() {
-    return path;
-  }
-
-  /**
-   * Get path to the input splits all ready.
-   *
-   * @return Path to input splits all ready
-   */
-  public String getAllReadyPath() {
-    return allReadyPath;
-  }
-
-  /** Get path to the input splits done.
-   *
-   * @return Path to input splits done
-   */
-  public String getDonePath() {
-    return donePath;
-  }
-
-  /**
-   * Get path to the input splits all done.
-   *
-   * @return Path to input splits all done
-   */
-  public String getAllDonePath() {
-    return allDonePath;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
index c2d13cc..1c871f0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
@@ -26,6 +26,8 @@ public class VertexEdgeCount {
   private final long vertexCount;
   /** Immutable edges */
   private final long edgeCount;
+  /** Immutable mappings */
+  private final long mappingCount;
 
   /**
    * Default constructor.
@@ -33,6 +35,7 @@ public class VertexEdgeCount {
   public VertexEdgeCount() {
     vertexCount = 0;
     edgeCount = 0;
+    mappingCount = 0;
   }
 
   /**
@@ -40,10 +43,12 @@ public class VertexEdgeCount {
    *
    * @param vertexCount Final number of vertices.
    * @param edgeCount Final number of edges.
+   * @param mappingCount Final number of mappings.
    */
-  public VertexEdgeCount(long vertexCount, long edgeCount) {
+  public VertexEdgeCount(long vertexCount, long edgeCount, long mappingCount) {
     this.vertexCount = vertexCount;
     this.edgeCount = edgeCount;
+    this.mappingCount = mappingCount;
   }
 
   public long getVertexCount() {
@@ -54,6 +59,10 @@ public class VertexEdgeCount {
     return edgeCount;
   }
 
+  public long getMappingCount() {
+    return mappingCount;
+  }
+
   /**
    * Increment the both the vertex edge count with a {@link VertexEdgeCount}.
    *
@@ -64,7 +73,8 @@ public class VertexEdgeCount {
       VertexEdgeCount vertexEdgeCount) {
     return new VertexEdgeCount(
         vertexCount + vertexEdgeCount.getVertexCount(),
-        edgeCount + vertexEdgeCount.getEdgeCount());
+        edgeCount + vertexEdgeCount.getEdgeCount(),
+        mappingCount + vertexEdgeCount.getMappingCount());
   }
 
   /**
@@ -78,11 +88,13 @@ public class VertexEdgeCount {
       long vertexCount, long edgeCount) {
     return new VertexEdgeCount(
         this.vertexCount + vertexCount,
-        this.edgeCount + edgeCount);
+        this.edgeCount + edgeCount,
+        this.mappingCount + mappingCount);
   }
 
   @Override
   public String toString() {
-    return "(v=" + getVertexCount() + ", e=" + getEdgeCount() + ")";
+    return "(v=" + getVertexCount() + ", e=" + getEdgeCount() +
+        (mappingCount > 0 ? ", m=" + mappingCount : "") + ")";
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/io/InputType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/InputType.java b/giraph-core/src/main/java/org/apache/giraph/io/InputType.java
new file mode 100644
index 0000000..26ee966
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/InputType.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+/**
+ * Type of input
+ */
+public enum InputType {
+  /** Vertex input */
+  VERTEX,
+  /** Edge input */
+  EDGE,
+  /** Mapping input */
+  MAPPING
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 0b56a4f..0e7bb9d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -21,12 +21,8 @@ package org.apache.giraph.master;
 import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
 import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
 import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
-import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.charset.Charset;
@@ -38,9 +34,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import net.iharder.Base64;
@@ -66,12 +59,12 @@ import org.apache.giraph.graph.GlobalStats;
 import org.apache.giraph.graph.GraphFunctions;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.GraphTaskManager;
-import org.apache.giraph.graph.InputSplitEvents;
-import org.apache.giraph.graph.InputSplitPaths;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.InputType;
+import org.apache.giraph.master.input.MasterInputSplitsHandler;
 import org.apache.giraph.metrics.AggregatedMetrics;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.GiraphTimer;
@@ -88,8 +81,6 @@ import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
 import org.apache.giraph.utils.CheckpointingUtils;
 import org.apache.giraph.utils.JMapHistoDumper;
-import org.apache.giraph.utils.LogStacktraceCallable;
-import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.utils.ReactiveJMapHistoDumper;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.WritableUtils;
@@ -99,7 +90,6 @@ import org.apache.giraph.zk.PredicateLock;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobID;
@@ -170,7 +160,7 @@ public class BspServiceMaster<I extends WritableComparable,
   private final List<PartitionStats> allPartitionStatsList =
       new ArrayList<PartitionStats>();
   /** Handler for global communication */
-  private MasterAggregatorHandler globalCommHandler;
+  private MasterGlobalCommHandler globalCommHandler;
   /** Handler for aggregators to reduce/broadcast translation */
   private AggregatorToGlobalCommTranslation aggregatorTranslation;
   /** Master class */
@@ -331,7 +321,7 @@ public class BspServiceMaster<I extends WritableComparable,
    */
   private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat,
                                                int minSplitCountHint,
-                                               String inputSplitType) {
+                                               InputType inputSplitType) {
     String logPrefix = "generate" + inputSplitType + "InputSplits";
     List<InputSplit> splits;
     try {
@@ -604,46 +594,25 @@ public class BspServiceMaster<I extends WritableComparable,
    * Common method for creating vertex/edge input splits.
    *
    * @param inputFormat The vertex/edge input format
-   * @param inputSplitPaths ZooKeeper input split paths
    * @param inputSplitType Type of input split (for logging purposes)
    * @return Number of splits. Returns -1 on failure to create
    *         valid input splits.
    */
   private int createInputSplits(GiraphInputFormat inputFormat,
-                                InputSplitPaths inputSplitPaths,
-                                String inputSplitType) {
+                                InputType inputSplitType) {
     ImmutableClassesGiraphConfiguration conf = getConfiguration();
     String logPrefix = "create" + inputSplitType + "InputSplits";
     // Only the 'master' should be doing this.  Wait until the number of
     // processes that have reported health exceeds the minimum percentage.
     // If the minimum percentage is not met, fail the job.  Otherwise
     // generate the input splits
-    String inputSplitsPath = inputSplitPaths.getPath();
-    try {
-      if (getZkExt().exists(inputSplitsPath, false) != null) {
-        LOG.info(inputSplitsPath + " already exists, no need to create");
-        return Integer.parseInt(
-            new String(getZkExt().getData(inputSplitsPath, false, null),
-                Charset.defaultCharset()));
-      }
-    } catch (KeeperException.NoNodeException e) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info(logPrefix + ": Need to create the input splits at " +
-            inputSplitsPath);
-      }
-    } catch (KeeperException e) {
-      throw new IllegalStateException(logPrefix + ": KeeperException", e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(logPrefix + ": InterruptedException", e);
-    }
-
-    // When creating znodes, in case the master has already run, resume
-    // where it left off.
     List<WorkerInfo> healthyWorkerInfoList = checkWorkers();
     if (healthyWorkerInfoList == null) {
       setJobStateFailed("Not enough healthy workers to create input splits");
       return -1;
     }
+    globalCommHandler.getInputSplitsHandler().initialize(masterClient,
+        healthyWorkerInfoList);
 
     // Create at least as many splits as the total number of input threads.
     int minSplitCountHint = healthyWorkerInfoList.size() *
@@ -671,54 +640,8 @@ public class BspServiceMaster<I extends WritableComparable,
           "some threads will be not used");
     }
 
-    // Write input splits to zookeeper in parallel
-    int inputSplitThreadCount = conf.getInt(NUM_MASTER_ZK_INPUT_SPLIT_THREADS,
-        DEFAULT_INPUT_SPLIT_THREAD_COUNT);
-    if (LOG.isInfoEnabled()) {
-      LOG.info(logPrefix + ": Starting to write input split data " +
-          "to zookeeper with " + inputSplitThreadCount + " threads");
-    }
-    try {
-      getZkExt().createExt(inputSplitsPath, null,
-          Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT,
-          false);
-    } catch (KeeperException e) {
-      LOG.info(logPrefix + ": Node " +
-          inputSplitsPath + " keeper exception " + e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(logPrefix + ' ' + e.getMessage(), e);
-    }
-    ExecutorService taskExecutor =
-        Executors.newFixedThreadPool(inputSplitThreadCount);
-    boolean writeLocations = USE_INPUT_SPLIT_LOCALITY.get(conf);
-    for (int i = 0; i < splitList.size(); ++i) {
-      InputSplit inputSplit = splitList.get(i);
-      taskExecutor.submit(new LogStacktraceCallable<Void>(
-          new WriteInputSplit(inputFormat, inputSplit, inputSplitsPath, i,
-              writeLocations)));
-    }
-    taskExecutor.shutdown();
-    ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext());
-    if (LOG.isInfoEnabled()) {
-      LOG.info(logPrefix + ": Done writing input split data to zookeeper");
-    }
-
-    // Let workers know they can start trying to load the input splits
-    try {
-      getZkExt().createExt(inputSplitPaths.getAllReadyPath(),
-          null,
-          Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT,
-          false);
-    } catch (KeeperException.NodeExistsException e) {
-      LOG.info(logPrefix + ": Node " +
-          inputSplitPaths.getAllReadyPath() + " already exists.");
-    } catch (KeeperException e) {
-      throw new IllegalStateException(logPrefix + ": KeeperException", e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(logPrefix + ": IllegalStateException", e);
-    }
+    globalCommHandler.getInputSplitsHandler().addSplits(inputSplitType,
+        splitList, inputFormat);
 
     return splitList.size();
   }
@@ -730,8 +653,7 @@ public class BspServiceMaster<I extends WritableComparable,
     }
     MappingInputFormat<I, V, E, ? extends Writable> mappingInputFormat =
       getConfiguration().createWrappedMappingInputFormat();
-    return createInputSplits(mappingInputFormat, mappingInputSplitsPaths,
-      "Mapping");
+    return createInputSplits(mappingInputFormat, InputType.MAPPING);
   }
 
   @Override
@@ -742,8 +664,7 @@ public class BspServiceMaster<I extends WritableComparable,
     }
     VertexInputFormat<I, V, E> vertexInputFormat =
         getConfiguration().createWrappedVertexInputFormat();
-    return createInputSplits(vertexInputFormat, vertexInputSplitsPaths,
-        "Vertex");
+    return createInputSplits(vertexInputFormat, InputType.VERTEX);
   }
 
   @Override
@@ -754,8 +675,7 @@ public class BspServiceMaster<I extends WritableComparable,
     }
     EdgeInputFormat<I, E> edgeInputFormat =
         getConfiguration().createWrappedEdgeInputFormat();
-    return createInputSplits(edgeInputFormat, edgeInputSplitsPaths,
-        "Edge");
+    return createInputSplits(edgeInputFormat, InputType.EDGE);
   }
 
   @Override
@@ -764,7 +684,7 @@ public class BspServiceMaster<I extends WritableComparable,
   }
 
   @Override
-  public MasterAggregatorHandler getGlobalCommHandler() {
+  public MasterGlobalCommHandler getGlobalCommHandler() {
     return globalCommHandler;
   }
 
@@ -838,7 +758,7 @@ public class BspServiceMaster<I extends WritableComparable,
     });
 
 
-    globalCommHandler.readFields(finalizedStream);
+    globalCommHandler.getAggregatorHandler().readFields(finalizedStream);
     aggregatorTranslation.readFields(finalizedStream);
     masterCompute.readFields(finalizedStream);
     finalizedStream.close();
@@ -911,12 +831,15 @@ public class BspServiceMaster<I extends WritableComparable,
         if (masterChildArr.get(0).equals(myBid)) {
           GiraphStats.getInstance().getCurrentMasterTaskPartition().
               setValue(getTaskPartition());
-          globalCommHandler = new MasterAggregatorHandler(
-              getConfiguration(), getContext());
+
+          globalCommHandler = new MasterGlobalCommHandler(
+              new MasterAggregatorHandler(getConfiguration(), getContext()),
+              new MasterInputSplitsHandler(
+                  getConfiguration().useInputSplitLocality()));
           aggregatorTranslation = new AggregatorToGlobalCommTranslation(
               getConfiguration(), globalCommHandler);
 
-          globalCommHandler.initialize(this);
+          globalCommHandler.getAggregatorHandler().initialize(this);
           masterCompute = getConfiguration().createMasterCompute();
           masterCompute.setMasterService(this);
 
@@ -1128,7 +1051,7 @@ public class BspServiceMaster<I extends WritableComparable,
     for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
       finalizedOutputStream.writeInt(getWorkerId(chosenWorkerInfo));
     }
-    globalCommHandler.write(finalizedOutputStream);
+    globalCommHandler.getAggregatorHandler().write(finalizedOutputStream);
     aggregatorTranslation.write(finalizedOutputStream);
     masterCompute.write(finalizedOutputStream);
     finalizedOutputStream.close();
@@ -1265,12 +1188,8 @@ public class BspServiceMaster<I extends WritableComparable,
   @Override
   public void restartFromCheckpoint(long checkpoint) {
     // Process:
-    // 1. Remove all old input split data
-    // 2. Increase the application attempt and set to the correct checkpoint
-    // 3. Send command to all workers to restart their tasks
-    zkDeleteNode(vertexInputSplitsPaths.getPath());
-    zkDeleteNode(edgeInputSplitsPaths.getPath());
-
+    // 1. Increase the application attempt and set to the correct checkpoint
+    // 2. Send command to all workers to restart their tasks
     setApplicationAttempt(getApplicationAttempt() + 1);
     setCachedSuperstep(checkpoint);
     setRestartedSuperstep(checkpoint);
@@ -1493,37 +1412,32 @@ public class BspServiceMaster<I extends WritableComparable,
 
   /**
    * Coordinate the exchange of vertex/edge input splits among workers.
-   *
-   * @param inputSplitPaths Input split paths
-   * @param inputSplitEvents Input split events
-   * @param inputSplitsType Type of input splits (for logging purposes)
    */
-  private void coordinateInputSplits(InputSplitPaths inputSplitPaths,
-                                     InputSplitEvents inputSplitEvents,
-                                     String inputSplitsType) {
+  private void coordinateInputSplits() {
     // Coordinate the workers finishing sending their vertices/edges to the
     // correct workers and signal when everything is done.
-    String logPrefix = "coordinate" + inputSplitsType + "InputSplits";
-    if (!barrierOnWorkerList(inputSplitPaths.getDonePath(),
+    if (!barrierOnWorkerList(inputSplitsWorkerDonePath,
         chosenWorkerInfoList,
-        inputSplitEvents.getDoneStateChanged(),
+        getInputSplitsWorkerDoneEvent(),
         false)) {
-      throw new IllegalStateException(logPrefix + ": Worker failed during " +
-          "input split (currently not supported)");
+      throw new IllegalStateException("coordinateInputSplits: Worker failed " +
+          "during input split (currently not supported)");
     }
     try {
-      getZkExt().createExt(inputSplitPaths.getAllDonePath(),
+      getZkExt().createExt(inputSplitsAllDonePath,
           null,
           Ids.OPEN_ACL_UNSAFE,
           CreateMode.PERSISTENT,
           false);
     } catch (KeeperException.NodeExistsException e) {
       LOG.info("coordinateInputSplits: Node " +
-          inputSplitPaths.getAllDonePath() + " already exists.");
+          inputSplitsAllDonePath + " already exists.");
     } catch (KeeperException e) {
-      throw new IllegalStateException(logPrefix + ": KeeperException", e);
+      throw new IllegalStateException(
+          "coordinateInputSplits: KeeperException", e);
     } catch (InterruptedException e) {
-      throw new IllegalStateException(logPrefix + ": IllegalStateException", e);
+      throw new IllegalStateException(
+          "coordinateInputSplits: IllegalStateException", e);
     }
   }
 
@@ -1543,7 +1457,7 @@ public class BspServiceMaster<I extends WritableComparable,
    */
   private void initializeAggregatorInputSuperstep()
     throws InterruptedException {
-    globalCommHandler.prepareSuperstep();
+    globalCommHandler.getAggregatorHandler().prepareSuperstep();
 
     prepareMasterCompute(getSuperstep());
     try {
@@ -1559,9 +1473,9 @@ public class BspServiceMaster<I extends WritableComparable,
         "initializeAggregatorInputSuperstep: Failed in access", e);
     }
     aggregatorTranslation.postMasterCompute();
-    globalCommHandler.finishSuperstep();
+    globalCommHandler.getAggregatorHandler().finishSuperstep();
 
-    globalCommHandler.sendDataToOwners(masterClient);
+    globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
   }
 
   /**
@@ -1627,7 +1541,7 @@ public class BspServiceMaster<I extends WritableComparable,
     // We need to finalize aggregators from previous superstep
     if (getSuperstep() >= 0) {
       aggregatorTranslation.postMasterCompute();
-      globalCommHandler.finishSuperstep();
+      globalCommHandler.getAggregatorHandler().finishSuperstep();
     }
 
     masterClient.openConnections();
@@ -1663,25 +1577,13 @@ public class BspServiceMaster<I extends WritableComparable,
 
     // We need to send aggregators to worker owners after new worker assignments
     if (getSuperstep() >= 0) {
-      globalCommHandler.sendDataToOwners(masterClient);
+      globalCommHandler.getAggregatorHandler().sendDataToOwners(masterClient);
     }
 
     if (getSuperstep() == INPUT_SUPERSTEP) {
       // Initialize aggregators before coordinating
       initializeAggregatorInputSuperstep();
-      if (getConfiguration().hasMappingInputFormat()) {
-        coordinateInputSplits(mappingInputSplitsPaths, mappingInputSplitsEvents,
-            "Mapping");
-      }
-      // vertex loading and edge loading
-      if (getConfiguration().hasVertexInputFormat()) {
-        coordinateInputSplits(vertexInputSplitsPaths, vertexInputSplitsEvents,
-            "Vertex");
-      }
-      if (getConfiguration().hasEdgeInputFormat()) {
-        coordinateInputSplits(edgeInputSplitsPaths, edgeInputSplitsEvents,
-            "Edge");
-      }
+      coordinateInputSplits();
     }
 
     String finishedWorkerPath =
@@ -1695,7 +1597,7 @@ public class BspServiceMaster<I extends WritableComparable,
 
     // Collect aggregator values, then run the master.compute() and
     // finally save the aggregator values
-    globalCommHandler.prepareSuperstep();
+    globalCommHandler.getAggregatorHandler().prepareSuperstep();
     aggregatorTranslation.prepareSuperstep();
 
     SuperstepClasses superstepClasses =
@@ -1761,7 +1663,8 @@ public class BspServiceMaster<I extends WritableComparable,
     } else {
       superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
     }
-    globalCommHandler.writeAggregators(getSuperstep(), superstepState);
+    globalCommHandler.getAggregatorHandler().writeAggregators(
+        getSuperstep(), superstepState);
 
     return superstepState;
   }
@@ -2009,7 +1912,7 @@ public class BspServiceMaster<I extends WritableComparable,
         failJob(new Exception("Checkpoint and halt requested. " +
             "Killing this job."));
       }
-      globalCommHandler.close();
+      globalCommHandler.getAggregatorHandler().close();
       masterClient.closeConnections();
       masterServer.close();
     }
@@ -2122,100 +2025,4 @@ public class BspServiceMaster<I extends WritableComparable,
     gs.getAggregateSentMessageBytes()
       .increment(globalStats.getMessageBytesCount());
   }
-
-  /**
-   * Task that writes a given input split to zookeeper.
-   * Upon failure call() throws an exception.
-   */
-  private class WriteInputSplit implements Callable<Void> {
-    /** Input format */
-    private final GiraphInputFormat inputFormat;
-    /** Input split which we are going to write */
-    private final InputSplit inputSplit;
-    /** Input splits path */
-    private final String inputSplitsPath;
-    /** Index of the input split */
-    private final int index;
-    /** Whether to write locality information */
-    private final boolean writeLocations;
-
-    /**
-     * Constructor
-     *
-     * @param inputFormat Input format
-     * @param inputSplit Input split which we are going to write
-     * @param inputSplitsPath Input splits path
-     * @param index Index of the input split
-     * @param writeLocations whether to write the input split's locations (to
-     *                       be used by workers for prioritizing local splits
-     *                       when reading)
-     */
-    public WriteInputSplit(GiraphInputFormat inputFormat,
-                           InputSplit inputSplit,
-                           String inputSplitsPath,
-                           int index,
-                           boolean writeLocations) {
-      this.inputFormat = inputFormat;
-      this.inputSplit = inputSplit;
-      this.inputSplitsPath = inputSplitsPath;
-      this.index = index;
-      this.writeLocations = writeLocations;
-    }
-
-    @Override
-    public Void call() {
-      String inputSplitPath = null;
-      try {
-        ByteArrayOutputStream byteArrayOutputStream =
-            new ByteArrayOutputStream();
-        DataOutput outputStream =
-            new DataOutputStream(byteArrayOutputStream);
-
-        if (writeLocations) {
-          String[] splitLocations = inputSplit.getLocations();
-          StringBuilder locations = null;
-          if (splitLocations != null) {
-            int splitListLength =
-                Math.min(splitLocations.length, localityLimit);
-            locations = new StringBuilder();
-            for (String location : splitLocations) {
-              locations.append(location)
-                  .append(--splitListLength > 0 ? "\t" : "");
-            }
-          }
-          Text.writeString(outputStream,
-              locations == null ? "" : locations.toString());
-        }
-
-        inputFormat.writeInputSplit(inputSplit, outputStream);
-        inputSplitPath = inputSplitsPath + "/" + index;
-        getZkExt().createExt(inputSplitPath,
-            byteArrayOutputStream.toByteArray(),
-            Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT,
-            true);
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("call: Created input split " +
-              "with index " + index + " serialized as " +
-              byteArrayOutputStream.toString(Charset.defaultCharset().name()));
-        }
-      } catch (KeeperException.NodeExistsException e) {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("call: Node " +
-              inputSplitPath + " already exists.");
-        }
-      } catch (KeeperException e) {
-        throw new IllegalStateException(
-            "call: KeeperException", e);
-      } catch (InterruptedException e) {
-        throw new IllegalStateException(
-            "call: IllegalStateException", e);
-      } catch (IOException e) {
-        throw new IllegalStateException(
-            "call: IOException", e);
-      }
-      return null;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
index 5558cee..8ca3d3a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
@@ -42,7 +42,7 @@ import com.google.common.collect.Maps;
 
 /** Handler for reduce/broadcast on the master */
 public class MasterAggregatorHandler
-    implements MasterGlobalCommUsage, Writable {
+    implements MasterGlobalCommUsageAggregators, Writable {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(MasterAggregatorHandler.class);

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java
new file mode 100644
index 0000000..717a24d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.master.input.MasterInputSplitsHandler;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Handler for all master communications
+ */
+public class MasterGlobalCommHandler implements MasterGlobalCommUsage {
+  /** Aggregator handler */
+  private final MasterAggregatorHandler aggregatorHandler;
+  /** Input splits handler*/
+  private final MasterInputSplitsHandler inputSplitsHandler;
+
+  /**
+   * Constructor
+   *
+   * @param aggregatorHandler Aggregator handler
+   * @param inputSplitsHandler Input splits handler
+   */
+  public MasterGlobalCommHandler(
+      MasterAggregatorHandler aggregatorHandler,
+      MasterInputSplitsHandler inputSplitsHandler) {
+    this.aggregatorHandler = aggregatorHandler;
+    this.inputSplitsHandler = inputSplitsHandler;
+  }
+
+  public MasterAggregatorHandler getAggregatorHandler() {
+    return aggregatorHandler;
+  }
+
+  public MasterInputSplitsHandler getInputSplitsHandler() {
+    return inputSplitsHandler;
+  }
+
+  @Override
+  public <S, R extends Writable> void registerReducer(String name,
+      ReduceOperation<S, R> reduceOp) {
+    aggregatorHandler.registerReducer(name, reduceOp);
+  }
+
+  @Override
+  public <S, R extends Writable> void registerReducer(String name,
+      ReduceOperation<S, R> reduceOp, R globalInitialValue) {
+    aggregatorHandler.registerReducer(name, reduceOp, globalInitialValue);
+  }
+
+  @Override
+  public <R extends Writable> R getReduced(String name) {
+    return aggregatorHandler.getReduced(name);
+  }
+
+  @Override
+  public void broadcast(String name, Writable value) {
+    aggregatorHandler.broadcast(name, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
index 7ee9048..60b1809 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java
@@ -17,52 +17,9 @@
  */
 package org.apache.giraph.master;
 
-import org.apache.giraph.reducers.ReduceOperation;
-import org.apache.hadoop.io.Writable;
-
 /**
- * Master compute can access reduce and broadcast methods
- * through this interface, from masterCompute method.
+ * All global master communication
  */
-public interface MasterGlobalCommUsage {
-  /**
-   * Register reducer to be reduced in the next worker computation,
-   * using given name and operations.
-   * @param name Name of the reducer
-   * @param reduceOp Reduce operations
-   * @param <S> Single value type
-   * @param <R> Reduced value type
-   */
-  <S, R extends Writable> void registerReducer(
-      String name, ReduceOperation<S, R> reduceOp);
-
-  /**
-   * Register reducer to be reduced in the next worker computation, using
-   * given name and operations, starting globally from globalInitialValue.
-   * (globalInitialValue is reduced only once, each worker will still start
-   * from neutral initial value)
-   *
-   * @param name Name of the reducer
-   * @param reduceOp Reduce operations
-   * @param globalInitialValue Global initial value
-   * @param <S> Single value type
-   * @param <R> Reduced value type
-   */
-  <S, R extends Writable> void registerReducer(
-      String name, ReduceOperation<S, R> reduceOp, R globalInitialValue);
-
-  /**
-   * Get reduced value from previous worker computation.
-   * @param name Name of the reducer
-   * @return Reduced value
-   * @param <R> Reduced value type
-   */
-  <R extends Writable> R getReduced(String name);
-
-  /**
-   * Broadcast given value to all workers for next computation.
-   * @param name Name of the broadcast object
-   * @param value Value
-   */
-  void broadcast(String name, Writable value);
+public interface MasterGlobalCommUsage
+    extends MasterGlobalCommUsageAggregators {
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java
new file mode 100644
index 0000000..62c1f3f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsageAggregators.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Master compute can access reduce and broadcast methods
+ * through this interface, from masterCompute method.
+ */
+public interface MasterGlobalCommUsageAggregators {
+  /**
+   * Register reducer to be reduced in the next worker computation,
+   * using given name and operations.
+   * @param name Name of the reducer
+   * @param reduceOp Reduce operations
+   * @param <S> Single value type
+   * @param <R> Reduced value type
+   */
+  <S, R extends Writable> void registerReducer(
+      String name, ReduceOperation<S, R> reduceOp);
+
+  /**
+   * Register reducer to be reduced in the next worker computation, using
+   * given name and operations, starting globally from globalInitialValue.
+   * (globalInitialValue is reduced only once, each worker will still start
+   * from neutral initial value)
+   *
+   * @param name Name of the reducer
+   * @param reduceOp Reduce operations
+   * @param globalInitialValue Global initial value
+   * @param <S> Single value type
+   * @param <R> Reduced value type
+   */
+  <S, R extends Writable> void registerReducer(
+      String name, ReduceOperation<S, R> reduceOp, R globalInitialValue);
+
+  /**
+   * Get reduced value from previous worker computation.
+   * @param name Name of the reducer
+   * @return Reduced value
+   * @param <R> Reduced value type
+   */
+  <R extends Writable> R getReduced(String name);
+
+  /**
+   * Broadcast given value to all workers for next computation.
+   * @param name Name of the broadcast object
+   * @param value Value
+   */
+  void broadcast(String name, Writable value);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java
new file mode 100644
index 0000000..5168e32
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/BasicInputSplitsMasterOrganizer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Input splits organizer for vertex and edge input splits on master, which
+ * doesn't use locality information
+ */
+public class BasicInputSplitsMasterOrganizer
+    implements InputSplitsMasterOrganizer {
+  /** Available splits queue */
+  private final ConcurrentLinkedQueue<byte[]> splits;
+
+  /**
+   * Constructor
+   *
+   * @param serializedSplits Splits
+   */
+  public BasicInputSplitsMasterOrganizer(List<byte[]> serializedSplits) {
+    splits = new ConcurrentLinkedQueue<>(serializedSplits);
+  }
+
+  @Override
+  public byte[] getSerializedSplitFor(int workerTaskId) {
+    return splits.poll();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java
new file mode 100644
index 0000000..d5a0131
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/InputSplitsMasterOrganizer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+/**
+ * Interface for different input split organizers on master
+ */
+public interface InputSplitsMasterOrganizer {
+  /**
+   * @param workerTaskId Id of worker requesting split
+   *
+   * @return Get next split for the worker, or null if all splits were taken
+   * already
+   */
+  byte[] getSerializedSplitFor(int workerTaskId);
+}