You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC

svn commit: r1245205 [7/18] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/examples...

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Thu Feb 16 22:12:31 2012
@@ -53,972 +53,1045 @@ import java.util.TreeMap;
 
 /**
  * Zookeeper-based implementation of {@link CentralizedService}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
-public abstract class BspService <
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable,
-        M extends Writable>
-        implements Watcher, CentralizedService<I, V, E, M> {
-    /** Private ZooKeeper instance that implements the service */
-    private final ZooKeeperExt zk;
-    /** Has the Connection occurred? */
-    private final BspEvent connectedEvent = new PredicateLock();
-    /** Has worker registration changed (either healthy or unhealthy) */
-    private final BspEvent workerHealthRegistrationChanged =
-        new PredicateLock();
-    /** InputSplits are ready for consumption by workers */
-    private final BspEvent inputSplitsAllReadyChanged =
-        new PredicateLock();
-    /** InputSplit reservation or finished notification and synchronization */
-    private final BspEvent inputSplitsStateChanged =
-        new PredicateLock();
-    /** InputSplits are done being processed by workers */
-    private final BspEvent inputSplitsAllDoneChanged =
-        new PredicateLock();
-    /** InputSplit done by a worker finished notification and synchronization */
-    private final BspEvent inputSplitsDoneStateChanged =
-        new PredicateLock();
-    /** Are the partition assignments to workers ready? */
-    private final BspEvent partitionAssignmentsReadyChanged =
-        new PredicateLock();
-
-    /** Application attempt changed */
-    private final BspEvent applicationAttemptChanged =
-        new PredicateLock();
-    /** Superstep finished synchronization */
-    private final BspEvent superstepFinished =
-        new PredicateLock();
-    /** Master election changed for any waited on attempt */
-    private final BspEvent masterElectionChildrenChanged =
-        new PredicateLock();
-    /** Cleaned up directory children changed*/
-    private final BspEvent cleanedUpChildrenChanged =
-        new PredicateLock();
-    /** Registered list of BspEvents */
-    private final List<BspEvent> registeredBspEvents =
-        new ArrayList<BspEvent>();
-    /** Configuration of the job*/
-    private final Configuration conf;
-    /** Job context (mainly for progress) */
-    private final Mapper<?, ?, ?, ?>.Context context;
-    /** Cached superstep (from ZooKeeper) */
-    private long cachedSuperstep = UNSET_SUPERSTEP;
-    /** Restarted from a checkpoint (manual or automatic) */
-    private long restartedSuperstep = UNSET_SUPERSTEP;
-    /** Cached application attempt (from ZooKeeper) */
-    private long cachedApplicationAttempt = UNSET_APPLICATION_ATTEMPT;
-    /** Job id, to ensure uniqueness */
-    private final String jobId;
-    /** Task partition, to ensure uniqueness */
-    private final int taskPartition;
-    /** My hostname */
-    private final String hostname;
-    /** Combination of hostname '_' partition (unique id) */
-    private final String hostnamePartitionId;
-    /** Graph partitioner */
-    private final GraphPartitionerFactory<I, V, E, M> graphPartitionerFactory;
-    /** Mapper that will do the graph computation */
-    private final GraphMapper<I, V, E, M> graphMapper;
-    /** Class logger */
-    private static final Logger LOG = Logger.getLogger(BspService.class);
-    /** File system */
-    private final FileSystem fs;
-    /** Checkpoint frequency */
-    private int checkpointFrequency = -1;
-    /** Map of aggregators */
-    private Map<String, Aggregator<Writable>> aggregatorMap =
-        new TreeMap<String, Aggregator<Writable>>();
-
-    /** Unset superstep */
-    public static final long UNSET_SUPERSTEP = Long.MIN_VALUE;
-    /** Input superstep (superstep when loading the vertices happens) */
-    public static final long INPUT_SUPERSTEP = -1;
-    /** Unset application attempt */
-    public static final long UNSET_APPLICATION_ATTEMPT = Long.MIN_VALUE;
-
-    private static final String BASE_DIR = "/_hadoopBsp";
-    public static final String MASTER_JOB_STATE_NODE = "/_masterJobState";
-    public static final String INPUT_SPLIT_DIR = "/_inputSplitDir";
-    public static final String INPUT_SPLIT_DONE_DIR = "/_inputSplitDoneDir";
-    public static final String INPUT_SPLIT_RESERVED_NODE =
-        "/_inputSplitReserved";
-    public static final String INPUT_SPLIT_FINISHED_NODE =
-        "/_inputSplitFinished";
-    public static final String INPUT_SPLITS_ALL_READY_NODE =
-        "/_inputSplitsAllReady";
-    public static final String INPUT_SPLITS_ALL_DONE_NODE =
-        "/_inputSplitsAllDone";
-    public static final String APPLICATION_ATTEMPTS_DIR =
-        "/_applicationAttemptsDir";
-    public static final String MASTER_ELECTION_DIR = "/_masterElectionDir";
-    public static final String SUPERSTEP_DIR = "/_superstepDir";
-    public static final String MERGED_AGGREGATOR_DIR =
-        "/_mergedAggregatorDir";
-    public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir";
-    public static final String WORKER_UNHEALTHY_DIR = "/_workerUnhealthyDir";
-    public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir";
-    public static final String PARTITION_ASSIGNMENTS_DIR =
-        "/_partitionAssignments";
-    public static final String PARTITION_EXCHANGE_DIR =
-        "/_partitionExchangeDir";
-    public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished";
-    public static final String CLEANED_UP_DIR = "/_cleanedUpDir";
-
-    public static final String JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY =
-        "_aggregatorValueArrayKey";
-    public static final String JSONOBJ_PARTITION_STATS_KEY =
-            "_partitionStatsKey";
-    public static final String JSONOBJ_FINISHED_VERTICES_KEY =
-        "_verticesFinishedKey";
-    public static final String JSONOBJ_NUM_VERTICES_KEY = "_numVerticesKey";
-    public static final String JSONOBJ_NUM_EDGES_KEY = "_numEdgesKey";
-    public static final String JSONOBJ_NUM_MESSAGES_KEY = "_numMsgsKey";
-    public static final String JSONOBJ_HOSTNAME_ID_KEY = "_hostnameIdKey";
-    public static final String JSONOBJ_MAX_VERTEX_INDEX_KEY =
-        "_maxVertexIndexKey";
-    public static final String JSONOBJ_HOSTNAME_KEY = "_hostnameKey";
-    public static final String JSONOBJ_PORT_KEY = "_portKey";
-    public static final String JSONOBJ_CHECKPOINT_FILE_PREFIX_KEY =
-        "_checkpointFilePrefixKey";
-    public static final String JSONOBJ_PREVIOUS_HOSTNAME_KEY =
-        "_previousHostnameKey";
-    public static final String JSONOBJ_PREVIOUS_PORT_KEY = "_previousPortKey";
-    public static final String JSONOBJ_STATE_KEY = "_stateKey";
-    public static final String JSONOBJ_APPLICATION_ATTEMPT_KEY =
-        "_applicationAttemptKey";
-    public static final String JSONOBJ_SUPERSTEP_KEY =
-        "_superstepKey";
-    public static final String AGGREGATOR_NAME_KEY = "_aggregatorNameKey";
-    public static final String AGGREGATOR_CLASS_NAME_KEY =
-        "_aggregatorClassNameKey";
-    public static final String AGGREGATOR_VALUE_KEY = "_aggregatorValueKey";
-
-    public static final String WORKER_SUFFIX = "_worker";
-    public static final String MASTER_SUFFIX = "_master";
-
-    /** Path to the job's root */
-    public final String BASE_PATH;
-    /** Path to the job state determined by the master (informative only) */
-    public final String MASTER_JOB_STATE_PATH;
-    /** Path to the input splits written by the master */
-    public final String INPUT_SPLIT_PATH;
-    /** Path to the input splits all ready to be processed by workers */
-    public final String INPUT_SPLITS_ALL_READY_PATH;
-    /** Path to the input splits done */
-    public final String INPUT_SPLIT_DONE_PATH;
-    /** Path to the input splits all done to notify the workers to proceed */
-    public final String INPUT_SPLITS_ALL_DONE_PATH;
-    /** Path to the application attempts) */
-    public final String APPLICATION_ATTEMPTS_PATH;
-    /** Path to the cleaned up notifications */
-    public final String CLEANED_UP_PATH;
-    /** Path to the checkpoint's root (including job id) */
-    public final String CHECKPOINT_BASE_PATH;
-    /** Path to the master election path */
-    public final String MASTER_ELECTION_PATH;
-
-    /**
-     * Get the superstep from a ZooKeeper path
-     *
-     * @param path Path to parse for the superstep
-     */
-    public static long getSuperstepFromPath(String path) {
-        int foundSuperstepStart = path.indexOf(SUPERSTEP_DIR);
-        if (foundSuperstepStart == -1) {
-            throw new IllegalArgumentException(
-                "getSuperstepFromPath: Cannot find " + SUPERSTEP_DIR +
-                "from " + path);
-        }
-        foundSuperstepStart += SUPERSTEP_DIR.length() + 1;
-        int endIndex = foundSuperstepStart +
-            path.substring(foundSuperstepStart).indexOf("/");
-        if (endIndex == -1) {
-            throw new IllegalArgumentException(
-                "getSuperstepFromPath: Cannot find end of superstep from " +
-                path);
-        }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("getSuperstepFromPath: Got path=" + path +
-                      ", start=" + foundSuperstepStart + ", end=" + endIndex);
-        }
-        return Long.parseLong(path.substring(foundSuperstepStart, endIndex));
-    }
-
-    /**
-     * Get the hostname and id from a "healthy" worker path
-     */
-    public static String getHealthyHostnameIdFromPath(String path) {
-        int foundWorkerHealthyStart = path.indexOf(WORKER_HEALTHY_DIR);
-        if (foundWorkerHealthyStart == -1) {
-            throw new IllegalArgumentException(
-                "getHealthyHostnameidFromPath: Couldn't find " +
-                WORKER_HEALTHY_DIR + " from " + path);
-        }
-        foundWorkerHealthyStart += WORKER_HEALTHY_DIR.length();
-        return path.substring(foundWorkerHealthyStart);
-    }
-
-    /**
-     * Generate the base superstep directory path for a given application
-     * attempt
-     *
-     * @param attempt application attempt number
-     * @return directory path based on the an attempt
-     */
-    final public String getSuperstepPath(long attempt) {
-        return APPLICATION_ATTEMPTS_PATH + "/" + attempt + SUPERSTEP_DIR;
-    }
-
-    /**
-     * Generate the worker information "healthy" directory path for a
-     * superstep
-     *
-     * @param attempt application attempt number
-     * @param superstep superstep to use
-     * @return directory path based on the a superstep
-     */
-    final public String getWorkerInfoHealthyPath(long attempt,
-                                                 long superstep) {
-        return APPLICATION_ATTEMPTS_PATH + "/" + attempt +
-            SUPERSTEP_DIR + "/" + superstep + WORKER_HEALTHY_DIR;
-    }
-
-    /**
-     * Generate the worker information "unhealthy" directory path for a
-     * superstep
-     *
-     * @param attempt application attempt number
-     * @param superstep superstep to use
-     * @return directory path based on the a superstep
-     */
-    final public String getWorkerInfoUnhealthyPath(long attempt,
-                                                   long superstep) {
-        return APPLICATION_ATTEMPTS_PATH + "/" + attempt +
-            SUPERSTEP_DIR + "/" + superstep + WORKER_UNHEALTHY_DIR;
-    }
-
-    /**
-     * Generate the worker "finished" directory path for a
-     * superstep
-     *
-     * @param attempt application attempt number
-     * @param superstep superstep to use
-     * @return directory path based on the a superstep
-     */
-    final public String getWorkerFinishedPath(long attempt, long superstep) {
-        return APPLICATION_ATTEMPTS_PATH + "/" + attempt +
-            SUPERSTEP_DIR + "/" + superstep + WORKER_FINISHED_DIR;
-    }
-
-    /**
-     * Generate the "partiton assignments" directory path for a superstep
-     *
-     * @param attempt application attempt number
-     * @param superstep superstep to use
-     * @return directory path based on the a superstep
-     */
-    final public String getPartitionAssignmentsPath(long attempt,
-                                                    long superstep) {
-        return APPLICATION_ATTEMPTS_PATH + "/" + attempt +
-            SUPERSTEP_DIR + "/" + superstep + PARTITION_ASSIGNMENTS_DIR;
-    }
-
-    /**
-     * Generate the "partition exchange" directory path for a superstep
-     *
-     * @param attempt application attempt number
-     * @param superstep superstep to use
-     * @return directory path based on the a superstep
-     */
-    final public String getPartitionExchangePath(long attempt,
-                                                 long superstep) {
-        return APPLICATION_ATTEMPTS_PATH + "/" + attempt +
-            SUPERSTEP_DIR + "/" + superstep + PARTITION_EXCHANGE_DIR;
-    }
-
-    final public String getPartitionExchangeWorkerPath(long attempt,
-                                                       long superstep,
-                                                       WorkerInfo workerInfo) {
-        return getPartitionExchangePath(attempt, superstep) +
-            "/" + workerInfo.getHostnameId();
-    }
-
-    /**
-     * Generate the merged aggregator directory path for a superstep
-     *
-     * @param attempt application attempt number
-     * @param superstep superstep to use
-     * @return directory path based on the a superstep
-     */
-    final public String getMergedAggregatorPath(long attempt, long superstep) {
-        return APPLICATION_ATTEMPTS_PATH + "/" + attempt +
-            SUPERSTEP_DIR + "/" + superstep + MERGED_AGGREGATOR_DIR;
-    }
-
-    /**
-     * Generate the "superstep finished" directory path for a superstep
-     *
-     * @param attempt application attempt number
-     * @param superstep superstep to use
-     * @return directory path based on the a superstep
-     */
-    final public String getSuperstepFinishedPath(long attempt, long superstep) {
-        return APPLICATION_ATTEMPTS_PATH + "/" + attempt +
-            SUPERSTEP_DIR + "/" + superstep + SUPERSTEP_FINISHED_NODE;
-    }
-
-    /**
-     * Generate the base superstep directory path for a given application
-     * attempt
-     *
-     * @param superstep Superstep to use
-     * @return Directory path based on the a superstep
-     */
-    final public String getCheckpointBasePath(long superstep) {
-        return CHECKPOINT_BASE_PATH + "/" + superstep;
-    }
-
-    /** If at the end of a checkpoint file, indicates metadata */
-    public final String CHECKPOINT_METADATA_POSTFIX = ".metadata";
-
-    /**
-     * If at the end of a checkpoint file, indicates vertices, edges,
-     * messages, etc.
-     */
-    public final String CHECKPOINT_VERTICES_POSTFIX = ".vertices";
-
-    /**
-     * If at the end of a checkpoint file, indicates metadata and data is valid
-     * for the same filenames without .valid
-     */
-    public final String CHECKPOINT_VALID_POSTFIX = ".valid";
-
-    /**
-     * If at the end of a checkpoint file, indicates the stitched checkpoint
-     * file prefixes.  A checkpoint is not valid if this file does not exist.
-     */
-    public static final String CHECKPOINT_FINALIZED_POSTFIX = ".finalized";
-
-    /**
-     * Get the checkpoint from a finalized checkpoint path
-     *
-     * @param finalizedPath Path of the finalized checkpoint
-     * @return Superstep referring to a checkpoint of the finalized path
-     */
-    public static long getCheckpoint(Path finalizedPath) {
-        if (!finalizedPath.getName().endsWith(CHECKPOINT_FINALIZED_POSTFIX)) {
-            throw new InvalidParameterException(
-                "getCheckpoint: " + finalizedPath + "Doesn't end in " +
-                CHECKPOINT_FINALIZED_POSTFIX);
-        }
-        String checkpointString =
-            finalizedPath.getName().replace(CHECKPOINT_FINALIZED_POSTFIX, "");
-        return Long.parseLong(checkpointString);
-    }
-
-    /**
-     * Get the ZooKeeperExt instance.
-     *
-     * @return ZooKeeperExt instance.
-     */
-    final public ZooKeeperExt getZkExt() {
-        return zk;
-    }
-
-    @Override
-    final public long getRestartedSuperstep() {
-        return restartedSuperstep;
-    }
-
-    /**
-     * Set the restarted superstep
-     *
-     * @param superstep Set the manually restarted superstep
-     */
-    final public void setRestartedSuperstep(long superstep) {
-        if (superstep < INPUT_SUPERSTEP) {
-            throw new IllegalArgumentException(
-                "setRestartedSuperstep: Bad argument " + superstep);
-        }
-        restartedSuperstep = superstep;
-    }
-
-    /**
-     * Should checkpoint on this superstep?  If checkpointing, always
-     * checkpoint the first user superstep.  If restarting, the first
-     * checkpoint is after the frequency has been met.
-     *
-     * @param superstep Decide if checkpointing no this superstep
-     * @return True if this superstep should be checkpointed, false otherwise
-     */
-    final public boolean checkpointFrequencyMet(long superstep) {
-        if (checkpointFrequency == 0) {
-            return false;
-        }
-        long firstCheckpoint = INPUT_SUPERSTEP + 1;
-        if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
-            firstCheckpoint = getRestartedSuperstep() + checkpointFrequency;
-        }
-        if (superstep < firstCheckpoint) {
-            return false;
-        } else if (((superstep - firstCheckpoint) % checkpointFrequency) == 0) {
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    /**
-     * Get the file system
-     *
-     * @return file system
-     */
-    final public FileSystem getFs() {
-        return fs;
-    }
-
-    final public Configuration getConfiguration() {
-        return conf;
-    }
-
-    final public Mapper<?, ?, ?, ?>.Context getContext() {
-        return context;
-    }
-
-    final public String getHostname() {
-        return hostname;
-    }
-
-    final public String getHostnamePartitionId() {
-        return hostnamePartitionId;
-    }
-
-    final public int getTaskPartition() {
-        return taskPartition;
-    }
-
-    final public GraphMapper<I, V, E, M> getGraphMapper() {
-        return graphMapper;
-    }
-
-    final public BspEvent getWorkerHealthRegistrationChangedEvent() {
-        return workerHealthRegistrationChanged;
-    }
-
-    final public BspEvent getInputSplitsAllReadyEvent() {
-        return inputSplitsAllReadyChanged;
-    }
-
-    final public BspEvent getInputSplitsStateChangedEvent() {
-        return inputSplitsStateChanged;
-    }
-
-    final public BspEvent getInputSplitsAllDoneEvent() {
-        return inputSplitsAllDoneChanged;
-    }
-
-    final public BspEvent getInputSplitsDoneStateChangedEvent() {
-        return inputSplitsDoneStateChanged;
-    }
-
-    final public BspEvent getPartitionAssignmentsReadyChangedEvent() {
-        return partitionAssignmentsReadyChanged;
-    }
-
-
-    final public BspEvent getApplicationAttemptChangedEvent() {
-        return applicationAttemptChanged;
-    }
-
-    final public BspEvent getSuperstepFinishedEvent() {
-        return superstepFinished;
-    }
-
-
-    final public BspEvent getMasterElectionChildrenChangedEvent() {
-        return masterElectionChildrenChanged;
-    }
-
-    final public BspEvent getCleanedUpChildrenChangedEvent() {
-        return cleanedUpChildrenChanged;
-    }
-
-    /**
-     * Get the master commanded job state as a JSONObject.  Also sets the
-     * watches to see if the master commanded job state changes.
-     *
-     * @return Last job state or null if none
-     */
-    final public JSONObject getJobState() {
-        try {
-            getZkExt().createExt(MASTER_JOB_STATE_PATH,
-                                 null,
-                                 Ids.OPEN_ACL_UNSAFE,
-                                 CreateMode.PERSISTENT,
-                                 true);
-        } catch (KeeperException.NodeExistsException e) {
-            LOG.info("getJobState: Job state already exists (" +
-                     MASTER_JOB_STATE_PATH + ")");
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        String jobState = null;
-        try {
-            List<String> childList =
-                getZkExt().getChildrenExt(
-                    MASTER_JOB_STATE_PATH, true, true, true);
-            if (childList.isEmpty()) {
-                return null;
-            }
-            jobState =
-                new String(getZkExt().getData(
-                    childList.get(childList.size() - 1), true, null));
-        } catch (KeeperException.NoNodeException e) {
-            LOG.info("getJobState: Job state path is empty! - " +
-                     MASTER_JOB_STATE_PATH);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        try {
-            return new JSONObject(jobState);
-        } catch (JSONException e) {
-            throw new RuntimeException(
-                "getJobState: Failed to parse job state " + jobState);
-        }
-    }
-
-    public BspService(String serverPortList,
-                      int sessionMsecTimeout,
-                      Mapper<?, ?, ?, ?>.Context context,
-                      GraphMapper<I, V, E, M> graphMapper) {
-        registerBspEvent(connectedEvent);
-        registerBspEvent(workerHealthRegistrationChanged);
-        registerBspEvent(inputSplitsAllReadyChanged);
-        registerBspEvent(inputSplitsStateChanged);
-        registerBspEvent(partitionAssignmentsReadyChanged);
-        registerBspEvent(applicationAttemptChanged);
-        registerBspEvent(superstepFinished);
-        registerBspEvent(masterElectionChildrenChanged);
-        registerBspEvent(cleanedUpChildrenChanged);
-
-        this.context = context;
-        this.graphMapper = graphMapper;
-        this.conf = context.getConfiguration();
-        this.jobId = conf.get("mapred.job.id", "Unknown Job");
-        this.taskPartition = conf.getInt("mapred.task.partition", -1);
-        this.restartedSuperstep = conf.getLong(GiraphJob.RESTART_SUPERSTEP,
-                                               UNSET_SUPERSTEP);
-        this.cachedSuperstep = restartedSuperstep;
-        if ((restartedSuperstep != UNSET_SUPERSTEP) &&
-                (restartedSuperstep < 0)) {
-            throw new IllegalArgumentException(
-                "BspService: Invalid superstep to restart - " +
-                restartedSuperstep);
-        }
-        try {
-            this.hostname = InetAddress.getLocalHost().getHostName();
-        } catch (UnknownHostException e) {
-            throw new RuntimeException(e);
-        }
-        this.hostnamePartitionId = hostname + "_" + getTaskPartition();
-        this.graphPartitionerFactory =
-            BspUtils.<I, V, E, M>createGraphPartitioner(conf);
-
-        this.checkpointFrequency =
-            conf.getInt(GiraphJob.CHECKPOINT_FREQUENCY,
-                          GiraphJob.CHECKPOINT_FREQUENCY_DEFAULT);
-
-        BASE_PATH = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
-        MASTER_JOB_STATE_PATH = BASE_PATH + MASTER_JOB_STATE_NODE;
-        INPUT_SPLIT_PATH = BASE_PATH + INPUT_SPLIT_DIR;
-        INPUT_SPLITS_ALL_READY_PATH = BASE_PATH + INPUT_SPLITS_ALL_READY_NODE;
-        INPUT_SPLIT_DONE_PATH = BASE_PATH + INPUT_SPLIT_DONE_DIR;
-        INPUT_SPLITS_ALL_DONE_PATH = BASE_PATH + INPUT_SPLITS_ALL_DONE_NODE;
-        APPLICATION_ATTEMPTS_PATH = BASE_PATH + APPLICATION_ATTEMPTS_DIR;
-        CLEANED_UP_PATH = BASE_PATH + CLEANED_UP_DIR;
-        CHECKPOINT_BASE_PATH =
-            getConfiguration().get(
-                GiraphJob.CHECKPOINT_DIRECTORY,
-                GiraphJob.CHECKPOINT_DIRECTORY_DEFAULT + "/" + getJobId());
-        MASTER_ELECTION_PATH = BASE_PATH + MASTER_ELECTION_DIR;
-        if (LOG.isInfoEnabled()) {
-            LOG.info("BspService: Connecting to ZooKeeper with job " + jobId +
-                     ", " + getTaskPartition() + " on " + serverPortList);
-        }
-        try {
-            this.zk = new ZooKeeperExt(serverPortList, sessionMsecTimeout, this);
-            connectedEvent.waitForever();
-            this.fs = FileSystem.get(getConfiguration());
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Get the job id
-     *
-     * @return job id
-     */
-    final public String getJobId() {
-        return jobId;
-    }
-
-    /**
-     * Get the latest application attempt and cache it.
-     *
-     * @return the latest application attempt
-     */
-    final public long getApplicationAttempt() {
-        if (cachedApplicationAttempt != UNSET_APPLICATION_ATTEMPT) {
-            return cachedApplicationAttempt;
-        }
-        try {
-            getZkExt().createExt(APPLICATION_ATTEMPTS_PATH,
-                                 null,
-                                 Ids.OPEN_ACL_UNSAFE,
-                                 CreateMode.PERSISTENT,
-                                 true);
-        } catch (KeeperException.NodeExistsException e) {
-            LOG.info("getApplicationAttempt: Node " +
-                     APPLICATION_ATTEMPTS_PATH + " already exists!");
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        try {
-            List<String> attemptList =
-                getZkExt().getChildrenExt(
-                    APPLICATION_ATTEMPTS_PATH, true, false, false);
-            if (attemptList.isEmpty()) {
-                cachedApplicationAttempt = 0;
-            }
-            else {
-                cachedApplicationAttempt =
-                    Long.parseLong(Collections.max(attemptList));
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-        return cachedApplicationAttempt;
-    }
-
-    /**
-     * Get the latest superstep and cache it.
-     *
-     * @return the latest superstep
-     * @throws InterruptedException
-     * @throws KeeperException
-     */
-    final public long getSuperstep() {
-        if (cachedSuperstep != UNSET_SUPERSTEP) {
-            return cachedSuperstep;
-        }
-        String superstepPath = getSuperstepPath(getApplicationAttempt());
-        try {
-            getZkExt().createExt(superstepPath,
-                                 null,
-                                 Ids.OPEN_ACL_UNSAFE,
-                                 CreateMode.PERSISTENT,
-                                 true);
-        } catch (KeeperException.NodeExistsException e) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("getApplicationAttempt: Node " +
-                         APPLICATION_ATTEMPTS_PATH + " already exists!");
-            }
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-                "getSuperstep: KeeperException", e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "getSuperstep: InterruptedException", e);
-        }
-
-        List<String> superstepList;
-        try {
-            superstepList =
-                getZkExt().getChildrenExt(superstepPath, true, false, false);
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-                "getSuperstep: KeeperException", e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "getSuperstep: InterruptedException", e);
-        }
-        if (superstepList.isEmpty()) {
-            cachedSuperstep = INPUT_SUPERSTEP;
-        }
-        else {
-            cachedSuperstep =
-                Long.parseLong(Collections.max(superstepList));
-        }
-
-        return cachedSuperstep;
-    }
-
-    /**
-     * Increment the cached superstep.  Shouldn't be the initial value anymore.
-     */
-    final public void incrCachedSuperstep() {
-        if (cachedSuperstep == UNSET_SUPERSTEP) {
-            throw new IllegalStateException(
-                "incrSuperstep: Invalid unset cached superstep " +
-                UNSET_SUPERSTEP);
-        }
-        ++cachedSuperstep;
-    }
-
-    /**
-     * Set the cached superstep (should only be used for loading checkpoints
-     * or recovering from failure).
-     *
-     * @param superstep will be used as the next superstep iteration
-     */
-    final public void setCachedSuperstep(long superstep) {
-        cachedSuperstep = superstep;
-    }
-
-    /**
-     * Set the cached application attempt (should only be used for restart from
-     * failure by the master)
-     *
-     * @param applicationAttempt Will denote the new application attempt
-     */
-    final public void setApplicationAttempt(long applicationAttempt) {
-        cachedApplicationAttempt = applicationAttempt;
-        String superstepPath = getSuperstepPath(cachedApplicationAttempt);
-        try {
-            getZkExt().createExt(superstepPath,
-                                 null,
-                                 Ids.OPEN_ACL_UNSAFE,
-                                 CreateMode.PERSISTENT,
-                                 true);
-        } catch (KeeperException.NodeExistsException e) {
-            throw new IllegalArgumentException(
-                "setApplicationAttempt: Attempt already exists! - " +
-                superstepPath, e);
-        } catch (KeeperException e) {
-            throw new RuntimeException(
-                "setApplicationAttempt: KeeperException - " +
-                superstepPath, e);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(
-                "setApplicationAttempt: InterruptedException - " +
-                superstepPath, e);
-        }
-    }
-
-    /**
-     * Register an aggregator with name.
-     *
-     * @param name Name of the aggregator
-     * @param aggregatorClass Class of the aggregator
-     * @return Aggregator
-     * @throws IllegalAccessException
-     * @throws InstantiationException
-     */
-    public final <A extends Writable> Aggregator<A> registerAggregator(
-            String name,
-            Class<? extends Aggregator<A>> aggregatorClass)
-            throws InstantiationException, IllegalAccessException {
-        if (aggregatorMap.get(name) != null) {
-            return null;
-        }
-        Aggregator<A> aggregator =
-            (Aggregator<A>) aggregatorClass.newInstance();
-        @SuppressWarnings("unchecked")
-        Aggregator<Writable> writableAggregator =
-            (Aggregator<Writable>) aggregator;
-        aggregatorMap.put(name, writableAggregator);
+public abstract class BspService<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements Watcher, CentralizedService<I, V, E, M> {
+  /** Unset superstep */
+  public static final long UNSET_SUPERSTEP = Long.MIN_VALUE;
+  /** Input superstep (superstep when loading the vertices happens) */
+  public static final long INPUT_SUPERSTEP = -1;
+  /** Unset application attempt */
+  public static final long UNSET_APPLICATION_ATTEMPT = Long.MIN_VALUE;
+  /** Base ZooKeeper directory */
+  public static final String BASE_DIR = "/_hadoopBsp";
+  /** Master job state znode above base dir */
+  public static final String MASTER_JOB_STATE_NODE = "/_masterJobState";
+  /** Input split directory about base dir */
+  public static final String INPUT_SPLIT_DIR = "/_inputSplitDir";
+  /** Input split done directory about base dir */
+  public static final String INPUT_SPLIT_DONE_DIR = "/_inputSplitDoneDir";
+  /** Denotes a reserved input split */
+  public static final String INPUT_SPLIT_RESERVED_NODE =
+      "/_inputSplitReserved";
+  /** Denotes a finished input split */
+  public static final String INPUT_SPLIT_FINISHED_NODE =
+      "/_inputSplitFinished";
+  /** Denotes that all the input splits are are ready for consumption */
+  public static final String INPUT_SPLITS_ALL_READY_NODE =
+      "/_inputSplitsAllReady";
+  /** Denotes that all the input splits are done. */
+  public static final String INPUT_SPLITS_ALL_DONE_NODE =
+      "/_inputSplitsAllDone";
+  /** Directory of attempts of this application */
+  public static final String APPLICATION_ATTEMPTS_DIR =
+      "/_applicationAttemptsDir";
+  /** Where the master election happens */
+  public static final String MASTER_ELECTION_DIR = "/_masterElectionDir";
+  /** Superstep scope */
+  public static final String SUPERSTEP_DIR = "/_superstepDir";
+  /** Where the merged aggregators are located */
+  public static final String MERGED_AGGREGATOR_DIR =
+      "/_mergedAggregatorDir";
+  /** Healthy workers register here. */
+  public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir";
+  /** Unhealthy workers register here. */
+  public static final String WORKER_UNHEALTHY_DIR = "/_workerUnhealthyDir";
+  /** Finished workers notify here */
+  public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir";
+  /** Where the partition assignments are set */
+  public static final String PARTITION_ASSIGNMENTS_DIR =
+      "/_partitionAssignments";
+  /** Helps coordinate the partition exchnages */
+  public static final String PARTITION_EXCHANGE_DIR =
+      "/_partitionExchangeDir";
+  /** Denotes that the superstep is done */
+  public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished";
+  /** Denotes which workers have been cleaned up */
+  public static final String CLEANED_UP_DIR = "/_cleanedUpDir";
+  /** JSON aggregator value array key */
+  public static final String JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY =
+      "_aggregatorValueArrayKey";
+  /** JSON partition stats key */
+  public static final String JSONOBJ_PARTITION_STATS_KEY =
+      "_partitionStatsKey";
+  /** JSON finished vertices key */
+  public static final String JSONOBJ_FINISHED_VERTICES_KEY =
+      "_verticesFinishedKey";
+  /** JSON vertex count key */
+  public static final String JSONOBJ_NUM_VERTICES_KEY = "_numVerticesKey";
+  /** JSON edge count key */
+  public static final String JSONOBJ_NUM_EDGES_KEY = "_numEdgesKey";
+  /** JSON message count key */
+  public static final String JSONOBJ_NUM_MESSAGES_KEY = "_numMsgsKey";
+  /** JSON hostname id key */
+  public static final String JSONOBJ_HOSTNAME_ID_KEY = "_hostnameIdKey";
+  /** JSON max vertex index key */
+  public static final String JSONOBJ_MAX_VERTEX_INDEX_KEY =
+      "_maxVertexIndexKey";
+  /** JSON hostname key */
+  public static final String JSONOBJ_HOSTNAME_KEY = "_hostnameKey";
+  /** JSON port key */
+  public static final String JSONOBJ_PORT_KEY = "_portKey";
+  /** JSON checkpoint file prefix key */
+  public static final String JSONOBJ_CHECKPOINT_FILE_PREFIX_KEY =
+      "_checkpointFilePrefixKey";
+  /** JSON previous hostname key */
+  public static final String JSONOBJ_PREVIOUS_HOSTNAME_KEY =
+      "_previousHostnameKey";
+  /** JSON previous port key */
+  public static final String JSONOBJ_PREVIOUS_PORT_KEY = "_previousPortKey";
+  /** JSON state key */
+  public static final String JSONOBJ_STATE_KEY = "_stateKey";
+  /** JSON application attempt key */
+  public static final String JSONOBJ_APPLICATION_ATTEMPT_KEY =
+      "_applicationAttemptKey";
+  /** JSON superstep key */
+  public static final String JSONOBJ_SUPERSTEP_KEY =
+      "_superstepKey";
+  /** Aggregator name key */
+  public static final String AGGREGATOR_NAME_KEY = "_aggregatorNameKey";
+  /** Aggregator class name key */
+  public static final String AGGREGATOR_CLASS_NAME_KEY =
+      "_aggregatorClassNameKey";
+  /** Aggregator value key */
+  public static final String AGGREGATOR_VALUE_KEY = "_aggregatorValueKey";
+  /** Suffix denotes a worker */
+  public static final String WORKER_SUFFIX = "_worker";
+  /** Suffix denotes a master */
+  public static final String MASTER_SUFFIX = "_master";
+  /** If at the end of a checkpoint file, indicates metadata */
+  public static final String CHECKPOINT_METADATA_POSTFIX = ".metadata";
+  /**
+   * If at the end of a checkpoint file, indicates vertices, edges,
+   * messages, etc.
+   */
+  public static final String CHECKPOINT_VERTICES_POSTFIX = ".vertices";
+  /**
+   * If at the end of a checkpoint file, indicates metadata and data is valid
+   * for the same filenames without .valid
+   */
+  public static final String CHECKPOINT_VALID_POSTFIX = ".valid";
+  /**
+   * If at the end of a checkpoint file, indicates the stitched checkpoint
+   * file prefixes.  A checkpoint is not valid if this file does not exist.
+   */
+  public static final String CHECKPOINT_FINALIZED_POSTFIX = ".finalized";
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(BspService.class);
+  /** Path to the job's root */
+  protected final String basePath;
+  /** Path to the job state determined by the master (informative only) */
+  protected final String masterJobStatePath;
+  /** Path to the input splits written by the master */
+  protected final String inputSplitsPath;
+  /** Path to the input splits all ready to be processed by workers */
+  protected final String inputSplitsAllReadyPath;
+  /** Path to the input splits done */
+  protected final String inputSplitsDonePath;
+  /** Path to the input splits all done to notify the workers to proceed */
+  protected final String inputSplitsAllDonePath;
+  /** Path to the application attempts) */
+  protected final String applicationAttemptsPath;
+  /** Path to the cleaned up notifications */
+  protected final String cleanedUpPath;
+  /** Path to the checkpoint's root (including job id) */
+  protected final String checkpointBasePath;
+  /** Path to the master election path */
+  protected final String masterElectionPath;
+  /** Private ZooKeeper instance that implements the service */
+  private final ZooKeeperExt zk;
+  /** Has the Connection occurred? */
+  private final BspEvent connectedEvent = new PredicateLock();
+  /** Has worker registration changed (either healthy or unhealthy) */
+  private final BspEvent workerHealthRegistrationChanged =
+      new PredicateLock();
+  /** InputSplits are ready for consumption by workers */
+  private final BspEvent inputSplitsAllReadyChanged =
+      new PredicateLock();
+  /** InputSplit reservation or finished notification and synchronization */
+  private final BspEvent inputSplitsStateChanged =
+      new PredicateLock();
+  /** InputSplits are done being processed by workers */
+  private final BspEvent inputSplitsAllDoneChanged =
+      new PredicateLock();
+  /** InputSplit done by a worker finished notification and synchronization */
+  private final BspEvent inputSplitsDoneStateChanged =
+      new PredicateLock();
+  /** Are the partition assignments to workers ready? */
+  private final BspEvent partitionAssignmentsReadyChanged =
+      new PredicateLock();
+  /** Application attempt changed */
+  private final BspEvent applicationAttemptChanged =
+      new PredicateLock();
+  /** Superstep finished synchronization */
+  private final BspEvent superstepFinished =
+      new PredicateLock();
+  /** Master election changed for any waited on attempt */
+  private final BspEvent masterElectionChildrenChanged =
+      new PredicateLock();
+  /** Cleaned up directory children changed*/
+  private final BspEvent cleanedUpChildrenChanged =
+      new PredicateLock();
+  /** Registered list of BspEvents */
+  private final List<BspEvent> registeredBspEvents =
+      new ArrayList<BspEvent>();
+  /** Configuration of the job*/
+  private final Configuration conf;
+  /** Job context (mainly for progress) */
+  private final Mapper<?, ?, ?, ?>.Context context;
+  /** Cached superstep (from ZooKeeper) */
+  private long cachedSuperstep = UNSET_SUPERSTEP;
+  /** Restarted from a checkpoint (manual or automatic) */
+  private long restartedSuperstep = UNSET_SUPERSTEP;
+  /** Cached application attempt (from ZooKeeper) */
+  private long cachedApplicationAttempt = UNSET_APPLICATION_ATTEMPT;
+  /** Job id, to ensure uniqueness */
+  private final String jobId;
+  /** Task partition, to ensure uniqueness */
+  private final int taskPartition;
+  /** My hostname */
+  private final String hostname;
+  /** Combination of hostname '_' partition (unique id) */
+  private final String hostnamePartitionId;
+  /** Graph partitioner */
+  private final GraphPartitionerFactory<I, V, E, M> graphPartitionerFactory;
+  /** Mapper that will do the graph computation */
+  private final GraphMapper<I, V, E, M> graphMapper;
+  /** File system */
+  private final FileSystem fs;
+  /** Checkpoint frequency */
+  private int checkpointFrequency = -1;
+  /** Map of aggregators */
+  private Map<String, Aggregator<Writable>> aggregatorMap =
+      new TreeMap<String, Aggregator<Writable>>();
+
+  /**
+   * Constructor.
+   *
+   * @param serverPortList ZooKeeper server port list
+   * @param sessionMsecTimeout ZooKeeper session timeount in milliseconds
+   * @param context Mapper context
+   * @param graphMapper Graph mapper reference
+   */
+  public BspService(String serverPortList,
+      int sessionMsecTimeout,
+      Mapper<?, ?, ?, ?>.Context context,
+      GraphMapper<I, V, E, M> graphMapper) {
+    registerBspEvent(connectedEvent);
+    registerBspEvent(workerHealthRegistrationChanged);
+    registerBspEvent(inputSplitsAllReadyChanged);
+    registerBspEvent(inputSplitsStateChanged);
+    registerBspEvent(partitionAssignmentsReadyChanged);
+    registerBspEvent(applicationAttemptChanged);
+    registerBspEvent(superstepFinished);
+    registerBspEvent(masterElectionChildrenChanged);
+    registerBspEvent(cleanedUpChildrenChanged);
+
+    this.context = context;
+    this.graphMapper = graphMapper;
+    this.conf = context.getConfiguration();
+    this.jobId = conf.get("mapred.job.id", "Unknown Job");
+    this.taskPartition = conf.getInt("mapred.task.partition", -1);
+    this.restartedSuperstep = conf.getLong(GiraphJob.RESTART_SUPERSTEP,
+        UNSET_SUPERSTEP);
+    this.cachedSuperstep = restartedSuperstep;
+    if ((restartedSuperstep != UNSET_SUPERSTEP) &&
+        (restartedSuperstep < 0)) {
+      throw new IllegalArgumentException(
+          "BspService: Invalid superstep to restart - " +
+              restartedSuperstep);
+    }
+    try {
+      this.hostname = InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      throw new RuntimeException(e);
+    }
+    this.hostnamePartitionId = hostname + "_" + getTaskPartition();
+    this.graphPartitionerFactory =
+        BspUtils.<I, V, E, M>createGraphPartitioner(conf);
+
+    this.checkpointFrequency =
+        conf.getInt(GiraphJob.CHECKPOINT_FREQUENCY,
+            GiraphJob.CHECKPOINT_FREQUENCY_DEFAULT);
+
+    basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
+    masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
+    inputSplitsPath = basePath + INPUT_SPLIT_DIR;
+    inputSplitsAllReadyPath = basePath + INPUT_SPLITS_ALL_READY_NODE;
+    inputSplitsDonePath = basePath + INPUT_SPLIT_DONE_DIR;
+    inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE;
+    applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
+    cleanedUpPath = basePath + CLEANED_UP_DIR;
+    checkpointBasePath =
+        getConfiguration().get(
+            GiraphJob.CHECKPOINT_DIRECTORY,
+            GiraphJob.CHECKPOINT_DIRECTORY_DEFAULT + "/" + getJobId());
+    masterElectionPath = basePath + MASTER_ELECTION_DIR;
+    if (LOG.isInfoEnabled()) {
+      LOG.info("BspService: Connecting to ZooKeeper with job " + jobId +
+          ", " + getTaskPartition() + " on " + serverPortList);
+    }
+    try {
+      this.zk = new ZooKeeperExt(serverPortList, sessionMsecTimeout, this);
+      connectedEvent.waitForever();
+      this.fs = FileSystem.get(getConfiguration());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+
+  /**
+   * Get the superstep from a ZooKeeper path
+   *
+   * @param path Path to parse for the superstep
+   * @return Superstep from the path.
+   */
+  public static long getSuperstepFromPath(String path) {
+    int foundSuperstepStart = path.indexOf(SUPERSTEP_DIR);
+    if (foundSuperstepStart == -1) {
+      throw new IllegalArgumentException(
+          "getSuperstepFromPath: Cannot find " + SUPERSTEP_DIR +
+          "from " + path);
+    }
+    foundSuperstepStart += SUPERSTEP_DIR.length() + 1;
+    int endIndex = foundSuperstepStart +
+        path.substring(foundSuperstepStart).indexOf("/");
+    if (endIndex == -1) {
+      throw new IllegalArgumentException(
+          "getSuperstepFromPath: Cannot find end of superstep from " +
+              path);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getSuperstepFromPath: Got path=" + path +
+          ", start=" + foundSuperstepStart + ", end=" + endIndex);
+    }
+    return Long.parseLong(path.substring(foundSuperstepStart, endIndex));
+  }
+
+  /**
+   * Get the hostname and id from a "healthy" worker path
+   *
+   * @param path Path to check
+   * @return Hostname and id from path
+   */
+  public static String getHealthyHostnameIdFromPath(String path) {
+    int foundWorkerHealthyStart = path.indexOf(WORKER_HEALTHY_DIR);
+    if (foundWorkerHealthyStart == -1) {
+      throw new IllegalArgumentException(
+          "getHealthyHostnameidFromPath: Couldn't find " +
+              WORKER_HEALTHY_DIR + " from " + path);
+    }
+    foundWorkerHealthyStart += WORKER_HEALTHY_DIR.length();
+    return path.substring(foundWorkerHealthyStart);
+  }
+
+  /**
+   * Generate the base superstep directory path for a given application
+   * attempt
+   *
+   * @param attempt application attempt number
+   * @return directory path based on the an attempt
+   */
+  public final String getSuperstepPath(long attempt) {
+    return applicationAttemptsPath + "/" + attempt + SUPERSTEP_DIR;
+  }
+
+  /**
+   * Generate the worker information "healthy" directory path for a
+   * superstep
+   *
+   * @param attempt application attempt number
+   * @param superstep superstep to use
+   * @return directory path based on the a superstep
+   */
+  public final String getWorkerInfoHealthyPath(long attempt,
+      long superstep) {
+    return applicationAttemptsPath + "/" + attempt +
+        SUPERSTEP_DIR + "/" + superstep + WORKER_HEALTHY_DIR;
+  }
+
+  /**
+   * Generate the worker information "unhealthy" directory path for a
+   * superstep
+   *
+   * @param attempt application attempt number
+   * @param superstep superstep to use
+   * @return directory path based on the a superstep
+   */
+  public final String getWorkerInfoUnhealthyPath(long attempt,
+      long superstep) {
+    return applicationAttemptsPath + "/" + attempt +
+        SUPERSTEP_DIR + "/" + superstep + WORKER_UNHEALTHY_DIR;
+  }
+
+  /**
+   * Generate the worker "finished" directory path for a
+   * superstep
+   *
+   * @param attempt application attempt number
+   * @param superstep superstep to use
+   * @return directory path based on the a superstep
+   */
+  public final String getWorkerFinishedPath(long attempt, long superstep) {
+    return applicationAttemptsPath + "/" + attempt +
+        SUPERSTEP_DIR + "/" + superstep + WORKER_FINISHED_DIR;
+  }
+
+  /**
+   * Generate the "partiton assignments" directory path for a superstep
+   *
+   * @param attempt application attempt number
+   * @param superstep superstep to use
+   * @return directory path based on the a superstep
+   */
+  public final String getPartitionAssignmentsPath(long attempt,
+      long superstep) {
+    return applicationAttemptsPath + "/" + attempt +
+        SUPERSTEP_DIR + "/" + superstep + PARTITION_ASSIGNMENTS_DIR;
+  }
+
+  /**
+   * Generate the "partition exchange" directory path for a superstep
+   *
+   * @param attempt application attempt number
+   * @param superstep superstep to use
+   * @return directory path based on the a superstep
+   */
+  public final String getPartitionExchangePath(long attempt,
+      long superstep) {
+    return applicationAttemptsPath + "/" + attempt +
+        SUPERSTEP_DIR + "/" + superstep + PARTITION_EXCHANGE_DIR;
+  }
+
+  /**
+   * Based on the superstep, worker info, and attempt, get the appropriate
+   * worker path for the exchange.
+   *
+   * @param attempt Application attempt
+   * @param superstep Superstep
+   * @param workerInfo Worker info of the exchange.
+   * @return Path of the desired worker
+   */
+  public final String getPartitionExchangeWorkerPath(long attempt,
+      long superstep,
+      WorkerInfo workerInfo) {
+    return getPartitionExchangePath(attempt, superstep) +
+        "/" + workerInfo.getHostnameId();
+  }
+
+  /**
+   * Generate the merged aggregator directory path for a superstep
+   *
+   * @param attempt application attempt number
+   * @param superstep superstep to use
+   * @return directory path based on the a superstep
+   */
+  public final String getMergedAggregatorPath(long attempt, long superstep) {
+    return applicationAttemptsPath + "/" + attempt +
+        SUPERSTEP_DIR + "/" + superstep + MERGED_AGGREGATOR_DIR;
+  }
+
+  /**
+   * Generate the "superstep finished" directory path for a superstep
+   *
+   * @param attempt application attempt number
+   * @param superstep superstep to use
+   * @return directory path based on the a superstep
+   */
+  public final String getSuperstepFinishedPath(long attempt, long superstep) {
+    return applicationAttemptsPath + "/" + attempt +
+        SUPERSTEP_DIR + "/" + superstep + SUPERSTEP_FINISHED_NODE;
+  }
+
+  /**
+   * Generate the base superstep directory path for a given application
+   * attempt
+   *
+   * @param superstep Superstep to use
+   * @return Directory path based on the a superstep
+   */
+  public final String getCheckpointBasePath(long superstep) {
+    return checkpointBasePath + "/" + superstep;
+  }
+
+  /**
+   * Get the checkpoint from a finalized checkpoint path
+   *
+   * @param finalizedPath Path of the finalized checkpoint
+   * @return Superstep referring to a checkpoint of the finalized path
+   */
+  public static long getCheckpoint(Path finalizedPath) {
+    if (!finalizedPath.getName().endsWith(CHECKPOINT_FINALIZED_POSTFIX)) {
+      throw new InvalidParameterException(
+          "getCheckpoint: " + finalizedPath + "Doesn't end in " +
+              CHECKPOINT_FINALIZED_POSTFIX);
+    }
+    String checkpointString =
+        finalizedPath.getName().replace(CHECKPOINT_FINALIZED_POSTFIX, "");
+    return Long.parseLong(checkpointString);
+  }
+
+  /**
+   * Get the ZooKeeperExt instance.
+   *
+   * @return ZooKeeperExt instance.
+   */
+  public final ZooKeeperExt getZkExt() {
+    return zk;
+  }
+
+  @Override
+  public final long getRestartedSuperstep() {
+    return restartedSuperstep;
+  }
+
+  /**
+   * Set the restarted superstep
+   *
+   * @param superstep Set the manually restarted superstep
+   */
+  public final void setRestartedSuperstep(long superstep) {
+    if (superstep < INPUT_SUPERSTEP) {
+      throw new IllegalArgumentException(
+          "setRestartedSuperstep: Bad argument " + superstep);
+    }
+    restartedSuperstep = superstep;
+  }
+
+  /**
+   * Should checkpoint on this superstep?  If checkpointing, always
+   * checkpoint the first user superstep.  If restarting, the first
+   * checkpoint is after the frequency has been met.
+   *
+   * @param superstep Decide if checkpointing no this superstep
+   * @return True if this superstep should be checkpointed, false otherwise
+   */
+  public final boolean checkpointFrequencyMet(long superstep) {
+    if (checkpointFrequency == 0) {
+      return false;
+    }
+    long firstCheckpoint = INPUT_SUPERSTEP + 1;
+    if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
+      firstCheckpoint = getRestartedSuperstep() + checkpointFrequency;
+    }
+    if (superstep < firstCheckpoint) {
+      return false;
+    } else if (((superstep - firstCheckpoint) % checkpointFrequency) == 0) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Get the file system
+   *
+   * @return file system
+   */
+  public final FileSystem getFs() {
+    return fs;
+  }
+
+  public final Configuration getConfiguration() {
+    return conf;
+  }
+
+  public final Mapper<?, ?, ?, ?>.Context getContext() {
+    return context;
+  }
+
+  public final String getHostname() {
+    return hostname;
+  }
+
+  public final String getHostnamePartitionId() {
+    return hostnamePartitionId;
+  }
+
+  public final int getTaskPartition() {
+    return taskPartition;
+  }
+
+  public final GraphMapper<I, V, E, M> getGraphMapper() {
+    return graphMapper;
+  }
+
+  public final BspEvent getWorkerHealthRegistrationChangedEvent() {
+    return workerHealthRegistrationChanged;
+  }
+
+  public final BspEvent getInputSplitsAllReadyEvent() {
+    return inputSplitsAllReadyChanged;
+  }
+
+  public final BspEvent getInputSplitsStateChangedEvent() {
+    return inputSplitsStateChanged;
+  }
+
+  public final BspEvent getInputSplitsAllDoneEvent() {
+    return inputSplitsAllDoneChanged;
+  }
+
+  public final BspEvent getInputSplitsDoneStateChangedEvent() {
+    return inputSplitsDoneStateChanged;
+  }
+
+  public final BspEvent getPartitionAssignmentsReadyChangedEvent() {
+    return partitionAssignmentsReadyChanged;
+  }
+
+
+  public final BspEvent getApplicationAttemptChangedEvent() {
+    return applicationAttemptChanged;
+  }
+
+  public final BspEvent getSuperstepFinishedEvent() {
+    return superstepFinished;
+  }
+
+
+  public final BspEvent getMasterElectionChildrenChangedEvent() {
+    return masterElectionChildrenChanged;
+  }
+
+  public final BspEvent getCleanedUpChildrenChangedEvent() {
+    return cleanedUpChildrenChanged;
+  }
+
+  /**
+   * Get the master commanded job state as a JSONObject.  Also sets the
+   * watches to see if the master commanded job state changes.
+   *
+   * @return Last job state or null if none
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public final JSONObject getJobState() {
+    try {
+      getZkExt().createExt(masterJobStatePath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.info("getJobState: Job state already exists (" +
+          masterJobStatePath + ")");
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Failed to create job state path " +
+          "due to KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Failed to create job state path " +
+          "due to InterruptedException", e);
+    }
+    String jobState = null;
+    try {
+      List<String> childList =
+          getZkExt().getChildrenExt(
+              masterJobStatePath, true, true, true);
+      if (childList.isEmpty()) {
+        return null;
+      }
+      jobState =
+          new String(getZkExt().getData(
+              childList.get(childList.size() - 1), true, null));
+    } catch (KeeperException.NoNodeException e) {
+      LOG.info("getJobState: Job state path is empty! - " +
+          masterJobStatePath);
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Failed to get job state path " +
+          "children due to KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Failed to get job state path " +
+          "children due to InterruptedException", e);
+    }
+    try {
+      return new JSONObject(jobState);
+    } catch (JSONException e) {
+      throw new RuntimeException(
+          "getJobState: Failed to parse job state " + jobState);
+    }
+  }
+
+  /**
+   * Get the job id
+   *
+   * @return job id
+   */
+  public final String getJobId() {
+    return jobId;
+  }
+
+  /**
+   * Get the latest application attempt and cache it.
+   *
+   * @return the latest application attempt
+   */
+  public final long getApplicationAttempt() {
+    if (cachedApplicationAttempt != UNSET_APPLICATION_ATTEMPT) {
+      return cachedApplicationAttempt;
+    }
+    try {
+      getZkExt().createExt(applicationAttemptsPath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.info("getApplicationAttempt: Node " +
+          applicationAttemptsPath + " already exists!");
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Couldn't create application " +
+          "attempts path due to KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Couldn't create application " +
+          "attempts path due to InterruptedException", e);
+    }
+    try {
+      List<String> attemptList =
+          getZkExt().getChildrenExt(
+              applicationAttemptsPath, true, false, false);
+      if (attemptList.isEmpty()) {
+        cachedApplicationAttempt = 0;
+      } else {
+        cachedApplicationAttempt =
+            Long.parseLong(Collections.max(attemptList));
+      }
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Couldn't get application " +
+          "attempts to KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Couldn't get application " +
+          "attempts to InterruptedException", e);
+    }
+
+    return cachedApplicationAttempt;
+  }
+
+  /**
+   * Get the latest superstep and cache it.
+   *
+   * @return the latest superstep
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public final long getSuperstep() {
+    if (cachedSuperstep != UNSET_SUPERSTEP) {
+      return cachedSuperstep;
+    }
+    String superstepPath = getSuperstepPath(getApplicationAttempt());
+    try {
+      getZkExt().createExt(superstepPath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("getApplicationAttempt: Node " +
+            applicationAttemptsPath + " already exists!");
+      }
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "getSuperstep: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "getSuperstep: InterruptedException", e);
+    }
+
+    List<String> superstepList;
+    try {
+      superstepList =
+          getZkExt().getChildrenExt(superstepPath, true, false, false);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "getSuperstep: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "getSuperstep: InterruptedException", e);
+    }
+    if (superstepList.isEmpty()) {
+      cachedSuperstep = INPUT_SUPERSTEP;
+    } else {
+      cachedSuperstep =
+          Long.parseLong(Collections.max(superstepList));
+    }
+
+    return cachedSuperstep;
+  }
+
+  /**
+   * Increment the cached superstep.  Shouldn't be the initial value anymore.
+   */
+  public final void incrCachedSuperstep() {
+    if (cachedSuperstep == UNSET_SUPERSTEP) {
+      throw new IllegalStateException(
+          "incrSuperstep: Invalid unset cached superstep " +
+              UNSET_SUPERSTEP);
+    }
+    ++cachedSuperstep;
+  }
+
+  /**
+   * Set the cached superstep (should only be used for loading checkpoints
+   * or recovering from failure).
+   *
+   * @param superstep will be used as the next superstep iteration
+   */
+  public final void setCachedSuperstep(long superstep) {
+    cachedSuperstep = superstep;
+  }
+
+  /**
+   * Set the cached application attempt (should only be used for restart from
+   * failure by the master)
+   *
+   * @param applicationAttempt Will denote the new application attempt
+   */
+  public final void setApplicationAttempt(long applicationAttempt) {
+    cachedApplicationAttempt = applicationAttempt;
+    String superstepPath = getSuperstepPath(cachedApplicationAttempt);
+    try {
+      getZkExt().createExt(superstepPath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      throw new IllegalArgumentException(
+          "setApplicationAttempt: Attempt already exists! - " +
+              superstepPath, e);
+    } catch (KeeperException e) {
+      throw new RuntimeException(
+          "setApplicationAttempt: KeeperException - " +
+              superstepPath, e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(
+          "setApplicationAttempt: InterruptedException - " +
+              superstepPath, e);
+    }
+  }
+
+  /**
+   * Register an aggregator with name.
+   *
+   * @param <A> Aggregator type
+   * @param name Name of the aggregator
+   * @param aggregatorClass Class of the aggregator
+   * @return Aggregator
+   * @throws IllegalAccessException
+   * @throws InstantiationException
+   */
+  public final <A extends Writable> Aggregator<A> registerAggregator(
+    String name,
+    Class<? extends Aggregator<A>> aggregatorClass)
+    throws InstantiationException, IllegalAccessException {
+    if (aggregatorMap.get(name) != null) {
+      return null;
+    }
+    Aggregator<A> aggregator =
+        (Aggregator<A>) aggregatorClass.newInstance();
+    @SuppressWarnings("unchecked")
+    Aggregator<Writable> writableAggregator =
+      (Aggregator<Writable>) aggregator;
+    aggregatorMap.put(name, writableAggregator);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("registerAggregator: registered " + name);
+    }
+    return aggregator;
+  }
+
+  /**
+   * Get aggregator by name.
+   *
+   * @param name Name of aggregator
+   * @return Aggregator or null when not registered
+   */
+  public final Aggregator<? extends Writable> getAggregator(String name) {
+    return aggregatorMap.get(name);
+  }
+
+  /**
+   * Get the aggregator map.
+   *
+   * @return Map of aggregator names to aggregator
+   */
+  public Map<String, Aggregator<Writable>> getAggregatorMap() {
+    return aggregatorMap;
+  }
+
+  /**
+   * Register a BspEvent.  Ensure that it will be signaled
+   * by catastrophic failure so that threads waiting on an event signal
+   * will be unblocked.
+   *
+   * @param event Event to be registered.
+   */
+  public void registerBspEvent(BspEvent event) {
+    registeredBspEvents.add(event);
+  }
+
+  /**
+   * Subclasses can use this to instantiate their respective partitioners
+   *
+   * @return Instantiated graph partitioner factory
+   */
+  protected GraphPartitionerFactory<I, V, E, M> getGraphPartitionerFactory() {
+    return graphPartitionerFactory;
+  }
+
+  /**
+   * Derived classes that want additional ZooKeeper events to take action
+   * should override this.
+   *
+   * @param event Event that occurred
+   * @return true if the event was processed here, false otherwise
+   */
+  protected boolean processEvent(WatchedEvent event) {
+    return false;
+  }
+
+  @Override
+  public final void process(WatchedEvent event) {
+    // 1. Process all shared events
+    // 2. Process specific derived class events
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("process: Got a new event, path = " + event.getPath() +
+          ", type = " + event.getType() + ", state = " +
+          event.getState());
+    }
+
+    if ((event.getPath() == null) && (event.getType() == EventType.None)) {
+      if (event.getState() == KeeperState.Disconnected) {
+        // No way to recover from a disconnect event, signal all BspEvents
+        for (BspEvent bspEvent : registeredBspEvents) {
+          bspEvent.signal();
+        }
+        throw new RuntimeException(
+            "process: Disconnected from ZooKeeper, cannot recover - " +
+                event);
+      } else if (event.getState() == KeeperState.SyncConnected) {
         if (LOG.isInfoEnabled()) {
-            LOG.info("registerAggregator: registered " + name);
-        }
-        return aggregator;
-    }
-
-    /**
-     * Get aggregator by name.
-     *
-     * @param name
-     * @return Aggregator<A> (null when not registered)
-     */
-    public final Aggregator<? extends Writable> getAggregator(String name) {
-        return aggregatorMap.get(name);
-    }
-
-    /**
-     * Get the aggregator map.
-     */
-    public Map<String, Aggregator<Writable>> getAggregatorMap() {
-        return aggregatorMap;
-    }
-
-    /**
-     * Register a BspEvent.  Ensure that it will be signaled
-     * by catastrophic failure so that threads waiting on an event signal
-     * will be unblocked.
-     */
-    public void registerBspEvent(BspEvent event) {
-        registeredBspEvents.add(event);
-    }
-
-    /**
-     * Subclasses can use this to instantiate their respective partitioners
-     *
-     * @return Instantiated graph partitioner factory
-     */
-    protected GraphPartitionerFactory<I, V, E, M> getGraphPartitionerFactory() {
-        return graphPartitionerFactory;
-    }
-
-    /**
-     * Derived classes that want additional ZooKeeper events to take action
-     * should override this.
-     *
-     * @param event Event that occurred
-     * @return true if the event was processed here, false otherwise
-     */
-    protected boolean processEvent(WatchedEvent event) {
-        return false;
-    }
-
-    @Override
-    final public void process(WatchedEvent event) {
-        // 1. Process all shared events
-        // 2. Process specific derived class events
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("process: Got a new event, path = " + event.getPath() +
-                      ", type = " + event.getType() + ", state = " +
-                      event.getState());
-        }
-
-        if ((event.getPath() == null) && (event.getType() == EventType.None)) {
-            if (event.getState() == KeeperState.Disconnected) {
-                // No way to recover from a disconnect event, signal all BspEvents
-                for (BspEvent bspEvent : registeredBspEvents) {
-                    bspEvent.signal();
-                }
-                throw new RuntimeException(
-                    "process: Disconnected from ZooKeeper, cannot recover - " +
-                    event);
-            } else if (event.getState() == KeeperState.SyncConnected) {
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("process: Asynchronous connection complete.");
-                }
-                connectedEvent.signal();
-            } else {
-                LOG.warn("process: Got unknown null path event " + event);
-            }
-            return;
-        }
-
-        boolean eventProcessed = false;
-        if (event.getPath().startsWith(MASTER_JOB_STATE_PATH)) {
-            // This will cause all becomeMaster() MasterThreads to notice the
-            // change in job state and quit trying to become the master.
-            masterElectionChildrenChanged.signal();
-            eventProcessed = true;
-        } else if ((event.getPath().contains(WORKER_HEALTHY_DIR) ||
-                event.getPath().contains(WORKER_UNHEALTHY_DIR)) &&
-                (event.getType() == EventType.NodeChildrenChanged)) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("process: workerHealthRegistrationChanged " +
-                          "(worker health reported - healthy/unhealthy )");
-            }
-            workerHealthRegistrationChanged.signal();
-            eventProcessed = true;
-        } else if (event.getPath().equals(INPUT_SPLITS_ALL_READY_PATH) &&
-                (event.getType() == EventType.NodeCreated)) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("process: inputSplitsReadyChanged " +
-                         "(input splits ready)");
-            }
-            inputSplitsAllReadyChanged.signal();
-            eventProcessed = true;
-        } else if (event.getPath().endsWith(INPUT_SPLIT_RESERVED_NODE) &&
-                (event.getType() == EventType.NodeCreated)) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("process: inputSplitsStateChanged "+
-                          "(made a reservation)");
-            }
-            inputSplitsStateChanged.signal();
-            eventProcessed = true;
-        } else if (event.getPath().endsWith(INPUT_SPLIT_RESERVED_NODE) &&
-                (event.getType() == EventType.NodeDeleted)) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("process: inputSplitsStateChanged "+
-                         "(lost a reservation)");
-            }
-            inputSplitsStateChanged.signal();
-            eventProcessed = true;
-        } else if (event.getPath().endsWith(INPUT_SPLIT_FINISHED_NODE) &&
-                (event.getType() == EventType.NodeCreated)) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("process: inputSplitsStateChanged " +
-                          "(finished inputsplit)");
-            }
-            inputSplitsStateChanged.signal();
-            eventProcessed = true;
-        } else if (event.getPath().endsWith(INPUT_SPLIT_DONE_DIR) &&
-                (event.getType() == EventType.NodeChildrenChanged)) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("process: inputSplitsDoneStateChanged " +
-                          "(worker finished sending)");
-            }
-            inputSplitsDoneStateChanged.signal();
-            eventProcessed = true;
-        }  else if (event.getPath().equals(INPUT_SPLITS_ALL_DONE_PATH) &&
-                (event.getType() == EventType.NodeCreated)) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("process: inputSplitsAllDoneChanged " +
-                         "(all vertices sent from input splits)");
-            }
-            inputSplitsAllDoneChanged.signal();
-            eventProcessed = true;
-        } else if (event.getPath().contains(PARTITION_ASSIGNMENTS_DIR) &&
-                event.getType() == EventType.NodeCreated) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("process: partitionAssignmentsReadyChanged " +
-                         "(partitions are assigned)");
-            }
-            partitionAssignmentsReadyChanged.signal();
-            eventProcessed = true;
-        } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) &&
-                event.getType() == EventType.NodeCreated) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("process: superstepFinished signaled");
-            }
-            superstepFinished.signal();
-            eventProcessed = true;
-        } else if (event.getPath().endsWith(APPLICATION_ATTEMPTS_PATH) &&
-                event.getType() == EventType.NodeChildrenChanged) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("process: applicationAttemptChanged signaled");
-            }
-            applicationAttemptChanged.signal();
-            eventProcessed = true;
-        } else if (event.getPath().contains(MASTER_ELECTION_DIR) &&
-                event.getType() == EventType.NodeChildrenChanged) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("process: masterElectionChildrenChanged signaled");
-            }
-            masterElectionChildrenChanged.signal();
-            eventProcessed = true;
-        } else if (event.getPath().equals(CLEANED_UP_PATH) &&
-                event.getType() == EventType.NodeChildrenChanged) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("process: cleanedUpChildrenChanged signaled");
-            }
-            cleanedUpChildrenChanged.signal();
-            eventProcessed = true;
-        }
-
-        if ((processEvent(event) == false) && (eventProcessed == false)) {
-            LOG.warn("process: Unknown and unprocessed event (path=" +
-                     event.getPath() + ", type=" + event.getType() +
-                     ", state=" + event.getState() + ")");
+          LOG.info("process: Asynchronous connection complete.");
         }
