You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2015/10/19 19:14:31 UTC

[2/3] git commit: updated refs/heads/trunk to 5b0cd0e

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java
new file mode 100644
index 0000000..d3eb5da
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/LocalityAwareInputSplitsMasterOrganizer.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Input splits organizer for vertex and edge input splits on master, which
+ * uses locality information
+ */
+public class LocalityAwareInputSplitsMasterOrganizer
+    implements InputSplitsMasterOrganizer {
+  /** All splits before this pointer were taken */
+  private final AtomicInteger listPointer = new AtomicInteger();
+  /** List of serialized splits */
+  private final List<byte[]> serializedSplits;
+  /** Array containing information about whether a split was taken or not */
+  private final AtomicBoolean[] splitsTaken;
+
+  /** Map with preferred splits for each worker */
+  private final Map<Integer, ConcurrentLinkedQueue<Integer>>
+      workerToPreferredSplitsMap;
+
+
+  /**
+   * Constructor
+   *
+   * @param serializedSplits Serialized splits
+   * @param splits           Splits
+   * @param workers          List of workers
+   */
+  public LocalityAwareInputSplitsMasterOrganizer(List<byte[]> serializedSplits,
+      List<InputSplit> splits, List<WorkerInfo> workers) {
+    this.serializedSplits = serializedSplits;
+    splitsTaken = new AtomicBoolean[serializedSplits.size()];
+    // Mark all splits as not taken initially
+    for (int i = 0; i < serializedSplits.size(); i++) {
+      splitsTaken[i] = new AtomicBoolean(false);
+    }
+
+    workerToPreferredSplitsMap = new HashMap<>();
+    for (WorkerInfo worker : workers) {
+      workerToPreferredSplitsMap.put(worker.getTaskId(),
+          new ConcurrentLinkedQueue<Integer>());
+    }
+    // Go through all splits
+    for (int i = 0; i < splits.size(); i++) {
+      try {
+        String[] locations = splits.get(i).getLocations();
+        // For every worker
+        for (WorkerInfo worker : workers) {
+          // Check splits locations
+          for (String location : locations) {
+            // If split is local for the worker, add it to preferred list
+            if (location.contains(worker.getHostname())) {
+              workerToPreferredSplitsMap.get(worker.getTaskId()).add(i);
+              break;
+            }
+          }
+        }
+      } catch (IOException | InterruptedException e) {
+        throw new IllegalStateException(
+            "Exception occurred while getting splits locations", e);
+      }
+    }
+  }
+
+  @Override
+  public byte[] getSerializedSplitFor(int workerTaskId) {
+    ConcurrentLinkedQueue<Integer> preferredSplits =
+        workerToPreferredSplitsMap.get(workerTaskId);
+    // Try to find a local split
+    while (true) {
+      // Get position to check
+      Integer splitIndex = preferredSplits.poll();
+      // Check if all local splits were already processed for this worker
+      if (splitIndex == null) {
+        break;
+      }
+      // Try to reserve the split
+      if (splitsTaken[splitIndex].compareAndSet(false, true)) {
+        return serializedSplits.get(splitIndex);
+      }
+    }
+
+    // No more local splits available, proceed linearly from splits list
+    while (true) {
+      // Get position to check
+      int splitIndex = listPointer.getAndIncrement();
+      // Check if all splits were already taken
+      if (splitIndex >= serializedSplits.size()) {
+        return null;
+      }
+      // Try to reserve the split
+      if (splitsTaken[splitIndex].compareAndSet(false, true)) {
+        return serializedSplits.get(splitIndex);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java
new file mode 100644
index 0000000..8399c8a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/MappingInputSplitsMasterOrganizer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+import org.apache.giraph.worker.WorkerInfo;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Organizer for mapping splits on master. Mapping splits need all to be
+ * given to all workers, unlike vertex and edge splits which are read by
+ * exactly one worker each
+ */
+public class MappingInputSplitsMasterOrganizer
+    implements InputSplitsMasterOrganizer {
+  /** List of splits */
+  private final List<byte[]> splits;
+  /** Map from worker task id to atomic pointer in splits list */
+  private final Map<Integer, AtomicInteger>
+      workerTaskIdToNextSplitIndexMap;
+
+  /**
+   * Constructor
+   *
+   * @param serializedSplits Splits
+   * @param workers List of workers
+   */
+  public MappingInputSplitsMasterOrganizer(List<byte[]> serializedSplits,
+      List<WorkerInfo> workers) {
+    this.splits = serializedSplits;
+    workerTaskIdToNextSplitIndexMap = new HashMap<>();
+    for (WorkerInfo worker : workers) {
+      workerTaskIdToNextSplitIndexMap.put(
+          worker.getTaskId(), new AtomicInteger(0));
+    }
+  }
+
+  @Override
+  public byte[] getSerializedSplitFor(int workerTaskId) {
+    AtomicInteger nextSplitIndex =
+        workerTaskIdToNextSplitIndexMap.get(workerTaskId);
+    int splitIndex = nextSplitIndex.getAndIncrement();
+    return splitIndex < splits.size() ? splits.get(splitIndex) : null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java
new file mode 100644
index 0000000..327e59d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master.input;
+
+import org.apache.giraph.comm.MasterClient;
+import org.apache.giraph.comm.requests.ReplyWithInputSplitRequest;
+import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.giraph.io.InputType;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Handler for input splits on master
+ *
+ * Since currently Giraph fails if worker fails while reading input, we
+ * didn't complicate this part with retries yet, later it could be added by
+ * keeping track of which worker got which split and then if worker dies put
+ * these splits back to queues.
+ */
+public class MasterInputSplitsHandler {
+  /** Whether to use locality information */
+  private final boolean useLocality;
+  /** Master client */
+  private MasterClient masterClient;
+  /** Master client */
+  private List<WorkerInfo> workers;
+  /** Map of splits organizers for each split type */
+  private Map<InputType, InputSplitsMasterOrganizer> splitsMap =
+      new EnumMap<>(InputType.class);
+  /** Latches to say when one input splits type is ready to be accessed */
+  private Map<InputType, CountDownLatch> latchesMap =
+      new EnumMap<>(InputType.class);
+
+  /**
+   * Constructor
+   *
+   * @param useLocality Whether to use locality information or not
+   */
+  public MasterInputSplitsHandler(boolean useLocality) {
+    this.useLocality = useLocality;
+    for (InputType inputType : InputType.values()) {
+      latchesMap.put(inputType, new CountDownLatch(1));
+    }
+  }
+
+  /**
+   * Initialize
+   *
+   * @param masterClient Master client
+   * @param workers List of workers
+   */
+  public void initialize(MasterClient masterClient, List<WorkerInfo> workers) {
+    this.masterClient = masterClient;
+    this.workers = workers;
+  }
+
+  /**
+   * Add splits
+   *
+   * @param splitsType Type of splits
+   * @param inputSplits Splits
+   * @param inputFormat Format
+   */
+  public void addSplits(InputType splitsType, List<InputSplit> inputSplits,
+      GiraphInputFormat inputFormat) {
+    List<byte[]> serializedSplits = new ArrayList<>();
+    for (InputSplit inputSplit : inputSplits) {
+      try {
+        ByteArrayOutputStream byteArrayOutputStream =
+            new ByteArrayOutputStream();
+        DataOutput outputStream =
+            new DataOutputStream(byteArrayOutputStream);
+        inputFormat.writeInputSplit(inputSplit, outputStream);
+        serializedSplits.add(byteArrayOutputStream.toByteArray());
+      } catch (IOException e) {
+        throw new IllegalStateException("IOException occurred", e);
+      }
+    }
+    InputSplitsMasterOrganizer inputSplitsOrganizer;
+    if (splitsType == InputType.MAPPING) {
+      inputSplitsOrganizer = new MappingInputSplitsMasterOrganizer(
+          serializedSplits, workers);
+    } else {
+      inputSplitsOrganizer = useLocality ?
+          new LocalityAwareInputSplitsMasterOrganizer(serializedSplits,
+              inputSplits, workers) :
+          new BasicInputSplitsMasterOrganizer(serializedSplits);
+    }
+    splitsMap.put(splitsType, inputSplitsOrganizer);
+    latchesMap.get(splitsType).countDown();
+  }
+
+  /**
+   * Called after we receive a split request from some worker, should send
+   * split back to it if available, or send it information that there is no
+   * more available
+   *
+   * @param splitType Type of split requested
+   * @param workerTaskId Id of worker who requested split
+   */
+  public void sendSplitTo(InputType splitType, int workerTaskId) {
+    try {
+      // Make sure we don't try to retrieve splits before they were added
+      latchesMap.get(splitType).await();
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Interrupted", e);
+    }
+    byte[] serializedInputSplit =
+        splitsMap.get(splitType).getSerializedSplitFor(workerTaskId);
+    masterClient.sendWritableRequest(workerTaskId,
+        new ReplyWithInputSplitRequest(splitType,
+            serializedInputSplit == null ? new byte[0] : serializedInputSplit));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java b/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java
new file mode 100644
index 0000000..992b6fe
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Input related master classes
+ */
+package org.apache.giraph.master.input;

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
index 4600745..6914c3b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
@@ -115,7 +115,7 @@ public class PartitionUtils {
         workerStatsMap.put(
             workerInfo,
             new VertexEdgeCount(partitionStats.getVertexCount(),
-                partitionStats.getEdgeCount()));
+                partitionStats.getEdgeCount(), 0));
       } else {
         workerStatsMap.put(
             workerInfo,

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 5b754d6..1031bb3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -59,8 +59,6 @@ import org.apache.giraph.graph.AddressesAndPartitionsWritable;
 import org.apache.giraph.graph.FinishedSuperstepStats;
 import org.apache.giraph.graph.GlobalStats;
 import org.apache.giraph.graph.GraphTaskManager;
-import org.apache.giraph.graph.InputSplitEvents;
-import org.apache.giraph.graph.InputSplitPaths;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.EdgeOutputFormat;
@@ -177,10 +175,8 @@ public class BspServiceWorker<I extends WritableComparable,
   /** Time spent waiting on requests to finish */
   private GiraphTimer waitRequestsTimer;
 
-  /** InputSplit handlers used in INPUT_SUPERSTEP for vertex splits */
-  private InputSplitsHandler vertexSplitsHandler;
-  /** InputSplit handlers used in INPUT_SUPERSTEP for edge splits */
-  private InputSplitsHandler edgeSplitsHandler;
+  /** InputSplit handlers used in INPUT_SUPERSTEP */
+  private WorkerInputSplitsHandler inputSplitsHandler;
 
   /**
    * Constructor for setting up the worker.
@@ -237,8 +233,9 @@ public class BspServiceWorker<I extends WritableComparable,
         null;
 
     GiraphMetrics.get().addSuperstepResetObserver(this);
-    vertexSplitsHandler = null;
-    edgeSplitsHandler = null;
+
+    inputSplitsHandler = new WorkerInputSplitsHandler(
+        workerInfo, masterInfo.getTaskId(), workerClient);
   }
 
   @Override
@@ -295,26 +292,20 @@ public class BspServiceWorker<I extends WritableComparable,
    *
    * Use one or more threads to do the loading.
    *
-   * @param inputSplitPathList List of input split paths
    * @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s
    * @return Statistics of the vertices and edges loaded
    * @throws InterruptedException
    * @throws KeeperException
    */
   private VertexEdgeCount loadInputSplits(
-      List<String> inputSplitPathList,
       CallableFactory<VertexEdgeCount> inputSplitsCallableFactory)
     throws KeeperException, InterruptedException {
     VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
-    // Determine how many threads to use based on the number of input splits
-    int maxInputSplitThreads = (inputSplitPathList.size() - 1) /
-        getConfiguration().getMaxWorkers() + 1;
-    int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
-        maxInputSplitThreads);
+    int numThreads = getConfiguration().getNumInputSplitsThreads();
     if (LOG.isInfoEnabled()) {
       LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
           "originally " + getConfiguration().getNumInputSplitsThreads() +
-          " threads(s) for " + inputSplitPathList.size() + " total splits.");
+          " threads(s)");
     }
 
     List<VertexEdgeCount> results =
@@ -336,46 +327,21 @@ public class BspServiceWorker<I extends WritableComparable,
    */
   private long loadMapping() throws KeeperException,
     InterruptedException {
-    List<String> inputSplitPathList =
-        getZkExt().getChildrenExt(mappingInputSplitsPaths.getPath(),
-        false, false, true);
-
-    InputSplitPathOrganizer splitOrganizer =
-        new InputSplitPathOrganizer(getZkExt(),
-            inputSplitPathList, getWorkerInfo().getHostname(),
-            getConfiguration().useInputSplitLocality());
-
     MappingInputSplitsCallableFactory<I, V, E, ? extends Writable>
-        mappingInputSplitsCallableFactory =
+        inputSplitsCallableFactory =
         new MappingInputSplitsCallableFactory<>(
             getConfiguration().createWrappedMappingInputFormat(),
-            splitOrganizer,
             getContext(),
             getConfiguration(),
             this,
-            getZkExt());
+            inputSplitsHandler);
 
-    long entriesLoaded = 0;
-    // Determine how many threads to use based on the number of input splits
-    int maxInputSplitThreads = inputSplitPathList.size();
-    int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
-        maxInputSplitThreads);
-    if (LOG.isInfoEnabled()) {
-      LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
-          "originally " + getConfiguration().getNumInputSplitsThreads() +
-          " threads(s) for " + inputSplitPathList.size() + " total splits.");
-    }
+    long mappingsLoaded =
+        loadInputSplits(inputSplitsCallableFactory).getMappingCount();
 
-    List<Integer> results =
-        ProgressableUtils.getResultsWithNCallables(
-            mappingInputSplitsCallableFactory,
-            numThreads, "load-mapping-%d", getContext());
-    for (Integer result : results) {
-      entriesLoaded += result;
-    }
     // after all threads finish loading - call postFilling
     localData.getMappingStore().postFilling();
-    return entriesLoaded;
+    return mappingsLoaded;
   }
 
   /**
@@ -386,31 +352,15 @@ public class BspServiceWorker<I extends WritableComparable,
    */
   private VertexEdgeCount loadVertices() throws KeeperException,
       InterruptedException {
-    List<String> inputSplitPathList =
-        getZkExt().getChildrenExt(vertexInputSplitsPaths.getPath(),
-            false, false, true);
-
-    InputSplitPathOrganizer splitOrganizer =
-        new InputSplitPathOrganizer(getZkExt(),
-            inputSplitPathList, getWorkerInfo().getHostname(),
-            getConfiguration().useInputSplitLocality());
-    vertexSplitsHandler = new InputSplitsHandler(
-        splitOrganizer,
-        getZkExt(),
-        getContext(),
-        BspService.VERTEX_INPUT_SPLIT_RESERVED_NODE,
-        BspService.VERTEX_INPUT_SPLIT_FINISHED_NODE);
-
     VertexInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
         new VertexInputSplitsCallableFactory<I, V, E>(
             getConfiguration().createWrappedVertexInputFormat(),
             getContext(),
             getConfiguration(),
             this,
-            vertexSplitsHandler,
-            getZkExt());
+            inputSplitsHandler);
 
-    return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory);
+    return loadInputSplits(inputSplitsCallableFactory);
   }
 
   /**
@@ -420,32 +370,15 @@ public class BspServiceWorker<I extends WritableComparable,
    * @return Number of edges loaded
    */
   private long loadEdges() throws KeeperException, InterruptedException {
-    List<String> inputSplitPathList =
-        getZkExt().getChildrenExt(edgeInputSplitsPaths.getPath(),
-            false, false, true);
-
-    InputSplitPathOrganizer splitOrganizer =
-        new InputSplitPathOrganizer(getZkExt(),
-            inputSplitPathList, getWorkerInfo().getHostname(),
-            getConfiguration().useInputSplitLocality());
-    edgeSplitsHandler = new InputSplitsHandler(
-        splitOrganizer,
-        getZkExt(),
-        getContext(),
-        BspService.EDGE_INPUT_SPLIT_RESERVED_NODE,
-        BspService.EDGE_INPUT_SPLIT_FINISHED_NODE);
-
     EdgeInputSplitsCallableFactory<I, V, E> inputSplitsCallableFactory =
         new EdgeInputSplitsCallableFactory<I, V, E>(
             getConfiguration().createWrappedEdgeInputFormat(),
             getContext(),
             getConfiguration(),
             this,
-            edgeSplitsHandler,
-            getZkExt());
+            inputSplitsHandler);
 
-    return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory).
-        getEdgeCount();
+    return loadInputSplits(inputSplitsCallableFactory).getEdgeCount();
   }
 
   @Override
