You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/10/10 04:18:10 UTC

kudu git commit: java: replace bespoke minicluster implementation with control shell

Repository: kudu
Updated Branches:
  refs/heads/master 1680d23be -> 8455f0374


java: replace bespoke minicluster implementation with control shell

This patch replaces the Java client's bespoke MiniKuduCluster and MiniKdc
implementations with appropriate calls to the new CLI control shell. The
shell can communicate either in protobuf or JSON; I've opted to use the
former here since the Java client already understands protobuf, and doing
so yields more type safety.

Change-Id: Ia6340dadd6ff0397fffa23388a8cbbca3e26a618
Reviewed-on: http://gerrit.cloudera.org:8080/8194
Reviewed-by: Dan Burkert <da...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8455f037
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8455f037
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8455f037

Branch: refs/heads/master
Commit: 8455f0374eed60950679500d3faac70213b8ff2d
Parents: 1680d23
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri Sep 29 13:35:08 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Tue Oct 10 04:17:55 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/BaseKuduTest.java    | 101 +--
 .../java/org/apache/kudu/client/MiniKdc.java    | 450 -----------
 .../org/apache/kudu/client/MiniKuduCluster.java | 747 ++++++++-----------
 .../kudu/client/TestClientFailoverSupport.java  |   6 +-
 .../org/apache/kudu/client/TestMiniKdc.java     |  79 --
 .../apache/kudu/client/TestMiniKuduCluster.java |  57 +-
 .../kudu/client/TestMultipleLeaderFailover.java |   8 +-
 .../java/org/apache/kudu/client/TestUtils.java  | 140 ----
 java/kudu-client/src/test/resources/flags       |   5 -
 .../apache/kudu/spark/kudu/TestContext.scala    |   1 -
 10 files changed, 380 insertions(+), 1214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/8455f037/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
