You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2016/10/04 16:25:36 UTC
[09/16] twill git commit: (TWILL-173) Have EmbeddedKafkaServer
restart multiple times on bind failure
(TWILL-173) Have EmbeddedKafkaServer restart multiple times on bind failure
- Due to potential race condition between random port generation vs actual binding, there is a possibility that the binding would fail.
Signed-off-by: Terence Yim <ch...@apache.org>
This closes #9 on Github
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/93c523d0
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/93c523d0
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/93c523d0
Branch: refs/heads/site
Commit: 93c523d022cfd021d5ad7f1cffa87b460cd5cd1b
Parents: 2d617d2
Author: Terence Yim <ch...@apache.org>
Authored: Wed Aug 31 14:38:19 2016 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Thu Sep 1 10:08:38 2016 -0700
----------------------------------------------------------------------
.../internal/kafka/EmbeddedKafkaServer.java | 42 +++++++++++++++++---
.../appmaster/ApplicationMasterMain.java | 6 ---
.../org/apache/twill/yarn/LogLevelTestRun.java | 2 +-
3 files changed, 37 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/93c523d0/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
index cd86dcb..be6121d 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
@@ -17,16 +17,21 @@
*/
package org.apache.twill.internal.kafka;
+import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractIdleService;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.twill.internal.utils.Networks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.BindException;
import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
/**
* A {@link com.google.common.util.concurrent.Service} implementation for running an instance of Kafka server in
@@ -34,25 +39,27 @@ import java.util.Properties;
*/
public final class EmbeddedKafkaServer extends AbstractIdleService {
- public static final String START_TIMEOUT_RETRIES = "twill.kafka.start.timeout.retries";
+ public static final String START_RETRIES = "twill.kafka.start.timeout.retries";
private static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaServer.class);
- private static final String DEFAULT_START_TIMEOUT_RETRIES = "5";
+ private static final String DEFAULT_START_RETRIES = "5";
private final int startTimeoutRetries;
- private final KafkaConfig kafkaConfig;
+ private final Properties properties;
private KafkaServer server;
public EmbeddedKafkaServer(Properties properties) {
- this.startTimeoutRetries = Integer.parseInt(properties.getProperty(START_TIMEOUT_RETRIES,
- DEFAULT_START_TIMEOUT_RETRIES));
- this.kafkaConfig = new KafkaConfig(properties);
+ this.startTimeoutRetries = Integer.parseInt(properties.getProperty(START_RETRIES,
+ DEFAULT_START_RETRIES));
+ this.properties = new Properties();
+ this.properties.putAll(properties);
}
@Override
protected void startUp() throws Exception {
int tries = 0;
do {
+ KafkaConfig kafkaConfig = createKafkaConfig(properties);
KafkaServer kafkaServer = createKafkaServer(kafkaConfig);
try {
kafkaServer.startup();
@@ -65,9 +72,14 @@ public final class EmbeddedKafkaServer extends AbstractIdleService {
if (rootCause instanceof ZkTimeoutException) {
// Potentially caused by race condition bug described in TWILL-139.
LOG.warn("Timeout when connecting to ZooKeeper from KafkaServer. Attempt number {}.", tries, rootCause);
+ } else if (rootCause instanceof BindException) {
+ LOG.warn("Kafka failed to bind to port {}. Attempt number {}.", kafkaConfig.port(), tries, rootCause);
} else {
throw e;
}
+
+ // Do a random sleep of < 200ms
+ TimeUnit.MILLISECONDS.sleep(new Random().nextInt(200) + 1L);
}
} while (server == null && ++tries < startTimeoutRetries);
@@ -107,4 +119,22 @@ public final class EmbeddedKafkaServer extends AbstractIdleService {
}
});
}
+
+ /**
+ * Creates a new {@link KafkaConfig} from the given {@link Properties}. If the {@code "port"} property is missing
+ * or is equals to {@code "0"}, a random port will be generated.
+ */
+ private KafkaConfig createKafkaConfig(Properties properties) {
+ Properties prop = new Properties();
+ prop.putAll(properties);
+
+ String port = prop.getProperty("port");
+ if (port == null || "0".equals(port)) {
+ int randomPort = Networks.getRandomPort();
+ Preconditions.checkState(randomPort > 0, "Failed to get random port.");
+ prop.setProperty("port", Integer.toString(randomPort));
+ }
+
+ return new KafkaConfig(prop);
+ }
}
http://git-wip-us.apache.org/repos/asf/twill/blob/93c523d0/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 38a2463..e708fb8 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -17,7 +17,6 @@
*/
package org.apache.twill.internal.appmaster;
-import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Futures;
import org.apache.hadoop.conf.Configuration;
@@ -30,7 +29,6 @@ import org.apache.twill.internal.RunIds;
import org.apache.twill.internal.ServiceMain;
import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
import org.apache.twill.internal.logging.Loggings;
-import org.apache.twill.internal.utils.Networks;
import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory;
import org.apache.twill.internal.yarn.YarnAMClient;
import org.apache.twill.zookeeper.OperationFuture;
@@ -168,12 +166,8 @@ public final class ApplicationMasterMain extends ServiceMain {
}
private Properties generateKafkaConfig(String kafkaZKConnect) {
- int port = Networks.getRandomPort();
- Preconditions.checkState(port > 0, "Failed to get random port.");
-
Properties prop = new Properties();
prop.setProperty("log.dir", new File("kafka-logs").getAbsolutePath());
- prop.setProperty("port", Integer.toString(port));
prop.setProperty("broker.id", "1");
prop.setProperty("socket.send.buffer.bytes", "1048576");
prop.setProperty("socket.receive.buffer.bytes", "1048576");
http://git-wip-us.apache.org/repos/asf/twill/blob/93c523d0/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java
index 7990a5b..e4f789a 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java
@@ -112,7 +112,7 @@ public class LogLevelTestRun extends BaseYarnTest {
}, Threads.SAME_THREAD_EXECUTOR);
Assert.assertTrue(running.await(200, TimeUnit.SECONDS));
- LogEntry.Level logLevel = waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(), 5L,
+ LogEntry.Level logLevel = waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(), 30L,
TimeUnit.SECONDS);
// Verify we got DEBUG log level.