You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC

svn commit: r1245205 [17/18] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/example...

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java Thu Feb 16 22:12:31 2012
@@ -56,80 +56,127 @@ import static org.apache.giraph.graph.Gi
  * etc.
  */
 public class ZooKeeperManager {
-    /** Job context (mainly for progress) */
-    private Mapper<?, ?, ?, ?>.Context context;
-    /** Hadoop configuration */
-    private final Configuration conf;
-    /** Class logger */
-    private static final Logger LOG = Logger.getLogger(ZooKeeperManager.class);
-    /** Task partition, to ensure uniqueness */
-    private final int taskPartition;
-    /** HDFS base directory for all file-based coordination */
-    private final Path baseDirectory;
-    /**
-     * HDFS task ZooKeeper candidate/completed
-     * directory for all file-based coordination
-     */
-    private final Path taskDirectory;
-    /**
-     * HDFS ZooKeeper server ready/done directory
-     * for all file-based coordination
-     */
-    private final Path serverDirectory;
-    /** HDFS path to whether the task is done */
-    private final Path myClosedPath;
-    /** Polling msecs timeout */
-    private final int pollMsecs;
-    /** Server count */
-    private final int serverCount;
-    /** File system */
-    private final FileSystem fs;
-    /** ZooKeeper process */
-    private Process zkProcess = null;
-    /** Thread that gets the zkProcess output */
-    private StreamCollector zkProcessCollector = null;
-    /** ZooKeeper local file system directory */
-    private String zkDir = null;
-    /** ZooKeeper config file path */
-    private String configFilePath = null;
-    /** ZooKeeper server list */
-    private final Map<String, Integer> zkServerPortMap = Maps.newTreeMap();
-    /** ZooKeeper base port */
-    private int zkBasePort = -1;
-    /** Final ZooKeeper server port list (for clients) */
-    private String zkServerPortString;
-    /** My hostname */
-    private String myHostname = null;
-    /** Job id, to ensure uniqueness */
-    private final String jobId;
-    /**
-     * Default local ZooKeeper prefix directory to use (where ZooKeeper server
-     * files will go)
-     */
-    private final String zkDirDefault;
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(ZooKeeperManager.class);
+  /** Separates the hostname and task in the candidate stamp */
+  private static final String HOSTNAME_TASK_SEPARATOR = " ";
+  /** The ZooKeeperString filename prefix */
+  private static final String ZOOKEEPER_SERVER_LIST_FILE_PREFIX =
+      "zkServerList_";
+  /** Denotes that the computation is done for a partition */
+  private static final String COMPUTATION_DONE_SUFFIX = ".COMPUTATION_DONE";
+  /** Job context (mainly for progress) */
+  private Mapper<?, ?, ?, ?>.Context context;
+  /** Hadoop configuration */
+  private final Configuration conf;
+  /** Task partition, to ensure uniqueness */
+  private final int taskPartition;
+  /** HDFS base directory for all file-based coordination */
+  private final Path baseDirectory;
+  /**
+   * HDFS task ZooKeeper candidate/completed
+   * directory for all file-based coordination
+   */
+  private final Path taskDirectory;
+  /**
+   * HDFS ZooKeeper server ready/done directory
+   * for all file-based coordination
+   */
+  private final Path serverDirectory;
+  /** HDFS path to whether the task is done */
+  private final Path myClosedPath;
+  /** Polling msecs timeout */
+  private final int pollMsecs;
+  /** Server count */
+  private final int serverCount;
+  /** File system */
+  private final FileSystem fs;
+  /** ZooKeeper process */
+  private Process zkProcess = null;
+  /** Thread that gets the zkProcess output */
+  private StreamCollector zkProcessCollector = null;
+  /** ZooKeeper local file system directory */
+  private String zkDir = null;
+  /** ZooKeeper config file path */
+  private String configFilePath = null;
+  /** ZooKeeper server list */
+  private final Map<String, Integer> zkServerPortMap = Maps.newTreeMap();
+  /** ZooKeeper base port */
+  private int zkBasePort = -1;
+  /** Final ZooKeeper server port list (for clients) */
+  private String zkServerPortString;
+  /** My hostname */
+  private String myHostname = null;
+  /** Job id, to ensure uniqueness */
+  private final String jobId;
+  /**
+   * Default local ZooKeeper prefix directory to use (where ZooKeeper server
+   * files will go)
+   */
+  private final String zkDirDefault;
 
+  /** State of the application */
+  public enum State {
+    /** Failure occurred */
+    FAILED,
+    /** Application finished */
+    FINISHED
+  }
 
-    /** Separates the hostname and task in the candidate stamp */
-    private static final String HOSTNAME_TASK_SEPARATOR = " ";
-    /** The ZooKeeperString filename prefix */
-    private static final String ZOOKEEPER_SERVER_LIST_FILE_PREFIX =
-        "zkServerList_";
-    /** Denotes that the computation is done for a partition */
-    private static final String COMPUTATION_DONE_SUFFIX = ".COMPUTATION_DONE";
-    /** State of the application */
-    public enum State {
-        FAILED,
-        FINISHED
-    }
+  /**
+   * Constructor with context.
+   *
+   * @param context Context to be stord internally
+   * @throws IOException
+   */
+  public ZooKeeperManager(Mapper<?, ?, ?, ?>.Context context)
+    throws IOException {
+    this.context = context;
+    conf = context.getConfiguration();
+    taskPartition = conf.getInt("mapred.task.partition", -1);
+    jobId = conf.get("mapred.job.id", "Unknown Job");
+    baseDirectory =
+        new Path(conf.get(GiraphJob.ZOOKEEPER_MANAGER_DIRECTORY,
+            getFinalZooKeeperPath()));
+    taskDirectory = new Path(baseDirectory,
+        "_task");
+    serverDirectory = new Path(baseDirectory,
+        "_zkServer");
+    myClosedPath = new Path(taskDirectory,
+        Integer.toString(taskPartition) +
+        COMPUTATION_DONE_SUFFIX);
+    pollMsecs = conf.getInt(
+        GiraphJob.ZOOKEEPER_SERVERLIST_POLL_MSECS,
+        GiraphJob.ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT);
+    serverCount = conf.getInt(
+        GiraphJob.ZOOKEEPER_SERVER_COUNT,
+        GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT);
+    String jobLocalDir = conf.get("job.local.dir");
+    if (jobLocalDir != null) { // for non-local jobs
+      zkDirDefault = jobLocalDir +
+          "/_bspZooKeeper";
+    } else {
+      zkDirDefault = System.getProperty("user.dir") + "/_bspZooKeeper";
+    }
+    zkDir = conf.get(GiraphJob.ZOOKEEPER_DIR, zkDirDefault);
+    configFilePath = zkDir + "/zoo.cfg";
+    zkBasePort = conf.getInt(
+        GiraphJob.ZOOKEEPER_SERVER_PORT,
+        GiraphJob.ZOOKEEPER_SERVER_PORT_DEFAULT);
+
+
+    myHostname = InetAddress.getLocalHost().getCanonicalHostName();
+    fs = FileSystem.get(conf);
+  }
 
-    /**
-     * Generate the final ZooKeeper coordination directory on HDFS
-     *
-     * @return directory path with job id
-     */
-    final private String getFinalZooKeeperPath() {
-        return GiraphJob.ZOOKEEPER_MANAGER_DIR_DEFAULT + "/" + jobId;
-    }
+  /**
+   * Generate the final ZooKeeper coordination directory on HDFS
+   *
+   * @return directory path with job id
+   */
+  private String getFinalZooKeeperPath() {
+    return GiraphJob.ZOOKEEPER_MANAGER_DIR_DEFAULT + "/" + jobId;
+  }
 
   /**
    * Return the base ZooKeeper ZNode from which all other ZNodes Giraph creates
@@ -139,708 +186,668 @@ public class ZooKeeperManager {
    * @param conf  Necessary to access user-provided values
    * @return  String of path without trailing slash
    */
-    public static String getBasePath(Configuration conf) {
-        String result = conf.get(BASE_ZNODE_KEY, "");
-        if (!result.equals("") && !result.startsWith("/")) {
-            throw new IllegalArgumentException("Value for " +
-                BASE_ZNODE_KEY + " must start with /: " + result);
-        }
-
-        return result;
+  public static String getBasePath(Configuration conf) {
+    String result = conf.get(BASE_ZNODE_KEY, "");
+    if (!result.equals("") && !result.startsWith("/")) {
+      throw new IllegalArgumentException("Value for " +
+          BASE_ZNODE_KEY + " must start with /: " + result);
     }
 
-    /**
-     * Collects the output of a stream and dumps it to the log.
-     */
-    private static class StreamCollector extends Thread {
-        /** Input stream to dump */
-        private final InputStream is;
-        /** Class logger */
-        private static final Logger LOG =
-            Logger.getLogger(StreamCollector.class);
-
-        /**
-         * Constructor.
-         *
-         * @param is InputStream to dump to LOG.info
-         */
-        public StreamCollector(final InputStream is) {
-            super(StreamCollector.class.getName());
-            this.is = is;
-        }
-
-        @Override
-        public void run() {
-            InputStreamReader streamReader = new InputStreamReader(is);
-            BufferedReader bufferedReader = new BufferedReader(streamReader);
-            String line;
-            try {
-                while ((line = bufferedReader.readLine()) != null) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("run: " + line);
-                    }
-                }
-            } catch (IOException e) {
-                LOG.error("run: Ignoring IOException", e);
-            }
-        }
-    }
-
-    public ZooKeeperManager(Mapper<?, ?, ?, ?>.Context context)
-            throws IOException {
-        this.context = context;
-        conf = context.getConfiguration();
-        taskPartition = conf.getInt("mapred.task.partition", -1);
-        jobId = conf.get("mapred.job.id", "Unknown Job");
-        baseDirectory =
-            new Path(conf.get(GiraphJob.ZOOKEEPER_MANAGER_DIRECTORY,
-                              getFinalZooKeeperPath()));
-        taskDirectory = new Path(baseDirectory,
-                                   "_task");
-        serverDirectory = new Path(baseDirectory,
-                                    "_zkServer");
-        myClosedPath = new Path(taskDirectory,
-                                  Integer.toString(taskPartition) +
-                                  COMPUTATION_DONE_SUFFIX);
-        pollMsecs = conf.getInt(
-            GiraphJob.ZOOKEEPER_SERVERLIST_POLL_MSECS,
-            GiraphJob.ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT);
-        serverCount = conf.getInt(
-            GiraphJob.ZOOKEEPER_SERVER_COUNT,
-            GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT);
-        String jobLocalDir = conf.get("job.local.dir");
-        if (jobLocalDir != null) { // for non-local jobs
-            zkDirDefault = jobLocalDir +
-                "/_bspZooKeeper";
-        } else {
-            zkDirDefault = System.getProperty("user.dir") + "/_bspZooKeeper";
-        }
-        zkDir = conf.get(GiraphJob.ZOOKEEPER_DIR, zkDirDefault);
-        configFilePath = zkDir + "/zoo.cfg";
-        zkBasePort = conf.getInt(
-            GiraphJob.ZOOKEEPER_SERVER_PORT,
-            GiraphJob.ZOOKEEPER_SERVER_PORT_DEFAULT);
-
+    return result;
+  }
 
-        myHostname = InetAddress.getLocalHost().getCanonicalHostName();
-        fs = FileSystem.get(conf);
-    }
+  /**
+   * Collects the output of a stream and dumps it to the log.
+   */
+  private static class StreamCollector extends Thread {
+    /** Class logger */
+    private static final Logger LOG = Logger.getLogger(StreamCollector.class);
+    /** Input stream to dump */
+    private final InputStream is;
 
     /**
-     * Create the candidate stamps and decide on the servers to start if
-     * you are partition 0.
+     * Constructor.
      *
-     * @throws IOException
-     * @throws InterruptedException
+     * @param is InputStream to dump to LOG.info
      */
-    public void setup() throws IOException, InterruptedException {
-        createCandidateStamp();
-        getZooKeeperServerList();
+    public StreamCollector(final InputStream is) {
+      super(StreamCollector.class.getName());
+      this.is = is;
+    }
+
+    @Override
+    public void run() {
+      InputStreamReader streamReader = new InputStreamReader(is);
+      BufferedReader bufferedReader = new BufferedReader(streamReader);
+      String line;
+      try {
+        while ((line = bufferedReader.readLine()) != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("run: " + line);
+          }
+        }
+      } catch (IOException e) {
+        LOG.error("run: Ignoring IOException", e);
+      }
     }
+  }
 