@@ -459,46 +392,12 @@ public class BspServiceWorker<I extends WritableComparable,
   }
 
   /**
-   * Ensure the input splits are ready for processing
-   *
-   * @param inputSplitPaths Input split paths
-   * @param inputSplitEvents Input split events
-   */
-  private void ensureInputSplitsReady(InputSplitPaths inputSplitPaths,
-                                      InputSplitEvents inputSplitEvents) {
-    while (true) {
-      Stat inputSplitsReadyStat;
-      try {
-        inputSplitsReadyStat = getZkExt().exists(
-            inputSplitPaths.getAllReadyPath(), true);
-      } catch (KeeperException e) {
-        throw new IllegalStateException("ensureInputSplitsReady: " +
-            "KeeperException waiting on input splits", e);
-      } catch (InterruptedException e) {
-        throw new IllegalStateException("ensureInputSplitsReady: " +
-            "InterruptedException waiting on input splits", e);
-      }
-      if (inputSplitsReadyStat != null) {
-        break;
-      }
-      inputSplitEvents.getAllReadyChanged().waitForever();
-      inputSplitEvents.getAllReadyChanged().reset();
-    }
-  }
-
-  /**
    * Mark current worker as done and then wait for all workers
    * to finish processing input splits.
-   *
-   * @param inputSplitPaths Input split paths
-   * @param inputSplitEvents Input split events
    */
