You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by nk...@apache.org on 2020/10/13 12:43:21 UTC

[zookeeper] branch master updated: ZOOKEEPER-3948: Introduce a deterministic runtime behavior injection framework for ZooKeeperServer testing.

This is an automated email from the ASF dual-hosted git repository.

nkalmar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 4432f5b  ZOOKEEPER-3948: Introduce a deterministic runtime behavior injection framework for ZooKeeperServer testing.
4432f5b is described below

commit 4432f5b44359f56007f1ea5be5de2a9829c97e5c
Author: Michael Han <ha...@apache.org>
AuthorDate: Tue Oct 13 14:43:12 2020 +0200

    ZOOKEEPER-3948: Introduce a deterministic runtime behavior injection framework for ZooKeeperServer testing.
    
    We'd like to understand how applications built on top of ZooKeeper behave under various faulty conditions, which is important to build resilient end to end solutions and avoid ZooKeeper being single point of failure. We'd also like to achieve this in both unit tests (in process) and integration tests (in and out of process). Traditional methods of using external fault injection mechanisms are non deterministic and requires non trivial set up and hard to integrate with unit tests, so h [...]
    
    The basic idea here is to create a controllable ZooKeeperServer which accepts various control commands (such as - delay request, drop request, eat request, expire session, shutdown, trigger leader election, and so on), and reacting based on incoming commands. The controllable server and production server share the same underlying machineries (quorum peers, ZooKeeper server, etc) but code paths are separate, thus this feature has no production impact.
    
    This controller system is currently composed of following pieces:
    
    CommandClient: a convenient HTTP client to send control commands to controller service.
    CommandListener: an embed HTTP server listening incoming commands and dispatch to controller service.
    Controller Service: the service that's responsible to create controllable ZK server and the controller.
    ZooKeeperServerController: the controller that changes the behavior of ZK server runtime.
    Controllable Cnx / Factory: controllable connection that accept behavior change requests.
    In future more control commands and controllable components can be added on top of this framework.
    
    This can be used in either unit tests / integration tests as an in process embedded controllable ZooKeeper server, or as an out of process stand alone controllable ZooKeeper process.
    
    Author: Michael Han <ha...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@apache.org>, Norbert Kalmar <nk...@apache.org>
    
    Closes #1467 from hanm/ZOOKEEPER-3948
---
 pom.xml                                            |   5 +
 zookeeper-server/pom.xml                           |   5 +
 .../org/apache/zookeeper/server/NIOServerCnxn.java |   4 +-
 .../zookeeper/server/NIOServerCnxnFactory.java     |   2 +-
 .../zookeeper/server/controller/CommandClient.java | 131 +++++++
 .../server/controller/CommandListener.java         |  96 ++++++
 .../server/controller/ControlCommand.java          | 122 +++++++
 .../server/controller/ControllableConnection.java  |  81 +++++
 .../controller/ControllableConnectionFactory.java  | 125 +++++++
 .../server/controller/ControllerServerConfig.java  | 156 +++++++++
 .../server/controller/ControllerService.java       | 167 +++++++++
 .../controller/ZooKeeperServerController.java      | 168 +++++++++
 .../apache/zookeeper/server/quorum/QuorumPeer.java |  62 ++++
 .../server/controller/ControlCommandTest.java      |  85 +++++
 .../controller/ControllerClientServerTest.java     | 109 ++++++
 .../server/controller/ControllerConfigTest.java    | 150 ++++++++
 .../server/controller/ControllerTestBase.java      |  78 +++++
 .../ZooKeeperServerControllerEndToEndTest.java     | 384 +++++++++++++++++++++
 18 files changed, 1927 insertions(+), 3 deletions(-)