-    /**
-     * Create a HDFS stamp for this task.  If another task already
-     * created it, then this one will fail, which is fine.
-     */
-    public void createCandidateStamp() {
-        try {
-            fs.mkdirs(baseDirectory);
-            LOG.info("createCandidateStamp: Made the directory " +
-                      baseDirectory);
-        } catch (IOException e) {
-            LOG.error("createCandidateStamp: Failed to mkdirs " +
-                      baseDirectory);
-        }
-        // Check that the base directory exists and is a directory
-        try {
-            if (!fs.getFileStatus(baseDirectory).isDir()) {
-                throw new IllegalArgumentException(
-                    "createCandidateStamp: " + baseDirectory +
-                    " is not a directory, but should be.");
-            }
-        } catch (IOException e) {
-            throw new IllegalArgumentException(
-                "createCandidateStamp: Couldn't get file status " +
-                "for base directory " + baseDirectory + ".  If there is an " +
-                "issue with this directory, please set an accesible " +
-                "base directory with the Hadoop configuration option " +
-                GiraphJob.ZOOKEEPER_MANAGER_DIRECTORY);
-        }
+  /**
+   * Create the candidate stamps and decide on the servers to start if
+   * you are partition 0.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void setup() throws IOException, InterruptedException {
+    createCandidateStamp();
+    getZooKeeperServerList();
+  }
 
-        Path myCandidacyPath = new Path(
-            taskDirectory, myHostname +
-            HOSTNAME_TASK_SEPARATOR + taskPartition);
-        try {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("createCandidateStamp: Creating my filestamp " +
-                         myCandidacyPath);
-            }
-            fs.createNewFile(myCandidacyPath);
-        } catch (IOException e) {
-            LOG.error("createCandidateStamp: Failed (maybe previous task " +
-                      "failed) to create filestamp " + myCandidacyPath, e);
-        }
+  /**
+   * Create a HDFS stamp for this task.  If another task already
+   * created it, then this one will fail, which is fine.
+   */
+  public void createCandidateStamp() {
+    try {
+      fs.mkdirs(baseDirectory);
+      LOG.info("createCandidateStamp: Made the directory " +
+          baseDirectory);
+    } catch (IOException e) {
+      LOG.error("createCandidateStamp: Failed to mkdirs " +
+          baseDirectory);
+    }
+    // Check that the base directory exists and is a directory
+    try {
+      if (!fs.getFileStatus(baseDirectory).isDir()) {
+        throw new IllegalArgumentException(
+            "createCandidateStamp: " + baseDirectory +
+            " is not a directory, but should be.");
+      }
+    } catch (IOException e) {
+      throw new IllegalArgumentException(
+          "createCandidateStamp: Couldn't get file status " +
+              "for base directory " + baseDirectory + ".  If there is an " +
+              "issue with this directory, please set an accesible " +
+              "base directory with the Hadoop configuration option " +
+              GiraphJob.ZOOKEEPER_MANAGER_DIRECTORY);
+    }
+
+    Path myCandidacyPath = new Path(
+        taskDirectory, myHostname +
+        HOSTNAME_TASK_SEPARATOR + taskPartition);
+    try {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("createCandidateStamp: Creating my filestamp " +
+            myCandidacyPath);
+      }
+      fs.createNewFile(myCandidacyPath);
+    } catch (IOException e) {
+      LOG.error("createCandidateStamp: Failed (maybe previous task " +
+          "failed) to create filestamp " + myCandidacyPath, e);
     }
+  }
 
