You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ed...@apache.org on 2016/06/15 21:50:52 UTC

git commit: updated refs/heads/trunk to 916763e

Repository: giraph
Updated Branches:
  refs/heads/trunk 5068b6f00 -> 916763e74


GIRAPH-1068 Make Zookeeper accept 0 as a port number and let it choose any available free port

Summary:
We have a few use cases where having zookeeper bound to specific port is very inconvenient.
1) Unit tests that run in parallel.
2) Shared clusters where multiple giraph instances can run on the same machines.

In theory we don't need to know what port zookeeper will run on. In most cases we're fine with any port available.
Picking any available port is currently supported by the server socket, but is not supported in the code that parses zookeper configs (this code lives in zookeper).
We don't have to parse configs though, as we have a way to run zookeper in process. And in that case we can have a full control on how zookeeper is initialized.

For this task I want to allow 0 as a port number for zookeeper. Which will allow us to run zookeeper on any available port. And I will also remove "out of process" zookeeper, as it clearly provides no benefits to us.

Note: it will still be possible to run external zookeper, if you have it running somewhere as a service.

Note2: this change is intended to remove the functionality to support multiple zk servers running as a part of the Giraph job and only support a single zk server. If you want to run multiple zookeepers, you need to configure them separately and let Giraph use existing zookeper quorum

Test Plan:
tested a few things:
picking any available port
-Dgiraph.zkServerPort=0
using external zookeeper:
-Dgiraph.zkList="hadoopXXX.YYY.facebook.com:22181"
using specified port:
-Dgiraph.zkServerPort=22128

Reviewers: majakabiljo, maja.kabiljo, dionysis.logothetis, avery.ching, heslami

Reviewed By: avery.ching, heslami

Differential Revision: https://reviews.facebook.net/D59109


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/916763e7
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/916763e7
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/916763e7

Branch: refs/heads/trunk
Commit: 916763e747b0f2d3bf2dcda76100dbc0d5714a9c
Parents: 5068b6f
Author: Sergey Edunov <ed...@fb.com>
Authored: Wed Jun 15 14:50:44 2016 -0700
Committer: Sergey Edunov <ed...@fb.com>
Committed: Wed Jun 15 14:50:44 2016 -0700

----------------------------------------------------------------------
 .../org/apache/giraph/bsp/BspInputFormat.java   |   3 +-
 .../java/org/apache/giraph/bsp/BspService.java  |   5 +-
 .../apache/giraph/conf/GiraphConfiguration.java |  18 --
 .../org/apache/giraph/conf/GiraphConstants.java |  38 +--
 .../apache/giraph/graph/GraphTaskManager.java   |  25 +-
 .../giraph/utils/InternalVertexRunner.java      | 145 ++--------
 .../apache/giraph/yarn/GiraphYarnClient.java    |  13 -
 .../giraph/zk/InProcessZooKeeperRunner.java     | 135 ++++++----
 .../giraph/zk/OutOfProcessZooKeeperRunner.java  | 227 ----------------
 .../org/apache/giraph/zk/ZooKeeperManager.java  | 263 ++++++-------------
 .../org/apache/giraph/zk/ZooKeeperRunner.java   |   5 +-
 .../org/apache/giraph/zk/ZookeeperConfig.java   | 105 ++++++++
 12 files changed, 322 insertions(+), 660 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/916763e7/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