-  private void markCurrentWorkerDoneThenWaitForOthers(
-    InputSplitPaths inputSplitPaths,
-    InputSplitEvents inputSplitEvents) {
+  private void markCurrentWorkerDoneReadingThenWaitForOthers() {
     String workerInputSplitsDonePath =
-        inputSplitPaths.getDonePath() + "/" +
-            getWorkerInfo().getHostnameId();
+        inputSplitsWorkerDonePath + "/" + getWorkerInfo().getHostnameId();
     try {
       getZkExt().createExt(workerInputSplitsDonePath,
           null,
@@ -508,32 +407,31 @@ public class BspServiceWorker<I extends WritableComparable,
     } catch (KeeperException e) {
       throw new IllegalStateException(
           "markCurrentWorkerDoneThenWaitForOthers: " +
-          "KeeperException creating worker done splits", e);
+              "KeeperException creating worker done splits", e);
     } catch (InterruptedException e) {
       throw new IllegalStateException(
           "markCurrentWorkerDoneThenWaitForOthers: " +
-          "InterruptedException creating worker done splits", e);
+              "InterruptedException creating worker done splits", e);
     }
     while (true) {
       Stat inputSplitsDoneStat;
       try {
         inputSplitsDoneStat =
-            getZkExt().exists(inputSplitPaths.getAllDonePath(),
-                true);
+            getZkExt().exists(inputSplitsAllDonePath, true);
       } catch (KeeperException e) {
         throw new IllegalStateException(
             "markCurrentWorkerDoneThenWaitForOthers: " +
-            "KeeperException waiting on worker done splits", e);
+                "KeeperException waiting on worker done splits", e);
       } catch (InterruptedException e) {
         throw new IllegalStateException(
             "markCurrentWorkerDoneThenWaitForOthers: " +
-            "InterruptedException waiting on worker done splits", e);
+                "InterruptedException waiting on worker done splits", e);
       }
       if (inputSplitsDoneStat != null) {
         break;
       }
-      inputSplitEvents.getAllDoneChanged().waitForever();
-      inputSplitEvents.getAllDoneChanged().reset();
+      getInputSplitsAllDoneEvent().waitForever();
+      getInputSplitsAllDoneEvent().reset();
     }
   }
 
@@ -597,8 +495,6 @@ else[HADOOP_NON_SECURE]*/
     long entriesLoaded;
 
     if (getConfiguration().hasMappingInputFormat()) {
-      // Ensure the mapping InputSplits are ready for processing
-      ensureInputSplitsReady(mappingInputSplitsPaths, mappingInputSplitsEvents);
       getContext().progress();
       try {
         entriesLoaded = loadMapping();
@@ -618,17 +514,12 @@ else[HADOOP_NON_SECURE]*/
             entriesLoaded + " entries from inputSplits");
       }
 
-      // Workers wait for each other to finish, coordinated by master
-      markCurrentWorkerDoneThenWaitForOthers(mappingInputSplitsPaths,
-          mappingInputSplitsEvents);
       // Print stats for data stored in localData once mapping is fully
       // loaded on all the workers
       localData.printStats();
     }
 
     if (getConfiguration().hasVertexInputFormat()) {
-      // Ensure the vertex InputSplits are ready for processing
-      ensureInputSplitsReady(vertexInputSplitsPaths, vertexInputSplitsEvents);
       getContext().progress();
       try {
         vertexEdgeCount = loadVertices();
@@ -646,8 +537,6 @@ else[HADOOP_NON_SECURE]*/
     WorkerProgress.get().finishLoadingVertices();
 
     if (getConfiguration().hasEdgeInputFormat()) {
-      // Ensure the edge InputSplits are ready for processing
-      ensureInputSplitsReady(edgeInputSplitsPaths, edgeInputSplitsEvents);
       getContext().progress();
       try {
         vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges());
@@ -666,17 +555,7 @@ else[HADOOP_NON_SECURE]*/
       LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
     }
 
-    if (getConfiguration().hasVertexInputFormat()) {
-      // Workers wait for each other to finish, coordinated by master
-      markCurrentWorkerDoneThenWaitForOthers(vertexInputSplitsPaths,
-          vertexInputSplitsEvents);
-    }
-
-    if (getConfiguration().hasEdgeInputFormat()) {
-      // Workers wait for each other to finish, coordinated by master
-      markCurrentWorkerDoneThenWaitForOthers(edgeInputSplitsPaths,
-          edgeInputSplitsEvents);
-    }
+    markCurrentWorkerDoneReadingThenWaitForOthers();
 
     // Create remaining partitions owned by this worker.
     for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
@@ -898,13 +777,6 @@ else[HADOOP_NON_SECURE]*/
 
     if (getSuperstep() != INPUT_SUPERSTEP) {
       postSuperstepCallbacks();
-    } else {
-      if (getConfiguration().hasVertexInputFormat()) {
-        vertexSplitsHandler.setDoneReadingGraph(true);
-      }
-      if (getConfiguration().hasEdgeInputFormat()) {
-        edgeSplitsHandler.setDoneReadingGraph(true);
-      }
     }
 
     globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor);
@@ -1692,7 +1564,7 @@ else[HADOOP_NON_SECURE]*/
       workerClient.setup(getConfiguration().authenticate());
 /*end[HADOOP_NON_SECURE]*/
       return new VertexEdgeCount(globalStats.getVertexCount(),
-          globalStats.getEdgeCount());
+          globalStats.getEdgeCount(), 0);
 
     } catch (IOException e) {
       throw new RuntimeException(
@@ -1963,4 +1835,9 @@ else[HADOOP_NON_SECURE]*/
     }
     return count;
   }
