You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/09/01 19:38:59 UTC

kudu git commit: KUDU-1976. java: use ephemeral ports for MiniKDC

Repository: kudu
Updated Branches:
  refs/heads/master fa953d05e -> f09bd015f


KUDU-1976. java: use ephemeral ports for MiniKDC

This fixes an issue where Java tests would occasionally fail due to failure to
kinit as a principal just after adding it. We initially interpreted this as
a race, but it turns out to be an issue with concurrent test execution on
the same slave:

- We were using Testutils.findFreePort() to look for an open port. This was
  in turn calling TestUtils.getUniqueLocalHost() when it was testing binding
  to ports. Because it used the "unique localhost", it would always successfully
  bind on the first port it tried, since another concurrently-running test
  would not have used that port.

- Meanwhile, when we actually started the KDC, we bound it to 0.0.0.0:<port>
  rather than the unique loopback. This is because the KDC does not actually
  support specifying a bind-address.

- Even worse, the KDC would successfully start up in this case and only log
  a warning that it had failed to bind to the requested port. So, we'd merrily
  continue on our way with a lame-duck KDC.

The tests would often pass despite this, since we'd create our KDC, add a user
to it, and then connect to the _other_ test's KDC. That KDC would likely also
have the appropriate user "testuser". However there was a small window where
the other KDC would have started already but not yet created the "testuser"
user, in which case the test would fail.

This patch fixes the issue by following the same approach as the C++ tests:
we start the KDC on an ephemeral port and then use lsof to figure out which
port it picked.

Change-Id: I1e75d89616e6dbca6e8687eeb23f807c3c7eb952
Reviewed-on: http://gerrit.cloudera.org:8080/7850
Reviewed-by: Dan Burkert <da...@apache.org>
Tested-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/f09bd015
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/f09bd015
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/f09bd015

Branch: refs/heads/master
Commit: f09bd015f142c0b254d6e274088b64897ce4ed9e
Parents: fa953d0
Author: Todd Lipcon <to...@apache.org>
Authored: Fri Aug 25 18:04:51 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Fri Sep 1 19:38:07 2017 +0000

----------------------------------------------------------------------
 .../java/org/apache/kudu/client/MiniKdc.java    | 115 ++++++++++++++-----
 .../java/org/apache/kudu/client/TestUtils.java  |  24 ----
 2 files changed, 87 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/f09bd015/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
