You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2014/01/28 20:56:39 UTC

git commit: updated refs/heads/trunk to 7cc5457

Updated Branches:
  refs/heads/trunk d1a061e1a -> 7cc54575d


GIRAPH-792: Print job progress to command line (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7cc54575
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7cc54575
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7cc54575

Branch: refs/heads/trunk
Commit: 7cc54575d867e37a43020df309a78cd65c3fbdc0
Parents: d1a061e
Author: Maja Kabiljo <ma...@fb.com>
Authored: Tue Jan 28 11:49:58 2014 -0800
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Tue Jan 28 11:56:10 2014 -0800

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../java/org/apache/giraph/bsp/BspService.java  |  13 +-
 .../apache/giraph/conf/GiraphConfiguration.java |   9 +
 .../org/apache/giraph/conf/GiraphConstants.java |  11 +
 .../apache/giraph/graph/ComputeCallable.java    |  12 +
 .../apache/giraph/graph/GraphTaskManager.java   |  23 +-
 .../giraph/job/CombinedWorkerProgress.java      | 118 ++++++
 .../java/org/apache/giraph/job/GiraphJob.java   |   5 +
 .../apache/giraph/job/HaltApplicationUtils.java |  56 +--
 .../apache/giraph/job/JobProgressTracker.java   | 149 ++++++++
 .../apache/giraph/master/BspServiceMaster.java  |   8 +-
 .../org/apache/giraph/utils/CounterUtils.java   |  57 +++
 .../apache/giraph/worker/BspServiceWorker.java  |  37 +-
 .../giraph/worker/EdgeInputSplitsCallable.java  |   5 +
 .../worker/VertexInputSplitsCallable.java       |   6 +
 .../apache/giraph/worker/WorkerProgress.java    | 369 +++++++++++++++++++
 .../giraph/worker/WorkerProgressWriter.java     |  74 ++++
 17 files changed, 889 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 7435a92..971ab46 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-792: Print job progress to command line (majakabiljo)
+
   GIRAPH-831: waitUntilAllTasksDone waits forever (without debug information) (aching)
 
   GIRAPH-830: directMemory used in netty message (pavanka via aching)

http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 86823ed..ec0ddbb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -137,6 +137,8 @@ public abstract class BspService<I extends WritableComparable,
       "/_partitionExchangeDir";
   /** Denotes that the superstep is done */
   public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished";
+  /** Stores progress info for workers */
+  public static final String WORKER_PROGRESSES = "/_workerProgresses";
   /** Denotes that computation should be halted */
   public static final String HALT_COMPUTATION_NODE = "/_haltComputation";
   /** Denotes which workers have been cleaned up */
@@ -202,6 +204,8 @@ public abstract class BspService<I extends WritableComparable,
   protected final String checkpointBasePath;
   /** Path to the master election path */
   protected final String masterElectionPath;
+  /** Stores progress info of this worker */
+  protected final String myProgressPath;
   /** If this path exists computation will be halted */
   protected final String haltComputationPath;
   /** Private ZooKeeper instance that implements the service */
@@ -253,11 +257,10 @@ public abstract class BspService<I extends WritableComparable,
   /**
    * Constructor.
    *
-   * @param sessionMsecTimeout ZooKeeper session timeount in milliseconds
    * @param context Mapper context
    * @param graphTaskManager GraphTaskManager for this compute node
    */
-  public BspService(int sessionMsecTimeout,
+  public BspService(
       Mapper<?, ?, ?, ?>.Context context,
       GraphTaskManager<I, V, E> graphTaskManager) {
     this.vertexInputSplitsEvents = new InputSplitEvents(context);
@@ -307,6 +310,8 @@ public abstract class BspService<I extends WritableComparable,
     this.checkpointFrequency = conf.getCheckpointFrequency();
 
     basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
+    getContext().getCounter(GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP,
+        basePath);
     masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
     vertexInputSplitsPaths = new InputSplitPaths(basePath,
         VERTEX_INPUT_SPLIT_DIR, VERTEX_INPUT_SPLIT_DONE_DIR,
@@ -320,6 +325,7 @@ public abstract class BspService<I extends WritableComparable,
         CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(),
             CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + getJobId());
     masterElectionPath = basePath + MASTER_ELECTION_DIR;
+    myProgressPath = basePath + WORKER_PROGRESSES + "/" + taskPartition;
     String serverPortList = conf.getZookeeperList();
     haltComputationPath = basePath + HALT_COMPUTATION_NODE;
     getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP,
@@ -333,7 +339,7 @@ public abstract class BspService<I extends WritableComparable,
     }
     try {
       this.zk = new ZooKeeperExt(serverPortList,
-                                 sessionMsecTimeout,
+                                 conf.getZooKeeperSessionTimeout(),
                                  conf.getZookeeperOpsMaxAttempts(),
                                  conf.getZookeeperOpsRetryWaitMsecs(),
                                  this,
@@ -345,7 +351,6 @@ public abstract class BspService<I extends WritableComparable,
     }
   }
 
-
   /**
    * Get the superstep from a ZooKeeper path
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 8cf403a..abc81e8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -1197,4 +1197,13 @@ public class GiraphConfiguration extends Configuration
   public void setWaitTaskDoneTimeoutMs(int ms) {
     WAIT_TASK_DONE_TIMEOUT_MS.set(this, ms);
   }
+
+  /**
+   * Check whether to track job progress on client or not
+   *
+   * @return True if job progress should be tracked on client
+   */
+  public boolean trackJobProgressOnClient() {
+    return TRACK_JOB_PROGRESS_ON_CLIENT.get(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 4e68308..9271152 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -996,6 +996,12 @@ public interface GiraphConstants {
   String ZOOKEEPER_HALT_NODE_COUNTER_GROUP = "Zookeeper halt node";
 
   /**
+   * This counter group will contain one counter whose name is the ZooKeeper
+   * node path which contains all data about this job
+   */
+  String ZOOKEEPER_BASE_PATH_COUNTER_GROUP = "Zookeeper base path";
+
+  /**
    * Which class to use to write instructions on how to halt the application
    */
   ClassConfOption<HaltApplicationUtils.HaltInstructionsWriter>
@@ -1013,5 +1019,10 @@ public interface GiraphConstants {
       new IntConfOption("giraph.waitTaskDoneTimeoutMs", MINUTES.toMillis(15),
           "Maximum timeout (in ms) for waiting for all all tasks to " +
               "complete");
+
+  /** Whether to track job progress on client or not */
+  BooleanConfOption TRACK_JOB_PROGRESS_ON_CLIENT =
+      new BooleanConfOption("giraph.trackJobProgressOnClient", true,
+          "Whether to track job progress on client or not");
 }
 // CHECKSTYLE: resume InterfaceIsTypeCheck

http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 1fe1d10..0303530 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -35,6 +35,7 @@ import org.apache.giraph.time.Times;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.TimedLogger;
 import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.worker.WorkerProgress;
 import org.apache.giraph.worker.WorkerThreadAggregatorUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -74,6 +75,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
   private static final Logger LOG  = Logger.getLogger(ComputeCallable.class);
   /** Class time object */
   private static final Time TIME = SystemTime.get();
+  /** How often to update WorkerProgress */
+  private static final long VERTICES_TO_UPDATE_PROGRESS = 100000;
   /** Context */
   private final Mapper<?, ?, ?, ?>.Context context;
   /** Graph state */
@@ -229,6 +232,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
       Partition<I, V, E> partition) throws IOException, InterruptedException {
     PartitionStats partitionStats =
         new PartitionStats(partition.getId(), 0, 0, 0, 0, 0);
+    long verticesComputedProgress = 0;
     // Make sure this is thread-safe across runs
     synchronized (partition) {
       for (Vertex<I, V, E> vertex : partition) {
@@ -260,10 +264,18 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
         // Add statistics for this vertex
         partitionStats.incrVertexCount();
         partitionStats.addEdgeCount(vertex.getNumEdges());
+
+        verticesComputedProgress++;
+        if (verticesComputedProgress == VERTICES_TO_UPDATE_PROGRESS) {
+          WorkerProgress.get().addVerticesComputed(verticesComputedProgress);
+          verticesComputedProgress = 0;
+        }
       }
 
       messageStore.clearPartition(partition.getId());
     }
+    WorkerProgress.get().addVerticesComputed(verticesComputedProgress);
+    WorkerProgress.get().incrementPartitionsComputed();
     return partitionStats;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 0617973..a84ac66 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -45,6 +45,7 @@ import org.apache.giraph.worker.BspServiceWorker;
 import org.apache.giraph.worker.InputSplitsCallable;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerObserver;
+import org.apache.giraph.worker.WorkerProgress;
 import org.apache.giraph.zk.ZooKeeperManager;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -215,9 +216,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
       Thread.sleep(GiraphConstants.DEFAULT_ZOOKEEPER_INIT_LIMIT *
         GiraphConstants.DEFAULT_ZOOKEEPER_TICK_TIME);
     }
-    int sessionMsecTimeout = conf.getZooKeeperSessionTimeout();
     try {
-      instantiateBspService(sessionMsecTimeout);
+      instantiateBspService();
     } catch (IOException e) {
       LOG.error("setup: Caught exception just before end of setup", e);
       if (zkManager != null) {
@@ -537,17 +537,15 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   /**
    * Instantiate the appropriate BspService object (Master or Worker)
    * for this compute node.
-   * @param sessionMsecTimeout configurable session timeout
    */
-  private void instantiateBspService(int sessionMsecTimeout)
+  private void instantiateBspService()
     throws IOException, InterruptedException {
     if (graphFunctions.isMaster()) {
       if (LOG.isInfoEnabled()) {
         LOG.info("setup: Starting up BspServiceMaster " +
           "(master thread)...");
       }
-      serviceMaster = new BspServiceMaster<I, V, E>(
-        sessionMsecTimeout, context, this);
+      serviceMaster = new BspServiceMaster<I, V, E>(context, this);
       masterThread = new MasterThread<I, V, E>(serviceMaster, context);
       masterThread.start();
     }
@@ -555,8 +553,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
       if (LOG.isInfoEnabled()) {
         LOG.info("setup: Starting up BspServiceWorker...");
       }
-      serviceWorker = new BspServiceWorker<I, V, E>(
-        sessionMsecTimeout, context, this);
+      serviceWorker = new BspServiceWorker<I, V, E>(context, this);
       if (LOG.isInfoEnabled()) {
         LOG.info("setup: Registering health of this worker...");
       }
@@ -711,10 +708,18 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
       int numThreads) {
     final BlockingQueue<Integer> computePartitionIdQueue =
       new ArrayBlockingQueue<Integer>(numPartitions);
+    long verticesToCompute = 0;
     for (Integer partitionId :
       serviceWorker.getPartitionStore().getPartitionIds()) {
       computePartitionIdQueue.add(partitionId);
-    }
+      verticesToCompute +=
+          serviceWorker.getPartitionStore().getOrCreatePartition(
+              partitionId).getVertexCount();
+    }
+    WorkerProgress.get().startSuperstep(
+        serviceWorker.getSuperstep(),
+        verticesToCompute,
+        serviceWorker.getPartitionStore().getNumPartitions());
 
     GiraphTimerContext computeAllTimerContext = computeAll.time();
     timeToFirstMessageTimerContext = timeToFirstMessage.time();

http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
new file mode 100644
index 0000000..0810040
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.giraph.worker.WorkerProgress;
+
+/**
+ * Class which combines multiple workers' progresses to get overall
+ * application progress
+ */
+public class CombinedWorkerProgress extends WorkerProgress {
+  /**
+   * How many workers have reported that they are in highest reported
+   * superstep
+   */
+  private int workersInSuperstep = 0;
+  /**
+   * How many workers reported that they finished application
+   */
+  private int workersDone = 0;
+
+  /**
+   * Constructor
+   *
+   * @param workerProgresses Worker progresses to combine
+   */
+  public CombinedWorkerProgress(Iterable<WorkerProgress> workerProgresses) {
+    for (WorkerProgress workerProgress : workerProgresses) {
+      if (workerProgress.getCurrentSuperstep() > currentSuperstep) {
+        verticesToCompute = 0;
+        verticesComputed = 0;
+        partitionsToCompute = 0;
+        partitionsComputed = 0;
+        currentSuperstep = workerProgress.getCurrentSuperstep();
+        workersInSuperstep = 0;
+      }
+
+      if (workerProgress.getCurrentSuperstep() == currentSuperstep) {
+        workersInSuperstep++;
+        if (isInputSuperstep()) {
+          verticesLoaded += workerProgress.getVerticesLoaded();
+          vertexInputSplitsLoaded +=
+              workerProgress.getVertexInputSplitsLoaded();
+          edgesLoaded += workerProgress.getEdgesLoaded();
+          edgeInputSplitsLoaded += workerProgress.getEdgeInputSplitsLoaded();
+        } else if (isComputeSuperstep()) {
+          verticesToCompute += workerProgress.getVerticesToCompute();
+          verticesComputed += workerProgress.getVerticesComputed();
+          partitionsToCompute += workerProgress.getPartitionsToCompute();
+          partitionsComputed += workerProgress.getPartitionsComputed();
+        } else if (isOutputSuperstep()) {
+          verticesToStore += workerProgress.getVerticesToStore();
+          verticesStored += workerProgress.getVerticesStored();
+          partitionsToStore += workerProgress.getPartitionsToStore();
+          partitionsStored += workerProgress.getPartitionsStored();
+        }
+      }
+
+      if (workerProgress.isStoringDone()) {
+        workersDone++;
+      }
+    }
+  }
+
+  /**
+   * Is the application done
+   *
+   * @param expectedWorkersDone Number of workers which should be done in
+   *                            order for application to be done
+   * @return True if application is done
+   */
+  public boolean isDone(int expectedWorkersDone) {
+    return workersDone == expectedWorkersDone;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("Data from ").append(workersInSuperstep).append(" workers - ");
+    if (isInputSuperstep()) {
+      sb.append("Loading data: ");
+      sb.append(verticesLoaded).append(" vertices loaded, ");
+      sb.append(vertexInputSplitsLoaded).append(
+          " vertex input splits loaded; ");
+      sb.append(edgesLoaded).append(" edges loaded, ");
+      sb.append(edgeInputSplitsLoaded).append(" edge input splits loaded");
+    } else if (isComputeSuperstep()) {
+      sb.append("Compute superstep ").append(currentSuperstep).append(": ");
+      sb.append(verticesComputed).append(" out of ").append(
+          verticesToCompute).append(" vertices computed; ");
+      sb.append(partitionsComputed).append(" out of ").append(
+          partitionsToCompute).append(" partitions computed");
+    } else if (isOutputSuperstep()) {
+      sb.append("Storing data: ");
+      sb.append(verticesStored).append(" out of ").append(
+          verticesToStore).append(" vertices stored; ");
+      sb.append(partitionsStored).append(" out of ").append(
+          partitionsToStore).append(" partitions stored");
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
index 40670bb..4a1f02e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -252,9 +252,14 @@ public class GiraphJob {
         LOG.info("run: Tracking URL: " + submittedJob.getTrackingURL());
       }
       HaltApplicationUtils.printHaltInfo(submittedJob, conf);
+      JobProgressTracker jobProgressTracker = conf.trackJobProgressOnClient() ?
+          new JobProgressTracker(submittedJob, conf) : null;
       jobObserver.jobRunning(submittedJob);
 
       boolean passed = submittedJob.waitForCompletion(verbose);
+      if (jobProgressTracker != null) {
+        jobProgressTracker.stop();
+      }
       jobObserver.jobFinished(submittedJob, passed);
       if (passed || !retryChecker.shouldRetry(submittedJob, tryCount)) {
         return passed;

http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java b/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java
index 28b5781..8150de6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java
@@ -20,69 +20,33 @@ package org.apache.giraph.job;
 
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.utils.CounterUtils;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
-
 /**
  * Utility methods for halting application while running
  */
 public class HaltApplicationUtils {
-  /** Milliseconds to sleep for while waiting for halt info */
-  private static final int SLEEP_MSECS = 100;
-
   /** Do not instantiate */
   private HaltApplicationUtils() { }
 
   /**
-   * Wait for halt info (zk server and node) to become available
-   *
-   * @param submittedJob Submitted job
-   * @return True if halt info became available, false if job completed
-   * before it became available
-   */
-  private static boolean waitForHaltInfo(Job submittedJob) throws IOException {
-    try {
-      while (submittedJob.getCounters().getGroup(
-          GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP).size() == 0) {
-        if (submittedJob.isComplete()) {
-          return false;
-        }
-        Thread.sleep(SLEEP_MSECS);
-      }
-      while (submittedJob.getCounters().getGroup(
-          GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP).size() == 0) {
-        if (submittedJob.isComplete()) {
-          return false;
-        }
-        Thread.sleep(SLEEP_MSECS);
-      }
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "waitForHaltInfo: InterruptedException occurred", e);
-    }
-    return true;
-  }
-
-  /**
    * Wait for halt info to become available and print instructions on how to
    * halt
    *
    * @param submittedJob Submitted job
-   * @param conf Configuration
+   * @param conf         Configuration
    */
   public static void printHaltInfo(Job submittedJob,
-      GiraphConfiguration conf) throws IOException {
-    if (waitForHaltInfo(submittedJob)) {
-      String zkServer = submittedJob.getCounters().getGroup(
-          GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP).iterator()
-          .next().getName();
-      String haltNode =  submittedJob.getCounters().getGroup(
-          GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP).iterator()
-          .next().getName();
-      GiraphConstants.HALT_INSTRUCTIONS_WRITER_CLASS.newInstance(conf)
-          .writeHaltInstructions(zkServer, haltNode);
+      GiraphConfiguration conf) {
+    String zkServer = CounterUtils.waitAndGetCounterNameFromGroup(
+        submittedJob, GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP);
+    String haltNode = CounterUtils.waitAndGetCounterNameFromGroup(
+        submittedJob, GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP);
+    if (zkServer != null && haltNode != null) {
+      GiraphConstants.HALT_INSTRUCTIONS_WRITER_CLASS.newInstance(
+          conf).writeHaltInstructions(zkServer, haltNode);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
new file mode 100644
index 0000000..f685344
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.utils.CounterUtils;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.worker.WorkerProgress;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Class which tracks job's progress on client
+ */
+public class JobProgressTracker implements Watcher {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(JobProgressTracker.class);
+  /** How often to print job's progress */
+  private static final int UPDATE_MILLISECONDS = 5 * 1000;
+  /** Thread which periodically writes job's progress */
+  private Thread writerThread;
+  /** ZooKeeperExt */
+  private ZooKeeperExt zk;
+  /** Whether application is finished */
+  private volatile boolean finished = false;
+
+  /**
+   * Constructor
+   *
+   * @param submittedJob Job to track
+   * @param conf Configuration
+   */
+  public JobProgressTracker(final Job submittedJob,
+      final GiraphConfiguration conf) throws IOException, InterruptedException {
+    String zkServer = CounterUtils.waitAndGetCounterNameFromGroup(
+        submittedJob, GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP);
+    final String basePath = CounterUtils.waitAndGetCounterNameFromGroup(
+        submittedJob, GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP);
+    // Connect to ZooKeeper
+    zk = new ZooKeeperExt(
+        zkServer,
+        conf.getZooKeeperSessionTimeout(),
+        conf.getZookeeperOpsMaxAttempts(),
+        conf.getZookeeperOpsRetryWaitMsecs(),
+        this,
+        new Progressable() {
+          @Override
+          public void progress() {
+          }
+        });
+    writerThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        String workerProgressBasePath = basePath + BspService.WORKER_PROGRESSES;
+        try {
+          while (!finished) {
+            if (zk.exists(workerProgressBasePath, false) != null) {
+              // Get locations of all worker progresses
+              List<String> workerProgressPaths = zk.getChildrenExt(
+                  workerProgressBasePath, false, false, true);
+              List<WorkerProgress> workerProgresses =
+                  new ArrayList<WorkerProgress>(workerProgressPaths.size());
+              // Read all worker progresses
+              for (String workerProgressPath : workerProgressPaths) {
+                WorkerProgress workerProgress = new WorkerProgress();
+                byte[] zkData = zk.getData(workerProgressPath, false, null);
+                WritableUtils.readFieldsFromByteArray(zkData, workerProgress);
+                workerProgresses.add(workerProgress);
+              }
+              // Combine and log
+              CombinedWorkerProgress combinedWorkerProgress =
+                  new CombinedWorkerProgress(workerProgresses);
+              if (LOG.isInfoEnabled()) {
+                LOG.info(combinedWorkerProgress.toString());
+              }
+              // Check if application is done
+              if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
+                break;
+              }
+            }
+            Thread.sleep(UPDATE_MILLISECONDS);
+          }
+        } catch (InterruptedException | KeeperException e) {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("run: Exception occurred", e);
+          }
+        } finally {
+          try {
+            // Create a node so master knows we stopped communicating with
+            // ZooKeeper and it's safe to cleanup
+            zk.createExt(
+                basePath + BspService.CLEANED_UP_DIR + "/client",
+                null,
+                ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT,
+                true);
+            zk.close();
+          } catch (InterruptedException | KeeperException e) {
+            if (LOG.isInfoEnabled()) {
+              LOG.info("run: Exception occurred", e);
+            }
+          }
+        }
+      }
+    });
+    writerThread.start();
+  }
+
+  /**
+   * Stop the thread which logs application progress
+   */
+  public void stop() {
+    finished = true;
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 78487ef..cfee4c5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -187,15 +187,13 @@ public class BspServiceMaster<I extends WritableComparable,
   /**
    * Constructor for setting up the master.
    *
-   * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
    * @param context Mapper context
    * @param graphTaskManager GraphTaskManager for this compute node
    */
   public BspServiceMaster(
-      int sessionMsecTimeout,
       Mapper<?, ?, ?, ?>.Context context,
       GraphTaskManager<I, V, E> graphTaskManager) {
-    super(sessionMsecTimeout, context, graphTaskManager);
+    super(context, graphTaskManager);
     workerWroteCheckpoint = new PredicateLock(context);
     registerBspEvent(workerWroteCheckpoint);
     superstepStateChanged = new PredicateLock(context);
@@ -1725,6 +1723,10 @@ public class BspServiceMaster<I extends WritableComparable,
         GraphFunctions.ALL_EXCEPT_ZOOKEEPER)) {
       maxTasks *= 2;
     }
+    if (getConfiguration().trackJobProgressOnClient()) {
+      // For job client
+      maxTasks++;
+    }
     List<String> cleanedUpChildrenList = null;
     while (true) {
       try {

http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/utils/CounterUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/CounterUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/CounterUtils.java
new file mode 100644
index 0000000..afec660
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/CounterUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+
+/** Utility methods for dealing with counters */
+public class CounterUtils {
+  /** Milliseconds to sleep for while waiting for counter to appear */
+  private static final int SLEEP_MSECS = 100;
+
+  /** Do not instantiate */
+  private CounterUtils() {
+  }
+
+  /**
+   * Wait for a counter to appear in a group and then return the name of that
+   * counter. If job finishes before counter appears, return null.
+   *
+   * @param job   Job
+   * @param group Name of the counter group
+   * @return Name of the counter inside of the group, or null if job finishes
+   *         before counter appears
+   */
+  public static String waitAndGetCounterNameFromGroup(Job job, String group) {
+    try {
+      while (job.getCounters().getGroup(group).size() == 0) {
+        if (job.isComplete()) {
+          return null;
+        }
+        Thread.sleep(SLEEP_MSECS);
+      }
+      return job.getCounters().getGroup(group).iterator().next().getName();
+    } catch (IOException | InterruptedException e) {
+      throw new IllegalStateException(
+          "waitAndGetCounterNameFromGroup: Exception occurred", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index bc29b03..13de188 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -159,6 +159,8 @@ public class BspServiceWorker<I extends WritableComparable,
 
   /** array of observers to call back to */
   private final WorkerObserver[] observers;
+  /** Writer for worker progress */
+  private final WorkerProgressWriter workerProgressWriter;
 
   // Per-Superstep Metrics
   /** Timer for WorkerContext#postSuperstep */
@@ -169,18 +171,16 @@ public class BspServiceWorker<I extends WritableComparable,
   /**
    * Constructor for setting up the worker.
    *
-   * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
    * @param context Mapper context
    * @param graphTaskManager GraphTaskManager for this compute node
    * @throws IOException
    * @throws InterruptedException
    */
   public BspServiceWorker(
-    int sessionMsecTimeout,
     Mapper<?, ?, ?, ?>.Context context,
     GraphTaskManager<I, V, E> graphTaskManager)
     throws IOException, InterruptedException {
-    super(sessionMsecTimeout, context, graphTaskManager);
+    super(context, graphTaskManager);
     ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
     partitionExchangeChildrenChanged = new PredicateLock(context);
     registerBspEvent(partitionExchangeChildrenChanged);
@@ -207,6 +207,9 @@ public class BspServiceWorker<I extends WritableComparable,
     }
     observers = conf.createWorkerObservers();
 
+    workerProgressWriter = conf.trackJobProgressOnClient() ?
+        new WorkerProgressWriter(myProgressPath, getZkExt()) : null;
+
     GiraphMetrics.get().addSuperstepResetObserver(this);
   }
 
@@ -515,6 +518,7 @@ public class BspServiceWorker<I extends WritableComparable,
     } else {
       vertexEdgeCount = new VertexEdgeCount();
     }
+    WorkerProgress.get().finishLoadingVertices();
 
     if (getConfiguration().hasEdgeInputFormat()) {
       // Ensure the edge InputSplits are ready for processing
@@ -531,6 +535,7 @@ public class BspServiceWorker<I extends WritableComparable,
       }
       getContext().progress();
     }
+    WorkerProgress.get().finishLoadingEdges();
 
     if (LOG.isInfoEnabled()) {
       LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
@@ -951,10 +956,21 @@ public class BspServiceWorker<I extends WritableComparable,
             new ArrayBlockingQueue<Integer>(numPartitions);
     Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds());
 
+    long verticesToStore = 0;
+    for (int partitionId : getPartitionStore().getPartitionIds()) {
+      verticesToStore +=  getPartitionStore().getOrCreatePartition(
+          partitionId).getVertexCount();
+    }
+    WorkerProgress.get().startStoring(
+        verticesToStore, getPartitionStore().getNumPartitions());
+
     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
       @Override
       public Callable<Void> newCallable(int callableId) {
         return new Callable<Void>() {
+          /** How often to update WorkerProgress */
+          private static final long VERTICES_TO_UPDATE_PROGRESS = 100000;
+
           @Override
           public Void call() throws Exception {
             VertexWriter<I, V, E> vertexWriter =
@@ -962,6 +978,7 @@ public class BspServiceWorker<I extends WritableComparable,
             vertexWriter.setConf(getConfiguration());
             vertexWriter.initialize(getContext());
             long nextPrintVertices = 0;
+            long nextUpdateProgressVertices = 0;
             long nextPrintMsecs = System.currentTimeMillis() + 15000;
             int partitionIndex = 0;
             int numPartitions = getPartitionStore().getNumPartitions();
@@ -989,9 +1006,18 @@ public class BspServiceWorker<I extends WritableComparable,
                   nextPrintMsecs = System.currentTimeMillis() + 15000;
                   nextPrintVertices = verticesWritten + 250000;
                 }
+
+                if (verticesWritten >= nextUpdateProgressVertices) {
+                  WorkerProgress.get().addVerticesStored(
+                      VERTICES_TO_UPDATE_PROGRESS);
+                  nextUpdateProgressVertices += VERTICES_TO_UPDATE_PROGRESS;
+                }
               }
               getPartitionStore().putPartition(partition);
               ++partitionIndex;
+              WorkerProgress.get().addVerticesStored(
+                  verticesWritten % VERTICES_TO_UPDATE_PROGRESS);
+              WorkerProgress.get().incrementPartitionsStored();
             }
             vertexWriter.close(getContext()); // the temp results are saved now
             return null;
@@ -1147,6 +1173,11 @@ public class BspServiceWorker<I extends WritableComparable,
     setCachedSuperstep(getSuperstep() - 1);
     saveVertices(finishedSuperstepStats.getLocalVertexCount());
     saveEdges();
+    WorkerProgress.get().finishStoring();
+    if (workerProgressWriter != null) {
+      WorkerProgress.writeToZnode(getZkExt(), myProgressPath);
+      workerProgressWriter.stop();
+    }
     getPartitionStore().shutdown();
     // All worker processes should denote they are done by adding special
     // znode.  Once the number of znodes equals the number of partitions

http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 8ec0453..828eac4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -174,6 +174,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
       // Update status every EDGES_UPDATE_PERIOD edges
       if (inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD == 0) {
         totalEdgesMeter.mark(EDGES_UPDATE_PERIOD);
+        WorkerProgress.get().addEdgesLoaded(EDGES_UPDATE_PERIOD);
         LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
             "readEdgeInputSplit: Loaded " +
                 totalEdgesMeter.count() + " edges at " +
@@ -198,6 +199,10 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
     totalEdgesFiltered.inc(inputSplitEdgesFiltered);
     totalEdgesMeter.mark(inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD);
 
+    WorkerProgress.get().addEdgesLoaded(
+        inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD);
+    WorkerProgress.get().incrementEdgeInputSplitsLoaded();
+
     return new VertexEdgeCount(0, inputSplitEdgesLoaded);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 01a6fc5..e3e04d6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -176,6 +176,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
       // Update status every VERTICES_UPDATE_PERIOD vertices
       if (inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD == 0) {
         totalVerticesMeter.mark(VERTICES_UPDATE_PERIOD);
+        WorkerProgress.get().addVerticesLoaded(VERTICES_UPDATE_PERIOD);
         totalEdgesMeter.mark(edgesSinceLastUpdate);
         inputSplitEdgesLoaded += edgesSinceLastUpdate;
         edgesSinceLastUpdate = 0;
@@ -208,6 +209,11 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
     totalVerticesFilteredCounter.inc(inputSplitVerticesFiltered);
 
     vertexReader.close();
+
+    WorkerProgress.get().addVerticesLoaded(
+        inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD);
+    WorkerProgress.get().incrementVertexInputSplitsLoaded();
+
     return new VertexEdgeCount(inputSplitVerticesLoaded,
         inputSplitEdgesLoaded + edgesSinceLastUpdate);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
new file mode 100644
index 0000000..f7de88b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * Stores information about a worker's progress that is periodically written to
+ * ZooKeeper with {@link WorkerProgressWriter}.
+ */
+@ThreadSafe
+public class WorkerProgress implements Writable {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(WorkerProgress.class);
+  /** Singleton instance for everyone to use */
+  private static final WorkerProgress INSTANCE = new WorkerProgress();
+
+  /** Superstep which worker is executing, Long.MAX_VALUE if it's output */
+  protected long currentSuperstep = -1;
+
+  /** How many vertices were loaded until now */
+  protected long verticesLoaded = 0;
+  /** How many vertex input splits were loaded until now */
+  protected int vertexInputSplitsLoaded = 0;
+  /** Whether worker finished loading vertices */
+  protected boolean loadingVerticesDone = false;
+  /** How many edges were loaded */
+  protected long edgesLoaded = 0;
+  /** How many edge input splits were loaded until now */
+  protected int edgeInputSplitsLoaded = 0;
+  /** Whether worker finished loading edges until now */
+  protected boolean loadingEdgesDone = false;
+
+  /** How many vertices are there to compute in current superstep */
+  protected long verticesToCompute = 0;
+  /** How many vertices were computed in current superstep until now */
+  protected long verticesComputed = 0;
+  /** How many partitions are there to compute in current superstep */
+  protected int partitionsToCompute = 0;
+  /** How many partitions were computed in current superstep  until now */
+  protected int partitionsComputed = 0;
+
+  /** Whether all compute supersteps are done */
+  protected boolean computationDone = false;
+
+  /** How many vertices are there to store */
+  protected long verticesToStore = 0;
+  /** How many vertices were stored until now */
+  protected long verticesStored = 0;
+  /** How many partitions are there to store */
+  protected int partitionsToStore = 0;
+  /** How many partitions were stored until now */
+  protected int partitionsStored = 0;
+  /** Whether worker finished storing data */
+  protected boolean storingDone = false;
+
+  /**
+   * Get singleton instance of WorkerProgress.
+   *
+   * @return WorkerProgress singleton instance
+   */
+  public static WorkerProgress get() {
+    return INSTANCE;
+  }
+
+  /**
+   * Write worker's progress to znode
+   *
+   * @param zk ZooKeeperExt
+   * @param myProgressPath Path to write the progress to
+   */
+  public static void writeToZnode(ZooKeeperExt zk, String myProgressPath) {
+    byte[] byteArray = WritableUtils.writeToByteArray(get());
+    try {
+      zk.createOrSetExt(myProgressPath,
+          byteArray,
+          ZooDefs.Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true,
+          -1);
+    } catch (KeeperException | InterruptedException e) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("writeToZnode: " + e.getClass().getName() +
+            " exception occurred", e);
+      }
+    }
+  }
+
+  public synchronized boolean isLoadingVerticesDone() {
+    return loadingVerticesDone;
+  }
+
+  public synchronized boolean isLoadingEdgesDone() {
+    return loadingEdgesDone;
+  }
+
+  public synchronized boolean isComputationDone() {
+    return computationDone;
+  }
+
+  public synchronized boolean isStoringDone() {
+    return storingDone;
+  }
+
+  /**
+   * Add number of vertices loaded
+   *
+   * @param verticesLoaded How many vertices were loaded since the last
+   *                       time this function was called
+   */
+  public synchronized void addVerticesLoaded(long verticesLoaded) {
+    this.verticesLoaded += verticesLoaded;
+  }
+
+  /**
+   * Increment number of vertex input splits which were loaded
+   */
+  public synchronized void incrementVertexInputSplitsLoaded() {
+    vertexInputSplitsLoaded++;
+  }
+
+  /**
+   * Notify this class that worker finished loading vertices
+   */
+  public synchronized void finishLoadingVertices() {
+    loadingVerticesDone = true;
+  }
+
+  /**
+   * Add number of edges loaded
+   *
+   * @param edgesLoaded How many edges were loaded since the last
+   *                    time this function was called
+   */
+  public synchronized void addEdgesLoaded(long edgesLoaded) {
+    this.edgesLoaded += edgesLoaded;
+  }
+
+  /**
+   * Increment number of edge input splits which were loaded
+   */
+  public synchronized void incrementEdgeInputSplitsLoaded() {
+    edgeInputSplitsLoaded++;
+  }
+
+  /**
+   * Notify this class that worker finished loading edges
+   */
+  public synchronized void finishLoadingEdges() {
+    loadingEdgesDone = true;
+  }
+
+  /**
+   * Notify this class that next computation superstep is starting
+   *
+   * @param superstep Superstep which is starting
+   * @param verticesToCompute How many vertices are there to compute
+   * @param partitionsToCompute How many partitions are there to compute
+   */
+  public synchronized void startSuperstep(long superstep,
+      long verticesToCompute, int partitionsToCompute) {
+    this.currentSuperstep = superstep;
+    this.verticesToCompute = verticesToCompute;
+    this.partitionsToCompute = partitionsToCompute;
+    verticesComputed = 0;
+    partitionsComputed = 0;
+  }
+
+  /**
+   * Add number of vertices computed
+   *
+   * @param verticesComputed How many vertices were computed since the last
+   *                         time this function was called
+   */
+  public synchronized void addVerticesComputed(long verticesComputed) {
+    this.verticesComputed += verticesComputed;
+  }
+
+  /**
+   * Increment number of partitions which were computed
+   */
+  public synchronized void incrementPartitionsComputed() {
+    partitionsComputed++;
+  }
+
+  /**
+   * Notify this class that worker is starting to store data
+   *
+   * @param verticesToStore How many vertices should be stored
+   * @param partitionsToStore How many partitions should be stored
+   */
+  public synchronized void startStoring(long verticesToStore,
+      int partitionsToStore) {
+    computationDone = true;
+    verticesToCompute = 0;
+    verticesComputed = 0;
+    partitionsToCompute = 0;
+    partitionsComputed = 0;
+    currentSuperstep = Long.MAX_VALUE;
+    this.verticesToStore = verticesToStore;
+    this.partitionsToStore = partitionsToStore;
+  }
+
+  /**
+   * Add number of vertices stored
+   *
+   * @param verticesStored How many vertices were stored since the last time
+   *                       this function was called
+   */
+  public synchronized void addVerticesStored(long verticesStored) {
+    this.verticesStored += verticesStored;
+  }
+
+  /**
+   * Increment number of partitions which were stored
+   */
+  public synchronized void incrementPartitionsStored() {
+    partitionsStored++;
+  }
+
+  /**
+   * Notify this class that storing data is done
+   */
+  public synchronized void finishStoring() {
+    storingDone = true;
+  }
+
+  public synchronized long getCurrentSuperstep() {
+    return currentSuperstep;
+  }
+
+  public synchronized long getVerticesLoaded() {
+    return verticesLoaded;
+  }
+
+  public synchronized int getVertexInputSplitsLoaded() {
+    return vertexInputSplitsLoaded;
+  }
+
+  public synchronized long getEdgesLoaded() {
+    return edgesLoaded;
+  }
+
+  public synchronized int getEdgeInputSplitsLoaded() {
+    return edgeInputSplitsLoaded;
+  }
+
+  public synchronized long getVerticesToCompute() {
+    return verticesToCompute;
+  }
+
+  public synchronized long getVerticesComputed() {
+    return verticesComputed;
+  }
+
+  public synchronized int getPartitionsToCompute() {
+    return partitionsToCompute;
+  }
+
+  public synchronized int getPartitionsComputed() {
+    return partitionsComputed;
+  }
+
+  public synchronized long getVerticesToStore() {
+    return verticesToStore;
+  }
+
+  public synchronized long getVerticesStored() {
+    return verticesStored;
+  }
+
+  public synchronized int getPartitionsToStore() {
+    return partitionsToStore;
+  }
+
+  public synchronized int getPartitionsStored() {
+    return partitionsStored;
+  }
+
+  public synchronized boolean isInputSuperstep() {
+    return currentSuperstep == -1;
+  }
+
+  public synchronized boolean isComputeSuperstep() {
+    return currentSuperstep >= 0 && currentSuperstep < Long.MAX_VALUE;
+  }
+
+  public synchronized boolean isOutputSuperstep() {
+    return currentSuperstep == Long.MAX_VALUE;
+  }
+
+  @Override
+  public synchronized void write(DataOutput dataOutput) throws IOException {
+    dataOutput.writeLong(currentSuperstep);
+
+    dataOutput.writeLong(verticesLoaded);
+    dataOutput.writeInt(vertexInputSplitsLoaded);
+    dataOutput.writeBoolean(loadingVerticesDone);
+    dataOutput.writeLong(edgesLoaded);
+    dataOutput.writeInt(edgeInputSplitsLoaded);
+    dataOutput.writeBoolean(loadingEdgesDone);
+
+    dataOutput.writeLong(verticesToCompute);
+    dataOutput.writeLong(verticesComputed);
+    dataOutput.writeInt(partitionsToCompute);
+    dataOutput.writeInt(partitionsComputed);
+
+    dataOutput.writeBoolean(computationDone);
+
+    dataOutput.writeLong(verticesToStore);
+    dataOutput.writeLong(verticesStored);
+    dataOutput.writeInt(partitionsToStore);
+    dataOutput.writeInt(partitionsStored);
+    dataOutput.writeBoolean(storingDone);
+  }
+
+  @Override
+  public synchronized void readFields(DataInput dataInput) throws IOException {
+    currentSuperstep = dataInput.readLong();
+
+    verticesLoaded = dataInput.readLong();
+    vertexInputSplitsLoaded = dataInput.readInt();
+    loadingVerticesDone = dataInput.readBoolean();
+    edgesLoaded = dataInput.readLong();
+    edgeInputSplitsLoaded = dataInput.readInt();
+    loadingEdgesDone = dataInput.readBoolean();
+
+    verticesToCompute = dataInput.readLong();
+    verticesComputed = dataInput.readLong();
+    partitionsToCompute = dataInput.readInt();
+    partitionsComputed = dataInput.readInt();
+
+    computationDone = dataInput.readBoolean();
+
+    verticesToStore = dataInput.readLong();
+    verticesStored = dataInput.readLong();
+    partitionsToStore = dataInput.readInt();
+    partitionsStored = dataInput.readInt();
+    storingDone = dataInput.readBoolean();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
new file mode 100644
index 0000000..f8c7571
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.log4j.Logger;
+
+/**
+ * Class which periodically writes worker's progress to zookeeper
+ */
+public class WorkerProgressWriter {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(WorkerProgressWriter.class);
+  /** How often to update worker's progress */
+  private static final int WRITE_UPDATE_PERIOD_MILLISECONDS = 10 * 1000;
+
+  /** Thread which writes worker's progress */
+  private final Thread writerThread;
+  /** Whether worker finished application */
+  private volatile boolean finished = false;
+
+  /**
+   * Constructor, starts separate thread to periodically update worker's
+   * progress
+   *
+   * @param myProgressPath Path where this worker's progress should be stored
+   * @param zk ZooKeeperExt
+   */
+  public WorkerProgressWriter(final String myProgressPath,
+      final ZooKeeperExt zk) {
+    writerThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          while (!finished) {
+            WorkerProgress.writeToZnode(zk, myProgressPath);
+            double factor = 1 + Math.random();
+            Thread.sleep((long) (WRITE_UPDATE_PERIOD_MILLISECONDS * factor));
+          }
+        } catch (InterruptedException e) {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("run: WorkerProgressWriter interrupted", e);
+          }
+        }
+      }
+    });
+    writerThread.start();
+  }
+
+  /**
+   * Stop the thread which writes worker's progress
+   */
+  public void stop() {
+    finished = true;
+    writerThread.interrupt();
+  }
+}