You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/10/05 23:37:17 UTC

svn commit: r1394826 - in /giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/graph/ src/main/java/org/apache/giraph/zk/

Author: aching
Date: Fri Oct  5 21:37:16 2012
New Revision: 1394826

URL: http://svn.apache.org/viewvc?rev=1394826&view=rev
Log:
GIRAPH-356: Improve ZooKeeper issues. (aching)

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/pom.xml
    giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1394826&r1=1394825&r2=1394826&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Fri Oct  5 21:37:16 2012
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+
+  GIRAPH-356: Improve ZooKeeper issues. (aching)
+
   GIRAPH-342: Recursive ZooKeeper calls should call progress, dynamic 
   ZooKeeper can skip delete (aching via majakabiljo)
 

Modified: giraph/trunk/pom.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/pom.xml?rev=1394826&r1=1394825&r2=1394826&view=diff
==============================================================================
--- giraph/trunk/pom.xml (original)
+++ giraph/trunk/pom.xml Fri Oct  5 21:37:16 2012
@@ -954,5 +954,5 @@ under the License.
       <artifactId>netty</artifactId>
       <version>3.5.3.Final</version>
     </dependency>
- </dependencies>
+  </dependencies>
 </project>

Modified: giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1394826&r1=1394825&r2=1394826&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java Fri Oct  5 21:37:16 2012
@@ -466,6 +466,12 @@ public class GiraphConfiguration extends
   public static final String ZOOKEEPER_MANAGER_DIR_DEFAULT =
       "_bsp/_defaultZkManagerDir";
 
+  /** Number of ZooKeeper client connection attempts before giving up. */
+  public static final String ZOOKEEPER_CONNECTION_ATTEMPTS =
+      "giraph.zkConnectionAttempts";
+  /** Default of 10 ZooKeeper client connection attempts before giving up. */
+  public static final int ZOOKEEPER_CONNECTION_ATTEMPTS_DEFAULT = 10;
+
   /** This directory has/stores the available checkpoint files in HDFS. */
   public static final String CHECKPOINT_DIRECTORY =
       "giraph.checkpointDirectory";
@@ -539,11 +545,22 @@ public class GiraphConfiguration extends
   public static final int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000;
   /** ZooKeeper minimum session timeout */
   public static final String ZOOKEEPER_MIN_SESSION_TIMEOUT =