+
+  @Override
+  public WorkerInputSplitsHandler getInputSplitsHandler() {
+    return inputSplitsHandler;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 89f74b3..b7f1eb6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -26,9 +26,9 @@ import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
 import org.apache.giraph.io.filters.EdgeInputFilter;
+import org.apache.giraph.io.InputType;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -89,17 +89,14 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
    * @param configuration Configuration
    * @param bspServiceWorker service worker
    * @param splitsHandler Handler for input splits
-   * @param zooKeeperExt Handle to ZooKeeperExt
    */
   public EdgeInputSplitsCallable(
       EdgeInputFormat<I, E> edgeInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
       BspServiceWorker<I, V, E> bspServiceWorker,
-      InputSplitsHandler splitsHandler,
-      ZooKeeperExt zooKeeperExt)  {
-    super(context, configuration, bspServiceWorker, splitsHandler,
-        zooKeeperExt);
+      WorkerInputSplitsHandler splitsHandler)  {
+    super(context, configuration, bspServiceWorker, splitsHandler);
     this.edgeInputFormat = edgeInputFormat;
 
     this.bspServiceWorker = bspServiceWorker;
@@ -126,6 +123,11 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
     return edgeInputFormat;
   }
 
+  @Override
+  public InputType getInputType() {
+    return InputType.EDGE;
+  }
+
   /**
    * Read edges from input split.  If testing, the user may request a
    * maximum number of edges to be read from an input split.
@@ -226,6 +228,6 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
         inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD);
     WorkerProgress.get().incrementEdgeInputSplitsLoaded();
 
-    return new VertexEdgeCount(0, inputSplitEdgesLoaded);
+    return new VertexEdgeCount(0, inputSplitEdgesLoaded, 0);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
index f68ac93..d4bc1fc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java
@@ -22,7 +22,6 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -46,9 +45,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
   /** {@link BspServiceWorker} we're running on. */
   private final BspServiceWorker<I, V, E> bspServiceWorker;
   /** Handler for input splits */
-  private final InputSplitsHandler splitsHandler;
-  /** {@link ZooKeeperExt} for this worker. */
-  private final ZooKeeperExt zooKeeperExt;
+  private final WorkerInputSplitsHandler splitsHandler;
 
   /**
    * Constructor.
@@ -58,20 +55,17 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
    * @param configuration Configuration
    * @param bspServiceWorker Calling {@link BspServiceWorker}
    * @param splitsHandler Handler for input splits
-   * @param zooKeeperExt {@link ZooKeeperExt} for this worker
    */
   public EdgeInputSplitsCallableFactory(
       EdgeInputFormat<I, E> edgeInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
       BspServiceWorker<I, V, E> bspServiceWorker,
-      InputSplitsHandler splitsHandler,
-      ZooKeeperExt zooKeeperExt) {
+      WorkerInputSplitsHandler splitsHandler) {
     this.edgeInputFormat = edgeInputFormat;
     this.context = context;
     this.configuration = configuration;
     this.bspServiceWorker = bspServiceWorker;
-    this.zooKeeperExt = zooKeeperExt;
     this.splitsHandler = splitsHandler;
   }
 
@@ -82,7 +76,6 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
         context,
         configuration,
         bspServiceWorker,
-        splitsHandler,
-        zooKeeperExt);
+        splitsHandler);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
deleted file mode 100644
index 4e93ce0..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.worker;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.io.GiraphInputFormat;
-import org.apache.giraph.time.SystemTime;
-import org.apache.giraph.time.Time;
-import org.apache.giraph.time.Times;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * InputSplitCallable to read all the splits
- *
- * @param <I> vertexId type
- * @param <V> vertexValue type
- * @param <E> edgeValue type
- */
-public abstract class FullInputSplitCallable<I extends WritableComparable,
-  V extends Writable, E extends Writable>
-  implements Callable<Integer> {
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(
-    FullInputSplitCallable.class);
-  /** Class time object */
-  private static final Time TIME = SystemTime.get();
-  /** Configuration */
-  protected final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
-  /** Context */
-  protected final Mapper<?, ?, ?, ?>.Context context;
-
-  /** The List of InputSplit znode paths */
-  private final List<String> pathList;
-  /** Current position in the path list */
-  private final AtomicInteger currentIndex;
-  /** ZooKeeperExt handle */
-  private final ZooKeeperExt zooKeeperExt;
-  /** Get the start time in nanos */
-  private final long startNanos = TIME.getNanoseconds();
-
-  // CHECKSTYLE: stop ParameterNumberCheck
-  /**
-   * Constructor.
-
-   * @param splitOrganizer Input splits organizer
-   * @param context Context
-   * @param configuration Configuration
-   * @param zooKeeperExt Handle to ZooKeeperExt
-   * @param currentIndex Atomic Integer to get splitPath from list
-   */
-  public FullInputSplitCallable(InputSplitPathOrganizer splitOrganizer,
-      Mapper<?, ?, ?, ?>.Context context,
-      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
-      ZooKeeperExt zooKeeperExt,
-      AtomicInteger currentIndex) {
-    this.pathList = Lists.newArrayList(splitOrganizer.getPathList());
-    this.currentIndex = currentIndex;
-    this.zooKeeperExt = zooKeeperExt;
-    this.context = context;
-    this.configuration = configuration;
-  }
-  // CHECKSTYLE: resume ParameterNumberCheck
-
-  /**
-   * Get input format
-   *
-   * @return Input format
-   */
-  public abstract GiraphInputFormat getInputFormat();
-
-  /**
-   * Load mapping entries from all the given input splits
-   *
-   * @param inputSplit Input split to load
-   * @return Count of vertices and edges loaded
-   * @throws java.io.IOException
-   * @throws InterruptedException
-   */
-  protected abstract Integer readInputSplit(InputSplit inputSplit)
-    throws IOException, InterruptedException;
-
-  @Override
-  public Integer call() {
-    int entries = 0;
-    String inputSplitPath;
-    int inputSplitsProcessed = 0;
-    try {
-      while (true) {
-        int pos = currentIndex.getAndIncrement();
-        if (pos >= pathList.size()) {
-          break;
-        }
-        inputSplitPath = pathList.get(pos);
-        entries += loadInputSplit(inputSplitPath);
-        context.progress();
-        ++inputSplitsProcessed;
-      }
-    } catch (InterruptedException e) {
-      throw new IllegalStateException("call: InterruptedException", e);
-    } catch (IOException e) {
-      throw new IllegalStateException("call: IOException", e);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalStateException("call: ClassNotFoundException", e);
-    } catch (InstantiationException e) {
-      throw new IllegalStateException("call: InstantiationException", e);
-    } catch (IllegalAccessException e) {
-      throw new IllegalStateException("call: IllegalAccessException", e);
-    }
-
-    if (LOG.isInfoEnabled()) {
-      float seconds = Times.getNanosSince(TIME, startNanos) /
-          Time.NS_PER_SECOND_AS_FLOAT;
-      float entriesPerSecond = entries / seconds;
-      LOG.info("call: Loaded " + inputSplitsProcessed + " " +
-          "input splits in " + seconds + " secs, " + entries +
-          " " + entriesPerSecond + " entries/sec");
-    }
-    return entries;
-  }
-
-  /**
-   * Extract entries from input split, saving them into mapping store.
-   * Mark the input split finished when done.
-   *
-   * @param inputSplitPath ZK location of input split
-   * @return Number of entries read in this input split
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws InterruptedException
-   * @throws InstantiationException
-   * @throws IllegalAccessException
-   */
-  private Integer loadInputSplit(
-    String inputSplitPath)
-    throws IOException, ClassNotFoundException, InterruptedException,
-    InstantiationException, IllegalAccessException {
-    InputSplit inputSplit = getInputSplit(inputSplitPath);
-    Integer entriesRead = readInputSplit(inputSplit);
-    if (LOG.isInfoEnabled()) {
-      LOG.info("loadFromInputSplit: Finished loading " +
-          inputSplitPath + " " + entriesRead);
-    }
-    return entriesRead;
-  }
-
-  /**
-   * Talk to ZooKeeper to convert the input split path to the actual
-   * InputSplit.
-   *
-   * @param inputSplitPath Location in ZK of input split
-   * @return instance of InputSplit
-   * @throws IOException
-   * @throws ClassNotFoundException
-   */
-  protected InputSplit getInputSplit(String inputSplitPath)
-    throws IOException, ClassNotFoundException {
-    byte[] splitList;
-    try {
-      splitList = zooKeeperExt.getData(inputSplitPath, false, null);
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "getInputSplit: KeeperException on " + inputSplitPath, e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "getInputSplit: IllegalStateException on " + inputSplitPath, e);
-    }
-    context.progress();
-
-    DataInputStream inputStream =
-        new DataInputStream(new ByteArrayInputStream(splitList));
-    InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("getInputSplit: Processing " + inputSplitPath +
-          " from ZooKeeper and got input split '" +
-          inputSplit.toString() + "'");
-    }
-    return inputSplit;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
deleted file mode 100644
index 463601c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.worker;
-
-import com.google.common.collect.Lists;
-
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Text;
-import org.apache.zookeeper.KeeperException;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Utility class to extract the list of InputSplits from the
- * ZooKeeper tree of "claimable splits" the master created,
- * and to sort the list to favor local data blocks.
- *
- * This class provides an Iterator for the list the worker will
- * claim splits from, making all sorting and data-code locality
- * processing done here invisible to callers. The aim is to cut
- * down on the number of ZK reads workers perform before locating
- * an unclaimed InputSplit.
- */
-public class InputSplitPathOrganizer {
-  /** The worker's local ZooKeeperExt ref */
-  private final ZooKeeperExt zooKeeper;
-  /** The List of InputSplit znode paths */
-  private final List<String> pathList;
-  /** The worker's hostname */
-  private final String hostName;
-
-  /**
-   * Constructor
-   *
-   * @param zooKeeper the worker's ZkExt
-   * @param zkPathList the path to read from
-   * @param hostName the worker's host name (for matching)
-   * @param useLocality whether to prioritize local input splits
-   */
-  public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper,
-    final String zkPathList, final String hostName,
-    final boolean useLocality) throws KeeperException, InterruptedException {
-    this(zooKeeper, zooKeeper.getChildrenExt(zkPathList, false, false, true),
-        hostName, useLocality);
-  }
-
-  /**
-   * Constructor
-   *
-   * @param zooKeeper the worker's ZkExt
-   * @param inputSplitPathList path of input splits to read from
-   * @param hostName the worker's host name (for matching)
-   * @param useLocality whether to prioritize local input splits
-   */
-  public InputSplitPathOrganizer(
-      final ZooKeeperExt zooKeeper, final List<String> inputSplitPathList,
-      final String hostName, final boolean useLocality) {
-    this.zooKeeper = zooKeeper;
-    this.pathList = Lists.newArrayList(inputSplitPathList);
-    this.hostName = hostName;
-    // Shuffle input splits in case several workers exist on this host
-    Collections.shuffle(pathList);
-    if (useLocality) {
-      prioritizeLocalInputSplits();
-    }
-  }
-
-  /**
-  * Re-order list of InputSplits so files local to this worker node's
-  * disk are the first it will iterate over when attempting to claim
-  * a split to read. This will increase locality of data reads with greater
-  * probability as the % of total nodes in the cluster hosting data and workers
-  * BOTH increase towards 100%. Replication increases our chances of a "hit."
-  */
-  private void prioritizeLocalInputSplits() {
-    List<String> sortedList = new ArrayList<String>();
-    String hosts;
-    for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) {
-      final String path = iterator.next();
-      try {
-        hosts = getLocationsFromZkInputSplitData(path);
-      } catch (IOException ioe) {
-        hosts = null; // no problem, just don't sort this entry
-      } catch (KeeperException ke) {
-        hosts = null;
-      } catch (InterruptedException ie) {
-        hosts = null;
-      }
-      if (hosts != null && hosts.contains(hostName)) {
-        sortedList.add(path); // collect the local block
-        iterator.remove(); // remove local block from list
-      }
-    }
-    pathList.addAll(0, sortedList);
-  }
-
-  /**
-   * Utility for extracting locality data from an InputSplit ZNode.
-   *
-   * @param zkSplitPath the input split path to attempt to read
-   * ZNode locality data from for this InputSplit.
-   * @return a String of hostnames from ZNode data, or throws
-   */
-  private String getLocationsFromZkInputSplitData(String zkSplitPath)
-    throws IOException, KeeperException, InterruptedException {
-    byte[] locationData = zooKeeper.getData(zkSplitPath, false, null);
-    DataInputStream inputStream =
-      new DataInputStream(new ByteArrayInputStream(locationData));
-    // only read the "first" entry in the znode data, the locations
-    return Text.readString(inputStream);
-  }
-
-  /**
-   * Get the ordered input splits paths.
-   *
-   * @return Ordered input splits paths
-   */
-  public Iterable<String> getPathList() {
-    return pathList;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index 7b2fc0f..92b23bd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -28,6 +28,7 @@ import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.giraph.io.InputType;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.GiraphMetricsRegistry;
 import org.apache.giraph.metrics.MeterDesc;
@@ -35,14 +36,11 @@ import org.apache.giraph.metrics.MetricNames;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
 import org.apache.giraph.time.Times;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
 
 import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Meter;
@@ -75,9 +73,7 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
    * Stores and processes the list of InputSplits advertised
    * in a tree of child znodes by the master.
    */
-  private final InputSplitsHandler splitsHandler;
-  /** ZooKeeperExt handle */
-  private final ZooKeeperExt zooKeeperExt;
+  private final WorkerInputSplitsHandler splitsHandler;
   /** Get the start time in nanos */
   private final long startNanos = TIME.getNanoseconds();
   /** Whether to prioritize local input splits. */
@@ -91,15 +87,12 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
    * @param configuration Configuration
    * @param bspServiceWorker service worker
    * @param splitsHandler Handler for input splits
-   * @param zooKeeperExt Handle to ZooKeeperExt
    */
   public InputSplitsCallable(
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
       BspServiceWorker<I, V, E> bspServiceWorker,
-      InputSplitsHandler splitsHandler,
-      ZooKeeperExt zooKeeperExt) {
-    this.zooKeeperExt = zooKeeperExt;
+      WorkerInputSplitsHandler splitsHandler) {
     this.context = context;
     this.workerClientRequestProcessor =
         new NettyWorkerClientRequestProcessor<I, V, E>(
@@ -119,6 +112,13 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
   public abstract GiraphInputFormat getInputFormat();
 
   /**
+   * Get input type
+   *
+   * @return Input type
+   */
+  public abstract InputType getInputType();
+
+  /**
    * Get Meter tracking edges loaded
    *
    * @return Meter tracking edges loaded
@@ -205,27 +205,22 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
   @Override
   public VertexEdgeCount call() {
     VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
-    String inputSplitPath;
+    byte[] serializedInputSplit;
     int inputSplitsProcessed = 0;
     try {
-      while ((inputSplitPath = splitsHandler.reserveInputSplit()) != null) {
-        vertexEdgeCount =
-            vertexEdgeCount.incrVertexEdgeCount(loadInputSplit(inputSplitPath));
+      while ((serializedInputSplit =
+          splitsHandler.reserveInputSplit(getInputType())) != null) {
+        vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
+            loadInputSplit(serializedInputSplit));
         context.progress();
         ++inputSplitsProcessed;
       }
-    } catch (KeeperException e) {
-      throw new IllegalStateException("call: KeeperException", e);
     } catch (InterruptedException e) {
       throw new IllegalStateException("call: InterruptedException", e);
     } catch (IOException e) {
       throw new IllegalStateException("call: IOException", e);
     } catch (ClassNotFoundException e) {
       throw new IllegalStateException("call: ClassNotFoundException", e);
-    } catch (InstantiationException e) {
-      throw new IllegalStateException("call: InstantiationException", e);
-    } catch (IllegalAccessException e) {
-      throw new IllegalStateException("call: IllegalAccessException", e);
     }
 
     if (LOG.isInfoEnabled()) {
@@ -252,25 +247,19 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
    * reached in readVerticeFromInputSplit.
    * Mark the input split finished when done.
    *
-   * @param inputSplitPath ZK location of input split
+   * @param serializedInputSplit Serialized input split
    * @return Mapping of vertex indices and statistics, or null if no data read
    * @throws IOException
    * @throws ClassNotFoundException
    * @throws InterruptedException
-   * @throws InstantiationException
-   * @throws IllegalAccessException
    */
-  private VertexEdgeCount loadInputSplit(
-      String inputSplitPath)
-    throws IOException, ClassNotFoundException, InterruptedException,
-      InstantiationException, IllegalAccessException {
-    InputSplit inputSplit = getInputSplit(inputSplitPath);
+  private VertexEdgeCount loadInputSplit(byte[] serializedInputSplit)
+      throws IOException, ClassNotFoundException, InterruptedException {
+    InputSplit inputSplit = getInputSplit(serializedInputSplit);
     VertexEdgeCount vertexEdgeCount = readInputSplit(inputSplit);
     if (LOG.isInfoEnabled()) {
-      LOG.info("loadFromInputSplit: Finished loading " +
-          inputSplitPath + " " + vertexEdgeCount);
+      LOG.info("loadFromInputSplit: Finished loading " + vertexEdgeCount);
     }
-    splitsHandler.markInputSplitPathFinished(inputSplitPath);
     return vertexEdgeCount;
   }
 
@@ -278,35 +267,19 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
    * Talk to ZooKeeper to convert the input split path to the actual
    * InputSplit.
    *
-   * @param inputSplitPath Location in ZK of input split
+   * @param serializedInputSplit Serialized input split
    * @return instance of InputSplit
    * @throws IOException
    * @throws ClassNotFoundException
    */
-  protected InputSplit getInputSplit(String inputSplitPath)
-    throws IOException, ClassNotFoundException {
-    byte[] splitList;
-    try {
-      splitList = zooKeeperExt.getData(inputSplitPath, false, null);
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "getInputSplit: KeeperException on " + inputSplitPath, e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "getInputSplit: IllegalStateException on " + inputSplitPath, e);
-    }
-    context.progress();
-
+  protected InputSplit getInputSplit(byte[] serializedInputSplit)
+      throws IOException, ClassNotFoundException {
     DataInputStream inputStream =
-        new DataInputStream(new ByteArrayInputStream(splitList));
-    if (useLocality) {
-      Text.readString(inputStream); // location data unused here, skip
-    }
+        new DataInputStream(new ByteArrayInputStream(serializedInputSplit));
     InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
 
     if (LOG.isInfoEnabled()) {
-      LOG.info("getInputSplit: Reserved " + inputSplitPath +
-          " from ZooKeeper and got input split '" +
+      LOG.info("getInputSplit: Reserved input split '" +
           inputSplit.toString() + "'");
     }
     return inputSplit;

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
deleted file mode 100644
index e2099eb..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.worker;
-
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.Stat;
-
-import com.google.common.collect.Lists;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Stores the list of input split paths, and provides thread-safe way for
- * reserving input splits.
- */
-public class InputSplitsHandler implements Watcher  {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(InputSplitsHandler.class);
-
-  /** The List of InputSplit znode paths */
-  private final List<String> pathList;
-  /** Current position in the path list */
-  private final AtomicInteger currentIndex;
-  /** The worker's local ZooKeeperExt ref */
-  private final ZooKeeperExt zooKeeper;
-  /** Context for reporting progress */
-  private final Mapper<?, ?, ?, ?>.Context context;
-  /** ZooKeeper input split reserved node. */
-  private final String inputSplitReservedNode;
-  /** ZooKeeper input split finished node. */
-  private final String inputSplitFinishedNode;
-  /** Specifies if we finished execution of INPUT_SUPERSTEP. The variable may
-   * be accessed via different threads. */
-  private volatile boolean doneReadingGraph;
-
-  /**
-   * Constructor
-   *
-   * @param splitOrganizer Input splits organizer
-   * @param zooKeeper The worker's local ZooKeeperExt ref
-   * @param context Context for reporting progress
-   * @param inputSplitReservedNode ZooKeeper input split reserved node
-   * @param inputSplitFinishedNode ZooKeeper input split finished node
-   */
-  public InputSplitsHandler(InputSplitPathOrganizer splitOrganizer,
-      ZooKeeperExt zooKeeper, Mapper<?, ?, ?, ?>.Context context,
-      String inputSplitReservedNode, String inputSplitFinishedNode) {
-    this.pathList = Lists.newArrayList(splitOrganizer.getPathList());
-    this.currentIndex = new AtomicInteger(0);
-    this.zooKeeper = zooKeeper;
-    this.context = context;
-    this.inputSplitReservedNode = inputSplitReservedNode;
-    this.inputSplitFinishedNode = inputSplitFinishedNode;
-    this.doneReadingGraph = false;
-  }
-
-  public void setDoneReadingGraph(boolean doneReadingGraph) {
-    this.doneReadingGraph = doneReadingGraph;
-  }
-
-   /**
-   * Try to reserve an InputSplit for loading.  While InputSplits exists that
-   * are not finished, wait until they are.
-   *
-   * NOTE: iterations on the InputSplit list only halt for each worker when it
-   * has scanned the entire list once and found every split marked RESERVED.
-   * When a worker fails, its Ephemeral RESERVED znodes will disappear,
-   * allowing other iterating workers to claim it's previously read splits.
-   * Only when the last worker left iterating on the list fails can a danger
-   * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
-   * causes job failure, this is OK. As the failure model evolves, this
-   * behavior might need to change. We could add watches on
-   * inputSplitFinishedNodes and stop iterating only when all these nodes
-   * have been created.
-   *
-   * @return reserved InputSplit or null if no unfinished InputSplits exist
-   * @throws KeeperException
-   * @throws InterruptedException
-   */
-  public String reserveInputSplit() throws KeeperException,
-      InterruptedException {
-    String reservedInputSplitPath;
-    Stat reservedStat;
-    while (true) {
-      int splitToTry = currentIndex.getAndIncrement();
-      if (splitToTry >= pathList.size()) {
-        return null;
-      }
-      String nextSplitToClaim = pathList.get(splitToTry);
-      context.progress();
-      String tmpInputSplitReservedPath =
-          nextSplitToClaim + inputSplitReservedNode;
-      reservedStat =
-          zooKeeper.exists(tmpInputSplitReservedPath, this);
-      if (reservedStat == null) {
-        try {
-          // Attempt to reserve this InputSplit
-          zooKeeper.createExt(tmpInputSplitReservedPath,
-              null,
-              ZooDefs.Ids.OPEN_ACL_UNSAFE,
-              CreateMode.EPHEMERAL,
-              false);
-          reservedInputSplitPath = nextSplitToClaim;
-          if (LOG.isInfoEnabled()) {
-            float percentFinished =
-                splitToTry * 100.0f / pathList.size();
-            LOG.info("reserveInputSplit: Reserved input " +
-                "split path " + reservedInputSplitPath +
-                ", overall roughly " +
-                +percentFinished +
-                "% input splits reserved");
-          }
-          return reservedInputSplitPath;
-        } catch (KeeperException.NodeExistsException e) {
-          LOG.info("reserveInputSplit: Couldn't reserve " +
-              "(already reserved) inputSplit" +
-              " at " + tmpInputSplitReservedPath);
-        } catch (KeeperException e) {
-          throw new IllegalStateException(
-              "reserveInputSplit: KeeperException on reserve", e);
-        } catch (InterruptedException e) {
-          throw new IllegalStateException(
-              "reserveInputSplit: InterruptedException " +
-                  "on reserve", e);
-        }
-      }
-    }
-  }
-
-  /**
-   * Mark an input split path as completed by this worker.  This notifies
-   * the master and the other workers that this input split has not only
-   * been reserved, but also marked processed.
-   *
-   * @param inputSplitPath Path to the input split.
-   */
-  public void markInputSplitPathFinished(String inputSplitPath) {
-    String inputSplitFinishedPath =
-        inputSplitPath + inputSplitFinishedNode;
-    try {
-      zooKeeper.createExt(inputSplitFinishedPath,
-          null,
-          ZooDefs.Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT,
-          true);
-    } catch (KeeperException.NodeExistsException e) {
-      LOG.warn("markInputSplitPathFinished: " + inputSplitFinishedPath +
-          " already exists!");
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "markInputSplitPathFinished: KeeperException on " +
-              inputSplitFinishedPath, e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "markInputSplitPathFinished: InterruptedException on " +
-              inputSplitFinishedPath, e);
-    }
-  }
-
-  @Override
-  public void process(WatchedEvent event) {
-    if (event.getPath() == null) {
-      LOG.warn("process: Problem with zookeeper, got event with path null, " +
-          "state " + event.getState() + ", event type " + event.getType());
-      return;
-    }
-    // Check if the reservation for the input split was lost in INPUT_SUPERSTEP
-    // (some worker died). If INPUT_SUPERSTEP has already completed, we ignore
-    // this event.
-    if (event.getPath().endsWith(inputSplitReservedNode) &&
-        event.getType() == Watcher.Event.EventType.NodeDeleted &&
-        !doneReadingGraph) {
-      synchronized (pathList) {
-        String split = event.getPath();
-        split = split.substring(0, split.indexOf(inputSplitReservedNode));
-        pathList.add(split);
-        if (LOG.isInfoEnabled()) {
-          LOG.info("process: Input split " + split + " lost reservation");
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
index f6dca25..5ab3ba9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
@@ -19,15 +19,15 @@
 package org.apache.giraph.worker;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.io.MappingReader;
 import org.apache.giraph.mapping.MappingEntry;
 import org.apache.giraph.mapping.MappingStore;
-import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.giraph.io.InputType;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -46,7 +46,7 @@ import org.apache.hadoop.mapreduce.Mapper;
 @SuppressWarnings("unchecked")
 public class MappingInputSplitsCallable<I extends WritableComparable,
   V extends Writable, E extends Writable, B extends Writable>
-  extends FullInputSplitCallable<I, V, E> {
+  extends InputSplitsCallable<I, V, E> {
   /** User supplied mappingInputFormat */
   private final MappingInputFormat<I, V, E, B> mappingInputFormat;
   /** Link to bspServiceWorker */
@@ -56,23 +56,18 @@ public class MappingInputSplitsCallable<I extends WritableComparable,
    * Constructor
    *
    * @param mappingInputFormat mappingInputFormat
-   * @param splitOrganizer Input splits organizer
    * @param context Context
    * @param configuration Configuration
-   * @param zooKeeperExt Handle to ZooKeeperExt
-   * @param currentIndex Atomic Integer to get splitPath from list
    * @param bspServiceWorker bsp service worker
+   * @param splitsHandler Splits handler
    */
   public MappingInputSplitsCallable(
       MappingInputFormat<I, V, E, B> mappingInputFormat,
-      InputSplitPathOrganizer splitOrganizer,
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
-      ZooKeeperExt zooKeeperExt,
-      AtomicInteger currentIndex,
-      BspServiceWorker<I, V, E> bspServiceWorker) {
-    super(splitOrganizer, context,
-      configuration, zooKeeperExt, currentIndex);
+      BspServiceWorker<I, V, E> bspServiceWorker,
+      WorkerInputSplitsHandler splitsHandler) {
+    super(context, configuration, bspServiceWorker, splitsHandler);
     this.mappingInputFormat = mappingInputFormat;
     this.bspServiceWorker = bspServiceWorker;
   }
@@ -83,7 +78,12 @@ public class MappingInputSplitsCallable<I extends WritableComparable,
   }
 
   @Override
-  protected Integer readInputSplit(InputSplit inputSplit)
+  public InputType getInputType() {
+    return InputType.MAPPING;
+  }
+
+  @Override
+  protected VertexEdgeCount readInputSplit(InputSplit inputSplit)
     throws IOException, InterruptedException {
     MappingReader<I, V, E, B> mappingReader =
         mappingInputFormat.createMappingReader(inputSplit, context);
@@ -104,6 +104,6 @@ public class MappingInputSplitsCallable<I extends WritableComparable,
       entriesLoaded += 1;
       mappingStore.addEntry(entry.getVertexId(), entry.getMappingTarget());
     }
-    return entriesLoaded;
+    return new VertexEdgeCount(0, 0, entriesLoaded);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
index 21a981e..6cf702a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
@@ -19,15 +19,13 @@
 package org.apache.giraph.worker;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * Factory for {@link org.apache.giraph.worker.MappingInputSplitsCallable}s.
  *
@@ -38,59 +36,47 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class MappingInputSplitsCallableFactory<I extends WritableComparable,
   V extends Writable, E extends Writable, B extends Writable>
-  implements CallableFactory<Integer> {
+  implements CallableFactory<VertexEdgeCount> {
   /** Mapping input format */
   private final MappingInputFormat<I, V, E, B> mappingInputFormat;
-  /** Input split organizer */
-  private final InputSplitPathOrganizer splitOrganizer;
   /** Mapper context. */
   private final Mapper<?, ?, ?, ?>.Context context;
   /** Configuration. */
   private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
   /** {@link BspServiceWorker} we're running on. */
   private final BspServiceWorker<I, V, E> bspServiceWorker;
-  /** {@link ZooKeeperExt} for this worker. */
-  private final ZooKeeperExt zooKeeperExt;
-  /** Current position in the path list */
-  private final AtomicInteger currentIndex;
-
+  /** Handler for input splits */
+  private final WorkerInputSplitsHandler splitsHandler;
 
   /**
    * Constructor.
    *
    * @param mappingInputFormat Mapping input format
-   * @param splitOrganizer Input split organizer
    * @param context Mapper context
    * @param configuration Configuration
    * @param bspServiceWorker Calling {@link BspServiceWorker}
-   * @param zooKeeperExt {@link org.apache.giraph.zk.ZooKeeperExt}
-   *                     for this worker
+   * @param splitsHandler Splits handler
    */
   public MappingInputSplitsCallableFactory(
       MappingInputFormat<I, V, E, B> mappingInputFormat,
-      InputSplitPathOrganizer splitOrganizer,
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
       BspServiceWorker<I, V, E> bspServiceWorker,
-      ZooKeeperExt zooKeeperExt) {
+      WorkerInputSplitsHandler splitsHandler) {
     this.mappingInputFormat = mappingInputFormat;
-    this.splitOrganizer = splitOrganizer;
     this.context = context;
     this.configuration = configuration;
     this.bspServiceWorker = bspServiceWorker;
-    this.zooKeeperExt = zooKeeperExt;
-    this.currentIndex = new AtomicInteger(0);
+    this.splitsHandler = splitsHandler;
   }
 
   @Override
-  public FullInputSplitCallable<I, V, E> newCallable(int threadId) {
+  public InputSplitsCallable<I, V, E> newCallable(int threadId) {
     return new MappingInputSplitsCallable<>(
         mappingInputFormat,
-        splitOrganizer,
         context,
         configuration,
-        zooKeeperExt,
-        currentIndex,
-        bspServiceWorker);
+        bspServiceWorker,
+        splitsHandler);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 00a2781..540a6b4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -30,10 +30,10 @@ import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
 import org.apache.giraph.io.filters.VertexInputFilter;
 import org.apache.giraph.mapping.translate.TranslateEdge;
+import org.apache.giraph.io.InputType;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -99,17 +99,14 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
    * @param configuration Configuration
    * @param bspServiceWorker service worker
    * @param splitsHandler Handler for input splits
-   * @param zooKeeperExt Handle to ZooKeeperExt
    */
   public VertexInputSplitsCallable(
       VertexInputFormat<I, V, E> vertexInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
       BspServiceWorker<I, V, E> bspServiceWorker,
-      InputSplitsHandler splitsHandler,
-      ZooKeeperExt zooKeeperExt)  {
-    super(context, configuration, bspServiceWorker, splitsHandler,
-        zooKeeperExt);
+      WorkerInputSplitsHandler splitsHandler)  {
+    super(context, configuration, bspServiceWorker, splitsHandler);
     this.vertexInputFormat = vertexInputFormat;
 
     inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
@@ -136,6 +133,11 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
     return vertexInputFormat;
   }
 
+  @Override
+  public InputType getInputType() {
+    return InputType.VERTEX;
+  }
+
   /**
    * Read vertices from input split.  If testing, the user may request a
    * maximum number of vertices to be read from an input split.
@@ -274,7 +276,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
     WorkerProgress.get().incrementVertexInputSplitsLoaded();
 
     return new VertexEdgeCount(inputSplitVerticesLoaded,
-        inputSplitEdgesLoaded + edgesSinceLastUpdate);
+        inputSplitEdgesLoaded + edgesSinceLastUpdate, 0);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
index c9893d2..7aef3a7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
@@ -22,7 +22,6 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -46,9 +45,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
   /** {@link BspServiceWorker} we're running on. */
   private final BspServiceWorker<I, V, E> bspServiceWorker;
   /** Handler for input splits */
-  private final InputSplitsHandler splitsHandler;
-  /** {@link ZooKeeperExt} for this worker. */
-  private final ZooKeeperExt zooKeeperExt;
+  private final WorkerInputSplitsHandler splitsHandler;
 
   /**
    * Constructor.
@@ -58,20 +55,17 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
    * @param configuration Configuration
    * @param bspServiceWorker Calling {@link BspServiceWorker}
    * @param splitsHandler Handler for input splits
-   * @param zooKeeperExt {@link ZooKeeperExt} for this worker
    */
   public VertexInputSplitsCallableFactory(
       VertexInputFormat<I, V, E> vertexInputFormat,
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
       BspServiceWorker<I, V, E> bspServiceWorker,
-      InputSplitsHandler splitsHandler,
-      ZooKeeperExt zooKeeperExt) {
+      WorkerInputSplitsHandler splitsHandler) {
     this.vertexInputFormat = vertexInputFormat;
     this.context = context;
     this.configuration = configuration;
     this.bspServiceWorker = bspServiceWorker;
-    this.zooKeeperExt = zooKeeperExt;
     this.splitsHandler = splitsHandler;
   }
 
@@ -82,7 +76,6 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable,
         context,
         configuration,
         bspServiceWorker,
-        splitsHandler,
-        zooKeeperExt);
+        splitsHandler);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/5b0cd0e0/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java
new file mode 100644
index 0000000..0dc42b3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.comm.requests.AskForInputSplitRequest;
+import org.apache.giraph.io.InputType;
+
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Requests splits from master and keeps track of them
+ */
+public class WorkerInputSplitsHandler {
+  /** Worker info of this worker */
+  private final WorkerInfo workerInfo;
+  /** Task id of master */
+  private final int masterTaskId;
+  /** Worker client, used for communication */
+  private final WorkerClient workerClient;
+  /** Map with currently available splits received from master */
+  private final Map<InputType, BlockingQueue<byte[]>> availableInputSplits;
+
+  /**
+   * Constructor
+   *
+   * @param workerInfo   Worker info of this worker
+   * @param masterTaskId Task id of master
+   * @param workerClient Worker client, used for communication
+   */
+  public WorkerInputSplitsHandler(WorkerInfo workerInfo, int masterTaskId,
+      WorkerClient workerClient) {
+    this.workerInfo = workerInfo;
+    this.masterTaskId = masterTaskId;
+    this.workerClient = workerClient;
+    availableInputSplits = new EnumMap<>(InputType.class);
+    for (InputType inputType : InputType.values()) {
+      availableInputSplits.put(
+          inputType, new LinkedBlockingQueue<byte[]>());
+    }
+  }
+
+  /**
+   * Called when an input split has been received from master, adding it to
+   * the map
+   *
+   * @param splitType            Type of split
+   * @param serializedInputSplit Split
+   */
+  public void receivedInputSplit(InputType splitType,
+      byte[] serializedInputSplit) {
+    try {
+      availableInputSplits.get(splitType).put(serializedInputSplit);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Interrupted", e);
+    }
+  }
+
+  /**
+   * Try to reserve an InputSplit for loading.  While InputSplits exists that
+   * are not finished, wait until they are.
+   *
+   * NOTE: iterations on the InputSplit list only halt for each worker when it
+   * has scanned the entire list once and found every split marked RESERVED.
+   * When a worker fails, its Ephemeral RESERVED znodes will disappear,
+   * allowing other iterating workers to claim it's previously read splits.
+   * Only when the last worker left iterating on the list fails can a danger
+   * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
+   * causes job failure, this is OK. As the failure model evolves, this
+   * behavior might need to change. We could add watches on
+   * inputSplitFinishedNodes and stop iterating only when all these nodes
+   * have been created.
+   *
+   * @param splitType Type of split
+   * @return reserved InputSplit or null if no unfinished InputSplits exist
+   */
+  public byte[] reserveInputSplit(InputType splitType) {
+    // Send request
+    workerClient.sendWritableRequest(masterTaskId,
+        new AskForInputSplitRequest(splitType, workerInfo.getTaskId()));
+    try {
+      // Wait for some split to become available
+      byte[] serializedInputSplit = availableInputSplits.get(splitType).take();
+      return serializedInputSplit.length == 0 ? null : serializedInputSplit;
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Interrupted", e);
+    }
+  }
+}