You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/06/24 23:30:34 UTC
incubator-twill git commit: (TWILL-139) Added workaround to retry
starting Kafka multiple times
Repository: incubator-twill
Updated Branches:
refs/heads/feature/TWILL-139 [created] b9b08999f
(TWILL-139) Added workaround to retry starting Kafka multiple times
- In case race condition happened, the startup will fail with a ZkTimeoutException
- Retry upon catching the timeout exception
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/b9b08999
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/b9b08999
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/b9b08999
Branch: refs/heads/feature/TWILL-139
Commit: b9b08999fda30f239f8121efe67badfea3298edd
Parents: d864ed1
Author: Terence Yim <ch...@apache.org>
Authored: Wed Jun 24 14:30:28 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Wed Jun 24 14:30:28 2015 -0700
----------------------------------------------------------------------
.../internal/kafka/EmbeddedKafkaServer.java | 68 ++++++++++++++++++--
.../appmaster/ApplicationMasterMain.java | 8 ++-
2 files changed, 69 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b9b08999/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
index 0beab44..5095604 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
@@ -17,9 +17,14 @@
*/
package org.apache.twill.internal.kafka;
+import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractIdleService;
import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
+import kafka.server.KafkaServer;
+import kafka.utils.Time;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Properties;
@@ -29,20 +34,71 @@ import java.util.Properties;
*/
public final class EmbeddedKafkaServer extends AbstractIdleService {
- private final KafkaServerStartable server;
+ public static final String START_TIMEOUT_RETRIES = "twill.kafka.start.timeout.retries";
+
+ private static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaServer.class);
+ private static final String DEFAULT_START_TIMEOUT_RETRIES = "5";
+
+ private final int startTimeoutRetries;
+ private final KafkaConfig kafkaConfig;
+ private KafkaServer server;
public EmbeddedKafkaServer(Properties properties) {
- server = new KafkaServerStartable(new KafkaConfig(properties));
+ this.startTimeoutRetries = Integer.parseInt(properties.getProperty(START_TIMEOUT_RETRIES,
+ DEFAULT_START_TIMEOUT_RETRIES));
+ this.kafkaConfig = new KafkaConfig(properties);
}
@Override
protected void startUp() throws Exception {
- server.startup();
+ int tries = 0;
+ do {
+ KafkaServer kafkaServer = createKafkaServer(kafkaConfig);
+ try {
+ kafkaServer.startup();
+ server = kafkaServer;
+ } catch (Exception e) {
+ kafkaServer.shutdown();
+ kafkaServer.awaitShutdown();
+
+ Throwable rootCause = Throwables.getRootCause(e);
+ if (rootCause instanceof ZkTimeoutException) {
+ // Potentially caused by race condition bug described in TWILL-139.
+ LOG.warn("Timeout when connecting to ZooKeeper from KafkaServer. Attempt number {}.", tries, rootCause);
+ }
+ }
+ } while (server == null && ++tries < startTimeoutRetries);
}
@Override
protected void shutDown() throws Exception {
- server.shutdown();
- server.awaitShutdown();
+ if (server != null) {
+ server.shutdown();
+ server.awaitShutdown();
+ }
+ }
+
+ private KafkaServer createKafkaServer(KafkaConfig kafkaConfig) {
+ return new KafkaServer(kafkaConfig, new Time() {
+
+ @Override
+ public long milliseconds() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public long nanoseconds() {
+ return System.nanoTime();
+ }
+
+ @Override
+ public void sleep(long ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ }
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b9b08999/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 561a68b..3e8cb93 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -179,7 +179,13 @@ public final class ApplicationMasterMain extends ServiceMain {
prop.setProperty("log.flush.interval.ms", "1000");
prop.setProperty("log.segment.bytes", "536870912");
prop.setProperty("zookeeper.connect", kafkaZKConnect);
- prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
+ // Set the connection timeout to relatively short time (3 seconds).
+ // It is only used by the org.I0Itec.zkclient.ZKClient inside KafkaServer
+ // to block and wait for ZK connection goes into SyncConnected state.
+ // However, due to race condition described in TWILL-139 in the ZK client library used by Kafka,
+ // when ZK authentication is enabled, the ZK client may hang until connection timeout.
+ // Setting it to lower value allow the AM to retry multiple times if race happens.
+ prop.setProperty("zookeeper.connection.timeout.ms", "3000");
prop.setProperty("default.replication.factor", "1");
return prop;
}