-      "giraph.zookeeperMinSessionTimeout";
-  /** Default ZooKeeper minimum session timeout of 5 minutes (in msecs). */
-  public static final int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 300 * 1000;
-  /** Default ZooKeeper maximum session timeout of 10 minutes (in msecs). */
-  public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 600 * 1000;
+      "giraph.zKMinSessionTimeout";
+  /** Default ZooKeeper minimum session timeout of 10 minutes (in msecs). */
+  public static final int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 600 * 1000;
+  /** ZooKeeper maximum session timeout */
+  public static final String ZOOKEEPER_MAX_SESSION_TIMEOUT =
+      "giraph.zkMaxSessionTimeout";
+  /** Default ZooKeeper maximum session timeout of 15 minutes (in msecs). */
+  public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 900 * 1000;
+  /** ZooKeeper force sync */
+  public static final String ZOOKEEPER_FORCE_SYNC = "giraph.zKForceSync";
+  /** Default ZooKeeper force sync is off (for performance) */
+  public static final String DEFAULT_ZOOKEEPER_FORCE_SYNC = "no";
+  /** ZooKeeper skip ACLs */
+  public static final String ZOOKEEPER_SKIP_ACL = "giraph.ZkSkipAcl";
+  /** Default ZooKeeper skip ACLs true (for performance) */
+  public static final String DEFAULT_ZOOKEEPER_SKIP_ACL = "yes";
 
   /**
    * Constructor that creates the configuration
@@ -787,4 +804,41 @@ public class GiraphConfiguration extends
       return getNettyServerThreads();
     }
   }
+
+  public int getZookeeperConnectionAttempts() {
+    return getInt(ZOOKEEPER_CONNECTION_ATTEMPTS,
+                  ZOOKEEPER_CONNECTION_ATTEMPTS_DEFAULT);
+  }
+
+  public int getZooKeeperMinSessionTimeout() {
+    return getInt(ZOOKEEPER_MIN_SESSION_TIMEOUT,
+        DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT);
+  }
+
+  public int getZooKeeperMaxSessionTimeout() {
+    return getInt(ZOOKEEPER_MAX_SESSION_TIMEOUT,
+        DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT);
+  }
+
+  public String getZooKeeperForceSync() {
+    return get(ZOOKEEPER_FORCE_SYNC, DEFAULT_ZOOKEEPER_FORCE_SYNC);
+  }
+
+  public String getZooKeeperSkipAcl() {
+    return get(ZOOKEEPER_SKIP_ACL, DEFAULT_ZOOKEEPER_SKIP_ACL);
+  }
+
+  /**
+   * Get the number of map tasks in this job
+   *
+   * @return Number of map tasks in this job
+   */
+  public int getMapTasks() {
+    int mapTasks = getInt("mapred.map.tasks", -1);
+    if (mapTasks == -1) {
+      throw new IllegalStateException("getMapTasks: Failed to get the map " +
+          "tasks!");
+    }
+    return mapTasks;
+  }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1394826&r1=1394825&r2=1394826&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Fri Oct  5 21:37:16 2012
@@ -934,13 +934,12 @@ public abstract class BspService<I exten
 
     if ((event.getPath() == null) && (event.getType() == EventType.None)) {
       if (event.getState() == KeeperState.Disconnected) {
-        // No way to recover from a disconnect event, signal all BspEvents
+        // Watches may not be triggered for some time, so signal all BspEvents
         for (BspEvent bspEvent : registeredBspEvents) {
           bspEvent.signal();
         }
-        throw new RuntimeException(
-            "process: Disconnected from ZooKeeper, cannot recover - " +
-                event);
+        LOG.warn("process: Disconnected from ZooKeeper (will automatically " +
+            "try to recover) " + event);
       } else if (event.getState() == KeeperState.SyncConnected) {
         if (LOG.isInfoEnabled()) {
           LOG.info("process: Asynchronous connection complete.");

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1394826&r1=1394825&r2=1394826&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Fri Oct  5 21:37:16 2012
@@ -443,23 +443,8 @@ public class BspServiceMaster<I extends 
         // Find the missing workers if there are only a few
         if ((maxWorkers - totalResponses) <=
             partitionLongTailMinPrint) {
-          Set<Integer> partitionSet = new TreeSet<Integer>();
-          for (WorkerInfo workerInfo : healthyWorkerInfoList) {
-            partitionSet.add(workerInfo.getTaskId());
-          }
-          for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
-            partitionSet.add(workerInfo.getTaskId());
-          }
-          for (int i = 1; i <= maxWorkers; ++i) {
-            if (partitionSet.contains(Integer.valueOf(i))) {
-              continue;
-            } else if (i == getTaskPartition()) {
-              continue;
-            } else {
-              LOG.info("checkWorkers: No response from " +
-                  "partition " + i + " (could be master)");
-            }
-          }
+          logMissingWorkersOnSuperstep(healthyWorkerInfoList,
+              unhealthyWorkerInfoList);
         }
       }
       ++pollAttempt;
@@ -477,6 +462,8 @@ public class BspServiceMaster<I extends 
     if (healthyWorkerInfoList.size() < minWorkers) {
       LOG.error("checkWorkers: Only " + healthyWorkerInfoList.size() +
           " available when " + minWorkers + " are required.");
+      logMissingWorkersOnSuperstep(healthyWorkerInfoList,
+          unhealthyWorkerInfoList);
       return null;
     }
 
@@ -488,6 +475,36 @@ public class BspServiceMaster<I extends 
     return healthyWorkerInfoList;
   }
 