index 8f88c80..310f66c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspInputFormat.java
@@ -55,8 +55,7 @@ public class BspInputFormat extends InputFormat<Text, Text> {
     // if this is a YARN job, separate ZK should already be running
     boolean isYarnJob = GiraphConstants.IS_PURE_YARN_JOB.get(conf);
     if (splitMasterWorker && !isYarnJob) {
-      int zkServers = GiraphConstants.ZOOKEEPER_SERVER_COUNT.get(conf);
-      maxTasks += zkServers;
+      maxTasks += 1;
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("getMaxTasks: Max workers = " + maxWorkers +

http://git-wip-us.apache.org/repos/asf/giraph/blob/916763e7/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index c73f441..fc0fa95 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -89,9 +89,6 @@ public abstract class BspService<I extends WritableComparable,
   public static final String MASTER_ELECTION_DIR = "/_masterElectionDir";
   /** Superstep scope */
   public static final String SUPERSTEP_DIR = "/_superstepDir";
-  /** Where the merged aggregators are located */
-  public static final String MERGED_AGGREGATOR_DIR =
-      "/_mergedAggregatorDir";
   /** Healthy workers register here. */
   public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir";
   /** Unhealthy workers register here. */
@@ -275,7 +272,7 @@ public abstract class BspService<I extends WritableComparable,
         getCheckpointBasePath(getConfiguration(), getJobId());
 
     masterElectionPath = basePath + MASTER_ELECTION_DIR;
-    String serverPortList = conf.getZookeeperList();
+    String serverPortList = graphTaskManager.getZookeeperList();
     haltComputationPath = basePath + HALT_COMPUTATION_NODE;
     memoryObserverPath = basePath + MEMORY_OBSERVER_DIR;
     getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP,

http://git-wip-us.apache.org/repos/asf/giraph/blob/916763e7/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index df79b7f..7f1cb2b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -834,16 +834,6 @@ public class GiraphConfiguration extends Configuration
     LOCAL_TEST_MODE.set(this, flag);
   }
 
-  /**
-   * The number of server tasks in our ZK quorum for
-   * this job run.
-   *
-   * @return the number of ZK servers in the quorum
-   */
-  public int getZooKeeperServerCount() {
-    return ZOOKEEPER_SERVER_COUNT.get(this);
-  }
-
   public int getZooKeeperSessionTimeout() {
     return ZOOKEEPER_SESSION_TIMEOUT.get(this);
   }
@@ -913,14 +903,6 @@ public class GiraphConfiguration extends Configuration
     return ZOOKEEPER_MAX_SESSION_TIMEOUT.get(this);
   }
 
-  public boolean getZooKeeperForceSync() {
-    return ZOOKEEPER_FORCE_SYNC.get(this);
-  }
-
-  public boolean getZooKeeperSkipAcl() {
-    return ZOOKEEPER_SKIP_ACL.get(this);
-  }
-
   /**
    * Get the number of map tasks in this job
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/916763e7/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index da0a453..c592a12 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -533,11 +533,6 @@ public interface GiraphConstants {
       new IntConfOption("giraph.zkServerlistPollMsecs", SECONDS.toMillis(3),
           "Polling interval to check for the ZooKeeper server data");
 
-  /** Number of nodes (not tasks) to run Zookeeper on */
-  IntConfOption ZOOKEEPER_SERVER_COUNT =
-      new IntConfOption("giraph.zkServerCount", 1,
-          "Number of nodes (not tasks) to run Zookeeper on");
-
   /** ZooKeeper port to use */
   IntConfOption ZOOKEEPER_SERVER_PORT =
       new IntConfOption("giraph.zkServerPort", 22181, "ZooKeeper port to use");
@@ -558,15 +553,6 @@ public interface GiraphConstants {
           "Msecs to wait before retrying a failed ZooKeeper op due to " +
           "connection loss.");
 
-  /**
-   * Should start zookeeper inside master java process or separately?
-   * In process by default.
-   */
-  BooleanConfOption ZOOKEEEPER_RUNS_IN_PROCESS = new BooleanConfOption(
-      "giraph.zkRunsInProcess",
-      true, "If true run zookeeper in master process, if false starts " +
-      "separate process for zookeeper");
-
   /** TCP backlog (defaults to number of workers) */
   IntConfOption TCP_BACKLOG = new IntConfOption("giraph.tcpBacklog", 1,
       "TCP backlog (defaults to number of workers)");
@@ -898,13 +884,6 @@ public interface GiraphConstants {
    */
   String PARTITION_VERTEX_KEY_SPACE_SIZE = "giraph.vertexKeySpaceSize";
 
-  /** Java opts passed to ZooKeeper startup */
-  StrConfOption ZOOKEEPER_JAVA_OPTS =
-      new StrConfOption("giraph.zkJavaOpts",
-          "-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " +
-          "-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100",
-          "Java opts passed to ZooKeeper startup");
-
   /**
    *  How often to checkpoint (i.e. 0, means no checkpoint,
    *  1 means every superstep, 2 is every two supersteps, etc.).
@@ -1004,14 +983,12 @@ public interface GiraphConstants {
 
   /** Default ZooKeeper tick time. */
   int DEFAULT_ZOOKEEPER_TICK_TIME = 6000;
-  /** Default ZooKeeper init limit (in ticks). */
-  int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10;
-  /** Default ZooKeeper sync limit (in ticks). */
-  int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5;
-  /** Default ZooKeeper snap count. */
-  int DEFAULT_ZOOKEEPER_SNAP_COUNT = 50000;
   /** Default ZooKeeper maximum client connections. */
   int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000;
+  /** Number of snapshots to be retained after purge */
+  int ZOOKEEPER_SNAP_RETAIN_COUNT = 3;
+  /** Zookeeper purge interval in hours */
+  int ZOOKEEPER_PURGE_INTERVAL = 1;
   /** ZooKeeper minimum session timeout */
   IntConfOption ZOOKEEPER_MIN_SESSION_TIMEOUT =
       new IntConfOption("giraph.zKMinSessionTimeout", MINUTES.toMillis(10),
@@ -1020,13 +997,6 @@ public interface GiraphConstants {
   IntConfOption ZOOKEEPER_MAX_SESSION_TIMEOUT =
       new IntConfOption("giraph.zkMaxSessionTimeout", MINUTES.toMillis(15),
           "ZooKeeper maximum session timeout");
-  /** ZooKeeper force sync */
-  BooleanConfOption ZOOKEEPER_FORCE_SYNC =
-      new BooleanConfOption("giraph.zKForceSync", false,
-          "ZooKeeper force sync");
-  /** ZooKeeper skip ACLs */
-  BooleanConfOption ZOOKEEPER_SKIP_ACL =
-      new BooleanConfOption("giraph.ZkSkipAcl", true, "ZooKeeper skip ACLs");
 
   /**
    * Whether to use SASL with DIGEST and Hadoop Job Tokens to authenticate

http://git-wip-us.apache.org/repos/asf/giraph/blob/916763e7/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 87d5248..725d327 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -256,11 +256,6 @@ end[PURE_YARN]*/
     context
         .setStatus("setup: Connected to Zookeeper service " + serverPortList);
     this.graphFunctions = determineGraphFunctions(conf, zkManager);
-    // Sometimes it takes a while to get multiple ZooKeeper servers up
-    if (conf.getZooKeeperServerCount() > 1) {
-      Thread.sleep(GiraphConstants.DEFAULT_ZOOKEEPER_INIT_LIMIT *
-        GiraphConstants.DEFAULT_ZOOKEEPER_TICK_TIME);
-    }
     try {
       instantiateBspService();
     } catch (IOException e) {
@@ -447,7 +442,7 @@ end[PURE_YARN]*/
       done = true;
       return true;
     }
-    zkManager.onlineZooKeeperServers();
+    zkManager.onlineZooKeeperServer();
     String serverPortList = zkManager.getZooKeeperServerPortString();
     conf.setZookeeperList(serverPortList);
     createZooKeeperCounter(serverPortList);
@@ -597,8 +592,7 @@ end[PURE_YARN]*/
       }
     } else {
       if (zkAlreadyProvided) {
-        int masterCount = conf.getZooKeeperServerCount();
-        if (taskPartition < masterCount) {
+        if (taskPartition == 0) {
           functions = GraphFunctions.MASTER_ONLY;
         } else {
           functions = GraphFunctions.WORKER_ONLY;
@@ -1064,6 +1058,21 @@ end[PURE_YARN]*/
   }
 
   /**
+   * Returns a list of zookeeper servers to connect to.
+   * If the port is set to 0 and Giraph is starting a single
+   * ZooKeeper server, then Zookeeper will pick its own port.
+   * Otherwise, the ZooKeeper port set by the user will be used.
+   * @return host:port,host:port for each zookeeper
+   */
+  public String getZookeeperList() {
+    if (zkManager != null) {
+      return zkManager.getZooKeeperServerPortString();
+    } else {
+      return conf.getZookeeperList();
+    }
+  }
+
+  /**
    * Default handler for uncaught exceptions.
    * It will do the best to clean up and then will terminate current giraph job.
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/916763e7/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index bb2865c..90a5859 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -26,23 +26,18 @@ import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.io.formats.GiraphFileInputFormat;
 import org.apache.giraph.io.formats.InMemoryVertexOutputFormat;
 import org.apache.giraph.job.GiraphJob;
+import org.apache.giraph.zk.InProcessZooKeeperRunner;
+import org.apache.giraph.zk.ZookeeperConfig;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.server.ServerConfig;
-import org.apache.zookeeper.server.ZooKeeperServerMain;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.net.InetSocketAddress;
 
 /**
  * A base class for running internal tests on a vertex
@@ -56,10 +51,6 @@ import java.util.concurrent.TimeUnit;
  */
 @SuppressWarnings("unchecked")
 public class InternalVertexRunner {
-  /** Range of ZooKeeper ports to use for tests */
-  public static final int LOCAL_ZOOKEEPER_PORT_FROM = 22182;
-  /** Range of ZooKeeper ports to use for tests */
-  public static final int LOCAL_ZOOKEEPER_PORT_TO = 65535;
 
   /** Logger */
   private static final Logger LOG =
@@ -85,42 +76,30 @@ public class InternalVertexRunner {
   }
 
   /**
-   * Run the standalone ZooKeeper process and the job.
+   * Run the ZooKeeper in-process and the job.
    *
-   * @param quorumPeerConfig Quorum peer configuration
+   * @param zookeeperConfig Quorum peer configuration
    * @param giraphJob Giraph job to run
    * @return True if successful, false otherwise
    */
-  private static boolean runZooKeeperAndJob(QuorumPeerConfig quorumPeerConfig,
-                                            GiraphJob giraphJob) {
-    final InternalZooKeeper zookeeper = new InternalZooKeeper();
-    final ServerConfig zkConfig = new ServerConfig();
-    zkConfig.readFrom(quorumPeerConfig);
-
-    ExecutorService executorService = Executors.newSingleThreadExecutor();
-    executorService.execute(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          zookeeper.runFromConfig(zkConfig);
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    });
+  private static boolean runZooKeeperAndJob(
+      final ZookeeperConfig zookeeperConfig,
+      GiraphJob giraphJob) throws IOException {
+    final InProcessZooKeeperRunner.ZooKeeperServerRunner zookeeper =
+        new InProcessZooKeeperRunner.ZooKeeperServerRunner();
+
+    int port = zookeeper.start(zookeeperConfig);
+
+    LOG.info("Started test zookeeper on port " + port);
+    GiraphConstants.ZOOKEEPER_LIST.set(giraphJob.getConfiguration(),
+        "localhost:" + port);
     try {
       return giraphJob.run(true);
     } catch (InterruptedException |
         ClassNotFoundException | IOException e) {
       LOG.error("runZooKeeperAndJob: Got exception on running", e);
     } finally {
-      zookeeper.end();
-      executorService.shutdown();
-      try {
-        executorService.awaitTermination(1, TimeUnit.MINUTES);
-      } catch (InterruptedException e) {
-        LOG.error("runZooKeeperAndJob: Interrupted on waiting", e);
-      }
+      zookeeper.stop();
     }
 
     return false;
@@ -192,13 +171,9 @@ public class InternalVertexRunner {
       FileUtils.writeLines(edgeInputFile, edgeInputData);
     }
 
-    int localZookeeperPort = findAvailablePort();
-
     conf.setWorkerConfiguration(1, 1, 100.0f);
     GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
     GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
-    conf.setZookeeperList("localhost:" +
-          String.valueOf(localZookeeperPort));
 
     conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
     GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
@@ -226,10 +201,7 @@ public class InternalVertexRunner {
         new Path(outputDir.toString()));
 
     // Configure a local zookeeper instance
-    Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort);
-
-    QuorumPeerConfig qpConfig = new QuorumPeerConfig();
-    qpConfig.parseProperties(zkProperties);
+    ZookeeperConfig qpConfig = configLocalZooKeeper(zkDir);
 
     boolean success = runZooKeeperAndJob(qpConfig, job);
     if (!success) {
@@ -308,27 +280,17 @@ public class InternalVertexRunner {
 
     InMemoryVertexInputFormat.setGraph(graph);
 
-    int localZookeeperPort = findAvailablePort();
-
     conf.setWorkerConfiguration(1, 1, 100.0f);
     GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
     GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
-    GiraphConstants.ZOOKEEPER_LIST.set(conf, "localhost:" +
-          String.valueOf(localZookeeperPort));
+    GiraphConstants.ZOOKEEPER_SERVER_PORT.set(conf, 0);
 
     conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
     GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
         zkMgrDir.toString());
     GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir);
 
-    // Configure a local zookeeper instance
-    Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort);
-
-    QuorumPeerConfig qpConfig = new QuorumPeerConfig();
-    qpConfig.parseProperties(zkProperties);
-
-    runZooKeeperAndJob(qpConfig, job);
-
+    runZooKeeperAndJob(configLocalZooKeeper(zkDir), job);
   }
 
   /**
@@ -391,66 +353,15 @@ public class InternalVertexRunner {
    * Configuration options for running local ZK.
    *
    * @param zkDir directory for ZK to hold files in.
-   * @param zookeeperPort port zookeeper will listen on
-   * @return Properties configured for local ZK.
+   * @return zookeeper configuration object
    */
-  private static Properties configLocalZooKeeper(File zkDir,
-                                                 int zookeeperPort) {
-    Properties zkProperties = new Properties();
-    zkProperties.setProperty("tickTime", "2000");
-    zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
-    zkProperties.setProperty("clientPort",
-        String.valueOf(zookeeperPort));
-    zkProperties.setProperty("maxClientCnxns", "10000");
-    zkProperties.setProperty("minSessionTimeout", "10000");
-    zkProperties.setProperty("maxSessionTimeout", "100000");
-    zkProperties.setProperty("initLimit", "10");
-    zkProperties.setProperty("syncLimit", "5");
-    zkProperties.setProperty("snapCount", "50000");
-    return zkProperties;
+  private static ZookeeperConfig configLocalZooKeeper(File zkDir) {
+    ZookeeperConfig config = new ZookeeperConfig();
+    config.setMaxSessionTimeout(100000);
+    config.setMinSessionTimeout(10000);
+    config.setClientPortAddress(new InetSocketAddress("localhost", 0));
+    config.setDataDir(zkDir.getAbsolutePath());
+    return config;
   }
 
-  /**
-   * Scans for available port. Returns first port where
-   * we can open server socket.
-   * Note: if another process opened port with SO_REUSEPORT then this
-   * function may return port that is in use. It actually happens
-   * with NetCat on Mac.
-   * @return available port
-   */
-  private static int findAvailablePort() {
-    for (int port = LOCAL_ZOOKEEPER_PORT_FROM;
-         port < LOCAL_ZOOKEEPER_PORT_TO; port++) {
-      ServerSocket ss = null;
-      try {
-        ss = new ServerSocket(port);
-        ss.setReuseAddress(true);
-        return port;
-      } catch (IOException e) {
-        LOG.info("findAvailablePort: port " + port + " is in use.");
-      } finally {
-        if (ss != null && !ss.isClosed()) {
-          try {
-            ss.close();
-          } catch (IOException e) {
-            LOG.info("findAvailablePort: can't close test socket", e);
-          }
-        }
-      }
-    }
-    throw new RuntimeException("No port found in the range [ " +
-        LOCAL_ZOOKEEPER_PORT_FROM + ", " + LOCAL_ZOOKEEPER_PORT_TO + ")");
-  }
-
-  /**
-   * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown
-   */
-  private static class InternalZooKeeper extends ZooKeeperServerMain {
-    /**
-     * Shutdown the ZooKeeper instance.
-     */
-    void end() {
-      shutdown();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/916763e7/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
index 9f5924d..fb4c18d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
@@ -444,19 +444,6 @@ public class GiraphYarnClient {
   }
 
   /**
-   * Check if the job's configuration is for a local run. These can all be
-   * removed as we expand the functionality of the "pure YARN" Giraph profile.
-   */
-  private void checkJobLocalZooKeeperSupported() {
-    final boolean isZkExternal = giraphConf.isZookeeperExternal();
-    final String checkZkList = giraphConf.getZookeeperList();
-    if (!isZkExternal || checkZkList.isEmpty()) {
-      throw new IllegalArgumentException("Giraph on YARN does not currently" +
-          "support Giraph-managed ZK instances: use a standalone ZooKeeper.");
-    }
-  }
-
-  /**
    * Register all local jar files from GiraphConstants.GIRAPH_YARN_LIBJARS
    * in the LocalResources map, copy to HDFS on that same registered path.
    * @param map the LocalResources list to populate.

http://git-wip-us.apache.org/repos/asf/giraph/blob/916763e7/giraph-core/src/main/java/org/apache/giraph/zk/InProcessZooKeeperRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/InProcessZooKeeperRunner.java b/giraph-core/src/main/java/org/apache/giraph/zk/InProcessZooKeeperRunner.java
index 5556216..9502c24 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/InProcessZooKeeperRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/InProcessZooKeeperRunner.java
@@ -18,15 +18,17 @@
 package org.apache.giraph.zk;
 
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.jmx.ManagedUtil;
 import org.apache.zookeeper.server.DatadirCleanupManager;
-import org.apache.zookeeper.server.ServerConfig;
-import org.apache.zookeeper.server.ZooKeeperServerMain;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.QuorumPeerMain;
 
 import javax.management.JMException;
+import java.io.File;
 import java.io.IOException;
 
 /**
@@ -45,21 +47,13 @@ public class InProcessZooKeeperRunner
   private QuorumRunner quorumRunner = new QuorumRunner();
 
   @Override
-  public void start(String zkDir, final String configFilePath) {
-    Thread zkThread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          quorumRunner.start(configFilePath);
-        } catch (IOException e) {
-          LOG.error("Unable to start zookeeper", e);
-        } catch (QuorumPeerConfig.ConfigException e) {
-          LOG.error("Invalid config, zookeeper failed", e);
-        }
-      }
-    });
-    zkThread.setDaemon(true);
-    zkThread.start();
+  public int start(String zkDir, final ZookeeperConfig config) {
+    try {
+      return quorumRunner.start(config);
+    } catch (IOException e) {
+      LOG.error("Unable to start zookeeper", e);
+    }
+    return -1;
   }
 
   @Override
@@ -89,30 +83,21 @@ public class InProcessZooKeeperRunner
 
     /**
      * Starts quorum and/or zookeeper service.
-     * @param configFilePath quorum and zookeeper configuration
-     * @throws IOException
-     * @throws QuorumPeerConfig.ConfigException if config
-     * is not formatted properly
+     * @param config quorum and zookeeper configuration
+     * @return zookeeper port
+     * @throws IOException if can't start zookeeper
      */
-    public void start(String configFilePath) throws IOException,
-        QuorumPeerConfig.ConfigException {
-      QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig();
-      quorumPeerConfig.parse(configFilePath);
+    public int start(ZookeeperConfig config) throws IOException {
       // Start and schedule the the purge task
       DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
-          quorumPeerConfig
-          .getDataDir(), quorumPeerConfig.getDataLogDir(), quorumPeerConfig
-          .getSnapRetainCount(), quorumPeerConfig.getPurgeInterval());
+          config
+          .getDataDir(), config.getDataLogDir(),
+          GiraphConstants.ZOOKEEPER_SNAP_RETAIN_COUNT,
+          GiraphConstants.ZOOKEEPER_PURGE_INTERVAL);
       purgeMgr.start();
 
-      if (quorumPeerConfig.getServers().size() > 0) {
-        runFromConfig(quorumPeerConfig);
-      } else {
-        serverRunner = new ZooKeeperServerRunner();
-        serverRunner.start(configFilePath);
-      }
-
-      LOG.info("Initialization ended");
+      serverRunner = new ZooKeeperServerRunner();
+      return serverRunner.start(config);
     }
 
     /**
@@ -134,35 +119,89 @@ public class InProcessZooKeeperRunner
   /**
    * Wrapper around zookeeper service.
    */
-  private static class ZooKeeperServerRunner extends ZooKeeperServerMain {
+  public static class ZooKeeperServerRunner  {
+    /**
+     * Reference to zookeeper factory.
+     */
+    private ServerCnxnFactory cnxnFactory;
+    /**
+     * Reference to zookeeper server.
+     */
+    private ZooKeeperServer zkServer;
 
     /**
      * Start zookeeper service.
-     * @param configFilePath zookeeper configuration file
-     * @throws QuorumPeerConfig.ConfigException if config file is not
+     * @param config zookeeper configuration
      * formatted properly
+     * @return the port zookeeper has started on.
      * @throws IOException
      */
-    public void start(String configFilePath) throws
-        QuorumPeerConfig.ConfigException, IOException {
-      LOG.warn("Either no config or no quorum defined in config, running " +
-          " in standalone mode");
+    public int start(ZookeeperConfig config) throws IOException {
+      LOG.warn("Either no config or no quorum defined in config, " +
+          "running in process");
       try {
         ManagedUtil.registerLog4jMBeans();
       } catch (JMException e) {
         LOG.warn("Unable to register log4j JMX control", e);
       }
 
-      ServerConfig serverConfig = new ServerConfig();
-      serverConfig.parse(configFilePath);
-      runFromConfig(serverConfig);
+      runFromConfig(config);
+      Thread zkThread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            cnxnFactory.join();
+            if (zkServer.isRunning()) {
+              zkServer.shutdown();
+            }
+          } catch (InterruptedException e) {
+            LOG.error(e.getMessage(), e);
+          }
+
+        }
+      });
+      zkThread.setDaemon(true);
+      zkThread.start();
+      return zkServer.getClientPort();
     }
 
+
+    /**
+     * Run from a ServerConfig.
+     * @param config ServerConfig to use.
+     * @throws IOException
+     */
+    public void runFromConfig(ZookeeperConfig config) throws IOException {
+      LOG.info("Starting server");
+      try {
+        // Note that this thread isn't going to be doing anything else,
+        // so rather than spawning another thread, we will just call
+        // run() in this thread.
+        // create a file logger url from the command line args
+        zkServer = new ZooKeeperServer();
+
+        FileTxnSnapLog ftxn = new FileTxnSnapLog(new
+            File(config.getDataLogDir()), new File(config.getDataDir()));
+        zkServer.setTxnLogFactory(ftxn);
+        zkServer.setTickTime(GiraphConstants.DEFAULT_ZOOKEEPER_TICK_TIME);
+        zkServer.setMinSessionTimeout(config.getMinSessionTimeout());
+        zkServer.setMaxSessionTimeout(config.getMaxSessionTimeout());
+        cnxnFactory = ServerCnxnFactory.createFactory();
+        cnxnFactory.configure(config.getClientPortAddress(),
+            GiraphConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS);
+        cnxnFactory.startup(zkServer);
+      } catch (InterruptedException e) {
+        // warn, but generally this is ok
+        LOG.warn("Server interrupted", e);
+      }
+    }
+
+
     /**
      * Stop zookeeper service.
      */
     public void stop() {
-      shutdown();
+      cnxnFactory.shutdown();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/916763e7/giraph-core/src/main/java/org/apache/giraph/zk/OutOfProcessZooKeeperRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/OutOfProcessZooKeeperRunner.java b/giraph-core/src/main/java/org/apache/giraph/zk/OutOfProcessZooKeeperRunner.java
deleted file mode 100644
index c86a199..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/zk/OutOfProcessZooKeeperRunner.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.zk;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.server.quorum.QuorumPeerMain;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Zookeeper wrapper that starts zookeeper in the separate process (old way).
- */
-public class OutOfProcessZooKeeperRunner
-    extends DefaultImmutableClassesGiraphConfigurable
-    implements ZooKeeperRunner {
-
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(OutOfProcessZooKeeperRunner.class);
-
-  /** ZooKeeper process */
-  private Process zkProcess;
-  /** Thread that gets the zkProcess output */
-  private StreamCollector zkProcessCollector = null;
-  /** Synchronization lock for zkProcess */
-  private final Object processLock = new Object();
-
-  @Override
-  public void start(String zkDir, String configFilePath) {
-    try {
-      ProcessBuilder processBuilder = new ProcessBuilder();
-      List<String> commandList = Lists.newArrayList();
-      String javaHome = System.getProperty("java.home");
-      if (javaHome == null) {
-        throw new IllegalArgumentException(
-            "onlineZooKeeperServers: java.home is not set!");
-      }
-      commandList.add(javaHome + "/bin/java");
-      commandList.add("-cp");
-      commandList.add(System.getProperty("java.class.path"));
-      String zkJavaOptsString =
-          GiraphConstants.ZOOKEEPER_JAVA_OPTS.get(getConf());
-      String[] zkJavaOptsArray = zkJavaOptsString.split(" ");
-      if (zkJavaOptsArray != null) {
-        commandList.addAll(Arrays.asList(zkJavaOptsArray));
-      }
-      commandList.add(QuorumPeerMain.class.getName());
-      commandList.add(configFilePath);
-      processBuilder.command(commandList);
-      File execDirectory = new File(zkDir);
-      processBuilder.directory(execDirectory);
-      processBuilder.redirectErrorStream(true);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("onlineZooKeeperServers: Attempting to " +
-            "start ZooKeeper server with command " + commandList +
-            " in directory " + execDirectory.toString());
-      }
-      synchronized (processLock) {
-        zkProcess = processBuilder.start();
-        zkProcessCollector =
-            new StreamCollector(zkProcess.getInputStream());
-        zkProcessCollector.start();
-      }
-      Runnable runnable = new Runnable() {
-        public void run() {
-          LOG.info("run: Shutdown hook started.");
-          synchronized (processLock) {
-            if (zkProcess != null) {
-              LOG.warn("onlineZooKeeperServers: " +
-                  "Forced a shutdown hook kill of the " +
-                  "ZooKeeper process.");
-              zkProcess.destroy();
-              int exitCode = -1;
-              try {
-                exitCode = zkProcess.waitFor();
-              } catch (InterruptedException e) {
-                LOG.warn("run: Couldn't get exit code.");
-              }
-              LOG.info("onlineZooKeeperServers: ZooKeeper process exited " +
-                  "with " + exitCode + " (note that 143 " +
-                  "typically means killed).");
-            }
-          }
-        }
-      };
-      Runtime.getRuntime().addShutdownHook(new Thread(runnable));
-      LOG.info("onlineZooKeeperServers: Shutdown hook added.");
-    } catch (IOException e) {
-      LOG.error("onlineZooKeeperServers: Failed to start " +
-          "ZooKeeper process", e);
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void stop() {
-    zkProcess.destroy();
-    int exitValue = -1;
-    try {
-      zkProcessCollector.join();
-      exitValue = zkProcess.waitFor();
-    } catch (InterruptedException e) {
-      LOG.warn("offlineZooKeeperServers: " +
-              "InterruptedException, but continuing ",
-          e);
-    }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("offlineZooKeeperServers: waitFor returned " +
-          exitValue);
-    }
-  }
-
-  @Override
-  public void cleanup() {
-    logZooKeeperOutput(Level.WARN);
-  }
-
-
-  /**
-   * Collects the output of a stream and dumps it to the log.
-   */
-  private static class StreamCollector extends Thread {
-    /** Number of last lines to keep */
-    private static final int LAST_LINES_COUNT = 100;
-    /** Class logger */
-    private static final Logger LOG = Logger.getLogger(StreamCollector.class);
-    /** Buffered reader of input stream */
-    private final BufferedReader bufferedReader;
-    /** Last lines (help to debug failures) */
-    private final LinkedList<String> lastLines = Lists.newLinkedList();
-    /**
-     * Constructor.
-     *
-     * @param is InputStream to dump to LOG.info
-     */
-    public StreamCollector(final InputStream is) {
-      super(StreamCollector.class.getName());
-      setDaemon(true);
-      InputStreamReader streamReader = new InputStreamReader(is,
-          Charset.defaultCharset());
-      bufferedReader = new BufferedReader(streamReader);
-    }
-
-    @Override
-    public void run() {
-      readLines();
-    }
-
-    /**
-     * Read all the lines from the bufferedReader.
-     */
-    private synchronized void readLines() {
-      String line;
-      try {
-        while ((line = bufferedReader.readLine()) != null) {
-          if (lastLines.size() > LAST_LINES_COUNT) {
-            lastLines.removeFirst();
-          }
-          lastLines.add(line);
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("readLines: " + line);
-          }
-        }
-      } catch (IOException e) {
-        LOG.error("readLines: Ignoring IOException", e);
-      }
-    }
-
-    /**
-     * Dump the last n lines of the collector.  Likely used in
-     * the case of failure.
-     *
-     * @param level Log level to dump with
-     */
-    public synchronized void dumpLastLines(Level level) {
-      // Get any remaining lines
-      readLines();
-      // Dump the lines to the screen
-      for (String line : lastLines) {
-        LOG.log(level, line);
-      }
-    }
-  }
-
-
-  /**
-   * Log the zookeeper output from the process (if it was started)
-   *
-   * @param level Log level to print at
-   */
-  public void logZooKeeperOutput(Level level) {
-    if (zkProcessCollector != null) {
-      LOG.log(level, "logZooKeeperOutput: Dumping up to last " +
-          StreamCollector.LAST_LINES_COUNT +
-          " lines of the ZooKeeper process STDOUT and STDERR.");
-      zkProcessCollector.dumpLastLines(level);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/916763e7/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
index 63b521c..cac4315 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
@@ -18,8 +18,6 @@
 
 package org.apache.giraph.zk;
 
-import com.google.common.collect.Maps;
-import com.google.common.io.Closeables;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.io.FileUtils;
 import org.apache.giraph.conf.GiraphConstants;
@@ -34,19 +32,15 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
 import java.io.File;
-import java.io.FileWriter;
 import java.io.IOException;
-import java.io.Writer;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static com.google.common.base.Preconditions.checkState;
 import static org.apache.giraph.conf.GiraphConstants.BASE_ZNODE_KEY;
 import static org.apache.giraph.conf.GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY;
 
@@ -85,8 +79,6 @@ public class ZooKeeperManager {
   private final Path myClosedPath;
   /** Polling msecs timeout */
   private final int pollMsecs;
-  /** Server count */
-  private final int serverCount;
   /** File system */
   private final FileSystem fs;
   /** Zookeeper wrapper */
@@ -94,22 +86,19 @@ public class ZooKeeperManager {
   /** ZooKeeper local file system directory */
   private final String zkDir;
   /** ZooKeeper config file path */
-  private final String configFilePath;
-  /** ZooKeeper server list */
-  private final Map<String, Integer> zkServerPortMap = Maps.newTreeMap();
+  private final ZookeeperConfig config;
+  /** ZooKeeper server host */
+  private String zkServerHost;
+  /** ZooKeeper server task */
+  private int zkServerTask;
   /** ZooKeeper base port */
-  private final int zkBasePort;
+  private int zkBasePort;
   /** Final ZooKeeper server port list (for clients) */
   private String zkServerPortString;
   /** My hostname */
   private String myHostname = null;
   /** Job id, to ensure uniqueness */
   private final String jobId;
-  /**
-   * Default local ZooKeeper prefix directory to use (where ZooKeeper server
-   * files will go)
-   */
-  private final String zkDirDefault;
   /** Time object for tracking timeouts */
   private final Time time = SystemTime.get();
 
@@ -145,8 +134,8 @@ public class ZooKeeperManager {
     myClosedPath = new Path(taskDirectory,
         (new ComputationDoneName(taskPartition)).getName());
     pollMsecs = GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.get(conf);
-    serverCount = GiraphConstants.ZOOKEEPER_SERVER_COUNT.get(conf);
     String jobLocalDir = conf.get("job.local.dir");
+    String zkDirDefault;
     if (jobLocalDir != null) { // for non-local jobs
       zkDirDefault = jobLocalDir +
           "/_bspZooKeeper";
@@ -155,7 +144,7 @@ public class ZooKeeperManager {
               ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue();
     }
     zkDir = conf.get(GiraphConstants.ZOOKEEPER_DIR, zkDirDefault);
-    configFilePath = zkDir + "/zoo.cfg";
+    config = new ZookeeperConfig();
     zkBasePort = GiraphConstants.ZOOKEEPER_SERVER_PORT.get(conf);
 
     myHostname = conf.getLocalHostname();
@@ -315,58 +304,32 @@ public class ZooKeeperManager {
    */
   private void createZooKeeperServerList() throws IOException,
       InterruptedException {
-    int candidateRetrievalAttempt = 0;
-    Map<String, Integer> hostnameTaskMap = Maps.newTreeMap();
-    while (true) {
+    String host = null;
+    int port = 0;
+    while (host == null) {
       FileStatus [] fileStatusArray = fs.listStatus(taskDirectory);
-      hostnameTaskMap.clear();
       if (fileStatusArray.length > 0) {
-        for (FileStatus fileStatus : fileStatusArray) {
-          String[] hostnameTaskArray =
-              fileStatus.getPath().getName().split(
-                  HOSTNAME_TASK_SEPARATOR);
-          if (hostnameTaskArray.length != 2) {
-            throw new RuntimeException(
-                "getZooKeeperServerList: Task 0 failed " +
-                    "to parse " +
-                    fileStatus.getPath().getName());
-          }
-          if (!hostnameTaskMap.containsKey(hostnameTaskArray[0])) {
-            hostnameTaskMap.put(hostnameTaskArray[0],
-                Integer.valueOf(hostnameTaskArray[1]));
-          }
-        }
-        if (LOG.isInfoEnabled()) {
-          LOG.info("getZooKeeperServerList: Got " +
-              hostnameTaskMap.keySet() + " " +
-              hostnameTaskMap.size() + " hosts from " +
-              fileStatusArray.length + " candidates when " +
-              serverCount + " required (polling period is " +
-              pollMsecs + ") on attempt " +
-              candidateRetrievalAttempt);
-        }
-
-        if (hostnameTaskMap.size() >= serverCount) {
-          break;
-        }
-        ++candidateRetrievalAttempt;
+        checkState(fileStatusArray.length == 1,
+            "createZooKeeperServerList: too many " +
+            "status files found " + Arrays.toString(fileStatusArray));
+        FileStatus fileStatus = fileStatusArray[0];
+        String[] hostnameTaskArray =
+            fileStatus.getPath().getName().split(
+                HOSTNAME_TASK_SEPARATOR);
+        checkState(hostnameTaskArray.length == 2,
+            "createZooKeeperServerList: Task 0 failed " +
+            "to parse " + fileStatus.getPath().getName());
+        host = hostnameTaskArray[0];
+        port = Integer.parseInt(hostnameTaskArray[1]);
         Thread.sleep(pollMsecs);
       }
     }
-    StringBuffer serverListFile =
-        new StringBuffer(ZOOKEEPER_SERVER_LIST_FILE_PREFIX);
-    int numServers = 0;
-    for (Map.Entry<String, Integer> hostnameTask :
-      hostnameTaskMap.entrySet()) {
-      serverListFile.append(hostnameTask.getKey() +
-          HOSTNAME_TASK_SEPARATOR + hostnameTask.getValue() +
-          HOSTNAME_TASK_SEPARATOR);
-      if (++numServers == serverCount) {
-        break;
-      }
-    }
+    String serverListFile =
+        ZOOKEEPER_SERVER_LIST_FILE_PREFIX + host +
+        HOSTNAME_TASK_SEPARATOR + port +
+        HOSTNAME_TASK_SEPARATOR;
     Path serverListPath =
-        new Path(baseDirectory, serverListFile.toString());
+        new Path(baseDirectory, serverListFile);
     if (LOG.isInfoEnabled()) {
       LOG.info("createZooKeeperServerList: Creating the final " +
           "ZooKeeper file '" + serverListPath + "'");
@@ -432,33 +395,25 @@ public class ZooKeeperManager {
 
     }
 
-    List<String> serverHostList = Arrays.asList(serverListFile.substring(
+    String[] serverHostList = serverListFile.substring(
         ZOOKEEPER_SERVER_LIST_FILE_PREFIX.length()).split(
-            HOSTNAME_TASK_SEPARATOR));
+            HOSTNAME_TASK_SEPARATOR);
     if (LOG.isInfoEnabled()) {
-      LOG.info("getZooKeeperServerList: Found " + serverHostList + " " +
-          serverHostList.size() +
+      LOG.info("getZooKeeperServerList: Found " +
+          Arrays.toString(serverHostList) +
           " hosts in filename '" + serverListFile + "'");
     }
-    if (serverHostList.size() != serverCount * 2) {
-      throw new IllegalStateException(
-          "getZooKeeperServerList: Impossible " +
-              " that " + serverHostList.size() +
-              " != 2 * " +
-              serverCount + " asked for.");
-    }
 
-    for (int i = 0; i < serverHostList.size(); i += 2) {
-      zkServerPortMap.put(serverHostList.get(i),
-        Integer.parseInt(serverHostList.get(i + 1)));
-    }
-    zkServerPortString = "";
-    for (String server : zkServerPortMap.keySet()) {
-      if (zkServerPortString.length() > 0) {
-        zkServerPortString += ",";
-      }
-      zkServerPortString += server + ":" + zkBasePort;
-    }
+    zkServerHost = serverHostList[0];
+    zkServerTask = Integer.parseInt(serverHostList[1]);
+    updateZkPortString();
+  }
+
+  /**
+   * Update zookeeper host:port string.
+   */
+  private void updateZkPortString() {
+    zkServerPortString = zkServerHost + ":" + zkBasePort;
   }
 
   /**
@@ -473,96 +428,32 @@ public class ZooKeeperManager {
    * Whoever is elected to be a ZooKeeper server must generate a config file
    * locally.
    *
-   * @param serverList List of ZooKeeper servers.
    */
-  private void generateZooKeeperConfigFile(List<String> serverList) {
+  private void generateZooKeeperConfig() {
     if (LOG.isInfoEnabled()) {
-      LOG.info("generateZooKeeperConfigFile: Creating file " +
-          configFilePath + " in " + zkDir + " with base port " +
+      LOG.info("generateZooKeeperConfig: with base port " +
           zkBasePort);
     }
-    try {
-      File zkDirFile = new File(this.zkDir);
-      boolean mkDirRet = zkDirFile.mkdirs();
-      if (LOG.isInfoEnabled()) {
-        LOG.info("generateZooKeeperConfigFile: Make directory of " +
-            zkDirFile.getName() + " = " + mkDirRet);
-      }
-      File configFile = new File(configFilePath);
-      boolean deletedRet = configFile.delete();
-      if (LOG.isInfoEnabled()) {
-        LOG.info("generateZooKeeperConfigFile: Delete of " +
-            configFile.getName() + " = " + deletedRet);
-      }
-      if (!configFile.createNewFile()) {
-        throw new IllegalStateException(
-            "generateZooKeeperConfigFile: Failed to " +
-                "create config file " + configFile.getName());
-      }
-      // Make writable by everybody
-      if (!configFile.setWritable(true, false)) {
-        throw new IllegalStateException(
-            "generateZooKeeperConfigFile: Failed to make writable " +
-                configFile.getName());
-      }
-
-      Writer writer = null;
-      try {
-        writer = new FileWriter(configFilePath);
-        writer.write("tickTime=" +
-            GiraphConstants.DEFAULT_ZOOKEEPER_TICK_TIME + "\n");
-        writer.write("dataDir=" + this.zkDir + "\n");
-        writer.write("clientPort=" + zkBasePort + "\n");
-        writer.write("maxClientCnxns=" +
-            GiraphConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS +
-            "\n");
-        writer.write("minSessionTimeout=" +
-            conf.getZooKeeperMinSessionTimeout() + "\n");
-        writer.write("maxSessionTimeout=" +
-            conf.getZooKeeperMaxSessionTimeout() + "\n");
-        writer.write("initLimit=" +
-            GiraphConstants.DEFAULT_ZOOKEEPER_INIT_LIMIT + "\n");
-        writer.write("syncLimit=" +
-            GiraphConstants.DEFAULT_ZOOKEEPER_SYNC_LIMIT + "\n");
-        writer.write("snapCount=" +
-            GiraphConstants.DEFAULT_ZOOKEEPER_SNAP_COUNT + "\n");
-        writer.write("forceSync=" +
-            (conf.getZooKeeperForceSync() ? "yes" : "no") + "\n");
-        writer.write("skipACL=" +
-            (conf.getZooKeeperSkipAcl() ? "yes" : "no") + "\n");
-        if (serverList.size() != 1) {
-          writer.write("electionAlg=0\n");
-          for (int i = 0; i < serverList.size(); ++i) {
-            writer.write("server." + i + "=" + serverList.get(i) +
-                ":" + (zkBasePort + 1) +
-                ":" + (zkBasePort + 2) + "\n");
-            if (myHostname.equals(serverList.get(i))) {
-              Writer myidWriter = null;
-              try {
-                myidWriter = new FileWriter(zkDir + "/myid");
-                myidWriter.write(i + "\n");
-              } finally {
-                Closeables.close(myidWriter, true);
-              }
-            }
-          }
-        }
-      } finally {
-        Closeables.close(writer, true);
-      }
-    } catch (IOException e) {
-      throw new IllegalStateException(
-          "generateZooKeeperConfigFile: Failed to write file", e);
+    File zkDirFile = new File(this.zkDir);
+    boolean mkDirRet = zkDirFile.mkdirs();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("generateZooKeeperConfigFile: Make directory of " +
+          zkDirFile.getName() + " = " + mkDirRet);
     }
+
+    config.setDataDir(zkDir);
+    config.setDataLogDir(zkDir);
+    config.setClientPortAddress(new InetSocketAddress(zkBasePort));
+    config.setMinSessionTimeout(conf.getZooKeeperMinSessionTimeout());
+    config.setMaxSessionTimeout(conf.getZooKeeperMaxSessionTimeout());
   }
 
   /**
    * If this task has been selected, online a ZooKeeper server.  Otherwise,
    * wait until this task knows that the ZooKeeper servers have been onlined.
    */
-  public void onlineZooKeeperServers() {
-    Integer taskId = zkServerPortMap.get(myHostname);
-    if ((taskId != null) && (taskId == taskPartition)) {
+  public void onlineZooKeeperServer() {
+    if (zkServerTask == taskPartition) {
       File zkDirFile = new File(this.zkDir);
       try {
         if (LOG.isInfoEnabled()) {
@@ -574,10 +465,14 @@ public class ZooKeeperManager {
         LOG.warn("onlineZooKeeperServers: Failed to delete " +
             "directory " + this.zkDir, e);
       }
-      generateZooKeeperConfigFile(new ArrayList<>(zkServerPortMap.keySet()));
+      generateZooKeeperConfig();
       synchronized (this) {
         zkRunner = createRunner();
-        zkRunner.start(zkDir, configFilePath);
+        int port = zkRunner.start(zkDir, config);
+        if (port > 0) {
+          zkBasePort = port;
+          updateZkPortString();
+        }
       }
 
       // Once the server is up and running, notify that this server is up
@@ -630,7 +525,8 @@ public class ZooKeeperManager {
       }
       Path myReadyPath = new Path(
           serverDirectory, myHostname +
-          HOSTNAME_TASK_SEPARATOR + taskPartition);
+          HOSTNAME_TASK_SEPARATOR + taskPartition +
+          HOSTNAME_TASK_SEPARATOR + zkBasePort);
       try {
         if (LOG.isInfoEnabled()) {
           LOG.info("onlineZooKeeperServers: Creating my filestamp " +
@@ -642,39 +538,37 @@ public class ZooKeeperManager {
             "task failed) to create filestamp " + myReadyPath, e);
       }
     } else {
-      List<String> foundList = new ArrayList<>();
       int readyRetrievalAttempt = 0;
+      String foundServer = null;
       while (true) {
         try {
           FileStatus [] fileStatusArray =
               fs.listStatus(serverDirectory);
-          foundList.clear();
           if ((fileStatusArray != null) &&
               (fileStatusArray.length > 0)) {
             for (int i = 0; i < fileStatusArray.length; ++i) {
               String[] hostnameTaskArray =
                   fileStatusArray[i].getPath().getName().split(
                       HOSTNAME_TASK_SEPARATOR);
-              if (hostnameTaskArray.length != 2) {
+              if (hostnameTaskArray.length != 3) {
                 throw new RuntimeException(
                     "getZooKeeperServerList: Task 0 failed " +
                         "to parse " +
                         fileStatusArray[i].getPath().getName());
               }
-              foundList.add(hostnameTaskArray[0]);
+              foundServer = hostnameTaskArray[0];
+              zkBasePort = Integer.parseInt(hostnameTaskArray[2]);
+              updateZkPortString();
             }
             if (LOG.isInfoEnabled()) {
               LOG.info("onlineZooKeeperServers: Got " +
-                  foundList + " " +
-                  foundList.size() + " hosts from " +
-                  fileStatusArray.length +
-                  " ready servers when " +
-                  serverCount +
-                  " required (polling period is " +
+                  foundServer + " on port " +
+                  zkBasePort +
+                  " (polling period is " +
                   pollMsecs + ") on attempt " +
                   readyRetrievalAttempt);
             }
-            if (foundList.containsAll(zkServerPortMap.keySet())) {
+            if (zkServerHost.equals(foundServer)) {
               break;
             }
           } else {
@@ -808,12 +702,7 @@ public class ZooKeeperManager {
    * @return either in process or out of process wrapper.
    */
   private ZooKeeperRunner createRunner() {
-    ZooKeeperRunner runner;
-    if (GiraphConstants.ZOOKEEEPER_RUNS_IN_PROCESS.get(conf)) {
-      runner = new InProcessZooKeeperRunner();
-    } else {
-      runner = new OutOfProcessZooKeeperRunner();
-    }
+    ZooKeeperRunner runner = new InProcessZooKeeperRunner();
     runner.setConf(conf);
     return runner;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/916763e7/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperRunner.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperRunner.java
index 4c13a25..2797047 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperRunner.java
@@ -30,9 +30,10 @@ public interface ZooKeeperRunner extends ImmutableClassesGiraphConfigurable {
    * Starts zookeeper service in specified working directory with
    * specified config file.
    * @param zkDir working directory
-   * @param configFilePath path to the config file
+   * @param config zookeeper configuration
+   * @return port zookeeper runs on
    */
-  void start(String zkDir, String configFilePath);
+  int start(String zkDir, ZookeeperConfig config);
 
   /**
    * Stops zookeeper.

http://git-wip-us.apache.org/repos/asf/giraph/blob/916763e7/giraph-core/src/main/java/org/apache/giraph/zk/ZookeeperConfig.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZookeeperConfig.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZookeeperConfig.java
new file mode 100644
index 0000000..2aeb192
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZookeeperConfig.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.zk;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Zookeeper configuration file.
+ * Originally copied from zookeeper sources to allow
+ * modification on the fly instead of reading from disk.
+ */
+public class ZookeeperConfig {
+
+  /** Zookeeper server address */
+  private InetSocketAddress clientPortAddress;
+  /** Snapshot log dir */
+  private String dataDir;
+  /** Transaction log dir */
+  private String dataLogDir;
+  /** minimum session timeout in milliseconds */
+  private int minSessionTimeout = -1;
+  /** maximum session timeout in milliseconds */
+  private int maxSessionTimeout = -1;
+  /**
+   * Get zookeeper server address
+   * @return zookeeper server address
+   */
+  public InetSocketAddress getClientPortAddress() { return clientPortAddress; }
+
+  /**
+   * Snapshot dir
+   * @return snapshot dir path
+   */
+  public String getDataDir() { return dataDir; }
+
+  /**
+   * Transaction dir
+   * @return transaction dir path
+   */
+  public String getDataLogDir() {
+    if (dataLogDir == null) {
+      return dataDir;
+    }
+    return dataLogDir;
+  }
+  /** minimum session timeout in milliseconds */
+  public int getMinSessionTimeout() { return minSessionTimeout; }
+  /** maximum session timeout in milliseconds */
+  public int getMaxSessionTimeout() { return maxSessionTimeout; }
+
+  /**
+   * Set snapshot log dir
+   * @param dataDir snapshot log dir path
+   */
+  public void setDataDir(String dataDir) {
+    this.dataDir = dataDir;
+  }
+
+  /**
+   * Transaction log dir
+   * @param dataLogDir transaction log dir path
+   */
+  public void setDataLogDir(String dataLogDir) {
+    this.dataLogDir = dataLogDir;
+  }
+
+  /**
+   * Set zookeeper server address
+   * @param clientPortAddress server address
+   */
+  public void setClientPortAddress(InetSocketAddress clientPortAddress) {
+    this.clientPortAddress = clientPortAddress;
+  }
+
+  /**
+   * Set minimum session timeout in milliseconds
+   * @param minSessionTimeout min session timeout
+   */
+  public void setMinSessionTimeout(int minSessionTimeout) {
+    this.minSessionTimeout = minSessionTimeout;
+  }
+  /**
+   * Set maximum session timeout in milliseconds
+   * @param maxSessionTimeout max session timeout
+   */
+  public void setMaxSessionTimeout(int maxSessionTimeout) {
+    this.maxSessionTimeout = maxSessionTimeout;
+  }
+
+}