-    /**
-     * Every task must create a stamp to let the ZooKeeper servers know that
-     * they can shutdown.  This also lets the task know that it was already
-     * completed.
-     */
-    private void createZooKeeperClosedStamp() {
-        try {
-            LOG.info("createZooKeeperClosedStamp: Creating my filestamp " +
-                     myClosedPath);
-            fs.createNewFile(myClosedPath);
-        } catch (IOException e) {
-            LOG.error("createZooKeeperClosedStamp: Failed (maybe previous task " +
-                      "failed) to create filestamp " + myClosedPath);
-        }
+  /**
+   * Every task must create a stamp to let the ZooKeeper servers know that
+   * they can shutdown.  This also lets the task know that it was already
+   * completed.
+   */
+  private void createZooKeeperClosedStamp() {
+    try {
+      LOG.info("createZooKeeperClosedStamp: Creating my filestamp " +
+          myClosedPath);
+      fs.createNewFile(myClosedPath);
+    } catch (IOException e) {
+      LOG.error("createZooKeeperClosedStamp: Failed (maybe previous task " +
+          "failed) to create filestamp " + myClosedPath);
     }
+  }
 
-    /**
-     * Check if all the computation is done.
-     * @return true if all computation is done.
-     */
-    public boolean computationDone() {
-        try {
-            return fs.exists(myClosedPath);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
+  /**
+   * Check if all the computation is done.
+   * @return true if all computation is done.
+   */
+  public boolean computationDone() {
+    try {
+      return fs.exists(myClosedPath);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
+  }
 
-    /**
-     * Task 0 will call this to create the ZooKeeper server list.  The result is
-     * a file that describes the ZooKeeper servers through the filename.
-     *
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    private void createZooKeeperServerList()
-            throws IOException, InterruptedException {
-        int candidateRetrievalAttempt = 0;
-        Map<String, Integer> hostnameTaskMap = Maps.newTreeMap();
-        while (true) {
-            FileStatus [] fileStatusArray = fs.listStatus(taskDirectory);
-            hostnameTaskMap.clear();
-            if (fileStatusArray.length > 0) {
-                for (FileStatus fileStatus : fileStatusArray) {
-                    String[] hostnameTaskArray =
-                        fileStatus.getPath().getName().split(
-                            HOSTNAME_TASK_SEPARATOR);
-                    if (hostnameTaskArray.length != 2) {
-                        throw new RuntimeException(
-                            "getZooKeeperServerList: Task 0 failed " +
-                            "to parse " +
-                            fileStatus.getPath().getName());
-                    }
-                    if (!hostnameTaskMap.containsKey(hostnameTaskArray[0])) {
-                        hostnameTaskMap.put(hostnameTaskArray[0],
-                                            new Integer(hostnameTaskArray[1]));
-                    }
-                }
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("getZooKeeperServerList: Got " +
-                             hostnameTaskMap.keySet() + " " +
-                             hostnameTaskMap.size() + " hosts from " +
-                             fileStatusArray.length + " candidates when " +
-                             serverCount + " required (polling period is " +
-                             pollMsecs + ") on attempt " +
-                             candidateRetrievalAttempt);
-                }
-
-                if (hostnameTaskMap.size() >= serverCount) {
-                    break;
-                }
-                ++candidateRetrievalAttempt;
-                Thread.sleep(pollMsecs);
-            }
-        }
-        StringBuffer serverListFile =
-            new StringBuffer(ZOOKEEPER_SERVER_LIST_FILE_PREFIX);
-        int numServers = 0;
-        for (Map.Entry<String, Integer> hostnameTask :
-                hostnameTaskMap.entrySet()) {
-            serverListFile.append(hostnameTask.getKey() +
-            HOSTNAME_TASK_SEPARATOR + hostnameTask.getValue() +
-            HOSTNAME_TASK_SEPARATOR);
-            if (++numServers == serverCount) {
-                break;
-            }
+  /**
+   * Task 0 will call this to create the ZooKeeper server list.  The result is
+   * a file that describes the ZooKeeper servers through the filename.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void createZooKeeperServerList() throws IOException,
+      InterruptedException {
+    int candidateRetrievalAttempt = 0;
+    Map<String, Integer> hostnameTaskMap = Maps.newTreeMap();
+    while (true) {
+      FileStatus [] fileStatusArray = fs.listStatus(taskDirectory);
+      hostnameTaskMap.clear();
+      if (fileStatusArray.length > 0) {
+        for (FileStatus fileStatus : fileStatusArray) {
+          String[] hostnameTaskArray =
+              fileStatus.getPath().getName().split(
+                  HOSTNAME_TASK_SEPARATOR);
+          if (hostnameTaskArray.length != 2) {
+            throw new RuntimeException(
+                "getZooKeeperServerList: Task 0 failed " +
+                    "to parse " +
+                    fileStatus.getPath().getName());
+          }
+          if (!hostnameTaskMap.containsKey(hostnameTaskArray[0])) {
+            hostnameTaskMap.put(hostnameTaskArray[0],
+                new Integer(hostnameTaskArray[1]));
+          }
         }
-        Path serverListPath =
-            new Path(baseDirectory, serverListFile.toString());
         if (LOG.isInfoEnabled()) {
-            LOG.info("createZooKeeperServerList: Creating the final " +
-                     "ZooKeeper file '" + serverListPath + "'");
-        }
-        fs.createNewFile(serverListPath);
+          LOG.info("getZooKeeperServerList: Got " +
+              hostnameTaskMap.keySet() + " " +
+              hostnameTaskMap.size() + " hosts from " +
+              fileStatusArray.length + " candidates when " +
+              serverCount + " required (polling period is " +
+              pollMsecs + ") on attempt " +
+              candidateRetrievalAttempt);
+        }
+
+        if (hostnameTaskMap.size() >= serverCount) {
+          break;
+        }
+        ++candidateRetrievalAttempt;
+        Thread.sleep(pollMsecs);
+      }
+    }
+    StringBuffer serverListFile =
+        new StringBuffer(ZOOKEEPER_SERVER_LIST_FILE_PREFIX);
+    int numServers = 0;
+    for (Map.Entry<String, Integer> hostnameTask :
+      hostnameTaskMap.entrySet()) {
+      serverListFile.append(hostnameTask.getKey() +
+          HOSTNAME_TASK_SEPARATOR + hostnameTask.getValue() +
+          HOSTNAME_TASK_SEPARATOR);
+      if (++numServers == serverCount) {
+        break;
+      }
+    }
+    Path serverListPath =
+        new Path(baseDirectory, serverListFile.toString());
+    if (LOG.isInfoEnabled()) {
+      LOG.info("createZooKeeperServerList: Creating the final " +
+          "ZooKeeper file '" + serverListPath + "'");
     }
+    fs.createNewFile(serverListPath);
+  }
 
-    /**
-     * Make an attempt to get the server list file by looking for a file in
-     * the appropriate directory with the prefix
-     * ZOOKEEPER_SERVER_LIST_FILE_PREFIX.
-     * @return null if not found or the filename if found
-     * @throws IOException
-     */
-    private String getServerListFile() throws IOException {
-        String serverListFile = null;
-        FileStatus [] fileStatusArray = fs.listStatus(baseDirectory);
-        for (FileStatus fileStatus : fileStatusArray) {
-            if (fileStatus.getPath().getName().startsWith(
-                    ZOOKEEPER_SERVER_LIST_FILE_PREFIX)) {
-                serverListFile = fileStatus.getPath().getName();
-                break;
-            }
-        }
-        return serverListFile;
+  /**
+   * Make an attempt to get the server list file by looking for a file in
+   * the appropriate directory with the prefix
+   * ZOOKEEPER_SERVER_LIST_FILE_PREFIX.
+   * @return null if not found or the filename if found
+   * @throws IOException
+   */
+  private String getServerListFile() throws IOException {
+    String serverListFile = null;
+    FileStatus [] fileStatusArray = fs.listStatus(baseDirectory);
+    for (FileStatus fileStatus : fileStatusArray) {
+      if (fileStatus.getPath().getName().startsWith(
+          ZOOKEEPER_SERVER_LIST_FILE_PREFIX)) {
+        serverListFile = fileStatus.getPath().getName();
+        break;
+      }
     }
+    return serverListFile;
+  }
 
-    /**
-     * Task 0 is the designated master and will generate the server list
-     * (unless it has already done so).  Other
-     * tasks will consume the file after it is created (just the filename).
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    private void getZooKeeperServerList()
-            throws IOException, InterruptedException {
-        String serverListFile;
-
-        if (taskPartition == 0) {
-            serverListFile = getServerListFile();
-            if (serverListFile == null) {
-                createZooKeeperServerList();
-            }
-        }
-
-        while (true) {
-            serverListFile = getServerListFile();
-            if (LOG.isInfoEnabled()) {
-                LOG.info("getZooKeeperServerList: For task " + taskPartition +
-                         ", got file '" + serverListFile +
-                         "' (polling period is " +
-                         pollMsecs + ")");
-            }
-            if (serverListFile != null) {
-                break;
-            }
-            try {
-                Thread.sleep(pollMsecs);
-            } catch (InterruptedException e) {
-                LOG.warn("getZooKeeperServerList: Strange interrupted " +
-                         "exception " + e.getMessage());
-            }
+  /**
+   * Task 0 is the designated master and will generate the server list
+   * (unless it has already done so).  Other
+   * tasks will consume the file after it is created (just the filename).
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void getZooKeeperServerList() throws IOException,
+      InterruptedException {
+    String serverListFile;
+
+    if (taskPartition == 0) {
+      serverListFile = getServerListFile();
+      if (serverListFile == null) {
+        createZooKeeperServerList();
+      }
+    }
+
+    while (true) {
+      serverListFile = getServerListFile();
+      if (LOG.isInfoEnabled()) {
+        LOG.info("getZooKeeperServerList: For task " + taskPartition +
+            ", got file '" + serverListFile +
+            "' (polling period is " +
+            pollMsecs + ")");
+      }
+      if (serverListFile != null) {
+        break;
+      }
+      try {
+        Thread.sleep(pollMsecs);
+      } catch (InterruptedException e) {
+        LOG.warn("getZooKeeperServerList: Strange interrupted " +
+            "exception " + e.getMessage());
+      }
 
-        }
+    }
 
-        List<String> serverHostList = Arrays.asList(serverListFile.substring(
-            ZOOKEEPER_SERVER_LIST_FILE_PREFIX.length()).split(
-                HOSTNAME_TASK_SEPARATOR));
-        if (LOG.isInfoEnabled()) {
-            LOG.info("getZooKeeperServerList: Found " + serverHostList + " " +
-                     serverHostList.size() +
-                     " hosts in filename '" + serverListFile + "'");
-        }
-        if (serverHostList.size() != serverCount * 2) {
-            throw new IllegalStateException(
-                "getZooKeeperServerList: Impossible " +
-                " that " + serverHostList.size() +
-                " != 2 * " +
-                serverCount + " asked for.");
-        }
-
-        for (int i = 0; i < serverHostList.size(); i += 2) {
-            zkServerPortMap.put(serverHostList.get(i),
-                                  Integer.parseInt(serverHostList.get(i+1)));
-        }
-        zkServerPortString = "";
-        for (String server : zkServerPortMap.keySet()) {
-            if (zkServerPortString.length() > 0) {
-                zkServerPortString += ",";
-            }
-            zkServerPortString += server + ":" + zkBasePort;
-        }
+    List<String> serverHostList = Arrays.asList(serverListFile.substring(
+        ZOOKEEPER_SERVER_LIST_FILE_PREFIX.length()).split(
+            HOSTNAME_TASK_SEPARATOR));
+    if (LOG.isInfoEnabled()) {
+      LOG.info("getZooKeeperServerList: Found " + serverHostList + " " +
+          serverHostList.size() +
+          " hosts in filename '" + serverListFile + "'");
+    }
+    if (serverHostList.size() != serverCount * 2) {
+      throw new IllegalStateException(
+          "getZooKeeperServerList: Impossible " +
+              " that " + serverHostList.size() +
+              " != 2 * " +
+              serverCount + " asked for.");
+    }
+
+    for (int i = 0; i < serverHostList.size(); i += 2) {
+      zkServerPortMap.put(serverHostList.get(i),
+        Integer.parseInt(serverHostList.get(i + 1)));
+    }
+    zkServerPortString = "";
+    for (String server : zkServerPortMap.keySet()) {
+      if (zkServerPortString.length() > 0) {
+        zkServerPortString += ",";
+      }
+      zkServerPortString += server + ":" + zkBasePort;
     }
+  }
 
-    /**
-     * Users can get the server port string to connect to ZooKeeper
-     * @return server port string - comma separated
-     */
-    public String getZooKeeperServerPortString() {
-        return zkServerPortString;
+  /**
+   * Users can get the server port string to connect to ZooKeeper
+   * @return server port string - comma separated
+   */
+  public String getZooKeeperServerPortString() {
+    return zkServerPortString;
+  }
+
+  /**
+   * Whoever is elected to be a ZooKeeper server must generate a config file
+   * locally.
+   *
+   * @param serverList List of ZooKeeper servers.
+   */
+  private void generateZooKeeperConfigFile(List<String> serverList) {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("generateZooKeeperConfigFile: Creating file " +
+          configFilePath + " in " + zkDir + " with base port " +
+          zkBasePort);
+    }
+    try {
+      File zkDirFile = new File(this.zkDir);
+      boolean mkDirRet = zkDirFile.mkdirs();
+      if (LOG.isInfoEnabled()) {
+        LOG.info("generateZooKeeperConfigFile: Make directory of " +
+            zkDirFile.getName() + " = " + mkDirRet);
+      }
+      File configFile = new File(configFilePath);
+      boolean deletedRet = configFile.delete();
+      if (LOG.isInfoEnabled()) {
+        LOG.info("generateZooKeeperConfigFile: Delete of " +
+            configFile.getName() + " = " + deletedRet);
+      }
+      if (!configFile.createNewFile()) {
+        throw new IllegalStateException(
+            "generateZooKeeperConfigFile: Failed to " +
+                "create config file " + configFile.getName());
+      }
+      // Make writable by everybody
+      if (!configFile.setWritable(true, false)) {
+        throw new IllegalStateException(
+            "generateZooKeeperConfigFile: Failed to make writable " +
+                configFile.getName());
+      }
+
+      Writer writer = null;
+      try {
+        writer = new FileWriter(configFilePath);
+        writer.write("tickTime=" +
+            GiraphJob.DEFAULT_ZOOKEEPER_TICK_TIME + "\n");
+        writer.write("dataDir=" + this.zkDir + "\n");
+        writer.write("clientPort=" + zkBasePort + "\n");
+        writer.write("maxClientCnxns=" +
+            GiraphJob.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS +
+            "\n");
+        writer.write("minSessionTimeout=" +
+            GiraphJob.DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT +
+            "\n");
+        writer.write("maxSessionTimeout=" +
+            GiraphJob.DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT +
+            "\n");
+        writer.write("initLimit=" +
+            GiraphJob.DEFAULT_ZOOKEEPER_INIT_LIMIT + "\n");
+        writer.write("syncLimit=" +
+            GiraphJob.DEFAULT_ZOOKEEPER_SYNC_LIMIT + "\n");
+        writer.write("snapCount=" +
+            GiraphJob.DEFAULT_ZOOKEEPER_SNAP_COUNT + "\n");
+        if (serverList.size() != 1) {
+          writer.write("electionAlg=0\n");
+          for (int i = 0; i < serverList.size(); ++i) {
+            writer.write("server." + i + "=" + serverList.get(i) +
+                ":" + (zkBasePort + 1) +
+                ":" + (zkBasePort + 2) + "\n");
+            if (myHostname.equals(serverList.get(i))) {
+              Writer myidWriter = null;
+              try {
+                myidWriter = new FileWriter(zkDir + "/myid");
+                myidWriter.write(i + "\n");
+              } finally {
+                Closeables.closeQuietly(myidWriter);
+              }
+            }
+          }
+        }
+      } finally {
+        Closeables.closeQuietly(writer);
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "generateZooKeeperConfigFile: Failed to write file", e);
     }
+  }
 
-    /**
-     * Whoever is elected to be a ZooKeeper server must generate a config file
-     * locally.
-     */
-    private void generateZooKeeperConfigFile(List<String> serverList) {
+  /**
+   * If this task has been selected, online a ZooKeeper server.  Otherwise,
+   * wait until this task knows that the ZooKeeper servers have been onlined.
+   */
+  public void onlineZooKeeperServers() {
+    Integer taskId = zkServerPortMap.get(myHostname);
+    if ((taskId != null) && (taskId.intValue() == taskPartition)) {
+      File zkDirFile = new File(this.zkDir);
+      try {
         if (LOG.isInfoEnabled()) {
-            LOG.info("generateZooKeeperConfigFile: Creating file " +
-                     configFilePath + " in " + zkDir + " with base port " +
-                     zkBasePort);
+          LOG.info("onlineZooKeeperServers: Trying to delete old " +
+              "directory " + this.zkDir);
         }
-        try {
-            File zkDirFile = new File(this.zkDir);
-            boolean mkDirRet = zkDirFile.mkdirs();
-            if (LOG.isInfoEnabled()) {
-                LOG.info("generateZooKeeperConfigFile: Make directory of " +
-                         zkDirFile.getName() + " = " + mkDirRet);
-            }
-            File configFile = new File(configFilePath);
-            boolean deletedRet = configFile.delete();
-            if (LOG.isInfoEnabled()) {
-                LOG.info("generateZooKeeperConfigFile: Delete of " +
-                         configFile.getName() + " = " + deletedRet);
-            }
-            if (!configFile.createNewFile()) {
-                throw new IllegalStateException(
-                    "generateZooKeeperConfigFile: Failed to " +
-                    "create config file " + configFile.getName());
-            }
-            // Make writable by everybody
-            if (!configFile.setWritable(true, false)) {
-                throw new IllegalStateException(
-                    "generateZooKeeperConfigFile: Failed to make writable " +
-                    configFile.getName());
-            }
-            
-            Writer writer = null;
-            try {
-                writer = new FileWriter(configFilePath);
-                writer.write("tickTime=" +
-                             GiraphJob.DEFAULT_ZOOKEEPER_TICK_TIME + "\n");
-                writer.write("dataDir=" + this.zkDir + "\n");
-                writer.write("clientPort=" + zkBasePort + "\n");
-                writer.write("maxClientCnxns=" +
-                        GiraphJob.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS +
-                        "\n");
-                writer.write("minSessionTimeout=" +
-                        GiraphJob.DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT +
-                        "\n");
-                writer.write("maxSessionTimeout=" +
-                        GiraphJob.DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT +
-                        "\n");
-                writer.write("initLimit=" +
-                        GiraphJob.DEFAULT_ZOOKEEPER_INIT_LIMIT + "\n");
-                writer.write("syncLimit=" +
-                        GiraphJob.DEFAULT_ZOOKEEPER_SYNC_LIMIT + "\n");
-                writer.write("snapCount=" +
-                        GiraphJob.DEFAULT_ZOOKEEPER_SNAP_COUNT + "\n");
-                if (serverList.size() != 1) {
-                    writer.write("electionAlg=0\n");
-                    for (int i = 0; i < serverList.size(); ++i) {
-                        writer.write("server." + i + "=" + serverList.get(i) +
-                                     ":" + (zkBasePort + 1) +
-                                     ":" + (zkBasePort + 2) + "\n");
-                        if (myHostname.equals(serverList.get(i))) {
-                            Writer myidWriter = null;
-                            try {
-                                myidWriter = new FileWriter(zkDir + "/myid");
-                                myidWriter.write(i + "\n");
-                            } finally {
-                                Closeables.closeQuietly(myidWriter);
-                            }
-                        }
-                    }
-                }
-            } finally {
-                Closeables.closeQuietly(writer);
+        FileUtils.deleteDirectory(zkDirFile);
+      } catch (IOException e) {
+        LOG.warn("onlineZooKeeperServers: Failed to delete " +
+            "directory " + this.zkDir, e);
+      }
+      generateZooKeeperConfigFile(
+          new ArrayList<String>(zkServerPortMap.keySet()));
+      ProcessBuilder processBuilder = new ProcessBuilder();
+      List<String> commandList = Lists.newArrayList();
+      String javaHome = System.getProperty("java.home");
+      if (javaHome == null) {
+        throw new IllegalArgumentException(
+            "onlineZooKeeperServers: java.home is not set!");
+      }
+      commandList.add(javaHome + "/bin/java");
+      String zkJavaOptsString =
+          conf.get(GiraphJob.ZOOKEEPER_JAVA_OPTS,
+              GiraphJob.ZOOKEEPER_JAVA_OPTS_DEFAULT);
+      String[] zkJavaOptsArray = zkJavaOptsString.split(" ");
+      if (zkJavaOptsArray != null) {
+        for (String javaOpt : zkJavaOptsArray) {
+          commandList.add(javaOpt);
+        }
+      }
+      commandList.add("-cp");
+      Path fullJarPath = new Path(conf.get(GiraphJob.ZOOKEEPER_JAR));
+      commandList.add(fullJarPath.toString());
+      commandList.add(QuorumPeerMain.class.getName());
+      commandList.add(configFilePath);
+      processBuilder.command(commandList);
+      File execDirectory = new File(zkDir);
+      processBuilder.directory(execDirectory);
+      processBuilder.redirectErrorStream(true);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("onlineZooKeeperServers: Attempting to " +
+            "start ZooKeeper server with command " + commandList +
+            " in directory " + execDirectory.toString());
+      }
+      try {
+        synchronized (this) {
+          zkProcess = processBuilder.start();
+          zkProcessCollector =
+              new StreamCollector(zkProcess.getInputStream());
+          zkProcessCollector.start();
+        }
+        Runnable runnable = new Runnable() {
+          public void run() {
+            synchronized (this) {
+              if (zkProcess != null) {
+                LOG.warn("onlineZooKeeperServers: " +
+                         "Forced a shutdown hook kill of the " +
+                         "ZooKeeper process.");
+                zkProcess.destroy();
+              }
             }
+          }
+        };
+        Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+      } catch (IOException e) {
+        LOG.error("onlineZooKeeperServers: Failed to start " +
+            "ZooKeeper process", e);
+        throw new RuntimeException(e);
+      }
+
+      // Once the server is up and running, notify that this server is up
+      // and running by dropping a ready stamp.
+      int connectAttempts = 0;
+      final int maxConnectAttempts = 10;
+      while (connectAttempts < maxConnectAttempts) {
+        try {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("onlineZooKeeperServers: Connect attempt " +
+                connectAttempts + " of " +
+                maxConnectAttempts +
+                " max trying to connect to " +
+                myHostname + ":" + zkBasePort +
+                " with poll msecs = " + pollMsecs);
+          }
+          InetSocketAddress zkServerAddress =
+              new InetSocketAddress(myHostname, zkBasePort);
+          Socket testServerSock = new Socket();
+          testServerSock.connect(zkServerAddress, 5000);
+          if (LOG.isInfoEnabled()) {
+            LOG.info("onlineZooKeeperServers: Connected to " +
+                zkServerAddress + "!");
+          }
+          break;
+        } catch (SocketTimeoutException e) {
+          LOG.warn("onlineZooKeeperServers: Got " +
+              "SocketTimeoutException", e);
+        } catch (ConnectException e) {
+          LOG.warn("onlineZooKeeperServers: Got " +
+              "ConnectException", e);
         } catch (IOException e) {
-            throw new IllegalStateException(
-                "generateZooKeeperConfigFile: Failed to write file", e);
+          LOG.warn("onlineZooKeeperServers: Got " +
+              "IOException", e);
         }
-    }
 
-    /**
-     * If this task has been selected, online a ZooKeeper server.  Otherwise,
-     * wait until this task knows that the ZooKeeper servers have been onlined.
-     */
-    public void onlineZooKeeperServers() {
-        Integer taskId = zkServerPortMap.get(myHostname);
-        if ((taskId != null) && (taskId.intValue() == taskPartition)) {
-            File zkDirFile = new File(this.zkDir);
-            try {
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("onlineZooKeeperServers: Trying to delete old " +
-                             "directory " + this.zkDir);
-                }
-                FileUtils.deleteDirectory(zkDirFile);
-            } catch (IOException e) {
-                LOG.warn("onlineZooKeeperServers: Failed to delete " +
-                         "directory " + this.zkDir, e);
-            }
-            generateZooKeeperConfigFile(
-                new ArrayList<String>(zkServerPortMap.keySet()));
-            ProcessBuilder processBuilder = new ProcessBuilder();
-            List<String> commandList = Lists.newArrayList();
-            String javaHome = System.getProperty("java.home");
-            if (javaHome == null) {
-                throw new IllegalArgumentException(
-                    "onlineZooKeeperServers: java.home is not set!");
-            }
-            commandList.add(javaHome + "/bin/java");
-            String zkJavaOptsString =
-                conf.get(GiraphJob.ZOOKEEPER_JAVA_OPTS,
-                         GiraphJob.ZOOKEEPER_JAVA_OPTS_DEFAULT);
-            String[] zkJavaOptsArray = zkJavaOptsString.split(" ");
-            if (zkJavaOptsArray != null) {
-                for (String javaOpt : zkJavaOptsArray) {
-                    commandList.add(javaOpt);
-                }
+        ++connectAttempts;
+        try {
+          Thread.sleep(pollMsecs);
+        } catch (InterruptedException e) {
+          LOG.warn("onlineZooKeeperServers: Sleep of " + pollMsecs +
+              " interrupted - " + e.getMessage());
+        }
+      }
+      if (connectAttempts == maxConnectAttempts) {
+        throw new IllegalStateException(
+            "onlineZooKeeperServers: Failed to connect in " +
+                connectAttempts + " tries!");
+      }
+      Path myReadyPath = new Path(
+          serverDirectory, myHostname +
+          HOSTNAME_TASK_SEPARATOR + taskPartition);
+      try {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("onlineZooKeeperServers: Creating my filestamp " +
+              myReadyPath);
+        }
+        fs.createNewFile(myReadyPath);
+      } catch (IOException e) {
+        LOG.error("onlineZooKeeperServers: Failed (maybe previous " +
+            "task failed) to create filestamp " + myReadyPath, e);
+      }
+    } else {
+      List<String> foundList = new ArrayList<String>();
+      int readyRetrievalAttempt = 0;
+      while (true) {
+        try {
+          FileStatus [] fileStatusArray =
+              fs.listStatus(serverDirectory);
+          foundList.clear();
+          if ((fileStatusArray != null) &&
+              (fileStatusArray.length > 0)) {
+            for (int i = 0; i < fileStatusArray.length; ++i) {
+              String[] hostnameTaskArray =
+                  fileStatusArray[i].getPath().getName().split(
+                      HOSTNAME_TASK_SEPARATOR);
+              if (hostnameTaskArray.length != 2) {
+                throw new RuntimeException(
+                    "getZooKeeperServerList: Task 0 failed " +
+                        "to parse " +
+                        fileStatusArray[i].getPath().getName());
+              }
+              foundList.add(hostnameTaskArray[0]);
             }
-            commandList.add("-cp");
-            Path fullJarPath = new Path(conf.get(GiraphJob.ZOOKEEPER_JAR));
-            commandList.add(fullJarPath.toString());
-            commandList.add(QuorumPeerMain.class.getName());
-            commandList.add(configFilePath);
-            processBuilder.command(commandList);
-            File execDirectory = new File(zkDir);
-            processBuilder.directory(execDirectory);
-            processBuilder.redirectErrorStream(true);
             if (LOG.isInfoEnabled()) {
-                LOG.info("onlineZooKeeperServers: Attempting to " +
-                         "start ZooKeeper server with command " + commandList +
-                         " in directory " + execDirectory.toString());
-            }
-            try {
-                synchronized (this) {
-                    zkProcess = processBuilder.start();
-                    zkProcessCollector =
-                        new StreamCollector(zkProcess.getInputStream());
-                    zkProcessCollector.start();
-                }
-                Runnable runnable = new Runnable() {
-                    public void run() {
-                        synchronized (this) {
-                            if (zkProcess != null) {
-                                LOG.warn("onlineZooKeeperServers: "+
-                                    "Forced a shutdown hook kill of the " +
-                                    "ZooKeeper process.");
-                                zkProcess.destroy();
-                            }
-                        }
-                    }
-                };
-                Runtime.getRuntime().addShutdownHook(new Thread(runnable));
-            } catch (IOException e) {
-                LOG.error("onlineZooKeeperServers: Failed to start " +
-                          "ZooKeeper process", e);
-                throw new RuntimeException(e);
-            }
-
-            // Once the server is up and running, notify that this server is up
-            // and running by dropping a ready stamp.
-            int connectAttempts = 0;
-            final int maxConnectAttempts = 10;
-            while (connectAttempts < maxConnectAttempts) {
-                try {
-                    if (LOG.isInfoEnabled()) {
-                        LOG.info("onlineZooKeeperServers: Connect attempt " +
-                                 connectAttempts + " of " +
-                                 maxConnectAttempts +
-                                 " max trying to connect to " +
-                                 myHostname + ":" + zkBasePort +
-                                 " with poll msecs = " + pollMsecs);
-                    }
-                    InetSocketAddress zkServerAddress =
-                        new InetSocketAddress(myHostname, zkBasePort);
-                    Socket testServerSock = new Socket();
-                    testServerSock.connect(zkServerAddress, 5000);
-                    if (LOG.isInfoEnabled()) {
-                        LOG.info("onlineZooKeeperServers: Connected to " +
-                                 zkServerAddress + "!");
-                    }
-                    break;
-                } catch (SocketTimeoutException e) {
-                    LOG.warn("onlineZooKeeperServers: Got " +
-                             "SocketTimeoutException", e);
-                } catch (ConnectException e) {
-                    LOG.warn("onlineZooKeeperServers: Got " +
-                             "ConnectException", e);
-                } catch (IOException e) {
-                    LOG.warn("onlineZooKeeperServers: Got " +
-                             "IOException", e);
-                }
-
-                ++connectAttempts;
-                try {
-                    Thread.sleep(pollMsecs);
-                } catch (InterruptedException e) {
-                    LOG.warn("onlineZooKeeperServers: Sleep of " + pollMsecs +
-                             " interrupted - " + e.getMessage());
-                }
-            }
-            if (connectAttempts == maxConnectAttempts) {
-                throw new IllegalStateException(
-                    "onlineZooKeeperServers: Failed to connect in " +
-                    connectAttempts + " tries!");
+              LOG.info("onlineZooKeeperServers: Got " +
+                  foundList + " " +
+                  foundList.size() + " hosts from " +
+                  fileStatusArray.length +
+                  " ready servers when " +
+                  serverCount +
+                  " required (polling period is " +
+                  pollMsecs + ") on attempt " +
+                  readyRetrievalAttempt);
             }
-            Path myReadyPath = new Path(
-                    serverDirectory, myHostname +
-                    HOSTNAME_TASK_SEPARATOR + taskPartition);
-            try {
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("onlineZooKeeperServers: Creating my filestamp " +
-                             myReadyPath);
-                }
-                fs.createNewFile(myReadyPath);
-            } catch (IOException e) {
-                LOG.error("onlineZooKeeperServers: Failed (maybe previous " +
-                          "task failed) to create filestamp " + myReadyPath, e);
-            }
-        }
-        else {
-            List<String> foundList = new ArrayList<String>();
-            int readyRetrievalAttempt = 0;
-            while (true) {
-                try {
-                    FileStatus [] fileStatusArray =
-                        fs.listStatus(serverDirectory);
-                    foundList.clear();
-                    if ((fileStatusArray != null) &&
-                        (fileStatusArray.length > 0)) {
-                        for (int i = 0; i < fileStatusArray.length; ++i) {
-                            String[] hostnameTaskArray =
-                                fileStatusArray[i].getPath().getName().split(
-                                    HOSTNAME_TASK_SEPARATOR);
-                            if (hostnameTaskArray.length != 2) {
-                                throw new RuntimeException(
-                                    "getZooKeeperServerList: Task 0 failed " +
-                                    "to parse " +
-                                    fileStatusArray[i].getPath().getName());
-                            }
-                            foundList.add(hostnameTaskArray[0]);
-                        }
-                        if (LOG.isInfoEnabled()) {
-                            LOG.info("onlineZooKeeperServers: Got " +
-                                     foundList + " " +
-                                     foundList.size() + " hosts from " +
-                                     fileStatusArray.length +
-                                     " ready servers when " +
-                                     serverCount +
-                                     " required (polling period is " +
-                                     pollMsecs + ") on attempt " +
-                                     readyRetrievalAttempt);
-                        }
-                        if (foundList.containsAll(zkServerPortMap.keySet())) {
-                            break;
-                        }
-                    } else {
-                        if (LOG.isInfoEnabled()) {
-                            LOG.info("onlineZooKeeperSErvers: Empty " +
-                                     "directory " + serverDirectory +
-                                     ", waiting " + pollMsecs + " msecs.");
-                        }
-                    }
-                    Thread.sleep(pollMsecs);
-                    ++readyRetrievalAttempt;
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                } catch (InterruptedException e) {
-                    LOG.warn("onlineZooKeeperServers: Strange interrupt from " +
-                             e.getMessage(), e);
-                }
+            if (foundList.containsAll(zkServerPortMap.keySet())) {
+              break;
             }
+          } else {
+            if (LOG.isInfoEnabled()) {
+              LOG.info("onlineZooKeeperSErvers: Empty " +
+                  "directory " + serverDirectory +
+                  ", waiting " + pollMsecs + " msecs.");
+            }
+          }
+          Thread.sleep(pollMsecs);
+          ++readyRetrievalAttempt;
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        } catch (InterruptedException e) {
+          LOG.warn("onlineZooKeeperServers: Strange interrupt from " +
+              e.getMessage(), e);
         }
+      }
     }
+  }
 
-    /**
-     * Wait for all map tasks to signal completion.
-     *
-     * @param totalMapTasks Number of map tasks to wait for
-     */
-    private void waitUntilAllTasksDone(int totalMapTasks) {
-        int attempt = 0;
-        while (true) {
-            try {
-                FileStatus [] fileStatusArray =
-                    fs.listStatus(taskDirectory);
-                int totalDone = 0;
-                if (fileStatusArray.length > 0) {
-                    for (int i = 0; i < fileStatusArray.length; ++i) {
-                        if (fileStatusArray[i].getPath().getName().endsWith(
-                            COMPUTATION_DONE_SUFFIX)) {
-                            ++totalDone;
-                        }
-                    }
-                }
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("waitUntilAllTasksDone: Got " + totalDone +
-                             " and " + totalMapTasks +
-                             " desired (polling period is " +
-                             pollMsecs + ") on attempt " +
-                             attempt);
-                }
-                if (totalDone >= totalMapTasks) {
-                    break;
-                }
-                ++attempt;
-                Thread.sleep(pollMsecs);
-               context.progress();
-            } catch (IOException e) {
-                LOG.warn("waitUntilAllTasksDone: Got IOException.", e);
-            } catch (InterruptedException e) {
-                LOG.warn("waitUntilAllTasksDone: Got InterruptedException", e);
+  /**
+   * Wait for all map tasks to signal completion.
+   *
+   * @param totalMapTasks Number of map tasks to wait for
+   */
+  private void waitUntilAllTasksDone(int totalMapTasks) {
+    int attempt = 0;
+    while (true) {
+      try {
+        FileStatus [] fileStatusArray =
+            fs.listStatus(taskDirectory);
+        int totalDone = 0;
+        if (fileStatusArray.length > 0) {
+          for (int i = 0; i < fileStatusArray.length; ++i) {
+            if (fileStatusArray[i].getPath().getName().endsWith(
+                COMPUTATION_DONE_SUFFIX)) {
+              ++totalDone;
             }
+          }
         }
+        if (LOG.isInfoEnabled()) {
+          LOG.info("waitUntilAllTasksDone: Got " + totalDone +
+              " and " + totalMapTasks +
+              " desired (polling period is " +
+              pollMsecs + ") on attempt " +
+              attempt);
+        }
+        if (totalDone >= totalMapTasks) {
+          break;
+        }
+        ++attempt;
+        Thread.sleep(pollMsecs);
+        context.progress();
+      } catch (IOException e) {
+        LOG.warn("waitUntilAllTasksDone: Got IOException.", e);
+      } catch (InterruptedException e) {
+        LOG.warn("waitUntilAllTasksDone: Got InterruptedException", e);
+      }
     }
+  }
 
-    /**
-     * Notify the ZooKeeper servers that this partition is done with all
-     * ZooKeeper communication.  If this task is running a ZooKeeper server,
-     * kill it when all partitions are done and wait for
-     * completion.  Clean up the ZooKeeper local directory as well.
-     *
-     * @param state State of the application
-     */
-    public void offlineZooKeeperServers(State state) {
-        if (state == State.FINISHED) {
-            createZooKeeperClosedStamp();
+  /**
+   * Notify the ZooKeeper servers that this partition is done with all
+   * ZooKeeper communication.  If this task is running a ZooKeeper server,
+   * kill it when all partitions are done and wait for
+   * completion.  Clean up the ZooKeeper local directory as well.
+   *
+   * @param state State of the application
+   */
+  public void offlineZooKeeperServers(State state) {
+    if (state == State.FINISHED) {
+      createZooKeeperClosedStamp();
+    }
+    synchronized (this) {
+      if (zkProcess != null) {
+        int totalMapTasks = conf.getInt("mapred.map.tasks", -1);
+        waitUntilAllTasksDone(totalMapTasks);
+        zkProcess.destroy();
+        int exitValue = -1;
+        File zkDirFile;
+        try {
+          zkProcessCollector.join();
+          exitValue = zkProcess.waitFor();
+          zkDirFile = new File(zkDir);
+          FileUtils.deleteDirectory(zkDirFile);
+        } catch (InterruptedException e) {
+          LOG.warn("offlineZooKeeperServers: " +
+              "InterruptedException, but continuing ",
+              e);
+        } catch (IOException e) {
+          LOG.warn("offlineZooKeeperSevers: " +
+              "IOException, but continuing",
+              e);
         }
-        synchronized (this) {
-            if (zkProcess != null) {
-                int totalMapTasks = conf.getInt("mapred.map.tasks", -1);
-                waitUntilAllTasksDone(totalMapTasks);
-                zkProcess.destroy();
-                int exitValue = -1;
-                File zkDirFile;
-                try {
-                    zkProcessCollector.join();
-                    exitValue = zkProcess.waitFor();
-                    zkDirFile = new File(zkDir);
-                    FileUtils.deleteDirectory(zkDirFile);
-                } catch (InterruptedException e) {
-                    LOG.warn("offlineZooKeeperServers: " +
-                             "InterruptedException, but continuing ",
-                             e);
-                } catch (IOException e) {
-                    LOG.warn("offlineZooKeeperSevers: " +
-                             "IOException, but continuing",
-                             e);
-                }
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("offlineZooKeeperServers: waitFor returned " +
-                             exitValue + " and deleted directory " + zkDir);
-                }
-                zkProcess = null;
-            }
+        if (LOG.isInfoEnabled()) {
+          LOG.info("offlineZooKeeperServers: waitFor returned " +
+              exitValue + " and deleted directory " + zkDir);
         }
+        zkProcess = null;
+      }
     }
+  }
 
-    /**
-     *  Is this task running a ZooKeeper server?  Only could be true if called
-     *  after onlineZooKeeperServers().
-     *
-     *  @return true if running a ZooKeeper server, false otherwise
-     */
-    public boolean runsZooKeeper() {
-        synchronized (this) {
-            return zkProcess != null;
-        }
+  /**
+   *  Is this task running a ZooKeeper server?  Only could be true if called
+   *  after onlineZooKeeperServers().
+   *
+   *  @return true if running a ZooKeeper server, false otherwise
+   */
+  public boolean runsZooKeeper() {
+    synchronized (this) {
+      return zkProcess != null;
     }
+  }
 }

Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.giraph.bsp;
-
 /**
- *  State of the BSP application
+ * Package of {@link org.apache.zookeeper.ZooKeeper} related objects.
  */
-public enum ApplicationState {
-    UNKNOWN, ///< Shouldn't be seen, just an initial state
-    START_SUPERSTEP, ///< Start from a desired superstep
-    FAILED, ///< Unrecoverable
-    FINISHED ///< Successful completion
-}
+package org.apache.giraph.zk;