+  /**
+   * Log info level of the missing workers on the superstep
+   *
+   * @param healthyWorkerInfoList Healthy worker list
+   * @param unhealthyWorkerInfoList Unhealthy worker list
+   */
+  private void logMissingWorkersOnSuperstep(
+      List<WorkerInfo> healthyWorkerInfoList,
+      List<WorkerInfo> unhealthyWorkerInfoList) {
+    if (LOG.isInfoEnabled()) {
+      Set<Integer> partitionSet = new TreeSet<Integer>();
+      for (WorkerInfo workerInfo : healthyWorkerInfoList) {
+        partitionSet.add(workerInfo.getTaskId());
+      }
+      for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
+        partitionSet.add(workerInfo.getTaskId());
+      }
+      for (int i = 1; i <= maxWorkers; ++i) {
+        if (partitionSet.contains(Integer.valueOf(i))) {
+          continue;
+        } else if (i == getTaskPartition()) {
+          continue;
+        } else {
+          LOG.info("logMissingWorkersOnSuperstep: No response from " +
+              "partition " + i + " (could be master)");
+        }
+      }
+    }
+  }
+
   @Override
   public int createInputSplits() {
     // Only the 'master' should be doing this.  Wait until the number of
@@ -1048,22 +1065,29 @@ public class BspServiceMaster<I extends 
     // Find the last good checkpoint if none have been written to the
     // knowledge of this master
     if (lastCheckpointedSuperstep == -1) {
-      FileStatus[] fileStatusArray =
-          getFs().listStatus(new Path(checkpointBasePath),
-              new FinalizedCheckpointPathFilter());
-      if (fileStatusArray == null) {
-        return -1;
-      }
-      Arrays.sort(fileStatusArray);
-      lastCheckpointedSuperstep = getCheckpoint(
-          fileStatusArray[fileStatusArray.length - 1].getPath());
-      if (LOG.isInfoEnabled()) {
-        LOG.info("getLastGoodCheckpoint: Found last good checkpoint " +
-            lastCheckpointedSuperstep + " from " +
-            fileStatusArray[fileStatusArray.length - 1].
-            getPath().toString());
+      try {
+        FileStatus[] fileStatusArray =
+            getFs().listStatus(new Path(checkpointBasePath),
+                new FinalizedCheckpointPathFilter());
+        if (fileStatusArray == null) {
+          return -1;
+        }
+        Arrays.sort(fileStatusArray);
+        lastCheckpointedSuperstep = getCheckpoint(
+            fileStatusArray[fileStatusArray.length - 1].getPath());
+        if (LOG.isInfoEnabled()) {
+          LOG.info("getLastGoodCheckpoint: Found last good checkpoint " +
+              lastCheckpointedSuperstep + " from " +
+              fileStatusArray[fileStatusArray.length - 1].
+                  getPath().toString());
+        }
+      } catch (IOException e) {
+        LOG.fatal("getLastGoodCheckpoint: No last good checkpoints can be " +
+            "found, killing the job.", e);
+        failJob();
       }
     }
+
     return lastCheckpointedSuperstep;
   }
 
@@ -1458,13 +1482,14 @@ public class BspServiceMaster<I extends 
     // and the master can do any final cleanup if the ZooKeeper service was
     // provided (not dynamically started) and we don't want to keep the data
     try {
-      if (getConfiguration().get(GiraphConfiguration.ZOOKEEPER_LIST) != null &&
+      if (getConfiguration().getZookeeperList() != null &&
           !getConfiguration().getBoolean(
               GiraphConfiguration.KEEP_ZOOKEEPER_DATA,
               GiraphConfiguration.KEEP_ZOOKEEPER_DATA_DEFAULT)) {
         if (LOG.isInfoEnabled()) {
           LOG.info("cleanupZooKeeper: Removing the following path " +
-              "and all children - " + basePath);
+              "and all children - " + basePath + " from ZooKeeper list " +
+              getConfiguration().getZookeeperList());
         }
         getZkExt().deleteExt(basePath, -1, true);
       }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1394826&r1=1394825&r2=1394826&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Fri Oct  5 21:37:16 2012
@@ -314,12 +314,11 @@ public class GraphMapper<I extends Writa
         LOG.info("setup: classpath @ " + zkClasspath + " for job " +
             context.getJobName());
       }
-      context.getConfiguration().set(
-          GiraphConfiguration.ZOOKEEPER_JAR, zkClasspath);
+      conf.setZooKeeperJar(zkClasspath);
     }
     String serverPortList = conf.getZookeeperList();
     if (serverPortList == null) {
-      zkManager = new ZooKeeperManager(context);
+      zkManager = new ZooKeeperManager(context, conf);
       context.setStatus("setup: Setting up Zookeeper manager.");
       zkManager.setup();
       if (zkManager.computationDone()) {
@@ -578,7 +577,18 @@ public class GraphMapper<I extends Writa
             context);
       }
       cleanup(context);
