You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/04 21:52:39 UTC

[4/23] GIRAPH-409: Refactor / cleanups (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
new file mode 100644
index 0000000..ec4780e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.InputSplitEvents;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.time.SystemTime;
+import org.apache.giraph.time.Time;
+import org.apache.giraph.time.Times;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract base class for loading vertex/edge input splits.
+ * Every thread will has its own instance of WorkerClientRequestProcessor
+ * to send requests.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public abstract class InputSplitsCallable<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements Callable<VertexEdgeCount> {
+  /** Name of counter for vertices loaded */
+  public static final String COUNTER_VERTICES_LOADED = "vertices-loaded";
+  /** Name of counter for edges loaded */
+  public static final String COUNTER_EDGES_LOADED = "edges-loaded";
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(InputSplitsCallable.class);
+  /** Class time object */
+  private static final Time TIME = SystemTime.get();
+  /** Configuration */
+  protected final ImmutableClassesGiraphConfiguration<I, V, E, M>
+  configuration;
+  /** Context */
+  protected final Mapper<?, ?, ?, ?>.Context context;
+  /** Graph state */
+  private final GraphState<I, V, E, M> graphState;
+  /** Handles IPC communication */
+  private final WorkerClientRequestProcessor<I, V, E, M>
+  workerClientRequestProcessor;
+  /**
+   * Stores and processes the list of InputSplits advertised
+   * in a tree of child znodes by the master.
+   */
+  private final InputSplitPathOrganizer splitOrganizer;
+  /** ZooKeeperExt handle */
+  private final ZooKeeperExt zooKeeperExt;
+  /** Get the start time in nanos */
+  private final long startNanos = TIME.getNanoseconds();
+  /** ZooKeeper input split reserved node. */
+  private final String inputSplitReservedNode;
+  /** ZooKeeper input split finished node. */
+  private final String inputSplitFinishedNode;
+  /** Input split events. */
+  private final InputSplitEvents inputSplitEvents;
+
+  // CHECKSTYLE: stop ParameterNumberCheck
+  /**
+   * Constructor.
+   *
+   * @param context Context
+   * @param graphState Graph state
+   * @param configuration Configuration
+   * @param bspServiceWorker service worker
+   * @param inputSplitPathList List of the paths of the input splits
+   * @param workerInfo This worker's info
+   * @param zooKeeperExt Handle to ZooKeeperExt
+   * @param inputSplitReservedNode Path to input split reserved
+   * @param inputSplitFinishedNode Path to input split finsished
+   * @param inputSplitEvents Input split events
+   */
+  public InputSplitsCallable(
+      Mapper<?, ?, ?, ?>.Context context,
+      GraphState<I, V, E, M> graphState,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      BspServiceWorker<I, V, E, M> bspServiceWorker,
+      List<String> inputSplitPathList,
+      WorkerInfo workerInfo,
+      ZooKeeperExt zooKeeperExt,
+      String inputSplitReservedNode,
+      String inputSplitFinishedNode,
+      InputSplitEvents inputSplitEvents) {
+    this.zooKeeperExt = zooKeeperExt;
+    this.context = context;
+    this.workerClientRequestProcessor =
+        new NettyWorkerClientRequestProcessor<I, V, E, M>(
+            context, configuration, bspServiceWorker);
+    this.graphState = new GraphState<I, V, E, M>(graphState.getSuperstep(),
+        graphState.getTotalNumVertices(), graphState.getTotalNumEdges(),
+        context, graphState.getGraphMapper(), workerClientRequestProcessor,
+        null);
+    try {
+      splitOrganizer = new InputSplitPathOrganizer(zooKeeperExt,
+          inputSplitPathList, workerInfo.getHostname(), workerInfo.getPort());
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "InputSplitsCallable: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "InputSplitsCallable: InterruptedException", e);
+    }
+    this.configuration = configuration;
+    this.inputSplitReservedNode = inputSplitReservedNode;
+    this.inputSplitFinishedNode = inputSplitFinishedNode;
+    this.inputSplitEvents = inputSplitEvents;
+  }
+  // CHECKSTYLE: resume ParameterNumberCheck
+
+  /**
+   * Load vertices/edges from the given input split.
+   *
+   * @param inputSplit Input split to load
+   * @param graphState Graph state
+   * @return Count of vertices and edges loaded
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  protected abstract VertexEdgeCount readInputSplit(
+      InputSplit inputSplit,
+      GraphState<I, V, E, M> graphState)
+    throws IOException, InterruptedException;
+
+  @Override
+  public VertexEdgeCount call() {
+    VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
+    String inputSplitPath;
+    int inputSplitsProcessed = 0;
+    try {
+      while ((inputSplitPath = reserveInputSplit()) != null) {
+        vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
+            loadInputSplit(inputSplitPath,
+                graphState));
+        context.progress();
+        ++inputSplitsProcessed;
+      }
+    } catch (KeeperException e) {
+      throw new IllegalStateException("call: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("call: InterruptedException", e);
+    } catch (IOException e) {
+      throw new IllegalStateException("call: IOException", e);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException("call: ClassNotFoundException", e);
+    } catch (InstantiationException e) {
+      throw new IllegalStateException("call: InstantiationException", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException("call: IllegalAccessException", e);
+    }
+
+    if (LOG.isInfoEnabled()) {
+      float seconds = Times.getNanosSince(TIME, startNanos) /
+          Time.NS_PER_SECOND_AS_FLOAT;
+      float verticesPerSecond = vertexEdgeCount.getVertexCount() / seconds;
+      float edgesPerSecond = vertexEdgeCount.getEdgeCount() / seconds;
+      LOG.info("call: Loaded " + inputSplitsProcessed + " " +
+          "input splits in " + seconds + " secs, " + vertexEdgeCount +
+          " " + verticesPerSecond + " vertices/sec, " +
+          edgesPerSecond + " edges/sec");
+    }
+    try {
+      workerClientRequestProcessor.flush();
+    } catch (IOException e) {
+      throw new IllegalStateException("call: Flushing failed.", e);
+    }
+    return vertexEdgeCount;
+  }
+
+  /**
+   * Try to reserve an InputSplit for loading.  While InputSplits exists that
+   * are not finished, wait until they are.
+   *
+   * NOTE: iterations on the InputSplit list only halt for each worker when it
+   * has scanned the entire list once and found every split marked RESERVED.
+   * When a worker fails, its Ephemeral RESERVED znodes will disappear,
+   * allowing other iterating workers to claim it's previously read splits.
+   * Only when the last worker left iterating on the list fails can a danger
+   * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
+   * causes job failure, this is OK. As the failure model evolves, this
+   * behavior might need to change.
+   *
+   * @return reserved InputSplit or null if no unfinished InputSplits exist
+   * @throws org.apache.zookeeper.KeeperException
+   * @throws InterruptedException
+   */
+  private String reserveInputSplit()
+    throws KeeperException, InterruptedException {
+    String reservedInputSplitPath = null;
+    Stat reservedStat;
+    while (true) {
+      int reservedInputSplits = 0;
+      for (String nextSplitToClaim : splitOrganizer) {
+        context.progress();
+        String tmpInputSplitReservedPath = nextSplitToClaim +
+            inputSplitReservedNode;
+        reservedStat =
+            zooKeeperExt.exists(tmpInputSplitReservedPath, true);
+        if (reservedStat == null) {
+          try {
+            // Attempt to reserve this InputSplit
+            zooKeeperExt.createExt(tmpInputSplitReservedPath,
+                null,
+                ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL,
+                false);
+            reservedInputSplitPath = nextSplitToClaim;
+            if (LOG.isInfoEnabled()) {
+              float percentFinished =
+                  reservedInputSplits * 100.0f /
+                      splitOrganizer.getPathListSize();
+              LOG.info("reserveInputSplit: Reserved input " +
+                  "split path " + reservedInputSplitPath +
+                  ", overall roughly " +
+                  + percentFinished +
+                  "% input splits reserved");
+            }
+            return reservedInputSplitPath;
+          } catch (KeeperException.NodeExistsException e) {
+            LOG.info("reserveInputSplit: Couldn't reserve " +
+                "(already reserved) inputSplit" +
+                " at " + tmpInputSplitReservedPath);
+          } catch (KeeperException e) {
+            throw new IllegalStateException(
+                "reserveInputSplit: KeeperException on reserve", e);
+          } catch (InterruptedException e) {
+            throw new IllegalStateException(
+                "reserveInputSplit: InterruptedException " +
+                    "on reserve", e);
+          }
+        } else {
+          ++reservedInputSplits;
+        }
+      }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("reserveInputSplit: reservedPath = " +
+            reservedInputSplitPath + ", " + reservedInputSplits +
+            " of " + splitOrganizer.getPathListSize() +
+            " InputSplits are finished.");
+      }
+      if (reservedInputSplits == splitOrganizer.getPathListSize()) {
+        return null;
+      }
+      context.progress();
+      // Wait for either a reservation to go away or a notification that
+      // an InputSplit has finished.
+      context.progress();
+      inputSplitEvents.getStateChanged().waitMsecs(
+          60 * 1000);
+      inputSplitEvents.getStateChanged().reset();
+    }
+  }
+
+  /**
+   * Mark an input split path as completed by this worker.  This notifies
+   * the master and the other workers that this input split has not only
+   * been reserved, but also marked processed.
+   *
+   * @param inputSplitPath Path to the input split.
+   */
+  private void markInputSplitPathFinished(String inputSplitPath) {
+    String inputSplitFinishedPath =
+        inputSplitPath + inputSplitFinishedNode;
+    try {
+      zooKeeperExt.createExt(inputSplitFinishedPath,
+          null,
+          ZooDefs.Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.warn("markInputSplitPathFinished: " + inputSplitFinishedPath +
+          " already exists!");
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "markInputSplitPathFinished: KeeperException on " +
+              inputSplitFinishedPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "markInputSplitPathFinished: InterruptedException on " +
+              inputSplitFinishedPath, e);
+    }
+  }
+
+  /**
+   * Extract vertices from input split, saving them into a mini cache of
+   * partitions.  Periodically flush the cache of vertices when a limit is
+   * reached in readVerticeFromInputSplit.
+   * Mark the input split finished when done.
+   *
+   * @param inputSplitPath ZK location of input split
+   * @param graphState Current graph state
+   * @return Mapping of vertex indices and statistics, or null if no data read
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   * @throws InstantiationException
+   * @throws IllegalAccessException
+   */
+  private VertexEdgeCount loadInputSplit(
+      String inputSplitPath,
+      GraphState<I, V, E, M> graphState)
+    throws IOException, ClassNotFoundException, InterruptedException,
+      InstantiationException, IllegalAccessException {
+    InputSplit inputSplit = getInputSplit(inputSplitPath);
+    VertexEdgeCount vertexEdgeCount =
+        readInputSplit(inputSplit, graphState);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("loadFromInputSplit: Finished loading " +
+          inputSplitPath + " " + vertexEdgeCount);
+    }
+    markInputSplitPathFinished(inputSplitPath);
+    return vertexEdgeCount;
+  }
+
+  /**
+   * Talk to ZooKeeper to convert the input split path to the actual
+   * InputSplit.
+   *
+   * @param inputSplitPath Location in ZK of input split
+   * @return instance of InputSplit
+   * @throws IOException
+   * @throws ClassNotFoundException
+   */
+  protected InputSplit getInputSplit(String inputSplitPath)
+    throws IOException, ClassNotFoundException {
+    byte[] splitList;
+    try {
+      splitList = zooKeeperExt.getData(inputSplitPath, false, null);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "getInputSplit: KeeperException on " + inputSplitPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "getInputSplit: IllegalStateException on " + inputSplitPath, e);
+    }
+    context.progress();
+
+    DataInputStream inputStream =
+        new DataInputStream(new ByteArrayInputStream(splitList));
+    Text.readString(inputStream); // location data unused here, skip
+    String inputSplitClass = Text.readString(inputStream);
+    InputSplit inputSplit = (InputSplit)
+        ReflectionUtils.newInstance(
+            configuration.getClassByName(inputSplitClass),
+            configuration);
+    ((Writable) inputSplit).readFields(inputStream);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("getInputSplit: Reserved " + inputSplitPath +
+          " from ZooKeeper and got input split '" +
+          inputSplit.toString() + "'");
+    }
+    return inputSplit;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
new file mode 100644
index 0000000..9e8bc32
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Factory class for creating {@link InputSplitsCallable}s.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public interface InputSplitsCallableFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Return a newly-created {@link InputSplitsCallable}.
+   *
+   * @return A new {@link InputSplitsCallable}
+   */
+  InputSplitsCallable<I, V, E, M> newCallable();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
new file mode 100644
index 0000000..83c8b41
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphMetricsRegistry;
+import org.apache.giraph.utils.LoggerUtils;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.yammer.metrics.core.Counter;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Load as many vertex input splits as possible.
+ * Every thread will has its own instance of WorkerClientRequestProcessor
+ * to send requests.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class VertexInputSplitsCallable<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends InputSplitsCallable<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(VertexInputSplitsCallable.class);
+  /** Total vertices loaded */
+  private long totalVerticesLoaded = 0;
+  /** Total edges loaded */
+  private long totalEdgesLoaded = 0;
+  /** Input split max vertices (-1 denotes all) */
+  private final long inputSplitMaxVertices;
+  /** Bsp service worker (only use thread-safe methods) */
+  private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+
+  // Metrics
+  /** number of vertices loaded counter */
+  private final Counter verticesLoadedCounter;
+  /** number of edges loaded counter */
+  private final Counter edgesLoadedCounter;
+
+  /**
+   * Constructor.
+   *
+   * @param context Context
+   * @param graphState Graph state
+   * @param configuration Configuration
+   * @param bspServiceWorker service worker
+   * @param inputSplitPathList List of the paths of the input splits
+   * @param workerInfo This worker's info
+   * @param zooKeeperExt Handle to ZooKeeperExt
+   */
+  public VertexInputSplitsCallable(
+      Mapper<?, ?, ?, ?>.Context context,
+      GraphState<I, V, E, M> graphState,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      BspServiceWorker<I, V, E, M> bspServiceWorker,
+      List<String> inputSplitPathList,
+      WorkerInfo workerInfo,
+      ZooKeeperExt zooKeeperExt)  {
+    super(context, graphState, configuration, bspServiceWorker,
+        inputSplitPathList, workerInfo, zooKeeperExt,
+        BspServiceWorker.VERTEX_INPUT_SPLIT_RESERVED_NODE,
+        BspServiceWorker.VERTEX_INPUT_SPLIT_FINISHED_NODE,
+        bspServiceWorker.getVertexInputSplitsEvents());
+
+    inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
+    this.bspServiceWorker = bspServiceWorker;
+
+    // Initialize Metrics
+    GiraphMetricsRegistry jobMetrics = GiraphMetrics.get().perJob();
+    verticesLoadedCounter = jobMetrics.getCounter(COUNTER_VERTICES_LOADED);
+    edgesLoadedCounter = jobMetrics.getCounter(COUNTER_EDGES_LOADED);
+  }
+
+  /**
+   * Read vertices from input split.  If testing, the user may request a
+   * maximum number of vertices to be read from an input split.
+   *
+   * @param inputSplit Input split to process with vertex reader
+   * @param graphState Current graph state
+   * @return Vertices and edges loaded from this input split
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  protected VertexEdgeCount readInputSplit(
+      InputSplit inputSplit,
+      GraphState<I, V, E, M> graphState)
+    throws IOException, InterruptedException {
+    VertexInputFormat<I, V, E, M> vertexInputFormat =
+        configuration.createVertexInputFormat();
+    VertexReader<I, V, E, M> vertexReader =
+        vertexInputFormat.createVertexReader(inputSplit, context);
+    vertexReader.initialize(inputSplit, context);
+    long inputSplitVerticesLoaded = 0;
+    long inputSplitEdgesLoaded = 0;
+    while (vertexReader.nextVertex()) {
+      Vertex<I, V, E, M> readerVertex =
+          vertexReader.getCurrentVertex();
+      if (readerVertex.getId() == null) {
+        throw new IllegalArgumentException(
+            "readInputSplit: Vertex reader returned a vertex " +
+                "without an id!  - " + readerVertex);
+      }
+      if (readerVertex.getValue() == null) {
+        readerVertex.setValue(configuration.createVertexValue());
+      }
+      readerVertex.setConf(configuration);
+      readerVertex.setGraphState(graphState);
+
+      PartitionOwner partitionOwner =
+          bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
+      graphState.getWorkerClientRequestProcessor().sendVertexRequest(
+          partitionOwner, readerVertex);
+      context.progress(); // do this before potential data transfer
+      ++inputSplitVerticesLoaded;
+      inputSplitEdgesLoaded += readerVertex.getNumEdges();
+
+      // Update status every 250k vertices
+      if (((inputSplitVerticesLoaded + totalVerticesLoaded) % 250000) == 0) {
+        LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
+            "readInputSplit: Loaded " +
+                (inputSplitVerticesLoaded + totalVerticesLoaded) +
+                " vertices " +
+                (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " +
+                MemoryUtils.getRuntimeMemoryStats());
+      }
+
+      // For sampling, or to limit outlier input splits, the number of
+      // records per input split can be limited
+      if (inputSplitMaxVertices > 0 &&
+          inputSplitVerticesLoaded >= inputSplitMaxVertices) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("readInputSplit: Leaving the input " +
+              "split early, reached maximum vertices " +
+              inputSplitVerticesLoaded);
+        }
+        break;
+      }
+    }
+    vertexReader.close();
+    totalVerticesLoaded += inputSplitVerticesLoaded;
+    verticesLoadedCounter.inc(inputSplitVerticesLoaded);
+    totalEdgesLoaded += inputSplitEdgesLoaded;
+    edgesLoadedCounter.inc(inputSplitEdgesLoaded);
+    return new VertexEdgeCount(
+        inputSplitVerticesLoaded, inputSplitEdgesLoaded);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
new file mode 100644
index 0000000..4bec931
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.util.List;
+
+/**
+ * Factory for {@link VertexInputSplitsCallable}s.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class VertexInputSplitsCallableFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements InputSplitsCallableFactory<I, V, E, M> {
+  /** Mapper context. */
+  private final Mapper<?, ?, ?, ?>.Context context;
+  /** Graph state. */
+  private final GraphState<I, V, E, M> graphState;
+  /** Configuration. */
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+  /** {@link BspServiceWorker} we're running on. */
+  private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+  /** List of input split paths. */
+  private final List<String> inputSplitPathList;
+  /** Worker info. */
+  private final WorkerInfo workerInfo;
+  /** {@link ZooKeeperExt} for this worker. */
+  private final ZooKeeperExt zooKeeperExt;
+
+  /**
+   * Constructor.
+   *
+   * @param context Mapper context
+   * @param graphState Graph state
+   * @param configuration Configuration
+   * @param bspServiceWorker Calling {@link BspServiceWorker}
+   * @param inputSplitPathList List of input split paths
+   * @param workerInfo Worker info
+   * @param zooKeeperExt {@link ZooKeeperExt} for this worker
+   */
+  public VertexInputSplitsCallableFactory(
+      Mapper<?, ?, ?, ?>.Context context,
+      GraphState<I, V, E, M> graphState,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      BspServiceWorker<I, V, E, M> bspServiceWorker,
+      List<String> inputSplitPathList,
+      WorkerInfo workerInfo,
+      ZooKeeperExt zooKeeperExt) {
+    this.context = context;
+    this.graphState = graphState;
+    this.configuration = configuration;
+    this.bspServiceWorker = bspServiceWorker;
+    this.inputSplitPathList = inputSplitPathList;
+    this.workerInfo = workerInfo;
+    this.zooKeeperExt = zooKeeperExt;
+  }
+
+  @Override
+  public InputSplitsCallable<I, V, E, M> newCallable() {
+    return new VertexInputSplitsCallable<I, V, E, M>(
+        context,
+        graphState,
+        configuration,
+        bspServiceWorker,
+        inputSplitPathList,
+        workerInfo,
+        zooKeeperExt);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
new file mode 100644
index 0000000..cafd17b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
+import org.apache.giraph.comm.aggregators.AggregatedValueOutputStream;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
+import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Handler for aggregators on worker. Provides the aggregated values and
+ * performs aggregations from user vertex code (thread-safe). Also has
+ * methods for all superstep coordination related to aggregators.
+ *
+ * At the beginning of any superstep any worker calls prepareSuperstep(),
+ * which blocks until the final aggregates from the previous superstep have
+ * been delivered to the worker.
+ * Next, during the superstep worker can call aggregate() and
+ * getAggregatedValue() (both methods are thread safe) the former
+ * computes partial aggregates for this superstep from the worker,
+ * the latter returns (read-only) final aggregates from the previous superstep.
+ * Finally, at the end of the superstep, the worker calls finishSuperstep(),
+ * which propagates non-owned partial aggregates to the owner workers,
+ * and sends the final aggregate from the owner worker to the master.
+ */
+public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(WorkerAggregatorHandler.class);
+  /** Map of values from previous superstep */
+  private Map<String, Writable> previousAggregatedValueMap =
+      Maps.newHashMap();
+  /** Map of aggregators for current superstep */
+  private Map<String, Aggregator<Writable>> currentAggregatorMap =
+      Maps.newHashMap();
+  /** Service worker */
+  private final CentralizedServiceWorker<?, ?, ?, ?> serviceWorker;
+  /** Progressable for reporting progress */
+  private final Progressable progressable;
+  /** How big a single aggregator request can be */
+  private final int maxBytesPerAggregatorRequest;
+  /** Giraph configuration */
+  private final ImmutableClassesGiraphConfiguration conf;
+
+  /**
+   * Constructor
+   *
+   * @param serviceWorker Service worker
+   * @param conf          Giraph configuration
+   * @param progressable  Progressable for reporting progress
+   */
+  public WorkerAggregatorHandler(
+      CentralizedServiceWorker<?, ?, ?, ?> serviceWorker,
+      ImmutableClassesGiraphConfiguration conf,
+      Progressable progressable) {
+    this.serviceWorker = serviceWorker;
+    this.progressable = progressable;
+    this.conf = conf;
+    maxBytesPerAggregatorRequest = conf.getInt(
+        AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST,
+        AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT);
+  }
+
+  @Override
+  public <A extends Writable> void aggregate(String name, A value) {
+    Aggregator<Writable> aggregator = currentAggregatorMap.get(name);
+    if (aggregator != null) {
+      synchronized (aggregator) {
+        aggregator.aggregate(value);
+      }
+    } else {
+      throw new IllegalStateException("aggregate: Tried to aggregate value " +
+          "to unregistered aggregator " + name);
+    }
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return (A) previousAggregatedValueMap.get(name);
+  }
+
+  /**
+   * Prepare aggregators for current superstep
+   *
+   * @param requestProcessor Request processor for aggregators
+   */
+  public void prepareSuperstep(
+      WorkerAggregatorRequestProcessor requestProcessor) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("prepareSuperstep: Start preparing aggregators");
+    }
+    AllAggregatorServerData allAggregatorData =
+        serviceWorker.getServerData().getAllAggregatorData();
+    // Wait for my aggregators
+    Iterable<byte[]> dataToDistribute =
+        allAggregatorData.getDataFromMasterWhenReady();
+    try {
+      // Distribute my aggregators
+      requestProcessor.distributeAggregators(dataToDistribute);
+    } catch (IOException e) {
+      throw new IllegalStateException("prepareSuperstep: " +
+          "IOException occurred while trying to distribute aggregators", e);
+    }
+    // Wait for all other aggregators and store them
+    allAggregatorData.fillNextSuperstepMapsWhenReady(
+        serviceWorker.getWorkerInfoList().size(), previousAggregatedValueMap,
+        currentAggregatorMap);
+    allAggregatorData.reset();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("prepareSuperstep: Aggregators prepared");
+    }
+  }
+
+  /**
+   * Send aggregators to their owners and in the end to the master
+   *
+   * @param requestProcessor Request processor for aggregators
+   */
+  public void finishSuperstep(
+      WorkerAggregatorRequestProcessor requestProcessor) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("finishSuperstep: Start finishing aggregators");
+    }
+    OwnerAggregatorServerData ownerAggregatorData =
+        serviceWorker.getServerData().getOwnerAggregatorData();
+    // First send partial aggregated values to their owners and determine
+    // which aggregators belong to this worker
+    for (Map.Entry<String, Aggregator<Writable>> entry :
+        currentAggregatorMap.entrySet()) {
+      try {
+        boolean sent = requestProcessor.sendAggregatedValue(entry.getKey(),
+            entry.getValue().getAggregatedValue());
+        if (!sent) {
+          // If it's my aggregator, add it directly
+          ownerAggregatorData.aggregate(entry.getKey(),
+              entry.getValue().getAggregatedValue());
+        }
+      } catch (IOException e) {
+        throw new IllegalStateException("finishSuperstep: " +
+            "IOException occurred while sending aggregator " +
+            entry.getKey() + " to its owner", e);
+      }
+      progressable.progress();
+    }
+    try {
+      // Flush
+      requestProcessor.flush();
+    } catch (IOException e) {
+      throw new IllegalStateException("finishSuperstep: " +
+          "IOException occurred while sending aggregators to owners", e);
+    }
+
+    // Wait to receive partial aggregated values from all other workers
+    Iterable<Map.Entry<String, Writable>> myAggregators =
+        ownerAggregatorData.getMyAggregatorValuesWhenReady(
+            serviceWorker.getWorkerInfoList().size());
+
+    // Send final aggregated values to master
+    AggregatedValueOutputStream aggregatorOutput =
+        new AggregatedValueOutputStream();
+    for (Map.Entry<String, Writable> entry : myAggregators) {
+      try {
+        int currentSize = aggregatorOutput.addAggregator(entry.getKey(),
+            entry.getValue());
+        if (currentSize > maxBytesPerAggregatorRequest) {
+          requestProcessor.sendAggregatedValuesToMaster(
+              aggregatorOutput.flush());
+        }
+        progressable.progress();
+      } catch (IOException e) {
+        throw new IllegalStateException("finishSuperstep: " +
+            "IOException occurred while writing aggregator " +
+            entry.getKey(), e);
+      }
+    }
+    try {
+      requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush());
+    } catch (IOException e) {
+      throw new IllegalStateException("finishSuperstep: " +
+          "IOException occured while sending aggregators to master", e);
+    }
+    // Wait for master to receive aggregated values before proceeding
+    serviceWorker.getWorkerClient().waitAllRequests();
+
+    ownerAggregatorData.reset();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("finishSuperstep: Aggregators finished");
+    }
+  }
+
+  /**
+   * Create new aggregator usage which will be used by one of the compute
+   * threads.
+   *
+   * @return New aggregator usage
+   */
+  public WorkerThreadAggregatorUsage newThreadAggregatorUsage() {
+    if (AggregatorUtils.useThreadLocalAggregators(conf)) {
+      return new ThreadLocalWorkerAggregatorUsage();
+    } else {
+      return this;
+    }
+  }
+
+  @Override
+  public void finishThreadComputation() {
+    // If we don't use thread-local aggregators, all the aggregated values
+    // are already in this object
+  }
+
+  /**
+   * Not thread-safe implementation of {@link WorkerThreadAggregatorUsage}.
+   * We can use one instance of this object per thread to prevent
+   * synchronizing on each aggregate() call. In the end of superstep,
+   * values from each of these will be aggregated back to {@link
+   * WorkerAggregatorHandler}
+   */
+  public class ThreadLocalWorkerAggregatorUsage
+      implements WorkerThreadAggregatorUsage {
+    /** Thread-local aggregator map */
+    private final Map<String, Aggregator<Writable>> threadAggregatorMap;
+
+    /**
+     * Constructor
+     *
+     * Creates new instances of all aggregators from
+     * {@link WorkerAggregatorHandler}
+     */
+    public ThreadLocalWorkerAggregatorUsage() {
+      threadAggregatorMap = Maps.newHashMapWithExpectedSize(
+          WorkerAggregatorHandler.this.currentAggregatorMap.size());
+      for (Map.Entry<String, Aggregator<Writable>> entry :
+          WorkerAggregatorHandler.this.currentAggregatorMap.entrySet()) {
+        threadAggregatorMap.put(entry.getKey(),
+            AggregatorUtils.newAggregatorInstance(
+                (Class<Aggregator<Writable>>) entry.getValue().getClass()));
+      }
+    }
+
+    @Override
+    public <A extends Writable> void aggregate(String name, A value) {
+      Aggregator<Writable> aggregator = threadAggregatorMap.get(name);
+      if (aggregator != null) {
+        aggregator.aggregate(value);
+      } else {
+        throw new IllegalStateException("aggregate: " +
+            "Tried to aggregate value to unregistered aggregator " + name);
+      }
+    }
+
+    @Override
+    public <A extends Writable> A getAggregatedValue(String name) {
+      return WorkerAggregatorHandler.this.<A>getAggregatedValue(name);
+    }
+
+    @Override
+    public void finishThreadComputation() {
+      // Aggregate the values this thread's vertices provided back to
+      // WorkerAggregatorHandler
+      for (Map.Entry<String, Aggregator<Writable>> entry :
+          threadAggregatorMap.entrySet()) {
+        WorkerAggregatorHandler.this.aggregate(entry.getKey(),
+            entry.getValue().getAggregatedValue());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorUsage.java
new file mode 100644
index 0000000..75cc6dc
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorUsage.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Vertex classes can access and change aggregators through this interface
+ */
+public interface WorkerAggregatorUsage {
+  /**
+   * Add a new value
+   *
+   * @param name Name of aggregator
+   * @param value Value to add
+   * @param <A> Aggregated value
+   */
+  <A extends Writable> void aggregate(String name, A value);
+
+  /**
+   * Get value of an aggregator.
+   *
+   * @param name Name of aggregator
+   * @param <A> Aggregated value
+   * @return Value of the aggregator
+   */
+  <A extends Writable> A getAggregatedValue(String name);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
new file mode 100644
index 0000000..d3ffaea
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.graph.GraphState;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * WorkerContext allows for the execution of user code
+ * on a per-worker basis. There's one WorkerContext per worker.
+ */
+@SuppressWarnings("rawtypes")
+public abstract class WorkerContext implements WorkerAggregatorUsage {
+  /** Global graph state */
+  private GraphState graphState;
+
+  /**
+   * Set the graph state.
+   *
+   *  @param graphState Used to set the graph state.
+   */
+  public void setGraphState(GraphState graphState) {
+    this.graphState = graphState;
+  }
+
+  /**
+   * Initialize the WorkerContext.
+   * This method is executed once on each Worker before the first
+   * superstep starts.
+   *
+   * @throws IllegalAccessException Thrown for getting the class
+   * @throws InstantiationException Expected instantiation in this method.
+   */
+  public abstract void preApplication() throws InstantiationException,
+    IllegalAccessException;
+
+  /**
+   * Finalize the WorkerContext.
+   * This method is executed once on each Worker after the last
+   * superstep ends.
+   */
+  public abstract void postApplication();
+
+  /**
+   * Execute user code.
+   * This method is executed once on each Worker before each
+   * superstep starts.
+   */
+  public abstract void preSuperstep();
+
+  /**
+   * Execute user code.
+   * This method is executed once on each Worker after each
+   * superstep ends.
+   */
+  public abstract void postSuperstep();
+
+  /**
+   * Retrieves the current superstep.
+   *
+   * @return Current superstep
+   */
+  public long getSuperstep() {
+    return graphState.getSuperstep();
+  }
+
+  /**
+   * Get the total (all workers) number of vertices that
+   * existed in the previous superstep.
+   *
+   * @return Total number of vertices (-1 if first superstep)
+   */
+  public long getTotalNumVertices() {
+    return graphState.getTotalNumVertices();
+  }
+
+  /**
+   * Get the total (all workers) number of edges that
+   * existed in the previous superstep.
+   *
+   * @return Total number of edges (-1 if first superstep)
+   */
+  public long getTotalNumEdges() {
+    return graphState.getTotalNumEdges();
+  }
+
+  /**
+   * Get the mapper context
+   *
+   * @return Mapper context
+   */
+  public Mapper.Context getContext() {
+    return graphState.getContext();
+  }
+
+  @Override
+  public <A extends Writable> void aggregate(String name, A value) {
+    graphState.getWorkerAggregatorUsage().aggregate(name, value);
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return graphState.getWorkerAggregatorUsage().<A>getAggregatedValue(name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInfo.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInfo.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInfo.java
new file mode 100644
index 0000000..265d676
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.graph.TaskInfo;
+
+/**
+ * Information about a worker that is sent to the master and other workers.
+ */
+public class WorkerInfo extends TaskInfo {
+  /**
+   * Constructor for reflection
+   */
+  public WorkerInfo() {
+  }
+
+  @Override
+  public String toString() {
+    return "Worker(" + super.toString() + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java
new file mode 100644
index 0000000..194127e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+/**
+ * {@link WorkerAggregatorUsage} which can be used in each of the
+ * computation threads.
+ */
+public interface WorkerThreadAggregatorUsage extends WorkerAggregatorUsage {
+  /**
+   * Call this after thread's computation is finished,
+   * i.e. when all vertices have provided their values to aggregators
+   */
+  void finishThreadComputation();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/package-info.java b/giraph-core/src/main/java/org/apache/giraph/worker/package-info.java
new file mode 100644
index 0000000..9c37caa
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of all the worker related things
+ */
+package org.apache.giraph.worker;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java b/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
index eb61ea4..4937f82 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
@@ -23,8 +23,8 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.giraph.utils.SystemTime;
-import org.apache.giraph.utils.Time;
+import org.apache.giraph.time.SystemTime;
+import org.apache.giraph.time.Time;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/BspCase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/BspCase.java b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
index e0c3af4..f43efe9 100644
--- a/giraph-core/src/test/java/org/apache/giraph/BspCase.java
+++ b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
@@ -234,7 +234,7 @@ public class BspCase implements Watcher {
    *
    *  @return whether we use a real hadoop instance or not
    */
-  boolean runningInDistributedMode() {
+  public boolean runningInDistributedMode() {
     return jobTracker != null;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
index 55d45d3..91d536e 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -30,17 +30,17 @@ import org.apache.giraph.examples.SimpleMsgVertex;
 import org.apache.giraph.examples.SimplePageRankVertex;
 import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
 import org.apache.giraph.examples.SimpleShortestPathsVertex;
-import org.apache.giraph.examples.SimpleSumCombiner;
+import org.apache.giraph.combiner.SimpleSumCombiner;
 import org.apache.giraph.examples.SimpleSuperstepVertex;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
-import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.vertex.EdgeListVertex;
 import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.graph.InputSplitPathOrganizer;
-import org.apache.giraph.graph.TextAggregatorWriter;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexInputFormat;
-import org.apache.giraph.io.JsonLongDoubleFloatDoubleVertexOutputFormat;
+import org.apache.giraph.worker.InputSplitPathOrganizer;
+import org.apache.giraph.aggregators.TextAggregatorWriter;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexOutputFormat;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/TestEdgeInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestEdgeInput.java b/giraph-core/src/test/java/org/apache/giraph/TestEdgeInput.java
deleted file mode 100644
index 797a410..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/TestEdgeInput.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph;
-
-import org.apache.giraph.conf.GiraphClasses;
-import org.apache.giraph.graph.EdgeListVertex;
-import org.apache.giraph.io.IdWithValueTextOutputFormat;
-import org.apache.giraph.io.IntIntTextVertexValueInputFormat;
-import org.apache.giraph.io.IntNullTextEdgeInputFormat;
-import org.apache.giraph.utils.InternalVertexRunner;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * A test case to ensure that loading a graph from a list of edges works as
- * expected.
- */
-public class TestEdgeInput extends BspCase {
-  public TestEdgeInput() {
-    super(TestEdgeInput.class.getName());
-  }
-
-  // It should be able to build a graph starting from the edges only.
-  // Vertices should be implicitly created with default values.
-  @Test
-  public void testEdgesOnly() throws Exception {
-    String[] edges = new String[] {
-        "1 2",
-        "2 3",
-        "2 4",
-        "4 1"
-    };
-
-    GiraphClasses classes = new GiraphClasses();
-    classes.setVertexClass(TestVertexWithNumEdges.class);
-    classes.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
-    classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
-    Map<String, String> params = ImmutableMap.of();
-    Iterable<String> results = InternalVertexRunner.run(classes, params,
-        null, edges);
-
-    Map<Integer, Integer> values = parseResults(results);
-
-    // Check that all vertices with outgoing edges have been created
-    assertEquals(3, values.size());
-    // Check the number of edges for each vertex
-    assertEquals(1, (int) values.get(1));
-    assertEquals(2, (int) values.get(2));
-    assertEquals(1, (int) values.get(4));
-  }
-
-  // It should be able to build a graph by specifying vertex data and edges
-  // as separate input formats.
-  @Test
-  public void testMixedFormat() throws Exception {
-    String[] vertices = new String[] {
-        "1 75",
-        "2 34",
-        "3 13",
-        "4 32"
-    };
-    String[] edges = new String[] {
-        "1 2",
-        "2 3",
-        "2 4",
-        "4 1",
-        "5 3"
-    };
-
-    GiraphClasses classes = new GiraphClasses();
-    classes.setVertexClass(TestVertexDoNothing.class);
-    classes.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
-    classes.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
-    classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
-    Map<String, String> emptyParams = ImmutableMap.of();
-
-    // Run a job with a vertex that does nothing
-    Iterable<String> results = InternalVertexRunner.run(classes, emptyParams,
-        vertices, edges);
-
-    Map<Integer, Integer> values = parseResults(results);
-
-    // Check that all vertices with either initial values or outgoing edges
-    // have been created
-    assertEquals(5, values.size());
-    // Check that the vertices have been created with correct values
-    assertEquals(75, (int) values.get(1));
-    assertEquals(34, (int) values.get(2));
-    assertEquals(13, (int) values.get(3));
-    assertEquals(32, (int) values.get(4));
-    // A vertex with edges but no initial value should have the default value
-    assertEquals(0, (int) values.get(5));
-
-    classes = new GiraphClasses();
-    classes.setVertexClass(TestVertexWithNumEdges.class);
-    classes.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
-    classes.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
-    classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
-
-    // Run a job with a vertex that counts outgoing edges
-    results = InternalVertexRunner.run(classes, emptyParams, vertices, edges);
-
-    values = parseResults(results);
-
-    // Check the number of edges for each vertex
-    assertEquals(1, (int) values.get(1));
-    assertEquals(2, (int) values.get(2));
-    assertEquals(0, (int) values.get(3));
-    assertEquals(1, (int) values.get(4));
-    assertEquals(1, (int) values.get(5));
-  }
-
-  public static class TestVertexWithNumEdges extends EdgeListVertex<IntWritable,
-      IntWritable, NullWritable, NullWritable> {
-    @Override
-    public void compute(Iterable<NullWritable> messages) throws IOException {
-      setValue(new IntWritable(getNumEdges()));
-      voteToHalt();
-    }
-  }
-
-  public static class TestVertexDoNothing extends EdgeListVertex<IntWritable,
-      IntWritable, NullWritable, NullWritable> {
-    @Override
-    public void compute(Iterable<NullWritable> messages) throws IOException {
-      voteToHalt();
-    }
-  }
-
-  private static Map<Integer, Integer> parseResults(Iterable<String> results) {
-    Map<Integer, Integer> values = Maps.newHashMap();
-    for (String line : results) {
-      String[] tokens = line.split("\\s+");
-      int id = Integer.valueOf(tokens[0]);
-      int value = Integer.valueOf(tokens[1]);
-      values.put(id, value);
-    }
-    return values;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/TestGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestGiraphConfiguration.java b/giraph-core/src/test/java/org/apache/giraph/TestGiraphConfiguration.java
deleted file mode 100644
index 0f8578a..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/TestGiraphConfiguration.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph;
-
-import org.apache.giraph.conf.GiraphConfiguration;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class TestGiraphConfiguration {
-  public interface If { }
-  public class A implements If { }
-  public class B implements If { }
-  public class C implements If { }
-
-  @Test
-  public void testSetClasses() {
-    GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setClasses("foo", If.class, A.class, B.class);
-    Class<?>[] klasses = conf.getClasses("foo");
-    assertEquals(2, klasses.length);
-    assertEquals(A.class, klasses[0]);
-    assertEquals(B.class, klasses[1]);
-
-    try {
-      conf.setClasses("foo", A.class, B.class);
-      fail();
-    } catch (RuntimeException e) {
-      assertEquals(2, conf.getClasses("foo").length);
-    }
-
-    Class<? extends If>[] klasses2 = conf.getClassesOfType("foo", If.class);
-    assertEquals(2, klasses2.length);
-    assertEquals(A.class, klasses2[0]);
-    assertEquals(B.class, klasses2[1]);
-  }
-
-  @Test
-  public void testAddToClasses() {
-    GiraphConfiguration conf = new GiraphConfiguration();
-
-    conf.setClasses("foo", If.class, A.class, B.class);
-    conf.addToClasses("foo", C.class, If.class);
-    Class<?>[] klasses = conf.getClasses("foo");
-    assertEquals(3, klasses.length);
-    assertEquals(A.class, klasses[0]);
-    assertEquals(B.class, klasses[1]);
-    assertEquals(C.class, klasses[2]);
-
-    conf.addToClasses("bar", B.class, If.class);
-    klasses = conf.getClasses("bar");
-    assertEquals(1, klasses.length);
-    assertEquals(B.class, klasses[0]);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java b/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java
index 6fda813..ff71b86 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java
@@ -24,8 +24,8 @@ import org.apache.giraph.examples.SimpleCheckpointVertex;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
 import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.graph.partition.HashRangePartitionerFactory;
-import org.apache.giraph.graph.partition.PartitionBalancer;
+import org.apache.giraph.partition.HashRangePartitionerFactory;
+import org.apache.giraph.partition.PartitionBalancer;
 import org.apache.giraph.integration.SuperstepHashPartitionerFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/TestJsonBase64Format.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestJsonBase64Format.java b/giraph-core/src/test/java/org/apache/giraph/TestJsonBase64Format.java
deleted file mode 100644
index 1e2b0e0..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/TestJsonBase64Format.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph;
-import org.apache.giraph.benchmark.EdgeListVertexPageRankBenchmark;
-import org.apache.giraph.benchmark.PageRankComputation;
-import org.apache.giraph.conf.GiraphClasses;
-import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.io.GiraphFileInputFormat;
-import org.apache.giraph.io.JsonBase64VertexInputFormat;
-import org.apache.giraph.io.JsonBase64VertexOutputFormat;
-import org.apache.giraph.io.PseudoRandomVertexInputFormat;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test out the JsonBase64 format.
- */
-public class TestJsonBase64Format extends BspCase {
-  /**
-   * Constructor.
-   */
-  public TestJsonBase64Format() {
-    super(TestJsonBase64Format.class.getName());
-  }
-
-  /**
-   * Start a job and finish after i supersteps, then begin a new job and
-   * continue on more j supersteps.  Check the results against a single job
-   * with i + j supersteps.
-   *
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws InterruptedException
-   */
-  @Test
-  public void testContinue()
-      throws IOException, InterruptedException, ClassNotFoundException {
-
-    Path outputPath = getTempPath(getCallingMethodName());
-    GiraphClasses classes = new GiraphClasses();
-    classes.setVertexClass(EdgeListVertexPageRankBenchmark.class);
-    classes.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
-    classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
-    GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
-    job.getConfiguration().setLong(
-        PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 101);
-    job.getConfiguration().setLong(
-        PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 2);
-    job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 2);
-
-    assertTrue(job.run(true));
-
-    Path outputPath2 = getTempPath(getCallingMethodName() + "2");
-    classes = new GiraphClasses();
-    classes.setVertexClass(EdgeListVertexPageRankBenchmark.class);
-    classes.setVertexInputFormatClass(JsonBase64VertexInputFormat.class);
-    classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
-    job = prepareJob(getCallingMethodName(), classes, outputPath2);
-    job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 3);
-    GiraphFileInputFormat.addVertexInputPath(job.getInternalJob(), outputPath);
-    assertTrue(job.run(true));
-
-    Path outputPath3 = getTempPath(getCallingMethodName() + "3");
-    classes = new GiraphClasses();
-    classes.setVertexClass(EdgeListVertexPageRankBenchmark.class);
-    classes.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
-    classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
-    job = prepareJob(getCallingMethodName(), classes, outputPath3);
-    job.getConfiguration().setLong(
-        PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 101);
-    job.getConfiguration().setLong(
-        PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 2);
-    job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 5);
-    assertTrue(job.run(true));
-
-    Configuration conf = job.getConfiguration();
-
-    assertEquals(101, getNumResults(conf, outputPath));
-    assertEquals(101, getNumResults(conf, outputPath2));
-    assertEquals(101, getNumResults(conf, outputPath3));
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/TestMasterObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestMasterObserver.java b/giraph-core/src/test/java/org/apache/giraph/TestMasterObserver.java
deleted file mode 100644
index 8609bc3..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/TestMasterObserver.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph;
-
-import org.apache.giraph.conf.GiraphClasses;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.graph.IntNullNullNullVertex;
-import org.apache.giraph.io.IntNullNullNullTextInputFormat;
-import org.apache.giraph.master.DefaultMasterObserver;
-import org.apache.giraph.utils.InternalVertexRunner;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.util.StringUtils;
-import org.junit.Test;
-
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestMasterObserver {
-  public static class NoOpVertex extends IntNullNullNullVertex {
-    private int count = 0;
-
-    @Override
-    public void compute(Iterable<NullWritable> messages) throws IOException {
-      if (count == 2) {
-        voteToHalt();
-      }
-      ++count;
-    }
-  }
-
-  public static class Obs extends DefaultMasterObserver {
-    public static int preApp = 0;
-    public static int preSuperstep = 0;
-    public static int postSuperstep = 0;
-    public static int postApp = 0;
-
-    @Override
-    public void preApplication() {
-      ++preApp;
-    }
-
-    @Override
-    public void postApplication() {
-      ++postApp;
-    }
-
-    @Override
-    public void preSuperstep() {
-      ++preSuperstep;
-    }
-
-    @Override
-    public void postSuperstep() {
-      ++postSuperstep;
-    }
-  }
-
-  @Test
-  public void testGetsCalled() throws Exception {
-    assertEquals(0, Obs.postApp);
-
-    String[] graph = new String[] { "1", "2", "3" };
-
-    Map<String, String> params = Maps.newHashMap();
-    String klasses[] = new String[] {
-        Obs.class.getName(),
-        Obs.class.getName()
-    };
-    params.put(GiraphConstants.MASTER_OBSERVER_CLASSES,
-        StringUtils.arrayToString(klasses));
-
-    GiraphClasses classes = new GiraphClasses();
-    classes.setVertexClass(NoOpVertex.class);
-    classes.setVertexInputFormatClass(IntNullNullNullTextInputFormat.class);
-    InternalVertexRunner.run(classes, params, graph);
-
-    assertEquals(2, Obs.preApp);
-    // 3 supersteps + 1 input superstep * 2 observers = 8 callbacks
-    assertEquals(8, Obs.preSuperstep);
-    assertEquals(8, Obs.postSuperstep);
-    assertEquals(2, Obs.postApp);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/TestPageRank.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestPageRank.java b/giraph-core/src/test/java/org/apache/giraph/TestPageRank.java
deleted file mode 100644
index e71e11f..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/TestPageRank.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph;
-
-import org.apache.giraph.conf.GiraphClasses;
-import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.examples.SimplePageRankVertex;
-import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.graph.partition.HashMasterPartitioner;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test page rank (with and without multithreading)
- */
-public class TestPageRank extends BspCase {
-
-  /**
-   * Constructor
-   */
-  public TestPageRank() {
-    super(TestPageRank.class.getName());
-  }
-
-  @Test
-  public void testBspPageRankSingleCompute()
-      throws ClassNotFoundException, IOException, InterruptedException {
-    testPageRank(1);
-  }
-
-
-  @Test
-  public void testPageRankTenThreadsCompute()
-      throws ClassNotFoundException, IOException, InterruptedException {
-    testPageRank(10);
-  }
-
-  /**
-   * Generic page rank test
-   *
-   * @param numComputeThreads Number of compute threads to use
-   * @throws java.io.IOException
-   * @throws ClassNotFoundException
-   * @throws InterruptedException
-   */
-  private void testPageRank(int numComputeThreads)
-      throws IOException, InterruptedException, ClassNotFoundException {
-    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
-        classes = new GiraphClasses();
-    classes.setVertexClass(SimplePageRankVertex.class);
-    classes.setVertexInputFormatClass(
-        SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
-    classes.setWorkerContextClass(
-        SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
-    classes.setMasterComputeClass(
-        SimplePageRankVertex.SimplePageRankVertexMasterCompute.class);
-    GiraphJob job = prepareJob(getCallingMethodName(), classes);
-    GiraphConfiguration conf = job.getConfiguration();
-    conf.setNumComputeThreads(numComputeThreads);
-    // Set enough partitions to generate randomness on the compute side
-    if (numComputeThreads != 1) {
-      conf.setInt(HashMasterPartitioner.USER_PARTITION_COUNT,
-          numComputeThreads * 5);
-    }
-    assertTrue(job.run(true));
-    if (!runningInDistributedMode()) {
-      double maxPageRank =
-          SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMax();
-      double minPageRank =
-          SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMin();
-      long numVertices =
-          SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalSum();
-      System.out.println(getCallingMethodName() + ": maxPageRank=" +
-          maxPageRank + " minPageRank=" +
-          minPageRank + " numVertices=" + numVertices + ", " +
-          " numComputeThreads=" + numComputeThreads);
-      assertEquals(34.03, maxPageRank, 0.001);
-      assertEquals(0.03, minPageRank, 0.00001);
-      assertEquals(5l, numVertices);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/TestPredicateLock.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestPredicateLock.java b/giraph-core/src/test/java/org/apache/giraph/TestPredicateLock.java
deleted file mode 100644
index c6043bc..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/TestPredicateLock.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.giraph.utils.Time;
-import org.apache.giraph.zk.BspEvent;
-import org.apache.giraph.zk.PredicateLock;
-import org.apache.hadoop.util.Progressable;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Ensure that PredicateLock objects work correctly.
- */
-public class TestPredicateLock {
-  /** How many times was progress called? */
-  private AtomicInteger progressCalled = new AtomicInteger(0);
-
-  private static class SignalThread extends Thread {
-    private final BspEvent event;
-    public SignalThread(BspEvent event) {
-      this.event = event;
-    }
-    public void run() {
-      try {
-        Thread.sleep(500);
-      } catch (InterruptedException e) {
-      }
-      event.signal();
-    }
-  }
-
-  private Progressable stubContext;
-
-  private Progressable getStubProgressable() {
-    if (stubContext == null)
-      stubContext = new Progressable() {
-        @Override
-        public void progress() {
-          progressCalled.incrementAndGet();
-        }
-      };
-    return stubContext;
-  }
-
-  @Before
-  public void setUp() {
-    progressCalled.set(0);
-  }
-
-  /**
-   * SMake sure the the event is not signaled.
-   */
-  @Test
-  public void testWaitMsecsNoEvent() {
-    Time mockTime = mock(Time.class);
-    when(mockTime.getMilliseconds()).
-        thenReturn(0L).thenReturn(2L);
-    BspEvent event = new PredicateLock(getStubProgressable(), 1, mockTime);
-    boolean gotPredicate = event.waitMsecs(1);
-    assertFalse(gotPredicate);
-    assertEquals(0, progressCalled.get());
-    when(mockTime.getMilliseconds()).
-        thenReturn(0L).thenReturn(0L).thenReturn(2L);
-    gotPredicate = event.waitMsecs(1);
-    assertFalse(gotPredicate);
-    assertEquals(1, progressCalled.get());
-  }
-
-  /**
-   * Single threaded case where the event is signaled.
-   */
-  @Test
-  public void testEvent() {
-    Time mockTime = mock(Time.class);
-    when(mockTime.getMilliseconds()).
-        thenReturn(0L).thenReturn(2L);
-    BspEvent event = new PredicateLock(getStubProgressable(), 1, mockTime);
-    event.signal();
-    boolean gotPredicate = event.waitMsecs(2);
-    assertTrue(gotPredicate);
-    event.reset();
-    when(mockTime.getMilliseconds()).
-        thenReturn(0L).thenReturn(2L);
-    gotPredicate = event.waitMsecs(0);
-    assertFalse(gotPredicate);
-  }
-
-  /**
-   * Thread signaled test for {@link PredicateLock#waitForever()}
-   */
-  @Test
-  public void testWaitForever() {
-    BspEvent event = new PredicateLock(getStubProgressable());
-    Thread signalThread = new SignalThread(event);
-    signalThread.start();
-    event.waitForever();
-    try {
-      signalThread.join();
-    } catch (InterruptedException e) {
-    }
-    assertTrue(event.waitMsecs(0));
-  }
-
-  /**
-   * Thread signaled test to make sure the the event is signaled correctly
-   *
-   * @throws InterruptedException
-   */
-  @Test
-  public void testWaitMsecs() {
-    BspEvent event = new PredicateLock(getStubProgressable());
-    Thread signalThread = new SignalThread(event);
-    signalThread.start();
-    boolean gotPredicate = event.waitMsecs(2000);
-    assertTrue(gotPredicate);
-    try {
-      signalThread.join();
-    } catch (InterruptedException e) {
-    }
-    gotPredicate = event.waitMsecs(0);
-    assertTrue(gotPredicate);
-  }
-}