diff --git a/pom.xml b/pom.xml
index 066b886..031e67f 100755
--- a/pom.xml
+++ b/pom.xml
@@ -602,6 +602,11 @@
         <version>${jetty.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.eclipse.jetty</groupId>
+        <artifactId>jetty-client</artifactId>
+        <version>${jetty.version}</version>
+      </dependency>
+      <dependency>
         <groupId>io.dropwizard.metrics</groupId>
         <artifactId>metrics-core</artifactId>
         <version>${dropwizard.version}</version>
diff --git a/zookeeper-server/pom.xml b/zookeeper-server/pom.xml
index 296083f..68155fd 100755
--- a/zookeeper-server/pom.xml
+++ b/zookeeper-server/pom.xml
@@ -89,6 +89,11 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-client</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
       <scope>provided</scope>
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
index f967fff..fd29aef 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
@@ -69,7 +69,7 @@ public class NIOServerCnxn extends ServerCnxn {
 
     private final ByteBuffer lenBuffer = ByteBuffer.allocate(4);
 
-    private ByteBuffer incomingBuffer = lenBuffer;
+    protected ByteBuffer incomingBuffer = lenBuffer;
 
     private final Queue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();
 
@@ -381,7 +381,7 @@ public class NIOServerCnxn extends ServerCnxn {
         }
     }
 
-    private void readRequest() throws IOException {
+    protected void readRequest() throws IOException {
         zkServer.processPacket(this, incomingBuffer);
     }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java
index 17fa362..57495c1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java
@@ -315,7 +315,7 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory {
      * If there is no worker thread pool, the SelectorThread performs the I/O
      * directly.
      */
-    class SelectorThread extends AbstractSelectThread {
+    public class SelectorThread extends AbstractSelectThread {
 
         private final int id;
         private final Queue<SocketChannel> acceptedQueue;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/CommandClient.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/CommandClient.java
new file mode 100644
index 0000000..632ece8
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/CommandClient.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.controller;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A convenient helper to send controller command to ControllerService.
+ */
+public class CommandClient {
+    private final int requestTimeoutInMs;
+    private static final int DEFAULT_TIMEOUT = 10000;
+    private static final Logger LOG = LoggerFactory.getLogger(CommandClient.class);
+    private final int hostPort;
+    private final String hostName;
+    private HttpClient client;
+    private boolean started = false;
+
+    /**
+     * Instantiate a client configured to send requests to localhost.
+     * @param localHostPort Port that the localhost CommandListener is listening on.
+     * @param requestTimeoutInMs Timeout in ms for synchronous requests to timeout.
+     */
+    public CommandClient(int localHostPort, int requestTimeoutInMs) {
+        this.client = new HttpClient();
+        this.requestTimeoutInMs = requestTimeoutInMs;
+        this.hostName = "localhost";
+        this.hostPort = localHostPort;
+    }
+
+    /**
+     * Instantiate a client configured to send requests to the specified host address.
+     * @param  hostAddress The host address of the listening server.
+     * @param  requestTimeoutInMs Timeout in ms for synchronous requests to timeout.
+     */
+    public CommandClient(InetSocketAddress hostAddress, int requestTimeoutInMs) {
+        this.client = new HttpClient();
+        this.requestTimeoutInMs = requestTimeoutInMs;
+        this.hostName = hostAddress.getHostName();
+        this.hostPort = hostAddress.getPort();
+    }
+
+    public CommandClient(int localhostPort) {
+        this(localhostPort, DEFAULT_TIMEOUT);
+    }
+
+    public synchronized void close() {
+        try {
+            if (client != null) {
+                client.stop();
+                client = null;
+            }
+        } catch (Exception ex) {
+            LOG.warn("Exception during shutdown", ex);
+        }
+    }
+
+    /**
+     * Send a command with no parameters to the server and wait for a response.
+     * Returns true if we received a good (200) response and false otherwise.
+     */
+    public boolean trySendCommand(ControlCommand.Action action) {
+        return trySendCommand(action, null);
+    }
+
+    /**
+     * Send a command with an optional command parameter to the server and wait for a response.
+     * @param action The command Action to send.
+     * @param commandParameter The command parameter, in the form of command/action/parameter.
+     * @return true if we received a good (200) response and false otherwise.
+     */
+    public boolean trySendCommand(ControlCommand.Action action, String commandParameter)  {
+        try {
+            if (!started) {
+                client.start();
+                started = true;
+            }
+            ContentResponse response = sendCommand(action, commandParameter);
+            LOG.info("Received {} response from the server", response);
+            return (response.getStatus() == 200);
+        } catch (InterruptedException | IOException ex) {
+            LOG.warn("Failed to get response from server", ex);
+        } catch (Exception ex) {
+            LOG.error("Unknown exception when sending command", ex);
+        }
+
+        return false;
+    }
+
+    /**
+     * Send a command and optional command parameter to the server and block until receiving
+     * a response.
+     *
+     * @param action The command Action to send.
+     * @param commandParameter The command parameter, in the form of command/action/parameter.
+     * @return The full response body from the CommandListener server.
+     */
+    public ContentResponse sendCommand(ControlCommand.Action action,
+                                       String commandParameter) throws Exception {
+        String command = String.format("%s%s:%s/%s", "http://",
+            this.hostName, this.hostPort, ControlCommand.createCommandUri(action, commandParameter));
+        ContentResponse response = this.client.newRequest(command).timeout(this.requestTimeoutInMs,
+            TimeUnit.MILLISECONDS).send();
+        LOG.info("Sent command {}", command);
+        LOG.info("Response body {}", new String(response.getContent(), StandardCharsets.UTF_8));
+        return response;
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/CommandListener.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/CommandListener.java
new file mode 100644
index 0000000..3f5bb62
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/CommandListener.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.controller;
+
+import java.io.IOException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.zookeeper.server.ExitCode;
+import org.apache.zookeeper.util.ServiceUtils;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An HTTP server listening to incoming controller commands sent from CommandClient (or any of your favorite REST client
+ * ) and dispatching the command to the ZooKeeperServerController for execution.
+ */
+public class CommandListener {
+    private static final Logger LOG = LoggerFactory.getLogger(CommandListener.class);
+
+    private ZooKeeperServerController controller;
+    private Server server;
+
+    public CommandListener(ZooKeeperServerController controller, ControllerServerConfig config) {
+        try {
+            this.controller = controller;
+
+            String host = config.getControllerAddress().getHostName();
+            int port = config.getControllerAddress().getPort();
+
+            server = new Server(port);
+            LOG.info("CommandListener server host: {} with port: {}", host, port);
+            server.setHandler(new CommandHandler());
+            server.start();
+        } catch (Exception ex) {
+            LOG.error("Failed to instantiate CommandListener.", ex);
+            ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
+        }
+    }
+
+    public void close() {
+        try {
+            if (server != null) {
+                server.stop();
+                server = null;
+            }
+        } catch (Exception ex) {
+            LOG.warn("Exception during shutdown CommandListener server", ex);
+        }
+    }
+
+    private class CommandHandler extends AbstractHandler {
+        @Override
+        public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
+                throws IOException {
+            // Extract command string from request path. Remove leading '/'.
+            String commandStr = request.getPathInfo().substring(1);
+            int responseCode;
+            response.setContentType("text/html;charset=utf-8");
+
+            try {
+                ControlCommand command = ControlCommand.parseUri(commandStr);
+                controller.processCommand(command);
+                baseRequest.setHandled(true);
+                responseCode = HttpServletResponse.SC_OK;
+            } catch (IllegalArgumentException ex) {
+                LOG.error("Bad argument or command", ex);
+                responseCode = HttpServletResponse.SC_BAD_REQUEST;
+            } catch (Exception ex) {
+                LOG.error("Failed processing the request", ex);
+                throw ex;
+            }
+            response.setStatus(responseCode);
+            response.getWriter().println(commandStr);
+            LOG.info("CommandListener processed command {} with response code {}", commandStr, responseCode);
+        }
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ControlCommand.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ControlCommand.java
new file mode 100644
index 0000000..8454daa
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ControlCommand.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.controller;
+
+/**
+ * Set of commands that this controller can execute. Commands are comprised
+ * of an action and an optional parameter specific to that action.
+ */
+public class ControlCommand {
+    /**
+     * Actions available to the controller
+      */
+    public enum Action {
+        // Simple "are you there" ping to confirm the controller is up and running.
+        PING,
+        // Shutdown everything, including CommandListener, ControllerService, Controller and the ZooKeeperServer.
+        SHUTDOWN,
+        // Close a connection triggering a client disconnect (and then reconnect attempt).
+        // No parameter indicates close all connections. Optional parameter indicates a specific session id (as long).
+        CLOSECONNECTION,
+        // More actions go here in the future (force drop sessions, etc).
+        EXPIRESESSION,
+        // Reject all future connections. No parameter required.
+        REJECTCONNECTIONS,
+        // Add latency to server replies.
+        // Optional parameter indicates time in milliseconds to delay
+        // (default = 1 second).
+        ADDDELAY,
+        // Fail requests.
+        // Optional parameter indicates how many requests to fail.
+        // (default = all requests until RESET).
+        FAILREQUESTS,
+        // Process requests but do not send a response.
+        // Optional parameter indicates how many requests to fail.
+        // (default = all requests until RESET).
+        NORESPONSE,
+        // No parameter indicates fail all requests.
+        // Optional parameter indicates undo all the chaotic action commands
+        // (reject connections, add delay, fail requests, eat requests and so on...).
+        RESET,
+        // Force the quorum to elect a new leader.
+        ELECTNEWLEADER,
+        // More actions go here in the future...
+    }
+
+    public static final String ENDPOINT = "command";
+    public static final String ENDPOINT_PREFIX = ENDPOINT + "/";
+
+    private Action action;
+    public Action getAction() {
+        return action;
+    }
+
+    private String parameter;
+    protected String getParameter() {
+        return parameter;
+    }
+
+    public ControlCommand(Action action) {
+        this(action, null);
+    }
+
+    public ControlCommand(Action action, String param) {
+        this.action = action;
+        this.parameter = param;
+    }
+
+    /**
+     * Create a REST command uri.
+     * @param action The 'verb' of the command.
+     * @param parameter The optional parameter.
+     * @return A string to send to the server as the end of the Uri.
+     */
+    public static String createCommandUri(Action action, String parameter) {
+        return ENDPOINT_PREFIX + action.toString() + (parameter != null && !parameter.isEmpty() ? "/" + parameter : "");
+    }
+
+    /**
+     * Parse a Uri into the required Command action and parameter.
+     * @param commandUri the properly formatted Uri.
+     */
+    public static ControlCommand parseUri(String commandUri) {
+        if (commandUri == null) {
+            throw new IllegalArgumentException("commandUri can't be null.");
+        }
+
+        if (!commandUri.startsWith(ENDPOINT_PREFIX)) {
+            throw new IllegalArgumentException("Missing required prefix: " + ENDPOINT_PREFIX);
+        }
+
+        String uri = commandUri.substring(ENDPOINT_PREFIX.length());
+        String name;
+        String param;
+
+        int separatorIndex = uri.indexOf('/');
+        if (separatorIndex < 0) {
+            name = uri;
+            param = null;
+        } else {
+            name = uri.substring(0, separatorIndex);
+            param = uri.substring(separatorIndex + 1);
+        }
+
+        return new ControlCommand(Action.valueOf(name.toUpperCase()), param);
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ControllableConnection.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ControllableConnection.java
new file mode 100644
index 0000000..2238f35
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ControllableConnection.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.controller;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.RequestHeader;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extension of NIOServerCnxn which can inject changes per controller commands.
+ * Similar extensions can implement on top of NettyServerCnxn as well.
+ */
+@SuppressFBWarnings(value = "BC_UNCONFIRMED_CAST", justification = "factory is ControllableConnectionFactory type.")
+public class ControllableConnection extends NIOServerCnxn {
+    private static final Logger LOG = LoggerFactory.getLogger(ControllableConnection.class);
+    private final ControllableConnectionFactory controller;
+
+    public ControllableConnection(ZooKeeperServer zk, SocketChannel sock, SelectionKey sk, NIOServerCnxnFactory factory,
+                                  NIOServerCnxnFactory.SelectorThread selectorThread) throws IOException {
+        super(zk, sock, sk, factory, selectorThread);
+        controller = (ControllableConnectionFactory) factory;
+    }
+
+    @Override
+    public int sendResponse(ReplyHeader header, Record record, String tag) {
+        if (controller.shouldSendResponse()) {
+            try {
+                return super.sendResponse(header, record, tag);
+            } catch (IOException ex) {
+                LOG.warn("IO Exception occurred", ex);
+            }
+        } else {
+            LOG.warn("Controller is configured to NOT sending response back to client.");
+        }
+        return -1;
+    }
+
+    @Override
+    protected void readRequest() throws IOException {
+        if (controller.shouldFailNextRequest()) {
+            ByteBuffer buffer = incomingBuffer.slice();
+            BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(buffer));
+            RequestHeader h = new RequestHeader();
+            h.deserialize(bia, "header");
+            super.sendResponse(new ReplyHeader(h.getXid(), 0, KeeperException.Code.APIERROR.intValue()),
+                    null, null);
+        } else {
+            controller.delayRequestIfNeeded();
+            super.readRequest();
+        }
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ControllableConnectionFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ControllableConnectionFactory.java
new file mode 100644
index 0000000..15d9d61
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ControllableConnectionFactory.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.controller;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extension of NIOServerCnxnFactory which can inject changes per controller commands.
+ * Similar extensions can implement on top of NettyServerCnxnFactory as well.
+ */
+@SuppressFBWarnings(value = "SWL_SLEEP_WITH_LOCK_HELD", justification = "no dead lock")
+public class ControllableConnectionFactory extends NIOServerCnxnFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(ControllableConnectionFactory.class);
+    private long responseDelayInMs = 0;
+    private long remainingRequestsToFail = 0;
+    private long remainingResponsesToHold = 0;
+
+    public ControllableConnectionFactory() {
+    }
+
+    @Override
+    protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk, SelectorThread selectorThread)
+            throws IOException {
+        return new ControllableConnection(zkServer, sock, sk, this, selectorThread);
+    }
+
+    /**
+     * Called by the connection to delay processing requests from the client.
+    */
+    public synchronized void delayRequestIfNeeded() {
+        try {
+            if (responseDelayInMs > 0) {
+                Thread.sleep(responseDelayInMs);
+            }
+        } catch (InterruptedException ex) {
+            LOG.warn("Interrupted while delaying requests", ex);
+        }
+    }
+
+    /**
+     * Check if we should fail the next incoming request.
+     * If so, decrement the remaining requests to fail.
+    */
+    public synchronized boolean shouldFailNextRequest() {
+        if (remainingRequestsToFail == 0) {
+            return false;
+        }
+
+        // Value < 0 indicates fail all requests.
+        if (remainingRequestsToFail > 0) {
+            remainingRequestsToFail--;
+        }
+
+        return true;
+    }
+
+    /**
+     * Check if we should send a response to the latest processed request (true),
+     * or eat the response to mess with the client (false).
+     * If so, decrement the remaining requests to eat.
+    */
+    public synchronized boolean shouldSendResponse() {
+        if (remainingResponsesToHold == 0) {
+            return true;
+        }
+
+        // Value < 0 indicates hold all the responses.
+        if (remainingResponsesToHold > 0) {
+            remainingResponsesToHold--;
+        }
+        return false;
+    }
+
+    public synchronized void delayResponses(long delayInMs) {
+        if (delayInMs < 0) {
+            throw new IllegalArgumentException("delay must be non-negative");
+        }
+        responseDelayInMs = delayInMs;
+    }
+
+    public synchronized void resetBadBehavior() {
+        responseDelayInMs = 0;
+        remainingRequestsToFail = 0;
+        remainingResponsesToHold = 0;
+    }
+
+    public synchronized void failAllFutureRequests() {
+        this.remainingRequestsToFail = -1;
+    }
+
+    public synchronized void failFutureRequests(long requestsToFail) {
+        this.remainingRequestsToFail = requestsToFail;
+    }
+
+    public synchronized void holdAllFutureResponses() {
+        this.remainingResponsesToHold = -1;
+    }
+
+    public synchronized void holdFutureResponses(long requestsToHold) {
+        this.remainingResponsesToHold = requestsToHold;
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ControllerServerConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ControllerServerConfig.java
new file mode 100644
index 0000000..bbdc14f
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ControllerServerConfig.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.controller;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+
+/**
+ * Config for the ControllerService. Responsible for providing the minimum set of configurations
+ * that's required to spin up a single member ensemble.
+ */
+public class ControllerServerConfig extends QuorumPeerConfig {
+    public static final String CONTROLLER_PORT_KEY = "zookeeper.controllerPort";
+    public static final String CLIENT_PORT_KEY = "zookeeper.clientPortAddress";
+    private InetSocketAddress controllerAddress;
+    public InetSocketAddress getControllerAddress() {
+        return controllerAddress;
+    }
+
+    /**
+     * Instantiate a new config via a zk config file.
+     * @param configFile path to the configuration file
+     * @throws ConfigException
+     */
+    public ControllerServerConfig(String configFile) throws ConfigException {
+        parse(configFile);
+    }
+
+    /**
+     * Instantiate a config object with required parameters.
+     * @param hostAddress The address to bind to (likely loopback or localhost)
+     * @param controllerPort Port the controller will listen for incoming control command sent from CommandClient.
+     * @param zkServerPort Port the ZooKeeper server will listen on.
+     * @param dataDirPath Path to the data directory that ZooKeeperServer uses.
+     */
+    public ControllerServerConfig(InetAddress hostAddress, int controllerPort, int zkServerPort, String dataDirPath) {
+        controllerAddress = new InetSocketAddress(hostAddress, controllerPort);
+        clientPortAddress = new InetSocketAddress(hostAddress, zkServerPort);
+        dataDir = new File(dataDirPath);
+        dataLogDir = dataDir;
+        serverId = 0;
+    }
+
+    /**
+     * Instantiate a config object with required parameters.
+     * @param controllerPort Port the controller will listen for incoming control command sent from CommandClient.
+     * @param zkServerPort Port the ZooKeeper server will listen on.
+     * @param dataDirPath Path to the data directory that ZooKeeperServer uses.
+     */
+    public ControllerServerConfig(int controllerPort, int zkServerPort, String dataDirPath) {
+        this(InetAddress.getLoopbackAddress(), controllerPort, zkServerPort, dataDirPath);
+    }
+
+    public ServerConfig getZooKeeperServerConfig() {
+        ServerConfig serverConfig = new ServerConfig();
+        serverConfig.readFrom(this);
+        return serverConfig;
+    }
+
+    @Override
+    public void parse(String configFile) throws ConfigException {
+        super.parse(configFile);
+        for (String key : System.getProperties().stringPropertyNames()) {
+            if (CONTROLLER_PORT_KEY.equalsIgnoreCase(key)) {
+                setControllerAddress(System.getProperty(key));
+            }
+            if (CLIENT_PORT_KEY.equals(key)) {
+                setClientAddress(System.getProperty(key));
+            }
+        }
+
+        if (controllerAddress == null) {
+            throw new ConfigException("Missing required parameter " + CONTROLLER_PORT_KEY);
+        }
+
+        if (clientPortAddress == null) {
+            throw new ConfigException("Missing required parameter " + CLIENT_PORT_KEY);
+        }
+    }
+
+    private void setControllerAddress(String port) {
+        try {
+            controllerAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), Integer.parseInt(port));
+        } catch (NumberFormatException ex) {
+            throw new IllegalArgumentException("Invalid port", ex);
+        }
+    }
+
+    private void setClientAddress(String port) {
+        try {
+            clientPortAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), Integer.parseInt(port));
+        } catch (NumberFormatException ex) {
+            throw new IllegalArgumentException("Invalid port", ex);
+        }
+    }
+
+    /**
+     * Ensure config is acceptable by filling in default values for any missing quorum configuration
+     * (specifically in the case of a single machine ensemble)
+     *
+     * @throws IOException
+     */
+    public void ensureComplete() throws IOException {
+        if (this.quorumVerifier != null && this.quorumVerifier.getAllMembers().size() > 0) {
+            return;
+        }
+
+        // QuorumPeer requires a QuorumVerifier.
+        // We will use majority strategy with only this host in the quorum.
+        // We need to provide 2 more ports: one for elections and one for quorum communications.
+        // We will also mark this host as the leader.
+        ServerSocket randomSocket1 = new ServerSocket(0);
+        int quorumPort = randomSocket1.getLocalPort();
+
+        ServerSocket randomSocket2 = new ServerSocket(0);
+        int electionPort = randomSocket2.getLocalPort();
+
+        randomSocket1.close();
+        randomSocket2.close();
+
+        QuorumPeer.QuorumServer selfAsPeer = new QuorumPeer.QuorumServer(
+                0,
+                new InetSocketAddress(quorumPort),
+                new InetSocketAddress(electionPort),
+                this.clientPortAddress
+        );
+        Map<Long, QuorumPeer.QuorumServer> peers = new HashMap<>();
+        peers.put(selfAsPeer.id, selfAsPeer);
+        this.quorumVerifier = new QuorumMaj(peers);
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ControllerService.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ControllerService.java
new file mode 100644
index 0000000..8d71984
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ControllerService.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.controller;
+
+import java.io.IOException;
+import org.apache.zookeeper.server.ExitCode;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.util.ServiceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main class which starts a ZooKeeperServer, a ZooKeeperServerController and the ControllerService.
+ * Tests should either invoke this class as the main target of a new JVM process OR explicitly
+ * start and stop a singleton of this class in their test process.
+ */
+public class ControllerService {
+    private static final Logger LOG = LoggerFactory.getLogger(ControllerService.class);
+
+    private ZooKeeperServerController controller;
+    private CommandListener listener;
+
+    protected QuorumPeerConfig config;
+    private ServerCnxnFactory serverCnxnFactory = null;
+    protected QuorumPeer quorumPeer = null;
+
+    /**
+     * Starts the ControllerService as a stand alone app. Useful for out of process testing
+     * - such as during integration testing.
+     */
+    public static void main(String[] args) {
+        ControllerServerConfig config;
+        try {
+            if (args.length != 1) {
+                throw new IllegalArgumentException("Require config file as cmd line argument");
+            } else {
+                config = new ControllerServerConfig(args[0]);
+            }
+            new ControllerService().start(config);
+        } catch (Exception ex) {
+            System.err.println(ex.getMessage());
+            System.err.println("Usage: TestControllerMain controller-port configfile");
+            ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
+        }
+    }
+
+    /**
+     * Starts a new thread to run the controller (useful when this service is hosted in process
+     * - such as during unit testing).
+     */
+    public Thread start(ControllerServerConfig controllerConfig) {
+        this.config = controllerConfig;
+        final ControllerService svc = this;
+
+        Thread runner = new Thread(() -> {
+            try {
+                svc.run();
+            } catch (Exception e) {
+            }
+        });
+        runner.setDaemon(true);
+        runner.start();
+        return runner;
+    }
+
+    public synchronized void shutdown() {
+        if (listener != null) {
+            listener.close();
+            listener = null;
+        }
+
+        if (controller != null) {
+            controller.shutdown();
+            controller = null;
+        }
+    }
+
+    /**
+     * Initializes an instance of the ZooKeeperServer, the ZooKeeperServerController, and a new
+     * Http listener (CommandListener) for the controller.
+     */
+    protected void initService() throws IOException {
+        ControllerServerConfig controllerConfig = (ControllerServerConfig) config;
+        controllerConfig.ensureComplete();
+        this.controller = new ZooKeeperServerController(controllerConfig);
+        this.listener = new CommandListener(controller, controllerConfig);
+        this.serverCnxnFactory = controller.getCnxnFactory();
+    }
+
+    protected void runServices() {
+        this.controller.run();
+    }
+
+    protected void cleanup() {
+        if (listener != null) {
+            listener.close();
+            listener = null;
+        }
+    }
+
+    /**
+     * Runs the main loop for this application but does not exit the process.
+     */
+    public void initializeAndRun(String[] args) throws QuorumPeerConfig.ConfigException {
+        initConfig(args);
+        run();
+    }
+
+    /**
+     * Derived classes may override to do custom initialization of command line args.
+     */
+    protected void initConfig(String[] args) throws QuorumPeerConfig.ConfigException {
+        if (args.length == 1) {
+            config.parse(args[0]);
+        }
+    }
+
+    /**
+     * Run the app given a QuorumPeerConfig.
+     *
+     * @param config The quorum peer config.
+     */
+    public void runFromConfig(QuorumPeerConfig config) {
+        LOG.info("Starting quorum peer from peer config");
+        this.config = config;
+        run();
+    }
+
+    protected void run() {
+        try {
+            initService();
+        } catch (Exception ex) {
+            LOG.error("Failed to start ControllerService.", ex);
+            ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
+
+        }
+        runServices();
+        cleanup();
+    }
+
+    /**
+     * Is the service up with all necessary initialization and port opening complete?
+     *
+     * @return true if the controller service is ready to use; false otherwise.
+     */
+    public boolean isReady() {
+        return controller != null && controller.isReady();
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ZooKeeperServerController.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ZooKeeperServerController.java
new file mode 100644
index 0000000..615db87
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ZooKeeperServerController.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.controller;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
+import org.apache.zookeeper.server.ExitCode;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.util.ServiceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class which accepts commands to modify ZooKeeperServer state or Connection state at runtime for the purpose of
+ * single machine integration testing. Not meant to be used in production. It is recommended to use this in conjunction
+ * with the CommandListener HttpServer and CommandClient.
+ *
+ */
+@SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "quorum peer is internally synchronized.")
+public class ZooKeeperServerController {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperServerController.class);
+    private static final long DEFAULT_DELAY_MS = 1000;
+
+    private QuorumPeer quorumPeer;
+    private ControllableConnectionFactory cnxnFactory;
+
+    public ZooKeeperServerController(QuorumPeerConfig config) throws IOException {
+        if (config == null) {
+            throw new IllegalArgumentException("ZooKeeperServerController requires a valid config!");
+        }
+
+        cnxnFactory = new ControllableConnectionFactory();
+        cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog());
+        quorumPeer = QuorumPeer.createFromConfig(config);
+        quorumPeer.setCnxnFactory(cnxnFactory);
+    }
+
+    public void run() {
+        try {
+            quorumPeer.start();
+            quorumPeer.join();
+        } catch (Exception ex) {
+            LOG.error("Fatal error starting quorum peer", ex);
+            ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
+        }
+    }
+
+    protected ServerCnxnFactory getCnxnFactory() {
+        return cnxnFactory;
+    }
+
+    public synchronized void shutdown() {
+        if (this.cnxnFactory != null) {
+            this.cnxnFactory.shutdown();
+            this.cnxnFactory = null;
+        }
+
+        if (this.quorumPeer != null && this.quorumPeer.isRunning()) {
+            this.quorumPeer.shutdown();
+            this.quorumPeer = null;
+        }
+    }
+
+    public synchronized boolean isReady() {
+        return this.cnxnFactory != null
+                && this.quorumPeer != null
+                && this.quorumPeer.isRunning()
+                && this.quorumPeer.getActiveServer() != null;
+    }
+
+    /**
+     * Process the command. An exception indicates errors. No exception indicates success.
+     */
+    public void processCommand(ControlCommand command) {
+        if (command == null) {
+            throw new IllegalArgumentException("Invalid command parameter!");
+        }
+
+        LOG.info("processing command {}{}", command.getAction(),
+                command.getParameter() == null ? "" : "[" + command.getParameter() + "]");
+
+        // Don't process command if we are shutting down or still initializing.
+        if (!isReady()) {
+            throw new IllegalStateException("Service is not ready. It has already been shutdown or is still initializing.");
+        }
+
+        switch (command.getAction()) {
+            case PING:
+                // NO-OP
+                break;
+            case SHUTDOWN:
+                shutdown();
+                break;
+            case CLOSECONNECTION:
+                if (command.getParameter() == null) {
+                    cnxnFactory.closeAll(ServerCnxn.DisconnectReason.CLOSE_ALL_CONNECTIONS_FORCED);
+                } else {
+                    // A single parameter should be a session id as long.
+                    // Parse failure exceptions will be sent to the caller.
+                    cnxnFactory.closeSession(Long.decode(command.getParameter()),
+                            ServerCnxn.DisconnectReason.CONNECTION_CLOSE_FORCED);
+                }
+                break;
+            case EXPIRESESSION:
+                // TODO: (hanm) implement once dependent feature is ready.
+                if (command.getParameter() == null) {
+                    // expireAllSessions();
+                } else {
+                    // A single parameter should be a session id as long.
+                    // Parse failure exceptions will be sent to the caller
+                    // expireSession(Long.decode(command.getParameter()));
+                }
+                break;
+            case REJECTCONNECTIONS:
+                // TODO: (hanm) implement once dependent feature is ready.
+                //cnxnFactory.rejectNewConnections();
+                break;
+            case ADDDELAY:
+                cnxnFactory.delayResponses(command.getParameter() == null
+                        ? DEFAULT_DELAY_MS : Long.decode(command.getParameter()));
+                break;
+            case NORESPONSE:
+                if (command.getParameter() == null) {
+                    cnxnFactory.holdAllFutureResponses();
+                } else {
+                    cnxnFactory.holdFutureResponses(Long.decode(command.getParameter()));
+                }
+                break;
+            case FAILREQUESTS:
+                if (command.getParameter() == null) {
+                    cnxnFactory.failAllFutureRequests();
+                } else {
+                    cnxnFactory.failFutureRequests(Long.decode(command.getParameter()));
+                }
+                break;
+            case RESET:
+                cnxnFactory.resetBadBehavior();
+                break;
+            case ELECTNEWLEADER:
+                quorumPeer.startLeaderElection();
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown command: " + command);
+        }
+    }
+
+}
+
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index e56c03d..ad5c8c3 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -2558,4 +2558,66 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         }
     }
 
+    /**
+     * Create a new QuorumPeer and apply all the values per the already-parsed config.
+     *
+     * @param config The appertained quorum peer config.
+     * @return A QuorumPeer instantiated with specified peer config. Note this peer
+     *         is not fully initialized; caller should finish initialization through
+     *         additional configurations (connection factory settings, etc).
+     *
+     * @throws IOException
+     */
+    public static QuorumPeer createFromConfig(QuorumPeerConfig config) throws IOException {
+        QuorumPeer quorumPeer = new QuorumPeer();
+        quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
+        quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
+        quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
+        quorumPeer.setElectionType(config.getElectionAlg());
+        quorumPeer.setMyid(config.getServerId());
+        quorumPeer.setTickTime(config.getTickTime());
+        quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
+        quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
+        quorumPeer.setInitLimit(config.getInitLimit());
+        quorumPeer.setSyncLimit(config.getSyncLimit());
+        quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
+        quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
+        quorumPeer.setConfigFileName(config.getConfigFilename());
+        quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
+        quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
+        quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
+        if (config.getLastSeenQuorumVerifier() != null) {
+            quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
+        }
+        quorumPeer.initConfigInZKDatabase();
+        quorumPeer.setSslQuorum(config.isSslQuorum());
+        quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
+        quorumPeer.setLearnerType(config.getPeerType());
+        quorumPeer.setSyncEnabled(config.getSyncEnabled());
+        quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
+        if (config.sslQuorumReloadCertFiles) {
+            quorumPeer.getX509Util().enableCertFileReloading();
+        }
+        quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
+        quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
+        quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());
+
+        // sets quorum sasl authentication configurations
+        quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
+        if (quorumPeer.isQuorumSaslAuthEnabled()) {
+            quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
+            quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
+            quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
+            quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
+            quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
+        }
+        quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
+
+        if (config.jvmPauseMonitorToRun) {
+            quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
+        }
+
+        return quorumPeer;
+    }
+
 }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ControlCommandTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ControlCommandTest.java
new file mode 100644
index 0000000..050bcef
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ControlCommandTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.controller;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ControlCommandTest {
+
+  @Test
+  public void verifyGeneratedUri() {
+    Assert.assertEquals("command/ping",
+        ControlCommand.createCommandUri(ControlCommand.Action.PING, null).toLowerCase());
+    Assert.assertEquals("command/ping",
+        ControlCommand.createCommandUri(ControlCommand.Action.PING, "").toLowerCase());
+    Assert.assertEquals("command/closeconnection/1234",
+        ControlCommand.createCommandUri(ControlCommand.Action.CLOSECONNECTION,
+            "1234").toLowerCase());
+  }
+
+  @Test
+  public void verifyParseChecksForNull() {
+    try {
+      ControlCommand.parseUri(null);
+      Assert.fail("Should have thrown for null.");
+    } catch (IllegalArgumentException ex) {
+    }
+  }
+
+  @Test
+  public void verifyParseChecksForPrefix() {
+    try {
+      ControlCommand.parseUri("ping");
+      Assert.fail("Should have thrown for missing command/ prefix.");
+    } catch (IllegalArgumentException ex) {
+    }
+  }
+
+  @Test
+  public void verifyParseCorrectlyFindsCommandWithNoParameter() {
+    Assert.assertEquals(ControlCommand.Action.PING,
+        ControlCommand.parseUri("command/ping").getAction());
+  }
+
+  @Test
+  public void verifyParseCorrectlyFindsCommandWithParameter() {
+    ControlCommand command = ControlCommand.parseUri("command/closeconnection/1234");
+    Assert.assertEquals(ControlCommand.Action.CLOSECONNECTION, command.getAction());
+    Assert.assertEquals("1234", command.getParameter());
+  }
+
+  @Test
+  public void verifyParseIllegalCommandWithNoParameter() {
+    try {
+      ControlCommand.parseUri("pings");
+      Assert.fail("Should have thrown for non existing command.");
+    } catch (IllegalArgumentException ex) {
+    }
+  }
+
+  @Test
+  public void verifyParseIllegalCommandWithParameter() {
+    try {
+      ControlCommand.parseUri("command/close_connection/1234");
+      Assert.fail("Should have thrown for non existing command.");
+    } catch (IllegalArgumentException ex) {
+    }
+  }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ControllerClientServerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ControllerClientServerTest.java
new file mode 100644
index 0000000..3b9b92a
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ControllerClientServerTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.controller;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class ControllerClientServerTest extends ControllerTestBase {
+    @Test
+    public void verifyPingCommand() {
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.PING));
+    }
+
+    @Test
+    public void verifyCloseConnectionCommand() {
+        // Valid long session ids should be accepted.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.CLOSECONNECTION, "0x1234"));
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.CLOSECONNECTION, "1234"));
+
+        // Invalid session id format should fail.
+        Assert.assertFalse(commandClient.trySendCommand(ControlCommand.Action.CLOSECONNECTION, "hanm"));
+
+        // No parameter should be accepted.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.CLOSECONNECTION));
+    }
+
+    // TODO (hanm): this depends on the expiration session feature which
+    // is not part of this patch. This test will be enabled once that
+    // feature is upstreamed.
+    @Ignore
+    public void verifyExpireSessionCommand() {
+        // Valid long session ids should be accepted.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.EXPIRESESSION, "0x1234"));
+
+        // Invalid session id format should fail.
+        Assert.assertFalse(commandClient.trySendCommand(ControlCommand.Action.EXPIRESESSION, "hanm"));
+
+        // No parameter should be accepted.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.EXPIRESESSION));
+    }
+
+    @Test
+    public void verifyAddResetDelayCommands() {
+        // Valid longs should be parsed.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.ADDDELAY, "0x1234"));
+
+        // Invalid longs should fail.
+        Assert.assertFalse(commandClient.trySendCommand(ControlCommand.Action.ADDDELAY, "hanm"));
+
+        // No parameter should be accepted.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.ADDDELAY));
+
+        // Reset delay should work.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.RESET));
+    }
+
+    @Test
+    public void verifyBadResponseCommands() {
+        // Valid longs should be parsed.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.FAILREQUESTS, "0x1234"));
+
+        // Invalid longs should fail.
+        Assert.assertFalse(commandClient.trySendCommand(ControlCommand.Action.FAILREQUESTS, "hanm"));
+
+        // No parameter should be accepted.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.FAILREQUESTS));
+
+        // Reset should work.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.RESET));
+    }
+
+    @Test
+    public void verifyEatResponseCommands() {
+        // Valid longs should be parsed.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.NORESPONSE, "0x1234"));
+
+        // Invalid longs should fail.
+        Assert.assertFalse(commandClient.trySendCommand(ControlCommand.Action.NORESPONSE, "hanm"));
+
+        // No parameter should be accepted.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.NORESPONSE));
+
+        // Reset should work.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.RESET));
+    }
+
+    @Test
+    public void verifyLeaderElectionCommand() {
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.ELECTNEWLEADER));
+    }
+
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ControllerConfigTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ControllerConfigTest.java
new file mode 100644
index 0000000..2a8ec82
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ControllerConfigTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.controller;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ControllerConfigTest {
+    File configFile;
+
+    private static final int AnyTickTime = 1234;
+    private static final int AnyPort = 1234;
+    private static final String AnyDataDir = "temp";
+
+    public static File createTempFile() throws IOException {
+        return File.createTempFile("temp", "cfg", new File(System.getProperty("user.dir")));
+    }
+
+    public static List<Integer> findNAvailablePorts(int n) throws IOException {
+        List<ServerSocket> openedSockets = new ArrayList<>();
+        List<Integer> ports = new ArrayList<>();
+
+        for (int i = 0; i < n; i++) {
+            ServerSocket randomSocket = new ServerSocket(0);
+            openedSockets.add(randomSocket);
+            ports.add(randomSocket.getLocalPort());
+        }
+
+        for (ServerSocket s : openedSockets) {
+            s.close();
+        }
+
+        return ports;
+    }
+
+    public static void writeRequiredControllerConfig(File file, int controllerPort, int zkServerPort, int adminServerPort) throws IOException {
+        PrintWriter writer = new PrintWriter(file);
+        writer.write("dataDir=anywhere\n");
+        writer.write("controllerPort=" + controllerPort + "\n");
+        writer.write("clientPort=" + zkServerPort + "\n");
+        writer.write("adminPort=" + adminServerPort + "\n");
+        writer.close();
+    }
+
+    @Before
+    public void init() throws IOException {
+        configFile = createTempFile();
+    }
+
+    private void writeFile(int portNumber) throws IOException {
+        FileWriter writer = new FileWriter(configFile);
+        writer.write("dataDir=somewhere\n");
+        writer.write("ignore=me\n");
+        writer.write("tickTime=" + AnyTickTime + "\n");
+        writer.write("controllerPort=" + portNumber + "\n");
+        writer.write("clientPort=" + portNumber + "\n");
+        writer.flush();
+        writer.close();
+    }
+
+    @After
+    public void cleanup() {
+        if (configFile != null) {
+            configFile.delete();
+        }
+    }
+
+    @Test
+    public void parseFileSucceeds() throws Exception {
+        writeFile(AnyPort);
+        ControllerServerConfig config = new ControllerServerConfig(configFile.getAbsolutePath());
+        Assert.assertEquals(AnyPort, config.getControllerAddress().getPort());
+        Assert.assertEquals(AnyPort, config.getClientPortAddress().getPort());
+        Assert.assertEquals(AnyTickTime, config.getTickTime());
+    }
+
+    @Test
+    public void parseFileFailsWithMissingPort() throws Exception {
+        FileWriter writer = new FileWriter(configFile);
+        writer.write("dataDir=somewhere\n");
+        writer.flush();
+        writer.close();
+        try {
+            ControllerServerConfig config = new ControllerServerConfig(configFile.getAbsolutePath());
+            Assert.fail("Should have thrown with missing server config");
+        } catch (QuorumPeerConfig.ConfigException ex) {
+        }
+    }
+
+    @Test public void parseMissingFileThrows() {
+        try {
+            ControllerServerConfig config = new ControllerServerConfig("DontLookHere.missing");
+            Assert.fail("should have thrown");
+        } catch (QuorumPeerConfig.ConfigException ex) {
+        }
+    }
+
+    @Test
+    public void parseInvalidPortThrows()throws QuorumPeerConfig.ConfigException {
+        try {
+            ControllerServerConfig config = new ControllerServerConfig(configFile.getAbsolutePath());
+            Assert.fail("should have thrown");
+        } catch (QuorumPeerConfig.ConfigException ex) {
+        }
+    }
+
+    @Test
+    public void validCtor() {
+        ControllerServerConfig config = new ControllerServerConfig(AnyPort, AnyPort, AnyDataDir);
+        Assert.assertEquals(AnyPort, config.getControllerAddress().getPort());
+        Assert.assertEquals(AnyPort, config.getClientPortAddress().getPort());
+        Assert.assertEquals(AnyDataDir, config.getDataDir().getName());
+    }
+
+    @Test
+    public void invalidCtor() {
+        try {
+            ControllerServerConfig config = new ControllerServerConfig(-10, -10, "no where");
+            Assert.fail("should have thrown");
+        } catch (IllegalArgumentException ex) {
+        }
+
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ControllerTestBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ControllerTestBase.java
new file mode 100644
index 0000000..70067c7
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ControllerTestBase.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.controller;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.After;
+import org.junit.Before;
+
+public class ControllerTestBase extends ZKTestCase {
+
+    protected ControllerService controllerService;
+    protected CommandClient commandClient;
+    private File tempDirectory;
+    protected ControllerServerConfig config;
+
+    @Before
+    public void init() throws Exception {
+        List<Integer> openPorts = ControllerConfigTest.findNAvailablePorts(2);
+        File tmpFile = File.createTempFile("test", ".junit", testBaseDir);
+        tempDirectory = new File(tmpFile + ".dir");
+        assertFalse(tempDirectory.exists());
+        assertTrue(tempDirectory.mkdirs());
+
+        config = new ControllerServerConfig(openPorts.get(0), openPorts.get(1), tempDirectory.getAbsolutePath());
+        controllerService = new ControllerService();
+        controllerService.start(config);
+
+        int retries = 50;
+        // The controller needs to hold an election before it is ready to process requests.
+        // Busy-wait until its ready...
+        while (!controllerService.isReady()) {
+            Thread.sleep(100);
+            retries--;
+            if (retries < 0) {
+                throw new TimeoutException("Service didn't start up and finish elections.");
+            }
+        }
+
+        // Create a client which sends requests to localhost on the configured port.
+        commandClient = new CommandClient(config.getControllerAddress().getPort());
+    }
+
+    @After
+    public void cleanup() throws InterruptedException {
+        if (controllerService != null) {
+            controllerService.shutdown();
+        }
+
+        if (commandClient != null) {
+            commandClient.close();
+        }
+
+        if (tempDirectory != null) {
+            tempDirectory.delete();
+        }
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ZooKeeperServerControllerEndToEndTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ZooKeeperServerControllerEndToEndTest.java
new file mode 100644
index 0000000..2895afd
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ZooKeeperServerControllerEndToEndTest.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.controller;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperServerControllerEndToEndTest extends ControllerTestBase {
+    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperServerControllerEndToEndTest.class);
+    private ZooKeeper zkClient;
+    private static final String AnyPath = "/Any";
+    private static final byte[] AnyData = new byte[] {0x0, 0x1};
+
+    @After
+    @Override
+    public void cleanup() throws InterruptedException {
+        if (zkClient != null) {
+            zkClient.close();
+        }
+        super.cleanup();
+    }
+
+    private void initClient(Watcher watcher) throws IOException {
+        zkClient = new ZooKeeper("localhost:" + config.getClientPortAddress().getPort(), 10000, watcher);
+    }
+
+    @Test
+    public void verifyClientConnects() throws Exception {
+        // Basic validation: we can connect and get events.
+        BlockingStateWatcher watcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
+        this.initClient(watcher);
+        watcher.waitForEvent();
+    }
+
+    @Test
+    public void verifyClientDisconnectsAndReconnects() throws Exception {
+        // Setup: First connect to the server and wait for connected.
+        BlockingStateWatcher watcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
+        initClient(watcher);
+        watcher.waitForEvent();
+
+        // Force a disconnection through the controller and ensure we get the events in order:
+        // 1: Disconnected
+        // 2: SyncConnected
+        watcher.reset(
+                new Watcher.Event.KeeperState[] {
+                        Watcher.Event.KeeperState.Disconnected,
+                        Watcher.Event.KeeperState.SyncConnected
+                });
+        Assert.assertTrue(commandClient
+                .trySendCommand(ControlCommand.Action.CLOSECONNECTION, String.valueOf(zkClient.getSessionId())));
+        watcher.waitForEvent();
+    }
+
+    @Ignore
+    public void verifySessionExpiration() throws Exception {
+        // Setup: First connect to the server and wait for connected.
+        BlockingStateWatcher watcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
+        initClient(watcher);
+        watcher.waitForEvent();
+
+        // Force an expiration.
+        // 1: Disconnected
+        // 2: Expired
+        watcher.reset(
+                new Watcher.Event.KeeperState[] {
+                        Watcher.Event.KeeperState.Disconnected,
+                        Watcher.Event.KeeperState.Expired
+                });
+        Assert.assertTrue(commandClient
+                .trySendCommand(ControlCommand.Action.EXPIRESESSION, String.valueOf(zkClient.getSessionId())));
+        watcher.waitForEvent();
+    }
+
+    @Ignore
+    public void verifyGlobalSessionExpiration() throws Exception {
+        // Step 1: Connect.
+        BlockingStateWatcher stateWatcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
+        initClient(stateWatcher);
+        stateWatcher.waitForEvent();
+
+        // Step 2: Add an ephemeral node (upgrades session to global).
+        BlockingPathWatcher pathWatcher = new BlockingPathWatcher(AnyPath, Watcher.Event.EventType.NodeCreated);
+
+        zkClient.exists(AnyPath, pathWatcher);
+        Assert.assertEquals(AnyPath,
+                zkClient.create(AnyPath, AnyData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL));
+        pathWatcher.waitForEvent();
+
+        // Force expire all sessions.
+        stateWatcher.reset(Watcher.Event.KeeperState.Expired);
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.EXPIRESESSION));
+        stateWatcher.waitForEvent();
+    }
+
+    @Ignore
+    public void verifyRejectAcceptSessions() throws Exception {
+        // Tell the server to reject new requests.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.REJECTCONNECTIONS));
+        EventWaiter watcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
+        initClient(watcher);
+        try {
+            watcher.waitForEvent(100);
+            Assert.fail("should have failed connecting");
+        } catch (TimeoutException ex) {
+        }
+        // Now accept requests. We should get a connection quickly.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.RESET));
+        watcher.waitForEvent();
+    }
+
+    private long timedTransaction() throws Exception {
+        long startTime = System.currentTimeMillis();
+        zkClient.exists(AnyPath, false);
+        return System.currentTimeMillis() - startTime;
+    }
+
+    @Test
+    public void verifyAddDelay() throws Exception {
+        EventWaiter watcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
+
+        initClient(watcher);
+        watcher.waitForEvent();
+        timedTransaction();
+
+        // Add 200 ms of delay to each response.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.ADDDELAY, String.valueOf(200)));
+        long delayedDuration = timedTransaction();
+
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.RESET));
+        long resetDuration = timedTransaction();
+
+        Assert.assertTrue(delayedDuration - resetDuration > 200);
+    }
+
+    @Test
+    public void verifyFailAllRequests() throws Exception {
+        // Step 1: Connect.
+        BlockingStateWatcher stateWatcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
+        initClient(stateWatcher);
+        stateWatcher.waitForEvent();
+
+        // Step 2: Tell the server to fail requests.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.FAILREQUESTS));
+
+        try {
+            zkClient.exists(AnyPath, null);
+            Assert.fail("should have failed");
+        } catch (KeeperException ex) {
+        }
+
+        // 2nd should fail: we haven't reset.
+        try {
+            zkClient.exists(AnyPath, null);
+            Assert.fail("should still fail");
+        } catch (KeeperException ex) {
+        }
+
+        // Reset; future requests should succeed.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.RESET));
+
+        zkClient.exists(AnyPath, null);
+    }
+
+    @Test
+    public void verifyFailRequestCount() throws Exception {
+        // Step 1: Connect.
+        BlockingStateWatcher stateWatcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
+        initClient(stateWatcher);
+        stateWatcher.waitForEvent();
+
+        // Step 2: Tell the server to fail 1 request.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.FAILREQUESTS, "1"));
+
+        try {
+            zkClient.exists(AnyPath, null);
+            Assert.fail("should have failed");
+        } catch (KeeperException ex) {
+        }
+
+        // Have not reset; should succeed.
+        zkClient.exists(AnyPath, null);
+    }
+
+    @Test
+    public void verifyServerEatsAllResponses() throws Exception {
+        // Step 1: Connect.
+        BlockingStateWatcher watcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
+        initClient(watcher);
+        watcher.waitForEvent();
+
+        // No data yet.
+        Assert.assertNull(zkClient.exists(AnyPath, null));
+
+        // Step 2: Tell the server to eat responses...nom...nom...nom....
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.NORESPONSE));
+
+        try {
+            BlockingPathWatcher pathWatcher = new BlockingPathWatcher(AnyPath, Watcher.Event.EventType.NodeCreated);
+            // This async call should succeed in setting the data, but never send a response.
+            zkClient.create(AnyPath, AnyData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, pathWatcher, null);
+            pathWatcher.waitForEvent(500);
+            Assert.fail("should time out since the event should never come");
+        } catch (TimeoutException ex) {
+        }
+
+        // Re-enable responses.
+        Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.RESET));
+        watcher.reset(Watcher.Event.KeeperState.SyncConnected);
+
+        try {
+            // Even though we get a good response, the client doesn't know about
+            // the transaction id (xid). This should terminate the connection and
+            // throw a KeeperException.
+            zkClient.exists(AnyPath, false);
+            Assert.fail("should have failed with bad xid");
+        } catch (KeeperException ex) {
+            // The client believes it has fallen behind so deems this a connection loss.
+            Assert.assertTrue(ex instanceof KeeperException.ConnectionLossException);
+        }
+
+        // The client should reconnect and be healthy after this.
+        watcher.waitForEvent();
+        Assert.assertNotNull(zkClient.exists(AnyPath, false));
+    }
+
+    /**
+     * Our watcher interface is called back on a potentially separate thread.
+     * Tests should be logically consolidated into a single method in the following format:
+     * for each action in my test
+     *     Setup test action
+     *     Kick off async action
+     *     await state change
+     *     verify state
+     *
+     * To enable this logical pattern, the watcher has an ordered set of states to wait on.
+     * When all the states have arrived (in order), the notifier is unblocked.
+     */
+    private abstract class EventWaiter implements Watcher, AsyncCallback.StringCallback {
+        private final int DEFAULT_WAIT_DURATION = 10000;
+        private CountDownLatch eventNotification;
+
+        public EventWaiter() {
+            reset();
+        }
+
+        protected void reset() {
+            eventNotification = new CountDownLatch(1);
+        }
+
+        @Override
+        public void process(WatchedEvent event) {
+            // NO-OP. Derived classes should override if required.
+            LOG.info("WatchedEvent: {}", event);
+        }
+
+        @Override
+        public void processResult(int rc, String path, Object ctx, String name) {
+            // NO-OP. Derived classes to implement if required.
+            LOG.info("StringCallback: {}, {}, {}, {}", rc, path, ctx, name);
+        }
+
+        public void notifyListener() {
+            eventNotification.countDown();
+        }
+
+        public void waitForEvent() throws InterruptedException, TimeoutException {
+            waitForEvent(DEFAULT_WAIT_DURATION);
+        }
+
+        public void waitForEvent(int waitDurationInMs) throws InterruptedException, TimeoutException {
+            // Wait ten seconds and throw if we time out.
+            if (!eventNotification.await(waitDurationInMs, TimeUnit.MILLISECONDS)) {
+                throw new TimeoutException("Timed out waiting for event");
+            }
+        }
+    }
+
+    private class BlockingStateWatcher extends EventWaiter {
+        private Object lockMe = new Object();
+        private LinkedList<Event.KeeperState> statesToWaitFor;
+
+        public BlockingStateWatcher(Event.KeeperState stateToNotifyOn) {
+            reset(stateToNotifyOn);
+        }
+
+        @Override
+        public void process(WatchedEvent event) {
+            LOG.info("State transition: {}", event.getState());
+
+            boolean shouldNotify = false;
+            synchronized (lockMe) {
+                if (!statesToWaitFor.isEmpty() && statesToWaitFor.getFirst() == event.getState()) {
+                    statesToWaitFor.removeFirst();
+                    shouldNotify = statesToWaitFor.isEmpty();
+                }
+            }
+
+            if (shouldNotify) {
+                notifyListener();
+            }
+        }
+
+        public void reset(Event.KeeperState stateToNotifyOn) {
+            reset(new Event.KeeperState[] {stateToNotifyOn});
+        }
+
+        public void reset(Event.KeeperState[] orderedStatesToWaitOn) {
+            if (orderedStatesToWaitOn == null) {
+                throw new IllegalArgumentException("orderedStatesToWaitOn can't be null.");
+            }
+
+            if (orderedStatesToWaitOn.length <= 0) {
+                throw new IllegalArgumentException("orderedStatesToWaitOn length must be positive.");
+            }
+
+            synchronized (lockMe) {
+                super.reset();
+                statesToWaitFor = new LinkedList<>();
+
+                for (Event.KeeperState state : orderedStatesToWaitOn) {
+                    statesToWaitFor.add(state);
+                }
+            }
+        }
+    }
+
+    private class BlockingPathWatcher extends EventWaiter {
+        private String pathToNotifyOn;
+        private Event.EventType requiredEventType;
+
+        public BlockingPathWatcher(String pathToNotifyOn, Event.EventType requiredEventType) {
+            reset(pathToNotifyOn, requiredEventType);
+        }
+
+        public void reset(String pathToNotifyOn, Event.EventType requiredEventType) {
+            super.reset();
+            this.pathToNotifyOn = pathToNotifyOn;
+            this.requiredEventType = requiredEventType;
+        }
+
+        @Override
+        public void process(WatchedEvent event) {
+            LOG.info("WatchEvent {} for path {}", event.getType(), event.getPath());
+            if (pathToNotifyOn != null && event.getType() == requiredEventType
+                    && pathToNotifyOn.equalsIgnoreCase(event.getPath())) {
+                notifyListener();
+            }
+        }
+    }
+
+}