+        connectedEvent.signal();
+      } else {
+        LOG.warn("process: Got unknown null path event " + event);
+      }
+      return;
+    }
+
+    boolean eventProcessed = false;
+    if (event.getPath().startsWith(masterJobStatePath)) {
+      // This will cause all becomeMaster() MasterThreads to notice the
+      // change in job state and quit trying to become the master.
+      masterElectionChildrenChanged.signal();
+      eventProcessed = true;
+    } else if ((event.getPath().contains(WORKER_HEALTHY_DIR) ||
+        event.getPath().contains(WORKER_UNHEALTHY_DIR)) &&
+        (event.getType() == EventType.NodeChildrenChanged)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("process: workerHealthRegistrationChanged " +
+            "(worker health reported - healthy/unhealthy )");
+      }
+      workerHealthRegistrationChanged.signal();
+      eventProcessed = true;
+    } else if (event.getPath().equals(inputSplitsAllReadyPath) &&
+        (event.getType() == EventType.NodeCreated)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: inputSplitsReadyChanged " +
+            "(input splits ready)");
+      }
+      inputSplitsAllReadyChanged.signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(INPUT_SPLIT_RESERVED_NODE) &&
+        (event.getType() == EventType.NodeCreated)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("process: inputSplitsStateChanged " +
+            "(made a reservation)");
+      }
+      inputSplitsStateChanged.signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(INPUT_SPLIT_RESERVED_NODE) &&
+        (event.getType() == EventType.NodeDeleted)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: inputSplitsStateChanged " +
+            "(lost a reservation)");
+      }
+      inputSplitsStateChanged.signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(INPUT_SPLIT_FINISHED_NODE) &&
+        (event.getType() == EventType.NodeCreated)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("process: inputSplitsStateChanged " +
+            "(finished inputsplit)");
+      }
+      inputSplitsStateChanged.signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(INPUT_SPLIT_DONE_DIR) &&
+        (event.getType() == EventType.NodeChildrenChanged)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("process: inputSplitsDoneStateChanged " +
+            "(worker finished sending)");
+      }
+      inputSplitsDoneStateChanged.signal();
+      eventProcessed = true;
+    }  else if (event.getPath().equals(inputSplitsAllDonePath) &&
+        (event.getType() == EventType.NodeCreated)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: inputSplitsAllDoneChanged " +
+            "(all vertices sent from input splits)");
+      }
+      inputSplitsAllDoneChanged.signal();
+      eventProcessed = true;
+    } else if (event.getPath().contains(PARTITION_ASSIGNMENTS_DIR) &&
+        event.getType() == EventType.NodeCreated) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: partitionAssignmentsReadyChanged " +
+            "(partitions are assigned)");
+      }
+      partitionAssignmentsReadyChanged.signal();
+      eventProcessed = true;
+    } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) &&
+        event.getType() == EventType.NodeCreated) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: superstepFinished signaled");
+      }
+      superstepFinished.signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(applicationAttemptsPath) &&
+        event.getType() == EventType.NodeChildrenChanged) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: applicationAttemptChanged signaled");
+      }
+      applicationAttemptChanged.signal();
+      eventProcessed = true;
+    } else if (event.getPath().contains(MASTER_ELECTION_DIR) &&
+        event.getType() == EventType.NodeChildrenChanged) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: masterElectionChildrenChanged signaled");
+      }
+      masterElectionChildrenChanged.signal();
+      eventProcessed = true;
+    } else if (event.getPath().equals(cleanedUpPath) &&
+        event.getType() == EventType.NodeChildrenChanged) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: cleanedUpChildrenChanged signaled");
+      }
+      cleanedUpChildrenChanged.signal();
+      eventProcessed = true;
+    }
+
+    if (!(processEvent(event)) && (!eventProcessed)) {
+      LOG.warn("process: Unknown and unprocessed event (path=" +
+          event.getPath() + ", type=" + event.getType() +
+          ", state=" + event.getState() + ")");
     }
+  }
 }