index a0a5832..fc6f303 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
@@ -18,6 +18,7 @@ package org.apache.kudu.client;
 
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -37,8 +38,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Common.HostPortPB;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
+import org.apache.kudu.client.LocatedTablet.Replica;
 import org.apache.kudu.master.Master;
 
 public class BaseKuduTest {
@@ -109,7 +112,6 @@ public class BaseKuduTest {
     miniCluster = miniClusterBuilder
         .numMasters(numMasters)
         .numTservers(numTabletServers)
-        .defaultTimeoutMs(DEFAULT_SLEEP)
         .build();
     masterAddresses = miniCluster.getMasterAddresses();
     masterHostPorts = miniCluster.getMasterHostPorts();
@@ -119,11 +121,6 @@ public class BaseKuduTest {
         .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
         .build();
     syncClient = new KuduClient(client);
-
-    LOG.info("Waiting for tablet servers...");
-    if (!miniCluster.waitForTabletServers(numTabletServers)) {
-      fail(String.format("Couldn't get %d tablet servers running, aborting", numTabletServers));
-    }
   }
 
   protected static KuduTable createTable(String tableName, Schema schema,
@@ -358,8 +355,8 @@ public class BaseKuduTest {
       fail("Table " + table.getName() + " only has 1 tablet, please enable replication");
     }
 
-    Integer port = findLeaderTabletServerPort(tablet);
-    miniCluster.killTabletServerOnPort(port);
+    HostAndPort hp = findLeaderTabletServerHostPort(tablet);
+    miniCluster.killTabletServerOnHostPort(hp);
   }
 
   /**
@@ -372,17 +369,17 @@ public class BaseKuduTest {
    * @throws Exception
    */
   protected static void killTabletLeader(RemoteTablet tablet) throws Exception {
-    int port = findLeaderTabletServerPort(new LocatedTablet(tablet));
-    miniCluster.killTabletServerOnPort(port);
+    HostAndPort hp = findLeaderTabletServerHostPort(new LocatedTablet(tablet));
+    miniCluster.killTabletServerOnHostPort(hp);
   }
 
   /**
-   * Finds the RPC port of the given tablet's leader tablet server.
+   * Finds the RPC port of the given tablet's leader tserver.
    * @param tablet a LocatedTablet
-   * @return the RPC port of the given tablet's leader tablet server.
-   * @throws Exception
+   * @return the host and port of the given tablet's leader tserver
+   * @throws Exception if we are unable to find the leader tserver
    */
-  protected static int findLeaderTabletServerPort(LocatedTablet tablet)
+  protected static HostAndPort findLeaderTabletServerHostPort(LocatedTablet tablet)
       throws Exception {
     LocatedTablet.Replica leader = null;
     DeadlineTracker deadlineTracker = new DeadlineTracker();
@@ -399,47 +396,44 @@ public class BaseKuduTest {
         Thread.sleep(50);
       }
     }
-    return leader.getRpcPort();
+    return HostAndPort.fromParts(leader.getRpcHost(), leader.getRpcPort());
   }
 
   /**
    * Helper method to easily kill the leader master.
    *
    * This method is thread-safe.
-   * @throws Exception If there is an error finding or killing the leader master.
+   * @throws Exception if there is an error finding or killing the leader master.
    */
   protected static void killMasterLeader() throws Exception {
-    int leaderPort = findLeaderMasterPort();
-    miniCluster.killMasterOnPort(leaderPort);
+    HostAndPort hp = findLeaderMasterHostPort();
+    miniCluster.killMasterOnHostPort(hp);
   }
 
   /**
-   * Find the port of the leader master in order to retrieve it from the port to process map.
-   * @return The port of the leader master.
-   * @throws Exception If we are unable to find the leader master.
+   * Find the host and port of the leader master.
+   * @return the host and port of the leader master
+   * @throws Exception if we are unable to find the leader master
    */
-  protected static int findLeaderMasterPort() throws Exception {
+  protected static HostAndPort findLeaderMasterHostPort() throws Exception {
     Stopwatch sw = Stopwatch.createStarted();
-    int leaderPort = -1;
-    while (leaderPort == -1 && sw.elapsed(TimeUnit.MILLISECONDS) < DEFAULT_SLEEP) {
+    while (sw.elapsed(TimeUnit.MILLISECONDS) < DEFAULT_SLEEP) {
       Deferred<Master.GetTableLocationsResponsePB> masterLocD =
           client.getMasterTableLocationsPB(null);
       Master.GetTableLocationsResponsePB r = masterLocD.join(DEFAULT_SLEEP);
-      leaderPort = r.getTabletLocations(0)
+      HostPortPB pb = r.getTabletLocations(0)
           .getReplicas(0)
           .getTsInfo()
-          .getRpcAddresses(0)
-          .getPort();
-    }
-    if (leaderPort == -1) {
-      fail(String.format("No leader master found after %d ms", DEFAULT_SLEEP));
+          .getRpcAddresses(0);
+      if (pb.getPort() != -1) {
+        return HostAndPort.fromParts(pb.getHost(), pb.getPort());
+      }
     }
-    return leaderPort;
+    throw new IOException(String.format("No leader master found after %d ms", DEFAULT_SLEEP));
   }
 
   /**
    * Picks at random a tablet server that serves tablets from the passed table and restarts it.
-   * Waits between killing and restarting the process.
    * @param table table to query for a TS to restart
    * @throws Exception
    */
@@ -450,44 +444,31 @@ public class BaseKuduTest {
     }
 
     LocatedTablet tablet = tablets.get(0);
-
-    int port = tablet.getReplicas().get(
-        randomForTSRestart.nextInt(tablet.getReplicas().size())).getRpcPort();
-
-    miniCluster.killTabletServerOnPort(port);
-
-    Thread.sleep(1000);
-
-    miniCluster.restartDeadTabletServerOnPort(port);
+    Replica replica = tablet.getReplicas().get(randomForTSRestart.nextInt(tablet.getReplicas().size()));
+    HostAndPort hp = HostAndPort.fromParts(replica.getRpcHost(), replica.getRpcPort());
+    miniCluster.killTabletServerOnHostPort(hp);
+    miniCluster.restartDeadTabletServerOnHostPort(hp);
   }
 
   /**
    * Kills a tablet server that serves the given tablet's leader and restarts it.
-   * Waits between killing and restarting the process.
-   *
    * @param tablet a RemoteTablet which will get its leader killed and restarted
    * @throws Exception
    */
   protected static void restartTabletServer(RemoteTablet tablet) throws Exception {
-    int port = findLeaderTabletServerPort(new LocatedTablet(tablet));
-    miniCluster.killTabletServerOnPort(port);
-
-    Thread.sleep(1000);
-
-    miniCluster.restartDeadTabletServerOnPort(port);
+    HostAndPort hp = findLeaderTabletServerHostPort(new LocatedTablet(tablet));
+    miniCluster.killTabletServerOnHostPort(hp);
+    miniCluster.restartDeadTabletServerOnHostPort(hp);
   }
 
   /**
-   * Kills, sleeps, then restarts the leader master.
+   * Kills and restarts the leader master.
    * @throws Exception
    */
   protected static void restartLeaderMaster() throws Exception {
-    int master = findLeaderMasterPort();
-    miniCluster.killMasterOnPort(master);
-
-    Thread.sleep(1000);
-
-    miniCluster.restartDeadMasterOnPort(master);
+    HostAndPort hp = findLeaderMasterHostPort();
+    miniCluster.killMasterOnHostPort(hp);
+    miniCluster.restartDeadMasterOnHostPort(hp);
   }
 
   /**
@@ -503,15 +484,15 @@ public class BaseKuduTest {
    * Kills all tablet servers in the cluster.
    * @throws InterruptedException
    */
-  protected void killTabletServers() throws InterruptedException {
-    miniCluster.killTabletServers();
+  protected void killTabletServers() throws IOException {
+    miniCluster.killTservers();
   }
 
   /**
    * Restarts killed tablet servers in the cluster.
    * @throws Exception
    */
-  protected void restartTabletServers() throws Exception {
-    miniCluster.restartDeadTabletServers();
+  protected void restartTabletServers() throws IOException {
+    miniCluster.restartDeadTservers();
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/8455f037/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
deleted file mode 100644
index d625c30..0000000
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
+++ /dev/null
@@ -1,450 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.kudu.client;
-
-import java.io.BufferedReader;
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.lang.reflect.Field;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import javax.annotation.concurrent.NotThreadSafe;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.io.CharStreams;
-import org.apache.commons.io.FileUtils;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A managed Kerberos Key Distribution Center.
- *
- * Provides utility functions to create users and services which can authenticate
- * to the KDC.
- *
- * The KDC is managed as an external process, using the krb5 binaries installed on the system.
- *
- * For debugging Kerberos client issues, it can be helpful to add
- * {@code -Dsun.security.krb5.debug=true} to the JVM properties.
- */
-@InterfaceAudience.Private
-@NotThreadSafe
-public class MiniKdc implements Closeable {
-  /**
-   * The '-Ffn' flag gets lsof to output something like:
-   *   p19730
-   *   f123
-   *   n*:41254
-   * The first line is the pid. We ignore it.
-   * The second line is the file descriptor number. We ignore it.
-   * The third line has the bind address and port.
-   *
-   * This regex just looks for the third line.
-   */
-  private static final Pattern LSOF_PATTERN = Pattern.compile(
-      "^n\\*:([0-9]+)$", Pattern.MULTILINE);
-
-  private static final Logger LOG = LoggerFactory.getLogger(MiniKuduCluster.class);
-
-  private final Options options;
-
-  private Process kdcProcess;
-
-  private Thread kdcProcessLogRedirector;
-
-  /**
-   * The ephemeral port that the KDC is bound to.
-   *
-   * This will be 0 if the KDC has not yet started.
-   */
-  private int kdcPort = 0;
-
-  /**
-   * Options for the MiniKdc.
-   */
-  public static class Options {
-    private final String realm;
-    private final Path dataRoot;
-
-    public Options(String realm, Path dataRoot) {
-      this.realm = realm;
-      this.dataRoot = dataRoot;
-    }
-
-    public String getRealm() {
-      return realm;
-    }
-
-    public Path getDataRoot() {
-      return dataRoot;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-                        .add("realm", realm)
-                        .add("dataRoot", dataRoot)
-                        .toString();
-    }
-  }
-
-  /**
-   * Creates a MiniKdc with explicit options.
-   */
-  public MiniKdc(Options options) {
-    this.options = options;
-  }
-
-  /**
-   * Creates a MiniKdc with default options.
-   */
-  public static MiniKdc withDefaults() throws IOException {
-    return new MiniKdc(new Options(
-        "KRBTEST.COM",
-        Paths.get(TestUtils.getBaseDir(), "krb5kdc-" + System.currentTimeMillis())));
-  }
-  /**
-   * Start the MiniKdc.
-   */
-  public void start() throws IOException, InterruptedException {
-    Preconditions.checkState(kdcProcess == null);
-    LOG.debug("starting KDC {}", options);
-
-    File dataRootDir = options.dataRoot.toFile();
-    if (!dataRootDir.exists()) {
-      if (!dataRootDir.mkdir()) {
-        throw new RuntimeException(String.format("unable to create krb5 state directory: %s",
-                                                 dataRootDir));
-      }
-
-      createKdcConf();
-      createKrb5Conf();
-
-      // Create the KDC database using the kdb5_util tool.
-      checkReturnCode(
-          startProcessWithKrbEnv(
-              getBinaryPath("kdb5_util"),
-              "create",
-              "-s", // Stash the master password.
-              "-P", "masterpw", // Set a password.
-              "-W" // Use weak entropy (since we don't need real security).
-          ), "kdb5_util", true);
-    }
-
-    kdcProcess = startProcessWithKrbEnv(getBinaryPath("krb5kdc"),
-                                        "-n"); // Do not daemonize.
-    // Redirect the KDC output to SLF4J.
-    kdcProcessLogRedirector = new Thread(
-        new MiniKuduCluster.ProcessInputStreamLogPrinterRunnable(kdcProcess.getInputStream()),
-        "krb5kdc:" + options.dataRoot);
-    kdcProcessLogRedirector.setDaemon(true);
-    kdcProcessLogRedirector.start();
-
-    // The first time we start a KDC, we use an ephemeral port. Figure out what port
-    // the KDC actually bound to, and rewrite the configuration to refer to it.
-    kdcPort = waitForBoundUdpPort(kdcProcess);
-    createKdcConf();
-    createKrb5Conf();
-  }
-
-  /**
-   * Creates a new Kerberos user with the given username.
-   * @param username the new user
-   */
-  void createUserPrincipal(String username) throws IOException {
-    checkReturnCode(
-        startProcessWithKrbEnv(
-            getBinaryPath("kadmin.local"),
-            "-q",
-            String.format("add_principal -pw %s %s", username, username)
-        ), "kadmin.local", true);
-  }
-
-  /**
-   * Kinit a user with the mini KDC.
-   * @param username the user to kinit
-   */
-  void kinit(String username) throws IOException {
-    Process proc = startProcessWithKrbEnv(getBinaryPath("kinit"), username);
-    proc.getOutputStream().write(username.getBytes());
-    proc.getOutputStream().close();
-    checkReturnCode(proc, "kinit", true);
-  }
-
-  /**
-   * Returns the output from the 'klist' utility. This is useful for logging the
-   * local ticket cache state.
-   */
-  String klist() throws IOException {
-    Process proc = startProcessWithKrbEnv(getBinaryPath("klist"), "-A");
-    checkReturnCode(proc, "klist", false);
-    return CharStreams.toString(new InputStreamReader(proc.getInputStream()));
-  }
-
-  /**
-   * Creates a new service principal and associated keytab, returning its path.
-   * @param spn the desired service principal name (e.g. "kudu/foo.example.com").
-   *            If the principal already exists, its key will be reset and a new
-   *            keytab will be generated.
-   * @return the path to the new services' keytab file.
-   */
-  Path createServiceKeytab(String spn) throws IOException {
-    Path kt_path = options.dataRoot.resolve(spn.replace('/', '_') + ".keytab");
-    String kadmin = getBinaryPath("kadmin.local");
-    checkReturnCode(startProcessWithKrbEnv(kadmin,
-                                           "-q",
-                                           String.format("add_principal -randkey %s", spn)),
-                    "kadmin.local", true);
-
-    checkReturnCode(startProcessWithKrbEnv(kadmin,
-                                           "-q",
-                                           String.format("ktadd -k %s %s", kt_path, spn)),
-                    "kadmin.local", true);
-    return kt_path;
-  }
-
-  private void createKrb5Conf() throws IOException {
-    List<String> contents = ImmutableList.of(
-        "[logging]",
-        "   kdc = FILE:/dev/stderr",
-
-        "[libdefaults]",
-        "   default_realm = " + options.realm,
-        "   dns_lookup_kdc = false",
-        "   dns_lookup_realm = false",
-        "   forwardable = true",
-        "   renew_lifetime = 7d",
-        "   ticket_lifetime = 24h",
-
-        // Disable aes256, since Java does not support it without JCE, see
-        // https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/jgss-features.html
-        "   default_tkt_enctypes = aes128-cts des3-cbc-sha1",
-        "   default_tgs_enctypes = aes128-cts des3-cbc-sha1",
-        "   permitted_enctypes = aes128-cts des3-cbc-sha1",
-
-        // In miniclusters, we start daemons on local loopback IPs that
-        // have no reverse DNS entries. So, disable reverse DNS.
-        "   rdns = false",
-        "   ignore_acceptor_hostname = true",
-
-        "[realms]",
-        options.realm + " = {",
-        "   kdc = 127.0.0.1:" + kdcPort,
-        "}");
-
-    Files.write(options.dataRoot.resolve("krb5.conf"), contents, Charsets.UTF_8);
-  }
-
-  private void createKdcConf() throws IOException {
-    List<String> contents = ImmutableList.of(
-        "[kdcdefaults]",
-        "   kdc_ports = " + kdcPort,
-        "   kdc_tcp_ports = \"\"",
-
-        "[realms]",
-        options.realm + " = {",
-        "   acl_file = " + options.dataRoot.resolve("kadm5.acl"),
-        "   admin_keytab = " + options.dataRoot.resolve("kadm5.keytab"),
-        "   database_name = " + options.dataRoot.resolve("principal"),
-        "   key_stash_file = " + options.dataRoot.resolve(".k5." + options.realm),
-        "   max_renewable_life = 7d 0h 0m 0s",
-        "}");
-
-    Files.write(options.dataRoot.resolve("kdc.conf"), contents, Charsets.UTF_8);
-  }
-
-  /**
-   * Stop the MiniKdc.
-   */
-  public void stop() throws IOException {
-    Preconditions.checkState(kdcProcess != null);
-    LOG.debug("stopping KDC {}", options);
-    try {
-      kdcProcess.destroy();
-      kdcProcess.waitFor();
-      kdcProcessLogRedirector.join();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    } finally {
-      kdcProcess = null;
-      kdcProcessLogRedirector = null;
-    }
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void close() throws IOException {
-    LOG.debug("closing KDC {}", options);
-    try {
-      if (kdcProcess != null) {
-        stop();
-      }
-    } finally {
-      FileUtils.deleteDirectory(options.dataRoot.toFile());
-    }
-  }
-
-  private static final List<String> KRB5_BINARY_PATHS = ImmutableList.of(
-      "/usr/local/opt/krb5/sbin", // Homebrew
-      "/usr/local/opt/krb5/bin", // Homebrew
-      "/opt/local/sbin", // Macports
-      "/opt/local/bin", // Macports
-      "/usr/lib/mit/sbin", // SLES
-      "/usr/sbin" // Linux
-  );
-
-  public Map<String, String> getEnvVars() {
-    return ImmutableMap.of(
-        "KRB5_CONFIG", options.dataRoot.resolve("krb5.conf").toString(),
-        "KRB5_KDC_PROFILE", options.dataRoot.resolve("kdc.conf").toString(),
-        "KRB5CCNAME", getTicketCachePath(),
-        "KUDU_ENABLE_KRB5_REALM_FIX", "yes"
-    );
-  }
-
-  /**
-   * @return the path of the Kerberos ticket/credential cache
-   */
-  public String getTicketCachePath() {
-    return options.dataRoot.resolve("krb5cc").toString();
-  }
-
-  private Process startProcessWithKrbEnv(String... argv) throws IOException {
-
-    ProcessBuilder procBuilder = new ProcessBuilder(argv);
-    procBuilder.environment().putAll(getEnvVars());
-    LOG.debug("executing '{}', env: '{}'",
-              Joiner.on(" ").join(procBuilder.command()),
-              Joiner.on(", ").withKeyValueSeparator("=").join(procBuilder.environment()));
-    return procBuilder.redirectErrorStream(true).start();
-  }
-
-  /**
-   * Waits for the process to exit, checking the return code. Any output to the
-   * process' stdout is optionally logged to SLF4J.
-   * @param process the process to check
-   * @param name the name of the process
-   * @param log whether to log the process' stdout.
-   */
-  private static void checkReturnCode(Process process, String name, boolean log) throws IOException {
-    int ret;
-    try {
-      ret = process.waitFor();
-      if (log) {
-        // Reading the output *after* waiting for the process to close can deadlock
-        // if the process overwhelms the output buffer, however none of the krb5
-        // utilities are known to do that.
-        try (BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
-          String line;
-          while ((line = in.readLine()) != null) {
-            LOG.debug(line);
-          }
-        }
-      }
-    } catch (InterruptedException e) {
-      Thread.interrupted();
-      throw new IOException(String.format("process '%s' interrupted", name));
-    }
-    if (ret != 0) {
-      throw new IOException(String.format("process '%s' failed: %s", name, ret));
-    }
-  }
-
-  /**
-   * Return the process identifier (pid) of a running Process.
-   *
-   * This is Unix-specific, but since MiniKDC only supports Unix, not a big deal.
-   */
-  private static int getProcessPid(Process p) {
-    try {
-      Field pidField = p.getClass().getDeclaredField("pid");
-      pidField.setAccessible(true);
-      return (int)pidField.get(p);
-    } catch (IllegalAccessException | NoSuchFieldException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * Wait for the given process to bind to a UDP port, and then return that port.
-   * This depends on 'lsof' being installed on the system.
-   *
-   * The provided Process must already be started.
-   */
-  private static int waitForBoundUdpPort(Process p) throws IOException, InterruptedException {
-    int pid = getProcessPid(p);
-    String lsof = getBinaryPath("lsof", ImmutableList.of("/sbin", "/usr/sbin"));
-    List<String> cmd = ImmutableList.of(
-        lsof, "-wbnP", "-Ffn",
-        "-p", "" + pid,
-        "-a", "-i", "4UDP");
-
-    Stopwatch sw = Stopwatch.createStarted();
-    for (int i = 0; ; i++) {
-      Process proc = new ProcessBuilder().command(cmd).start();
-      try {
-        checkReturnCode(proc, "lsof", false);
-        String lsofOutput = CharStreams.toString(new InputStreamReader(proc.getInputStream()));
-        Matcher m = LSOF_PATTERN.matcher(lsofOutput);
-        if (m.find()) {
-          return Integer.parseInt(m.group(1));
-        }
-      } catch (IOException ioe) {
-        // If lsof fails and we have already used up our timeout, re-throw.
-        if (sw.elapsed(TimeUnit.MILLISECONDS) > 5000) {
-          throw ioe;
-        }
-        Thread.sleep(10 * i);
-      }
-    }
-  }
-
-  private static String getBinaryPath(String executable) throws IOException {
-    return getBinaryPath(executable, KRB5_BINARY_PATHS);
-  }
-
-  private static String getBinaryPath(String executable,
-                                      List<String> searchPaths) throws IOException {
-    for (String path : searchPaths) {
-      File f = Paths.get(path).resolve(executable).toFile();
-      if (f.exists() && f.canExecute()) {
-        return f.getPath();
-      }
-    }
-
-    Process which = new ProcessBuilder().command("which", executable).start();
-    checkReturnCode(which, "which", false);
-    return CharStreams.toString(new InputStreamReader(which.getInputStream())).trim();
-  }
-}

http://git-wip-us.apache.org/repos/asf/kudu/blob/8455f037/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
index 67b3a00..3546d27 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
@@ -15,437 +15,344 @@
 package org.apache.kudu.client;
 
 import java.io.BufferedReader;
-import java.io.File;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.net.HostAndPort;
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import org.apache.kudu.util.NetUtil;
+import org.apache.kudu.Common.HostPortPB;
+import org.apache.kudu.tools.Tool.ControlShellRequestPB;
+import org.apache.kudu.tools.Tool.ControlShellResponsePB;
+import org.apache.kudu.tools.Tool.CreateClusterRequestPB;
+import org.apache.kudu.tools.Tool.DaemonIdentifierPB;
+import org.apache.kudu.tools.Tool.DaemonInfoPB;
+import org.apache.kudu.tools.Tool.GetKDCEnvVarsRequestPB;
+import org.apache.kudu.tools.Tool.GetMastersRequestPB;
+import org.apache.kudu.tools.Tool.GetTServersRequestPB;
+import org.apache.kudu.tools.Tool.StartClusterRequestPB;
+import org.apache.kudu.tools.Tool.StartDaemonRequestPB;
+import org.apache.kudu.tools.Tool.StopDaemonRequestPB;
 import org.apache.kudu.util.SecurityUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Utility class to start and manipulate Kudu clusters. Relies on being IN the Kudu source code with
- * both the kudu-master and kudu-tserver binaries already compiled. {@link BaseKuduTest} should be
+ * Utility class to start and manipulate Kudu clusters. Depends on precompiled
+ * kudu, kudu-master, and kudu-tserver binaries. {@link BaseKuduTest} should be
  * extended instead of directly using this class in almost all cases.
  */
 public class MiniKuduCluster implements AutoCloseable {
 
   private static final Logger LOG = LoggerFactory.getLogger(MiniKuduCluster.class);
 
-  // TS and Master ports will be assigned starting with this one.
-  private static final int PORT_START = 64030;
+  // Control shell process.
+  private Process miniCluster;
 
-  // List of threads that print
-  private final List<Thread> processInputPrinters = new ArrayList<>();
+  // Request channel to the control shell.
+  private DataOutputStream miniClusterStdin;
 
-  // Map of ports to master servers.
-  private final Map<Integer, Process> masterProcesses = new ConcurrentHashMap<>();
+  // Response channel from the control shell.
+  private DataInputStream miniClusterStdout;
 
-  // Map of ports to tablet servers.
-  private final Map<Integer, Process> tserverProcesses = new ConcurrentHashMap<>();
+  // Thread that reads and logs stderr from the control shell.
+  private Thread miniClusterErrorPrinter;
 
-  // Map of ports to process command lines. Never removed from. Used to restart processes.
-  private final Map<Integer, List<String>> commandLines = new ConcurrentHashMap<>();
-
-  private final List<String> pathsToDelete = new ArrayList<>();
-  private final List<HostAndPort> masterHostPorts = new ArrayList<>();
-  private final List<Integer> tserverPorts = new ArrayList<>();
-  private final ImmutableList<String> extraTserverFlags;
-  private final ImmutableList<String> extraMasterFlags;
+  private class DaemonInfo {
+    DaemonIdentifierPB id;
+    boolean isRunning;
+  }
 
-  // Client we can use for common operations.
-  private KuduClient syncClient;
-  private final int defaultTimeoutMs;
+  // Map of master addresses to daemon information.
+  private final Map<HostAndPort, DaemonInfo> masters = Maps.newHashMap();
 
-  private String masterAddresses;
+  // Map of tserver addresses to daemon information.
+  private final Map<HostAndPort, DaemonInfo> tservers = Maps.newHashMap();
 
-  private final String bindHost = TestUtils.getUniqueLocalhost();
-  private Path keytab;
-  private MiniKdc miniKdc;
+  // Builder-provided cluster configuration state.
+  private final boolean enableKerberos;
+  private final int numMasters;
+  private final int numTservers;
+  private final ImmutableList<String> extraTserverFlags;
+  private final ImmutableList<String> extraMasterFlags;
 
-  private MiniKuduCluster(final int defaultTimeoutMs,
-                          final List<String> extraTserverFlags,
-                          final List<String> extraMasterFlags) {
-    this.defaultTimeoutMs = defaultTimeoutMs;
+  private MiniKuduCluster(boolean enableKerberos,
+      int numMasters,
+      int numTservers,
+      List<String> extraTserverFlags,
+      List<String> extraMasterFlags) {
+    this.enableKerberos = enableKerberos;
+    this.numMasters = numMasters;
+    this.numTservers = numTservers;
     this.extraTserverFlags = ImmutableList.copyOf(extraTserverFlags);
     this.extraMasterFlags = ImmutableList.copyOf(extraMasterFlags);
   }
 
   /**
-   * Enable Kerberos security for this cluster, start the MiniKdc, and log in
-   * the required subjects.
-   */
-  private void startKerberos() throws Exception {
-    miniKdc = MiniKdc.withDefaults();
-    miniKdc.start();
-
-    keytab = miniKdc.createServiceKeytab("kudu/" + bindHost);
-
-    miniKdc.createUserPrincipal("testuser");
-    miniKdc.kinit("testuser");
-    System.setProperty("java.security.krb5.conf",
-        miniKdc.getEnvVars().get("KRB5_CONFIG"));
-    System.setProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY,
-        miniKdc.getEnvVars().get("KRB5CCNAME"));
-  }
-
-  /**
-   * Start the master and tablet server processes.
-   * @param numMasters the number of masters to start.
-   * @param numTservers the number of tablet servers to start.
+   * Sends a command to the control shell and receives its response.
+   * <p>
+   * The method is synchronized to prevent interleaving of requests and responses.
+   * @param req control shell request
+   * @return control shell response
+   * @throws IOException if there was some kind of transport error, or if the
+   *                     response indicates an error
    */
-  private void start(int numMasters, int numTservers) throws Exception {
-    startCluster(numMasters, numTservers);
-
-    KuduClient.KuduClientBuilder kuduClientBuilder =
-        new KuduClient.KuduClientBuilder(getMasterAddresses());
-    kuduClientBuilder.defaultAdminOperationTimeoutMs(defaultTimeoutMs);
-    kuduClientBuilder.defaultOperationTimeoutMs(defaultTimeoutMs);
-    syncClient = kuduClientBuilder.build();
-  }
-
-  /**
-   * Wait up to this instance's "default timeout" for an expected count of TS to
-   * connect to the master.
-   * @param expected How many TS are expected
-   * @return true if there are at least as many TS as expected, otherwise false
-   */
-  public boolean waitForTabletServers(int expected) throws Exception {
-    int count = 0;
-    Stopwatch stopwatch = Stopwatch.createStarted();
-    while (count < expected && stopwatch.elapsed(TimeUnit.MILLISECONDS) < defaultTimeoutMs) {
-      Thread.sleep(200);
-      count = syncClient.listTabletServers().getTabletServersCount();
-    }
-    return count >= expected;
+  private synchronized ControlShellResponsePB sendRequestToCluster(ControlShellRequestPB req)
+      throws IOException {
+    // Send the request's size (4 bytes, big endian) followed by the request.
+    LOG.debug("Request: {}", req);
+    miniClusterStdin.writeInt(req.getSerializedSize());
+    miniClusterStdin.write(req.toByteArray());
+    miniClusterStdin.flush();
+
+    // Read the response's size (4 bytes, big endian) followed by the response.
+    int respLength = miniClusterStdout.readInt();
+    byte[] respBody = new byte[respLength];
+    miniClusterStdout.readFully(respBody);
+    ControlShellResponsePB resp = ControlShellResponsePB.parseFrom(respBody);
+    LOG.debug("Response: {}", resp);
+
+    // Convert any error into an exception.
+    if (resp.hasError()) {
+      throw new NonRecoverableException(Status.fromPB(resp.getError()));
+    }
+    return resp;
   }
 
   /**
-   * Starts a Kudu cluster composed of the provided masters and tablet servers.
-   * @param numMasters how many masters to start
-   * @param numTservers how many tablet servers to start
+   * Starts this Kudu cluster.
+   * @throws IOException if something went wrong in transit
    */
-  private void startCluster(int numMasters, int numTservers) throws Exception {
+  private void start() throws IOException {
     Preconditions.checkArgument(numMasters > 0, "Need at least one master");
-    // The following props are set via kudu-client's pom.
-    String baseDirPath = TestUtils.getBaseDir();
-
-    LOG.info("Starting {} masters...", numMasters);
-    int startPort = startMasters(PORT_START, numMasters, baseDirPath, bindHost);
 
-    LOG.info("Starting {} tablet servers...", numTservers);
-    startTabletServers(startPort, numTservers, baseDirPath);
-  }
-
-  /**
-   * Start the specified number of masters with ports starting from the specified
-   * number. Finds free web and RPC ports up front for all of the masters first, then
-   * starts them on those ports.
-   *
-   * @param startPort the starting point of the port range for the masters
-   * @param numServers number of master servers to start
-   * @param baseDirPath the base directory where the mini cluster stores its data
-   * @return the next free port
-   * @throws Exception if we are unable to start the masters
-   */
-  private int startMasters(int startPort,
-                           int numServers,
-                           String baseDirPath,
-                           String bindHost) throws Exception {
-    if (numServers <= 0) {
-      return startPort;
-    }
-    // Get the list of web and RPC ports to use for the master consensus configuration:
-    // request NUM_MASTERS * 2 free ports as we want to also reserve the web
-    // ports for the consensus configuration.
-    final List<Integer> ports = TestUtils.findFreePorts(
-        startPort > 0 ? startPort : PORT_START, numServers * 2);
-    List<Integer> masterRpcPorts = Lists.newArrayListWithCapacity(numServers);
-    List<Integer> masterWebPorts = Lists.newArrayListWithCapacity(numServers);
-    for (int i = 0; i < numServers * 2; i++) {
-      if (i % 2 == 0) {
-        masterRpcPorts.add(ports.get(i));
-        masterHostPorts.add(HostAndPort.fromParts(bindHost, ports.get(i)));
-      } else {
-        masterWebPorts.add(ports.get(i));
+    // Start the control shell and the communication channel to it.
+    List<String> commandLine = Lists.newArrayList(
+        TestUtils.findBinary("kudu"),
+        "test",
+        "mini_cluster",
+        "--serialization=pb");
+    LOG.info("Starting process: {}", commandLine);
+    ProcessBuilder processBuilder = new ProcessBuilder(commandLine);
+    miniCluster = processBuilder.start();
+    miniClusterStdin = new DataOutputStream(miniCluster.getOutputStream());
+    miniClusterStdout = new DataInputStream(miniCluster.getInputStream());
+
+    // Set up a thread that logs stderr from the control shell; this will
+    // include all cluster logging.
+    ProcessInputStreamLogPrinterRunnable printer =
+        new ProcessInputStreamLogPrinterRunnable(miniCluster.getErrorStream());
+    miniClusterErrorPrinter = new Thread(printer);
+    miniClusterErrorPrinter.setDaemon(true);
+    miniClusterErrorPrinter.setName("cluster stderr printer");
+    miniClusterErrorPrinter.start();
+
+    // Create and start the cluster.
+    sendRequestToCluster(
+        ControlShellRequestPB.newBuilder()
+        .setCreateCluster(CreateClusterRequestPB.newBuilder()
+            .setNumMasters(numMasters)
+            .setNumTservers(numTservers)
+            .setEnableKerberos(enableKerberos)
+            .addAllExtraMasterFlags(extraMasterFlags)
+            .addAllExtraTserverFlags(extraTserverFlags)
+            .build())
+        .build());
+    sendRequestToCluster(
+        ControlShellRequestPB.newBuilder()
+        .setStartCluster(StartClusterRequestPB.newBuilder().build())
+        .build());
+
+    // If the cluster is Kerberized, retrieve the KDC's environment variables
+    // and adapt them into certain security-related system properties.
+    if (enableKerberos) {
+      ControlShellResponsePB resp = sendRequestToCluster(
+          ControlShellRequestPB.newBuilder()
+          .setGetKdcEnvVars(GetKDCEnvVarsRequestPB.newBuilder().build())
+          .build());
+      for (Map.Entry<String, String> e : resp.getGetKdcEnvVars().getEnvVarsMap().entrySet()) {
+        if (e.getKey().equals("KRB5_CONFIG")) {
+          System.setProperty("java.security.krb5.conf", e.getValue());
+        } else if (e.getKey().equals("KRB5CCNAME")) {
+          System.setProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY, e.getValue());
+        }
       }
     }
-    masterAddresses = NetUtil.hostsAndPortsToString(masterHostPorts);
-    long now = System.currentTimeMillis();
-    for (int i = 0; i < numServers; i++) {
-      int port = masterRpcPorts.get(i);
-      String masterBaseDirPath = baseDirPath + "/master-" + i + "-" + now;
-      new File(masterBaseDirPath).mkdir();
-      String logDirPath = masterBaseDirPath + "/logs";
-      new File(logDirPath).mkdir();
-      String dataDirPath = masterBaseDirPath + "/data";
-      String flagsPath = TestUtils.getFlagsPath();
-      // The web port must be reserved in the call to findFreePorts above and specified
-      // to avoid the scenario where:
-      // 1) findFreePorts finds RPC ports a, b, c for the 3 masters.
-      // 2) start master 1 with RPC port and let it bind to any (specified as 0) web port.
-      // 3) master 1 happens to bind to port b for the web port, as master 2 hasn't been
-      // started yet and findFreePort(s) is "check-time-of-use" (it does not reserve the
-      // ports, only checks that when it was last called, these ports could be used).
-      List<String> commandLine = Lists.newArrayList(
-          TestUtils.findBinary("kudu-master"),
-          "--flagfile=" + flagsPath,
-          "--log_dir=" + logDirPath,
-          "--fs_wal_dir=" + dataDirPath,
-          "--fs_data_dirs=" + dataDirPath,
-          "--ipki_ca_key_size=1024",
-          "--ipki_server_key_size=1024",
-          "--tsk_num_rsa_bits=512",
-          "--webserver_interface=" + bindHost,
-          "--local_ip_for_outbound_sockets=" + bindHost,
-          "--rpc_bind_addresses=" + bindHost + ":" + port,
-          "--webserver_port=" + masterWebPorts.get(i),
-          "--raft_heartbeat_interval_ms=200"); // make leader elections faster for faster tests
-
-      if (numServers > 1) {
-        commandLine.add("--master_addresses=" + masterAddresses);
-      }
 
-      if (miniKdc != null) {
-        commandLine.add("--keytab_file=" + keytab);
-        commandLine.add("--principal=kudu/" + bindHost);
-        commandLine.add("--rpc_authentication=required");
-        commandLine.add("--superuser_acl=testuser");
-      }
-
-      commandLine.addAll(extraMasterFlags);
-
-      if (flagsPath.startsWith(baseDirPath)) {
-        // We made a temporary copy of the flags; delete them later.
-        pathsToDelete.add(flagsPath);
-      }
-      pathsToDelete.add(masterBaseDirPath);
-
-      masterProcesses.put(port, configureAndStartProcess(port, commandLine));
-      commandLines.put(port, commandLine);
+    // Initialize the maps of masters and tservers.
+    ControlShellResponsePB resp = sendRequestToCluster(
+        ControlShellRequestPB.newBuilder()
+        .setGetMasters(GetMastersRequestPB.newBuilder().build())
+        .build());
+    for (DaemonInfoPB info : resp.getGetMasters().getMastersList()) {
+      DaemonInfo d = new DaemonInfo();
+      d.id = info.getId();
+      d.isRunning = true;
+      masters.put(hostAndPortFromPB(info.getBoundRpcAddress()), d);
     }
-    // Return next port number.
-    return ports.get(ports.size() - 1) + 1;
-  }
-
-  /**
-   * Start the specified number of tablet servers with ports starting from the specified
-   * number. Finds free web and RPC ports up front for all of the tablet servers first,
-   * then starts them on those ports.
-   *
-   * @param startPort the starting point of the port range for the masters
-   * @param numServers number of tablet servers to start
-   * @param baseDirPath the base directory where the mini cluster stores its data
-   * @return the next free port
-   * @throws Exception if something fails
-   */
-  private int startTabletServers(int startPort,
-                                 int numServers,
-                                 String baseDirPath) throws Exception {
-    if (numServers <= 0) {
-      return startPort;
-    }
-    long now = System.currentTimeMillis();
-    final List<Integer> ports = TestUtils.findFreePorts(
-        startPort > 0 ? startPort : PORT_START, numServers * 2);
-    for (int i = 0; i < numServers; i++) {
-      int rpcPort = ports.get(i * 2);
-      tserverPorts.add(rpcPort);
-      String tsBaseDirPath = baseDirPath + "/ts-" + i + "-" + now;
-      new File(tsBaseDirPath).mkdir();
-      String logDirPath = tsBaseDirPath + "/logs";
-      new File(logDirPath).mkdir();
-      String dataDirPath = tsBaseDirPath + "/data";
-      String flagsPath = TestUtils.getFlagsPath();
-
-      List<String> commandLine = Lists.newArrayList(
-          TestUtils.findBinary("kudu-tserver"),
-          "--flagfile=" + flagsPath,
-          "--log_dir=" + logDirPath,
-          "--fs_wal_dir=" + dataDirPath,
-          "--fs_data_dirs=" + dataDirPath,
-          "--flush_threshold_mb=1",
-          "--ipki_server_key_size=1024",
-          "--tserver_master_addrs=" + masterAddresses,
-          "--webserver_interface=" + bindHost,
-          "--local_ip_for_outbound_sockets=" + bindHost,
-          "--webserver_port=" + (rpcPort + 1),
-          "--rpc_bind_addresses=" + bindHost + ":" + rpcPort);
-
-      if (miniKdc != null) {
-        commandLine.add("--keytab_file=" + keytab);
-        commandLine.add("--principal=kudu/" + bindHost);
-        commandLine.add("--rpc_authentication=required");
-        commandLine.add("--superuser_acl=testuser");
-      }
-
-      commandLine.addAll(extraTserverFlags);
-
-      if (flagsPath.startsWith(baseDirPath)) {
-        // We made a temporary copy of the flags; delete them later.
-        pathsToDelete.add(flagsPath);
-      }
-      pathsToDelete.add(tsBaseDirPath);
-
-      tserverProcesses.put(rpcPort, configureAndStartProcess(rpcPort, commandLine));
-      commandLines.put(rpcPort, commandLine);
+    resp = sendRequestToCluster(
+        ControlShellRequestPB.newBuilder()
+        .setGetTservers(GetTServersRequestPB.newBuilder().build())
+        .build());
+    for (DaemonInfoPB info : resp.getGetTservers().getTserversList()) {
+      DaemonInfo d = new DaemonInfo();
+      d.id = info.getId();
+      d.isRunning = true;
+      tservers.put(hostAndPortFromPB(info.getBoundRpcAddress()), d);
     }
-    // Return next port number.
-    return ports.get(ports.size() - 1) + 1;
   }
 
   /**
-   * Starts a process using the provided command and configures it to be daemon,
-   * redirects the stderr to stdout, and starts a thread that will read from the process' input
-   * stream and redirect that to LOG.
-   * @param port RPC port used to identify the process
-   * @param command process and options
-   * @return The started process
-   * @throws Exception Exception if an error prevents us from starting the process,
-   * or if we were able to start the process but noticed that it was then killed (in which case
-   * we'll log the exit value).
+   * Restarts a master identified by hostname and port. The master must already be dead.
+   * @param hp unique hostname and port identifying the master
+   * @throws IOException if the master is believed to be alive
    */
-  private Process configureAndStartProcess(int port, List<String> command) throws Exception {
-    ProcessBuilder processBuilder = new ProcessBuilder(command);
-    processBuilder.redirectErrorStream(true);
-    if (miniKdc != null) {
-      processBuilder.environment().putAll(miniKdc.getEnvVars());
+  public void restartDeadMasterOnHostPort(HostAndPort hp) throws IOException {
+    DaemonInfo d = masters.get(hp);
+    if (d == null) {
+      throw new IOException(String.format("Master %s not found", hp));
     }
-    Process proc = processBuilder.start();
-    ProcessInputStreamLogPrinterRunnable printer =
-        new ProcessInputStreamLogPrinterRunnable(proc.getInputStream());
-    Thread thread = new Thread(printer);
-    thread.setDaemon(true);
-    thread.setName(Iterables.getLast(Splitter.on(File.separatorChar).split(command.get(0))) + ":" + port);
-    processInputPrinters.add(thread);
-    thread.start();
-
-    Thread.sleep(300);
-    try {
-      int ev = proc.exitValue();
-      throw new Exception(String.format(
-          "We tried starting a process (%s) but it exited with value=%s", command.get(0), ev));
-    } catch (IllegalThreadStateException ex) {
-      // This means the process is still alive, it's like reverse psychology.
-    }
-    return proc;
+    if (d.isRunning) {
+      throw new IOException(String.format("Master %s is already running", hp));
+    }
+    sendRequestToCluster(ControlShellRequestPB.newBuilder()
+        .setStartDaemon(StartDaemonRequestPB.newBuilder().setId(d.id).build())
+        .build());
+    d.isRunning = true;
   }
 
   /**
-   * Starts a previously killed master process on the specified port.
-   * @param port which port the master was listening on for RPCs
-   * @throws Exception
+   * Kills a master identified by hostname and port. Does nothing if the master
+   * was already dead.
+   * @param hp unique hostname and port identifying the master
+   * @throws IOException if something went wrong in transit
    */
-  public void restartDeadMasterOnPort(int port) throws Exception {
-    restartDeadProcessOnPort(port, masterProcesses);
+  public void killMasterOnHostPort(HostAndPort hp) throws IOException {
+    DaemonInfo d = masters.get(hp);
+    if (d == null) {
+      throw new IOException(String.format("Master %s not found", hp));
+    }
+    if (!d.isRunning) {
+      return;
+    }
+    sendRequestToCluster(ControlShellRequestPB.newBuilder()
+        .setStopDaemon(StopDaemonRequestPB.newBuilder().setId(d.id).build())
+        .build());
+    d.isRunning = false;
   }
 
   /**
-   * Restart any master processes which are not currently running.
+   * Restarts a tserver identified by hostname and port. The tserver must already be dead.
+   * @param hp unique hostname and port identifying the tserver
+   * @throws IOException if the tserver is believed to be alive
    */
-  public void restartDeadMasters() throws Exception {
-    for (HostAndPort hostAndPort : masterHostPorts) {
-      if (!masterProcesses.containsKey(hostAndPort.getPort())) {
-        restartDeadProcessOnPort(hostAndPort.getPort(), masterProcesses);
-      }
+  public void restartDeadTabletServerOnHostPort(HostAndPort hp) throws IOException {
+    DaemonInfo d = tservers.get(hp);
+    if (d == null) {
+      throw new IOException(String.format("Tserver %s not found", hp));
+    }
+    if (d.isRunning) {
+      throw new IOException(String.format("Tserver %s is already running", hp));
     }
+    sendRequestToCluster(ControlShellRequestPB.newBuilder()
+        .setStartDaemon(StartDaemonRequestPB.newBuilder().setId(d.id).build())
+        .build());
+    d.isRunning = true;
   }
 
-
   /**
-   * Starts a previously killed tablet server process on the specified port.
-   * @param port which port the TS was listening on for RPCs
-   * @throws Exception
+   * Kills a tserver identified by hostname and port. Does nothing if the tserver
+   * was already dead.
+   * @param hp unique hostname and port identifying the tserver
+   * @throws IOException if something went wrong in transit
    */
-  public void restartDeadTabletServerOnPort(int port) throws Exception {
-    restartDeadProcessOnPort(port, tserverProcesses);
-  }
-
-  private void restartDeadProcessOnPort(int port, Map<Integer, Process> map) throws Exception {
-    if (!commandLines.containsKey(port)) {
-      String message = "Cannot start process on unknown port " + port;
-      LOG.warn(message);
-      throw new RuntimeException(message);
+  public void killTabletServerOnHostPort(HostAndPort hp) throws IOException {
+    DaemonInfo d = tservers.get(hp);
+    if (d == null) {
+      throw new IOException(String.format("Tserver %s not found", hp));
     }
-
-    if (map.containsKey(port)) {
-      String message = "Process already exists on port " + port;
-      LOG.warn(message);
-      throw new RuntimeException(message);
+    if (!d.isRunning) {
+      return;
     }
-
-    map.put(port, configureAndStartProcess(port, commandLines.get(port)));
+    sendRequestToCluster(ControlShellRequestPB.newBuilder()
+        .setStopDaemon(StopDaemonRequestPB.newBuilder().setId(d.id).build())
+        .build());
+    d.isRunning = false;
   }
 
   /**
-   * Kills the TS listening on the provided port. Doesn't do anything if the TS was already killed.
-   * @param port port on which the tablet server is listening on
-   * @throws InterruptedException
+   * Kills all masters not already stopped.
+   * @throws IOException if something went wrong in transit
    */
-  public void killTabletServerOnPort(int port) throws InterruptedException {
-    Process ts = tserverProcesses.remove(port);
-    if (ts == null) {
-      // The TS is already dead, good.
-      return;
+  public void killMasters() throws IOException {
+    List<HostAndPort> toKill = Lists.newArrayList();
+    for (Map.Entry<HostAndPort, DaemonInfo> e : masters.entrySet()) {
+      if (e.getValue().isRunning) {
+        toKill.add(e.getKey());
+      }
+    }
+    for (HostAndPort hp : toKill) {
+      killMasterOnHostPort(hp);
     }
-    LOG.info("Killing server at port " + port);
-    terminateAndWait(ts);
   }
 
   /**
-   * Kills all tablet servers.
-   * @throws InterruptedException
+   * Starts all currently stopped masters.
+   * @throws IOException if something went wrong in transit
    */
-  public void killTabletServers() throws InterruptedException {
-    for (Process tserver : tserverProcesses.values()) {
-      terminateAndWait(tserver);
+  public void restartDeadMasters() throws IOException {
+    List<HostAndPort> toRestart = Lists.newArrayList();
+    for (Map.Entry<HostAndPort, DaemonInfo> e : masters.entrySet()) {
+      if (!e.getValue().isRunning) {
+        toRestart.add(e.getKey());
+      }
+    }
+    for (HostAndPort hp : toRestart) {
+      restartDeadMasterOnHostPort(hp);
     }
-    tserverProcesses.clear();
   }
 
   /**
-   * Restarts any tablet servers which were previously killed.
+   * Kills all tservers not already stopped.
+   * @throws IOException if something went wrong in transit
    */
-  public void restartDeadTabletServers() throws Exception {
-    for (int port : tserverPorts) {
-      if (tserverProcesses.containsKey(port)) continue;
-      restartDeadTabletServerOnPort(port);
+  public void killTservers() throws IOException {
+    List<HostAndPort> toKill = Lists.newArrayList();
+    for (Map.Entry<HostAndPort, DaemonInfo> e : tservers.entrySet()) {
+      if (e.getValue().isRunning) {
+        toKill.add(e.getKey());
+      }
+    }
+    for (HostAndPort hp : toKill) {
+      killTabletServerOnHostPort(hp);
     }
   }
 
   /**
-   * Kills the master listening on the provided port. Doesn't do anything if the master was
-   * already killed.
-   * @param port port on which the master is listening on
-   * @throws InterruptedException
+   * Starts all currently stopped tservers.
+   * @throws IOException if something went wrong in transit
    */
-  public void killMasterOnPort(int port) throws InterruptedException {
-    Process master = masterProcesses.remove(port);
-    if (master == null) {
-      // The master is already dead, good.
-      return;
+  public void restartDeadTservers() throws IOException {
+    List<HostAndPort> toRestart = Lists.newArrayList();
+    for (Map.Entry<HostAndPort, DaemonInfo> e : tservers.entrySet()) {
+      if (!e.getValue().isRunning) {
+        toRestart.add(e.getKey());
+      }
+    }
+    for (HostAndPort hp : toRestart) {
+      restartDeadTabletServerOnHostPort(hp);
     }
-    LOG.info("Killing master at port " + port);
-    terminateAndWait(master);
   }
 
   /** {@override} */
@@ -455,119 +362,64 @@ public class MiniKuduCluster implements AutoCloseable {
   }
 
   /**
-   * Stops all the test processes; deletes the folders used to store data; deletes the flag file.
+   * Shuts down a Kudu cluster.
    */
   public void shutdown() {
-    boolean wasInterrupted = false;
-    wasInterrupted |= terminateAndWait(masterProcesses);
-    wasInterrupted |= terminateAndWait(tserverProcesses);
-
-    // Whether we were interrupted or not above we still destroyed all the processes, so the input
-    // printers will hit EOFs and stop.
-    for (Thread thread : processInputPrinters) {
+    // Closing stdin should cause the control shell process to terminate.
+    if (miniClusterStdin != null) {
       try {
-        thread.join();
-      } catch (InterruptedException e) {
-        wasInterrupted = true;
-        // Need to continue cleaning up.
-        LOG.info("ignoring request to interrupt; waiting for input printer {} to exit", thread);
+        miniClusterStdin.close();
+      } catch (IOException e) {
+        LOG.info("Caught exception while closing minicluster stdin", e);
       }
     }
-    processInputPrinters.clear();
-
-    for (String path : pathsToDelete) {
+    if (miniClusterStdout != null) {
       try {
-        File f = new File(path);
-        if (f.isDirectory()) {
-          FileUtils.deleteDirectory(f);
-        } else {
-          f.delete();
-        }
-      } catch (Exception e) {
-        LOG.warn(String.format("Could not delete path %s", path), e);
+        miniClusterStdout.close();
+      } catch (IOException e) {
+        LOG.info("Caught exception while closing minicluster stdout", e);
       }
     }
-
-    if (miniKdc != null) {
+    if (miniClusterErrorPrinter != null) {
       try {
-        miniKdc.close();
-      } catch (IOException e) {
-        LOG.warn("Unable to close MiniKdc", e);
+        miniClusterErrorPrinter.join();
+      } catch (InterruptedException e) {
+        LOG.info("Caught exception while closing minicluster stderr", e);
       }
     }
-
-    if (wasInterrupted) {
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  private static void terminateAndWait(Process process) throws InterruptedException {
-    process.destroy();
-    process.waitFor();
-  }
-
-  /**
-   * Terminate and wait for exit of every process in the specified container.
-   *
-   * @param processes map of processes to terminate
-   * @return true if {@link InterruptedException} was received while waiting for processes'
-   *         termination, false otherwise
-   */
-  private static boolean terminateAndWait(Map<Integer, Process> processes) {
-    boolean wasInterrupted = false;
-    for (Process p : processes.values()) {
-      while (true) {
-        try {
-          terminateAndWait(p);
-          break;
-        } catch (InterruptedException e) {
-          wasInterrupted = true;
-          // Not being polite here: ignore the request to interrupt and continue cleaning up.
-          LOG.info("ignoring request to interrupt; waiting process {} to exit", p);
-        }
+    if (miniCluster != null) {
+      try {
+        miniCluster.waitFor(10, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        LOG.warn("Minicluster process did not exit after 10s, destroying");
+        miniCluster.destroyForcibly();
       }
     }
-    processes.clear();
-
-    return wasInterrupted;
   }
 
   /**
-   * Returns the comma-separated list of master addresses.
-   * @return master addresses
+   * @return comma-separated list of master addresses
    */
   public String getMasterAddresses() {
-    return masterAddresses;
+    return Joiner.on(',').join(masters.keySet());
   }
 
   /**
-   * Returns a list of master addresses.
-   * @return master addresses
+   * @return list of all masters, uniquely identified by hostname and port
    */
   public List<HostAndPort> getMasterHostPorts() {
-    return masterHostPorts;
+    return new ArrayList<>(masters.keySet());
   }
 
   /**
-   * Returns an unmodifiable map of all tablet servers in pairs of RPC port - > Process.
-   * @return an unmodifiable map of all tablet servers
+   * @return list of all tservers, uniquely identified by hostname and port
    */
-  @VisibleForTesting
-  Map<Integer, Process> getTabletServerProcesses() {
-    return Collections.unmodifiableMap(tserverProcesses);
+  public List<HostAndPort> getTserverHostPorts() {
+    return new ArrayList<>(tservers.keySet());
   }
 
   /**
-   * Returns an unmodifiable map of all masters in pairs of RPC port - > Process.
-   * @return an unmodifiable map of all masters
-   */
-  @VisibleForTesting
-  Map<Integer, Process> getMasterProcesses() {
-    return Collections.unmodifiableMap(masterProcesses);
-  }
-
-  /**
-   * Helper runnable that receives stdout and logs it along with the process' identifier.
+   * Helper runnable that receives stderr and logs it along with the process' identifier.
    */
   public static class ProcessInputStreamLogPrinterRunnable implements Runnable {
 
@@ -594,11 +446,21 @@ public class MiniKuduCluster implements AutoCloseable {
     }
   }
 
+  /**
+   * TODO(KUDU-2186): If used directly from {@link ProtobufHelper}, tests from
+   * other modules break when run by Gradle.
+   */
+  private static HostAndPort hostAndPortFromPB(HostPortPB hostPortPB) {
+    return HostAndPort.fromParts(hostPortPB.getHost(), hostPortPB.getPort());
+  }
+
+  /**
+   * Builder for {@link MiniKuduCluster}
+   */
   public static class MiniKuduClusterBuilder {
 
     private int numMasters = 1;
     private int numTservers = 3;
-    private int defaultTimeoutMs = 50000;
     private boolean enableKerberos = false;
     private final List<String> extraTserverFlags = new ArrayList<>();
     private final List<String> extraMasterFlags = new ArrayList<>();
@@ -614,17 +476,6 @@ public class MiniKuduCluster implements AutoCloseable {
     }
 
     /**
-     * Configures the internal client to use the given timeout for all operations. Also uses the
-     * timeout for tasks like waiting for tablet servers to check in with the master.
-     * @param defaultTimeoutMs timeout in milliseconds
-     * @return this instance
-     */
-    public MiniKuduClusterBuilder defaultTimeoutMs(int defaultTimeoutMs) {
-      this.defaultTimeoutMs = defaultTimeoutMs;
-      return this;
-    }
-
-    /**
      * Enables Kerberos on the mini cluster and acquire client credentials for this process.
      * @return this instance
      */
@@ -651,15 +502,19 @@ public class MiniKuduCluster implements AutoCloseable {
       return this;
     }
 
-    public MiniKuduCluster build() throws Exception {
+    /**
+     * Builds and starts a new {@link MiniKuduCluster} using builder state.
+     * @return the newly started {@link MiniKuduCluster}
+     * @throws IOException if something went wrong starting the cluster
+     */
+    public MiniKuduCluster build() throws IOException {
       MiniKuduCluster cluster =
-          new MiniKuduCluster(defaultTimeoutMs, extraTserverFlags, extraMasterFlags);
+          new MiniKuduCluster(enableKerberos,
+              numMasters, numTservers,
+              extraTserverFlags, extraMasterFlags);
       try {
-        if (enableKerberos) {
-          cluster.startKerberos();
-        }
-        cluster.start(numMasters, numTservers);
-      } catch (Exception e) {
+        cluster.start();
+      } catch (IOException e) {
         // MiniKuduCluster.close should not throw, so no need for a nested try/catch.
         cluster.close();
         throw e;

http://git-wip-us.apache.org/repos/asf/kudu/blob/8455f037/java/kudu-client/src/test/java/org/apache/kudu/client/TestClientFailoverSupport.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestClientFailoverSupport.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestClientFailoverSupport.java
index b1cb403..2eeb0cc 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestClientFailoverSupport.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestClientFailoverSupport.java
@@ -25,6 +25,8 @@ import org.apache.kudu.util.AssertHelpers.BooleanExpression;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.net.HostAndPort;
+
 public class TestClientFailoverSupport extends BaseKuduTest {
 
   @BeforeClass
@@ -75,8 +77,8 @@ public class TestClientFailoverSupport extends BaseKuduTest {
     for (int i = 0; i < TSERVER_LEADERS_TO_KILL; i++) {
       List<LocatedTablet> tablets = table.getTabletsLocations(DEFAULT_SLEEP);
       assertEquals(1, tablets.size());
-      final int leaderPort = findLeaderTabletServerPort(tablets.get(0));
-      miniCluster.killTabletServerOnPort(leaderPort);
+      HostAndPort hp = findLeaderTabletServerHostPort(tablets.get(0));
+      miniCluster.killTabletServerOnHostPort(hp);
     }
     killMasterLeader();
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/8455f037/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java
deleted file mode 100644
index 3b8c21d..0000000
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKdc.java
+++ /dev/null
@@ -1,79 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.kudu.client;
-
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import javax.security.auth.Subject;
-
-import org.junit.Test;
-
-import org.apache.kudu.util.SecurityUtil;
-
-public class TestMiniKdc {
-
-  @Test
-  public void testBasicFunctionality() throws Exception {
-    try (MiniKdc kdc = MiniKdc.withDefaults()) {
-      kdc.start();
-
-      kdc.createUserPrincipal("alice");
-      kdc.kinit("alice");
-
-      kdc.stop();
-      kdc.start();
-
-      kdc.createUserPrincipal("bob");
-      kdc.kinit("bob");
-
-      kdc.createServiceKeytab("kudu/KRBTEST.COM");
-
-      String klist = kdc.klist();
-
-      assertFalse(klist.contains("alice@KRBTEST.COM"));
-      assertTrue(klist.contains("bob@KRBTEST.COM"));
-      assertTrue(klist.contains("krbtgt/KRBTEST.COM@KRBTEST.COM"));
-    }
-  }
-
-  /**
-   * Test that we can initialize a JAAS Subject from a user-provided TicketCache.
-   */
-  @Test
-  public void testGetKerberosSubject() throws Exception {
-    try (MiniKdc kdc = MiniKdc.withDefaults()) {
-      kdc.start();
-      kdc.createUserPrincipal("alice");
-      kdc.kinit("alice");
-      // Typically this would be picked up from the $KRB5CCNAME environment
-      // variable, or use a default. However, it's not easy to modify the
-      // environment in Java, so instead we override a system property.
-      System.setProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY, kdc.getTicketCachePath());
-      Subject subj = SecurityUtil.getSubjectOrLogin();
-      assertThat(subj.toString(), containsString("alice"));
-    }
-  }
-
-  @Test
-  public void testStopClose() throws Exception {
-    // Test that closing a stopped KDC does not throw.
-    MiniKdc.withDefaults().close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/kudu/blob/8455f037/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
index d7ed783..6f7640e 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java
@@ -20,80 +20,81 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.net.Socket;
 
+import org.apache.kudu.client.KuduClient.KuduClientBuilder;
 import org.junit.Test;
 
+import com.google.common.net.HostAndPort;
+
 public class TestMiniKuduCluster {
 
   private static final int NUM_TABLET_SERVERS = 3;
-  private static final int DEFAULT_NUM_MASTERS = 1;
+  private static final int NUM_MASTERS = 1;
   private static final long SLEEP_TIME_MS = 10000;
 
   @Test(timeout = 50000)
   public void test() throws Exception {
     try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
-                                                      .numMasters(DEFAULT_NUM_MASTERS)
+                                                      .numMasters(NUM_MASTERS)
                                                       .numTservers(NUM_TABLET_SERVERS)
                                                       .build()) {
-      assertTrue(cluster.waitForTabletServers(NUM_TABLET_SERVERS));
-      assertEquals(DEFAULT_NUM_MASTERS, cluster.getMasterProcesses().size());
-      assertEquals(NUM_TABLET_SERVERS, cluster.getTabletServerProcesses().size());
+      assertEquals(NUM_MASTERS, cluster.getMasterHostPorts().size());
+      assertEquals(NUM_TABLET_SERVERS, cluster.getTserverHostPorts().size());
 
       {
         // Kill the master.
-        int masterPort = cluster.getMasterProcesses().keySet().iterator().next();
-        testPort(masterPort, true);
-        cluster.killMasterOnPort(masterPort);
+        HostAndPort masterHostPort = cluster.getMasterHostPorts().get(0);
+        testHostPort(masterHostPort, true);
+        cluster.killMasterOnHostPort(masterHostPort);
 
-        testPort(masterPort, false);
+        testHostPort(masterHostPort, false);
 
         // Restart the master.
-        cluster.restartDeadMasterOnPort(masterPort);
+        cluster.restartDeadMasterOnHostPort(masterHostPort);
 
         // Test we can reach it.
-        testPort(masterPort, true);
+        testHostPort(masterHostPort, true);
       }
 
       {
         // Kill the first TS.
-        int tsPort = cluster.getTabletServerProcesses().keySet().iterator().next();
-        testPort(tsPort, true);
-        cluster.killTabletServerOnPort(tsPort);
+        HostAndPort tsHostPort = cluster.getTserverHostPorts().get(0);
+        testHostPort(tsHostPort, true);
+        cluster.killTabletServerOnHostPort(tsHostPort);
 
-        testPort(tsPort, false);
+        testHostPort(tsHostPort, false);
 
         // Restart it.
-        cluster.restartDeadTabletServerOnPort(tsPort);
+        cluster.restartDeadTabletServerOnHostPort(tsHostPort);
 
-        testPort(tsPort, true);
+        testHostPort(tsHostPort, true);
       }
-
-      assertEquals(DEFAULT_NUM_MASTERS, cluster.getMasterProcesses().size());
-      assertEquals(NUM_TABLET_SERVERS, cluster.getTabletServerProcesses().size());
     }
   }
 
   @Test(timeout = 50000)
   public void testKerberos() throws Exception {
     try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
-                                                      .numMasters(DEFAULT_NUM_MASTERS)
+                                                      .numMasters(NUM_MASTERS)
                                                       .numTservers(NUM_TABLET_SERVERS)
                                                       .enableKerberos()
                                                       .build()) {
-      assertTrue(cluster.waitForTabletServers(NUM_TABLET_SERVERS));
+      KuduClient client = new KuduClientBuilder(cluster.getMasterAddresses()).build();
+      ListTablesResponse resp = client.getTablesList();
+      assertTrue(resp.getTablesList().isEmpty());
     }
   }
 
   /**
-   * Test whether the specified port is open or closed, waiting up to a certain time.
-   * @param port the port to test
+   * Test whether the specified host and port is open or closed, waiting up to a certain time.
+   * @param hp the host and port to test
    * @param testIsOpen true if we should want it to be open, false if we want it closed
    */
-  private static void testPort(int port,
-                               boolean testIsOpen) throws InterruptedException {
+  private static void testHostPort(HostAndPort hp,
+      boolean testIsOpen) throws InterruptedException {
     DeadlineTracker tracker = new DeadlineTracker();
     while (tracker.getElapsedMillis() < SLEEP_TIME_MS) {
       try {
-        Socket socket = new Socket(TestUtils.getUniqueLocalhost(), port);
+        Socket socket = new Socket(hp.getHost(), hp.getPort());
         socket.close();
         if (testIsOpen) {
           return;
@@ -105,6 +106,6 @@ public class TestMiniKuduCluster {
       }
       Thread.sleep(200);
     }
-    fail("Port " + port + " is still " + (testIsOpen ? "closed " : "open"));
+    fail("HostAndPort " + hp + " is still " + (testIsOpen ? "closed " : "open"));
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/8455f037/java/kudu-client/src/test/java/org/apache/kudu/client/TestMultipleLeaderFailover.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMultipleLeaderFailover.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMultipleLeaderFailover.java
index 526fdb3..8e8ebae 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMultipleLeaderFailover.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMultipleLeaderFailover.java
@@ -25,6 +25,8 @@ import org.apache.kudu.util.AssertHelpers.BooleanExpression;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.net.HostAndPort;
+
 public class TestMultipleLeaderFailover extends BaseKuduTest {
 
   @BeforeClass
@@ -74,8 +76,8 @@ public class TestMultipleLeaderFailover extends BaseKuduTest {
     for (int i = 0; i < NUM_ITERATIONS; i++) {
       List<LocatedTablet> tablets = table.getTabletsLocations(DEFAULT_SLEEP);
       assertEquals(1, tablets.size());
-      int leaderPort = findLeaderTabletServerPort(tablets.get(0));
-      miniCluster.killTabletServerOnPort(leaderPort);
+      HostAndPort hp = findLeaderTabletServerHostPort(tablets.get(0));
+      miniCluster.killTabletServerOnHostPort(hp);
 
       for (int j = 0; j < ROWS_PER_ITERATION; j++) {
         OperationResponse resp = session.apply(createBasicSchemaInsert(table, currentRows));
@@ -85,7 +87,7 @@ public class TestMultipleLeaderFailover extends BaseKuduTest {
         currentRows++;
       }
 
-      miniCluster.restartDeadTabletServerOnPort(leaderPort);
+      miniCluster.restartDeadTabletServerOnHostPort(hp);
       // Read your writes hasn't been enabled, so we need to use a helper function to poll.
       waitUntilRowCount(table, currentRows, DEFAULT_SLEEP);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/8455f037/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
index ec197f9..751cbd7 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
@@ -18,34 +18,17 @@ package org.apache.kudu.client;
 
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
 import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.net.DatagramSocket;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.SocketAddress;
-import java.net.SocketException;
 import java.net.URL;
 import java.net.URLDecoder;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
-import java.util.List;
 import java.util.Set;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
-import com.sun.security.auth.module.UnixSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import sun.management.VMManagement;
 
 import org.apache.kudu.Common;
 import org.apache.kudu.consensus.Metadata;
@@ -64,33 +47,6 @@ public class TestUtils {
   private static final String BIN_DIR_PROP = "binDir";
 
   /**
-   * @return the path of the flags file to pass to daemon processes
-   * started by the tests
-   */
-  public static String getFlagsPath() {
-    URL u = BaseKuduTest.class.getResource("/flags");
-    if (u == null) {
-      throw new RuntimeException("Unable to find 'flags' file");
-    }
-    if (u.getProtocol() == "file") {
-      return urlToPath(u);
-    }
-    // If the flags are inside a JAR, extract them into our temporary
-    // test directory.
-    try {
-      // Somewhat unintuitively, createTempFile() actually creates the file,
-      // not just the path, so we have to use REPLACE_EXISTING below.
-      Path tmpFile = Files.createTempFile(
-          Paths.get(getBaseDir()), "kudu-flags", ".flags");
-      Files.copy(BaseKuduTest.class.getResourceAsStream("/flags"), tmpFile,
-          StandardCopyOption.REPLACE_EXISTING);
-      return tmpFile.toAbsolutePath().toString();
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to extract flags file into tmp", e);
-    }
-  }
-
-  /**
    * Return the path portion of a file URL, after decoding the escaped
    * components. This fixes issues when trying to build within a
    * working directory with special characters.
@@ -161,63 +117,6 @@ public class TestUtils {
   }
 
   /**
-   * @return the base directory within which we will store server data
-   */
-  public static String getBaseDir() {
-    String s = System.getenv("TEST_TMPDIR");
-    if (s == null) {
-      s = String.format("/tmp/kudutest-%d", new UnixSystem().getUid());
-    }
-    File f = new File(s);
-    f.mkdirs();
-    return f.getAbsolutePath();
-  }
-
-  /**
-   * Finds the next free port, starting with the one passed. Keep in mind the
-   * time-of-check-time-of-use nature of this method, the returned port might become occupied
-   * after it was checked for availability.
-   * @param startPort first port to be probed
-   * @return a currently usable port
-   * @throws IOException IOE is thrown if we can't close a socket we tried to open or if we run
-   * out of ports to try
-   */
-  public static int findFreePort(int startPort) throws IOException {
-    ServerSocket ss;
-    for(int i = startPort; i < 65536; i++) {
-      try {
-        ss = new ServerSocket();
-        SocketAddress address = new InetSocketAddress(getUniqueLocalhost(), i);
-        ss.bind(address);
-      } catch (IOException e) {
-        continue;
-      }
-      ss.close();
-      return i;
-    }
-    throw new IOException("Ran out of ports.");
-  }
-
-  /**
-   * Finds a specified number of parts, starting with one passed. Keep in mind the
-   * time-of-check-time-of-use nature of this method.
-   * @see {@link #findFreePort(int)}
-   * @param startPort First port to be probed.
-   * @param numPorts Number of ports to reserve.
-   * @return A list of currently usable ports.
-   * @throws IOException IOE Is thrown if we can't close a socket we tried to open or if run
-   * out of ports to try.
-   */
-  public static List<Integer> findFreePorts(int startPort, int numPorts) throws IOException {
-    List<Integer> ports = Lists.newArrayListWithCapacity(numPorts);
-    for (int i = 0; i < numPorts; i++) {
-      startPort = findFreePort(startPort);
-      ports.add(startPort++);
-    }
-    return ports;
-  }
-
-  /**
    * Gets the pid of a specified process. Relies on reflection and only works on
    * UNIX process, not guaranteed to work on JDKs other than Oracle and OpenJDK.
    * @param proc The specified process.
@@ -278,45 +177,6 @@ public class TestUtils {
   }
 
   /**
-   * This is used to generate unique loopback IPs for parallel test running.
-   * @return the local PID of this process
-   */
-  static int getPid() {
-    try {
-      RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
-      java.lang.reflect.Field jvm = runtime.getClass().getDeclaredField("jvm");
-      jvm.setAccessible(true);
-      VMManagement mgmt = (VMManagement)jvm.get(runtime);
-      Method pid_method = mgmt.getClass().getDeclaredMethod("getProcessId");
-      pid_method.setAccessible(true);
-
-      return (Integer)pid_method.invoke(mgmt);
-    } catch (Exception e) {
-      LOG.warn("Cannot get PID", e);
-      return 1;
-    }
-  }
-
-  /**
-   * The generated IP is based on pid, so this requires that the parallel tests
-   * run in separate VMs.
-   *
-   * On OSX, the above trick doesn't work, so we can't run parallel tests on OSX.
-   * Given that, we just return the normal localhost IP.
-   *
-   * @return a unique loopback IP address for this PID. This allows running
-   * tests in parallel, since 127.0.0.0/8 all act as loopbacks on Linux.
-   */
-  static String getUniqueLocalhost() {
-    if ("Mac OS X".equals(System.getProperty("os.name"))) {
-      return "127.0.0.1";
-    }
-
-    int pid = getPid();
-    return "127." + ((pid & 0xff00) >> 8) + "." + (pid & 0xff) + ".1";
-  }
-
-  /**
    * Get a PartitionPB with empty start and end keys.
    * @return a fake partition
    */

http://git-wip-us.apache.org/repos/asf/kudu/blob/8455f037/java/kudu-client/src/test/resources/flags
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/resources/flags b/java/kudu-client/src/test/resources/flags
deleted file mode 100644
index b72ff55..0000000
--- a/java/kudu-client/src/test/resources/flags
+++ /dev/null
@@ -1,5 +0,0 @@
---logtostderr
---never_fsync
---unlock_experimental_flags
---unlock_unsafe_flags
---log_preallocate_segments=false
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/8455f037/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
index 6a0c08d..9ce0991 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
@@ -75,7 +75,6 @@ trait TestContext extends BeforeAndAfterAll { self: Suite =>
     ss = SparkSession.builder().config(conf).getOrCreate()
 
     kuduClient = new KuduClientBuilder(miniCluster.getMasterAddresses).build()
-    assert(miniCluster.waitForTabletServers(1))
 
     kuduContext = new KuduContext(miniCluster.getMasterAddresses, ss.sparkContext)