You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/06/24 23:30:34 UTC

incubator-twill git commit: (TWILL-139) Added workaround to retry starting Kafka multiple times

Repository: incubator-twill
Updated Branches:
  refs/heads/feature/TWILL-139 [created] b9b08999f


(TWILL-139) Added workaround to retry starting Kafka multiple times

- In case race condition happened, the startup will fail with a ZkTimeoutException
- Retry upon catching the timeout exception

Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/b9b08999
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/b9b08999
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/b9b08999

Branch: refs/heads/feature/TWILL-139
Commit: b9b08999fda30f239f8121efe67badfea3298edd
Parents: d864ed1
Author: Terence Yim <ch...@apache.org>
Authored: Wed Jun 24 14:30:28 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Wed Jun 24 14:30:28 2015 -0700

----------------------------------------------------------------------
 .../internal/kafka/EmbeddedKafkaServer.java     | 68 ++++++++++++++++++--
 .../appmaster/ApplicationMasterMain.java        |  8 ++-
 2 files changed, 69 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b9b08999/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
index 0beab44..5095604 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
@@ -17,9 +17,14 @@
  */
 package org.apache.twill.internal.kafka;
 
+import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.AbstractIdleService;
 import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
+import kafka.server.KafkaServer;
+import kafka.utils.Time;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Properties;
 
@@ -29,20 +34,71 @@ import java.util.Properties;
  */
 public final class EmbeddedKafkaServer extends AbstractIdleService {
 
-  private final KafkaServerStartable server;
+  public static final String START_TIMEOUT_RETRIES = "twill.kafka.start.timeout.retries";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaServer.class);
+  private static final String DEFAULT_START_TIMEOUT_RETRIES = "5";
+
+  private final int startTimeoutRetries;
+  private final KafkaConfig kafkaConfig;
+  private KafkaServer server;
 
   public EmbeddedKafkaServer(Properties properties) {
-    server = new KafkaServerStartable(new KafkaConfig(properties));
+    this.startTimeoutRetries = Integer.parseInt(properties.getProperty(START_TIMEOUT_RETRIES,
+                                                                       DEFAULT_START_TIMEOUT_RETRIES));
+    this.kafkaConfig = new KafkaConfig(properties);
   }
 
   @Override
   protected void startUp() throws Exception {
-    server.startup();
+    int tries = 0;
+    do {
+      KafkaServer kafkaServer = createKafkaServer(kafkaConfig);
+      try {
+        kafkaServer.startup();
+        server = kafkaServer;
+      } catch (Exception e) {
+        kafkaServer.shutdown();
+        kafkaServer.awaitShutdown();
+
+        Throwable rootCause = Throwables.getRootCause(e);
+        if (rootCause instanceof ZkTimeoutException) {
+          // Potentially caused by race condition bug described in TWILL-139.
+          LOG.warn("Timeout when connecting to ZooKeeper from KafkaServer. Attempt number {}.", tries, rootCause);
+        }
+      }
+    } while (server == null && ++tries < startTimeoutRetries);
   }
 
   @Override
   protected void shutDown() throws Exception {
-    server.shutdown();
-    server.awaitShutdown();
+    if (server != null) {
+      server.shutdown();
+      server.awaitShutdown();
+    }
+  }
+
+  private KafkaServer createKafkaServer(KafkaConfig kafkaConfig) {
+    return new KafkaServer(kafkaConfig, new Time() {
+
+      @Override
+      public long milliseconds() {
+        return System.currentTimeMillis();
+      }
+
+      @Override
+      public long nanoseconds() {
+        return System.nanoTime();
+      }
+
+      @Override
+      public void sleep(long ms) {
+        try {
+          Thread.sleep(ms);
+        } catch (InterruptedException e) {
+          Thread.interrupted();
+        }
+      }
+    });
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b9b08999/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 561a68b..3e8cb93 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -179,7 +179,13 @@ public final class ApplicationMasterMain extends ServiceMain {
       prop.setProperty("log.flush.interval.ms", "1000");
       prop.setProperty("log.segment.bytes", "536870912");
       prop.setProperty("zookeeper.connect", kafkaZKConnect);
-      prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
+      // Set the connection timeout to relatively short time (3 seconds).
+      // It is only used by the org.I0Itec.zkclient.ZKClient inside KafkaServer
+      // to block and wait for ZK connection goes into SyncConnected state.
+      // However, due to race condition described in TWILL-139 in the ZK client library used by Kafka,
+      // when ZK authentication is enabled, the ZK client may hang until connection timeout.
+      // Setting it to lower value allow the AM to retry multiple times if race happens.
+      prop.setProperty("zookeeper.connection.timeout.ms", "3000");
       prop.setProperty("default.replication.factor", "1");
       return prop;
     }