index 75e927c..d625c30 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKdc.java
@@ -22,17 +22,22 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.lang.reflect.Field;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.CharStreams;
@@ -55,9 +60,19 @@ import org.slf4j.LoggerFactory;
 @InterfaceAudience.Private
 @NotThreadSafe
 public class MiniKdc implements Closeable {
-
-  // The KDC port will be assigned starting from this value.
-  private static final int PORT_START = 64530;
+  /**
+   * The '-Ffn' flag gets lsof to output something like:
+   *   p19730
+   *   f123
+   *   n*:41254
+   * The first line is the pid. We ignore it.
+   * The second line is the file descriptor number. We ignore it.
+   * The third line has the bind address and port.
+   *
+   * This regex just looks for the third line.
+   */
+  private static final Pattern LSOF_PATTERN = Pattern.compile(
+      "^n\\*:([0-9]+)$", Pattern.MULTILINE);
 
   private static final Logger LOG = LoggerFactory.getLogger(MiniKuduCluster.class);
 
@@ -68,18 +83,22 @@ public class MiniKdc implements Closeable {
   private Thread kdcProcessLogRedirector;
 
   /**
+   * The ephemeral port that the KDC is bound to.
+   *
+   * This will be 0 if the KDC has not yet started.
+   */
+  private int kdcPort = 0;
+
+  /**
    * Options for the MiniKdc.
    */
   public static class Options {
     private final String realm;
     private final Path dataRoot;
-    private final int port;
 
-    public Options(String realm, Path dataRoot, int port) {
-      Preconditions.checkArgument(port > 0);
+    public Options(String realm, Path dataRoot) {
       this.realm = realm;
       this.dataRoot = dataRoot;
-      this.port = port;
     }
 
     public String getRealm() {
@@ -90,17 +109,12 @@ public class MiniKdc implements Closeable {
       return dataRoot;
     }
 
-    public int getPort() {
-      return port;
-    }
-
     /** {@inheritDoc} */
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(this)
                         .add("realm", realm)
                         .add("dataRoot", dataRoot)
-                        .add("port", port)
                         .toString();
     }
   }
@@ -116,16 +130,14 @@ public class MiniKdc implements Closeable {
    * Creates a MiniKdc with default options.
    */
   public static MiniKdc withDefaults() throws IOException {
-    return new MiniKdc(
-        new Options("KRBTEST.COM",
-                    Paths.get(TestUtils.getBaseDir(), "krb5kdc-" + System.currentTimeMillis()),
-                    TestUtils.findFreeUdpPort(PORT_START)));
+    return new MiniKdc(new Options(
+        "KRBTEST.COM",
+        Paths.get(TestUtils.getBaseDir(), "krb5kdc-" + System.currentTimeMillis())));
   }
-
   /**
    * Start the MiniKdc.
    */
-  public void start() throws IOException {
+  public void start() throws IOException, InterruptedException {
     Preconditions.checkState(kdcProcess == null);
     LOG.debug("starting KDC {}", options);
 
@@ -152,20 +164,18 @@ public class MiniKdc implements Closeable {
 
     kdcProcess = startProcessWithKrbEnv(getBinaryPath("krb5kdc"),
                                         "-n"); // Do not daemonize.
-
     // Redirect the KDC output to SLF4J.
     kdcProcessLogRedirector = new Thread(
         new MiniKuduCluster.ProcessInputStreamLogPrinterRunnable(kdcProcess.getInputStream()),
-        "krb5kdc:" + options.port);
+        "krb5kdc:" + options.dataRoot);
     kdcProcessLogRedirector.setDaemon(true);
     kdcProcessLogRedirector.start();
 
-    // The C++ MiniKdc defaults to binding the KDC to an ephemeral port, which
-    // it then finds using lsof at this point. Java is unable to do that since
-    // the Process API does not expose the subprocess PID. As a result, this
-    // MiniKdc doesn't support binding to an ephemeral port, and we use the
-    // race-prone TestUtils.findFreePort instead. The upside is that we
-    // don't have to rewrite the config files.
+    // The first time we start a KDC, we use an ephemeral port. Figure out what port
+    // the KDC actually bound to, and rewrite the configuration to refer to it.
+    kdcPort = waitForBoundUdpPort(kdcProcess);
+    createKdcConf();
+    createKrb5Conf();
   }
 
   /**
@@ -250,7 +260,7 @@ public class MiniKdc implements Closeable {
 
         "[realms]",
         options.realm + " = {",
-        "   kdc = 127.0.0.1:" + options.port,
+        "   kdc = 127.0.0.1:" + kdcPort,
         "}");
 
     Files.write(options.dataRoot.resolve("krb5.conf"), contents, Charsets.UTF_8);
@@ -259,7 +269,7 @@ public class MiniKdc implements Closeable {
   private void createKdcConf() throws IOException {
     List<String> contents = ImmutableList.of(
         "[kdcdefaults]",
-        "   kdc_ports = " + options.port,
+        "   kdc_ports = " + kdcPort,
         "   kdc_tcp_ports = \"\"",
 
         "[realms]",
@@ -371,6 +381,55 @@ public class MiniKdc implements Closeable {
     }
   }
 
+  /**
+   * Return the process identifier (pid) of a running Process.
+   *
+   * This is Unix-specific, but since MiniKDC only supports Unix, not a big deal.
+   */
+  private static int getProcessPid(Process p) {
+    try {
+      Field pidField = p.getClass().getDeclaredField("pid");
+      pidField.setAccessible(true);
+      return (int)pidField.get(p);
+    } catch (IllegalAccessException | NoSuchFieldException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Wait for the given process to bind to a UDP port, and then return that port.
+   * This depends on 'lsof' being installed on the system.
+   *
+   * The provided Process must already be started.
+   */
+  private static int waitForBoundUdpPort(Process p) throws IOException, InterruptedException {
+    int pid = getProcessPid(p);
+    String lsof = getBinaryPath("lsof", ImmutableList.of("/sbin", "/usr/sbin"));
+    List<String> cmd = ImmutableList.of(
+        lsof, "-wbnP", "-Ffn",
+        "-p", "" + pid,
+        "-a", "-i", "4UDP");
+
+    Stopwatch sw = Stopwatch.createStarted();
+    for (int i = 0; ; i++) {
+      Process proc = new ProcessBuilder().command(cmd).start();
+      try {
+        checkReturnCode(proc, "lsof", false);
+        String lsofOutput = CharStreams.toString(new InputStreamReader(proc.getInputStream()));
+        Matcher m = LSOF_PATTERN.matcher(lsofOutput);
+        if (m.find()) {
+          return Integer.parseInt(m.group(1));
+        }
+      } catch (IOException ioe) {
+        // If lsof fails and we have already used up our timeout, re-throw.
+        if (sw.elapsed(TimeUnit.MILLISECONDS) > 5000) {
+          throw ioe;
+        }
+        Thread.sleep(10 * i);
+      }
+    }
+  }
+
   private static String getBinaryPath(String executable) throws IOException {
     return getBinaryPath(executable, KRB5_BINARY_PATHS);
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/f09bd015/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
index e3e0622..ec197f9 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestUtils.java
@@ -199,30 +199,6 @@ public class TestUtils {
   }
 
   /**
-   * Finds the next free UDP port, starting with the one passed. Keep in mind the
-   * time-of-check-time-of-use nature of this method, the returned port might become occupied
-   * after it was checked for availability.
-   * @param startPort first port to be probed
-   * @return a currently usable port
-   * @throws IOException IOE is thrown if we can't close a socket we tried to open or if we run
-   * out of ports to try
-   */
-  public static int findFreeUdpPort(int startPort) throws IOException {
-    DatagramSocket ds;
-    for (int i = startPort; i < 65536; i++) {
-      try {
-        SocketAddress address = new InetSocketAddress(getUniqueLocalhost(), i);
-        ds = new DatagramSocket(address);
-      } catch (SocketException e) {
-        continue;
-      }
-      ds.close();
-      return i;
-    }
-    throw new IOException("Ran out of ports");
-  }
-
-  /**
    * Finds a specified number of parts, starting with one passed. Keep in mind the
    * time-of-check-time-of-use nature of this method.
    * @see {@link #findFreePort(int)}