-    } catch (IOException e) {
+      // Checkstyle exception due to needing to dump ZooKeeper failure
+      // on exception
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (RuntimeException e) {
+      // CHECKSTYLE: resume IllegalCatch
+      if (mapFunctions == MapFunctions.UNKNOWN ||
+          mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) {
+        // ZooKeeper may have had an issue
+        if (zkManager != null) {
+          zkManager.logZooKeeperOutput(Level.WARN);
+        }
+      }
       if (mapFunctions == MapFunctions.WORKER_ONLY) {
         serviceWorker.failureCleanup();
       }

Modified: giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java?rev=1394826&r1=1394825&r2=1394826&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java Fri Oct  5 21:37:16 2012
@@ -21,13 +21,16 @@ package org.apache.giraph.zk;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
+import java.util.LinkedList;
 import org.apache.commons.io.FileUtils;
 import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.server.quorum.QuorumPeerMain;
 
@@ -68,7 +71,7 @@ public class ZooKeeperManager {
   /** Job context (mainly for progress) */
   private Mapper<?, ?, ?, ?>.Context context;
   /** Hadoop configuration */
-  private final Configuration conf;
+  private final ImmutableClassesGiraphConfiguration conf;
   /** Task partition, to ensure uniqueness */
   private final int taskPartition;
   /** HDFS base directory for all file-based coordination */
@@ -96,13 +99,13 @@ public class ZooKeeperManager {
   /** Thread that gets the zkProcess output */
   private StreamCollector zkProcessCollector = null;
   /** ZooKeeper local file system directory */
-  private String zkDir = null;
+  private final String zkDir;
   /** ZooKeeper config file path */
-  private String configFilePath = null;
+  private final String configFilePath;
   /** ZooKeeper server list */
   private final Map<String, Integer> zkServerPortMap = Maps.newTreeMap();
   /** ZooKeeper base port */
-  private int zkBasePort = -1;
+  private final int zkBasePort;
   /** Final ZooKeeper server port list (for clients) */
   private String zkServerPortString;
   /** My hostname */
@@ -126,13 +129,15 @@ public class ZooKeeperManager {
   /**
    * Constructor with context.
    *
-   * @param context Context to be stord internally
+   * @param context Context to be stored internally
+   * @param configuration Configuration
    * @throws IOException
    */
-  public ZooKeeperManager(Mapper<?, ?, ?, ?>.Context context)
+  public ZooKeeperManager(Mapper<?, ?, ?, ?>.Context context,
+                          ImmutableClassesGiraphConfiguration configuration)
     throws IOException {
     this.context = context;
-    conf = context.getConfiguration();
+    this.conf = configuration;
     taskPartition = conf.getInt("mapred.task.partition", -1);
     jobId = conf.get("mapred.job.id", "Unknown Job");
     baseDirectory =
@@ -201,11 +206,14 @@ public class ZooKeeperManager {
    * Collects the output of a stream and dumps it to the log.
    */
   private static class StreamCollector extends Thread {
+    /** Number of last lines to keep */
+    private static final int LAST_LINES_COUNT = 100;
     /** Class logger */
     private static final Logger LOG = Logger.getLogger(StreamCollector.class);
-    /** Input stream to dump */
-    private final InputStream is;
-
+    /** Buffered reader of input stream */
+    private final BufferedReader bufferedReader;
+    /** Last lines (help to debug failures) */
+    private final LinkedList<String> lastLines = Lists.newLinkedList();
     /**
      * Constructor.
      *
@@ -213,22 +221,49 @@ public class ZooKeeperManager {
      */
     public StreamCollector(final InputStream is) {
       super(StreamCollector.class.getName());
-      this.is = is;
+      setDaemon(true);
+      InputStreamReader streamReader = new InputStreamReader(is);
+      bufferedReader = new BufferedReader(streamReader);
     }
 
     @Override
     public void run() {
-      InputStreamReader streamReader = new InputStreamReader(is);
-      BufferedReader bufferedReader = new BufferedReader(streamReader);
+      readLines();
+    }
+
+    /**
+     * Read all the lines from the bufferedReader.
+     */
+    private synchronized void readLines() {
       String line;
       try {
         while ((line = bufferedReader.readLine()) != null) {
+          if (lastLines.size() > LAST_LINES_COUNT) {
+            lastLines.removeFirst();
+          }
+          lastLines.add(line);
+
           if (LOG.isDebugEnabled()) {
-            LOG.debug("run: " + line);
+            LOG.debug("readLines: " + line);
           }
         }
       } catch (IOException e) {
-        LOG.error("run: Ignoring IOException", e);
+        LOG.error("readLines: Ignoring IOException", e);
+      }
+    }
+
+    /**
+     * Dump the last n lines of the collector.  Likely used in
+     * the case of failure.
+     *
+     * @param level Log level to dump with
+     */
+    public synchronized void dumpLastLines(Level level) {
+      // Get any remaining lines
+      readLines();
+      // Dump the lines to the screen
+      for (String line : lastLines) {
+        LOG.log(level, line);
       }
     }
   }
@@ -527,19 +562,18 @@ public class ZooKeeperManager {
         writer.write("maxClientCnxns=" +
             GiraphConfiguration.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS +
             "\n");
-        int minSessionTimeout = conf.getInt(
-            GiraphConfiguration.ZOOKEEPER_MIN_SESSION_TIMEOUT,
-            GiraphConfiguration.DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT);
-        writer.write("minSessionTimeout=" + minSessionTimeout + "\n");
+        writer.write("minSessionTimeout=" +
+            conf.getZooKeeperMinSessionTimeout() + "\n");
         writer.write("maxSessionTimeout=" +
-            GiraphConfiguration.DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT +
-            "\n");
+            conf.getZooKeeperMaxSessionTimeout() + "\n");
         writer.write("initLimit=" +
             GiraphConfiguration.DEFAULT_ZOOKEEPER_INIT_LIMIT + "\n");
         writer.write("syncLimit=" +
             GiraphConfiguration.DEFAULT_ZOOKEEPER_SYNC_LIMIT + "\n");
         writer.write("snapCount=" +
             GiraphConfiguration.DEFAULT_ZOOKEEPER_SNAP_COUNT + "\n");
+        writer.write("forceSync=" + conf.getZooKeeperForceSync() + "\n");
+        writer.write("skipACL=" + conf.getZooKeeperSkipAcl() + "\n");
         if (serverList.size() != 1) {
           writer.write("electionAlg=0\n");
           for (int i = 0; i < serverList.size(); ++i) {
@@ -624,17 +658,28 @@ public class ZooKeeperManager {
         }
         Runnable runnable = new Runnable() {
           public void run() {
+            LOG.info("run: Shutdown hook started.");
             synchronized (this) {
               if (zkProcess != null) {
                 LOG.warn("onlineZooKeeperServers: " +
                          "Forced a shutdown hook kill of the " +
                          "ZooKeeper process.");
                 zkProcess.destroy();
+                int exitCode = -1;
+                try {
+                  exitCode = zkProcess.waitFor();
+                } catch (InterruptedException e) {
+                  LOG.warn("run: Couldn't get exit code.");
+                }
+                LOG.info("onlineZooKeeperServers: ZooKeeper process exited " +
+                    "with " + exitCode + " (note that 143 " +
+                    "typically means killed).");
               }
             }
           }
         };
         Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        LOG.info("onlineZooKeeperServers: Shutdown hook added.");
       } catch (IOException e) {
         LOG.error("onlineZooKeeperServers: Failed to start " +
             "ZooKeeper process", e);
@@ -644,7 +689,8 @@ public class ZooKeeperManager {
       // Once the server is up and running, notify that this server is up
       // and running by dropping a ready stamp.
       int connectAttempts = 0;
-      final int maxConnectAttempts = 10;
+      final int maxConnectAttempts =
+          conf.getZookeeperConnectionAttempts();
       while (connectAttempts < maxConnectAttempts) {
         try {
           if (LOG.isInfoEnabled()) {
@@ -811,7 +857,7 @@ public class ZooKeeperManager {
     }
     synchronized (this) {
       if (zkProcess != null) {
-        int totalMapTasks = conf.getInt("mapred.map.tasks", -1);
+        int totalMapTasks = conf.getMapTasks();
         waitUntilAllTasksDone(totalMapTasks);
         zkProcess.destroy();
         int exitValue = -1;
@@ -850,4 +896,18 @@ public class ZooKeeperManager {
       return zkProcess != null;
     }
   }
+
+  /**
+   * Log the zookeeper output from the process (if it was started)
+   *
+   * @param level Log level to print at
+   */
+  public void logZooKeeperOutput(Level level) {
+    if (zkProcessCollector != null) {
+      LOG.log(level, "logZooKeeperOutput: Dumping up to last " +
+          StreamCollector.LAST_LINES_COUNT +
+          " lines of the ZooKeeper process STDOUT and STDERR.");
+      zkProcessCollector.dumpLastLines(level);
+    }
+  }
 }