You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2016/10/04 16:25:36 UTC

[09/16] twill git commit: (TWILL-173) Have EmbeddedKafkaServer restart multiple times on bind failure

(TWILL-173) Have EmbeddedKafkaServer restart multiple times on bind failure

- Due to potential race condition between random port generation vs actual binding, there is a possibility that the binding would fail.

Signed-off-by: Terence Yim <ch...@apache.org>

This closes #9 on Github


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/93c523d0
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/93c523d0
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/93c523d0

Branch: refs/heads/site
Commit: 93c523d022cfd021d5ad7f1cffa87b460cd5cd1b
Parents: 2d617d2
Author: Terence Yim <ch...@apache.org>
Authored: Wed Aug 31 14:38:19 2016 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Thu Sep 1 10:08:38 2016 -0700

----------------------------------------------------------------------
 .../internal/kafka/EmbeddedKafkaServer.java     | 42 +++++++++++++++++---
 .../appmaster/ApplicationMasterMain.java        |  6 ---
 .../org/apache/twill/yarn/LogLevelTestRun.java  |  2 +-
 3 files changed, 37 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/93c523d0/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
index cd86dcb..be6121d 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
@@ -17,16 +17,21 @@
  */
 package org.apache.twill.internal.kafka;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.AbstractIdleService;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import kafka.utils.Time;
 import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.twill.internal.utils.Networks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.BindException;
 import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A {@link com.google.common.util.concurrent.Service} implementation for running an instance of Kafka server in
@@ -34,25 +39,27 @@ import java.util.Properties;
  */
 public final class EmbeddedKafkaServer extends AbstractIdleService {
 
-  public static final String START_TIMEOUT_RETRIES = "twill.kafka.start.timeout.retries";
+  public static final String START_RETRIES = "twill.kafka.start.timeout.retries";
 
   private static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaServer.class);
-  private static final String DEFAULT_START_TIMEOUT_RETRIES = "5";
+  private static final String DEFAULT_START_RETRIES = "5";
 
   private final int startTimeoutRetries;
-  private final KafkaConfig kafkaConfig;
+  private final Properties properties;
   private KafkaServer server;
 
   public EmbeddedKafkaServer(Properties properties) {
-    this.startTimeoutRetries = Integer.parseInt(properties.getProperty(START_TIMEOUT_RETRIES,
-                                                                       DEFAULT_START_TIMEOUT_RETRIES));
-    this.kafkaConfig = new KafkaConfig(properties);
+    this.startTimeoutRetries = Integer.parseInt(properties.getProperty(START_RETRIES,
+                                                                       DEFAULT_START_RETRIES));
+    this.properties = new Properties();
+    this.properties.putAll(properties);
   }
 
   @Override
   protected void startUp() throws Exception {
     int tries = 0;
     do {
+      KafkaConfig kafkaConfig = createKafkaConfig(properties);
       KafkaServer kafkaServer = createKafkaServer(kafkaConfig);
       try {
         kafkaServer.startup();
@@ -65,9 +72,14 @@ public final class EmbeddedKafkaServer extends AbstractIdleService {
         if (rootCause instanceof ZkTimeoutException) {
           // Potentially caused by race condition bug described in TWILL-139.
           LOG.warn("Timeout when connecting to ZooKeeper from KafkaServer. Attempt number {}.", tries, rootCause);
+        } else if (rootCause instanceof BindException) {
+          LOG.warn("Kafka failed to bind to port {}. Attempt number {}.", kafkaConfig.port(), tries, rootCause);
         } else {
           throw e;
         }
+
+        // Do a random sleep of < 200ms
+        TimeUnit.MILLISECONDS.sleep(new Random().nextInt(200) + 1L);
       }
     } while (server == null && ++tries < startTimeoutRetries);
 
@@ -107,4 +119,22 @@ public final class EmbeddedKafkaServer extends AbstractIdleService {
       }
     });
   }
+
+  /**
+   * Creates a new {@link KafkaConfig} from the given {@link Properties}. If the {@code "port"} property is missing
+   * or is equals to {@code "0"}, a random port will be generated.
+   */
+  private KafkaConfig createKafkaConfig(Properties properties) {
+    Properties prop = new Properties();
+    prop.putAll(properties);
+
+    String port = prop.getProperty("port");
+    if (port == null || "0".equals(port)) {
+      int randomPort = Networks.getRandomPort();
+      Preconditions.checkState(randomPort > 0, "Failed to get random port.");
+      prop.setProperty("port", Integer.toString(randomPort));
+    }
+
+    return new KafkaConfig(prop);
+  }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/93c523d0/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 38a2463..e708fb8 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -17,7 +17,6 @@
  */
 package org.apache.twill.internal.appmaster;
 
-import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.Futures;
 import org.apache.hadoop.conf.Configuration;
@@ -30,7 +29,6 @@ import org.apache.twill.internal.RunIds;
 import org.apache.twill.internal.ServiceMain;
 import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
 import org.apache.twill.internal.logging.Loggings;
-import org.apache.twill.internal.utils.Networks;
 import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory;
 import org.apache.twill.internal.yarn.YarnAMClient;
 import org.apache.twill.zookeeper.OperationFuture;
@@ -168,12 +166,8 @@ public final class ApplicationMasterMain extends ServiceMain {
     }
 
     private Properties generateKafkaConfig(String kafkaZKConnect) {
-      int port = Networks.getRandomPort();
-      Preconditions.checkState(port > 0, "Failed to get random port.");
-
       Properties prop = new Properties();
       prop.setProperty("log.dir", new File("kafka-logs").getAbsolutePath());
-      prop.setProperty("port", Integer.toString(port));
       prop.setProperty("broker.id", "1");
       prop.setProperty("socket.send.buffer.bytes", "1048576");
       prop.setProperty("socket.receive.buffer.bytes", "1048576");

http://git-wip-us.apache.org/repos/asf/twill/blob/93c523d0/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java
index 7990a5b..e4f789a 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java
@@ -112,7 +112,7 @@ public class LogLevelTestRun extends BaseYarnTest {
     }, Threads.SAME_THREAD_EXECUTOR);
     Assert.assertTrue(running.await(200, TimeUnit.SECONDS));
 
-    LogEntry.Level logLevel = waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(), 5L,
+    LogEntry.Level logLevel = waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(), 30L,
                                               TimeUnit.SECONDS);
 
     // Verify we got DEBUG log level.