You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2017/04/08 18:08:30 UTC
[07/24] twill git commit: (TWILL-122) Allow disabling log collection
(TWILL-122) Allow disabling log collection
- Introduced a new configuration twill.log.collection.enabled for
turning off log collection
- Refactor YarnTwillController and related class hierarchy to not
starting Kafka client when log collection is disabled
- Added Kafka zk connection string information in AM live node data
- Refactor KafkaAppender and ServiceMain configureLogger
- Log to StatusManager instead of Logger to avoid recursive logging
- Instead of resetting logback configuration, directly instantiate and
add the Kafka log appender to the logging context.
- Refactor ServiceMain, ApplicationMasterMain and TwillContainerMain to
simplify ZK Connection string construction
This closes #40 on Github.
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/3045b91b
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/3045b91b
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/3045b91b
Branch: refs/heads/site
Commit: 3045b91b0367448db0dd3146db3bd34c107eb4c5
Parents: e154bfe
Author: Terence Yim <ch...@apache.org>
Authored: Mon Mar 20 14:42:46 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Thu Mar 23 15:13:47 2017 -0700
----------------------------------------------------------------------
.../main/java/org/apache/twill/api/Configs.java | 10 ++
.../twill/internal/AbstractTwillController.java | 36 +++++-
.../java/org/apache/twill/internal/EnvKeys.java | 2 -
.../internal/TwillRuntimeSpecification.java | 26 +++-
.../json/TwillRuntimeSpecificationCodec.java | 8 +-
.../twill/internal/logging/KafkaAppender.java | 30 ++---
.../apache/twill/internal/ControllerTest.java | 11 +-
.../org/apache/twill/internal/ServiceMain.java | 120 ++++++-------------
.../ApplicationMasterLiveNodeData.java | 15 ++-
.../appmaster/ApplicationMasterMain.java | 94 +++++++++++----
.../appmaster/ApplicationMasterService.java | 9 +-
.../internal/container/TwillContainerMain.java | 27 ++---
.../apache/twill/yarn/YarnTwillController.java | 8 +-
.../twill/yarn/YarnTwillControllerFactory.java | 2 +-
.../apache/twill/yarn/YarnTwillPreparer.java | 9 +-
.../twill/yarn/YarnTwillRunnerService.java | 3 +-
.../apache/twill/yarn/LogHandlerTestRun.java | 44 +++++++
17 files changed, 279 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-api/src/main/java/org/apache/twill/api/Configs.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/Configs.java b/twill-api/src/main/java/org/apache/twill/api/Configs.java
index 570eafe..1447a37 100644
--- a/twill-api/src/main/java/org/apache/twill/api/Configs.java
+++ b/twill-api/src/main/java/org/apache/twill/api/Configs.java
@@ -78,6 +78,11 @@ public final class Configs {
*/
public static final String YARN_AM_RESERVED_MEMORY_MB = "twill.yarn.am.reserved.memory.mb";
+ /**
+ * Setting for enabling log collection.
+ */
+ public static final String LOG_COLLECTION_ENABLED = "twill.log.collection.enabled";
+
private Keys() {
}
}
@@ -117,6 +122,11 @@ public final class Configs {
*/
public static final int YARN_AM_RESERVED_MEMORY_MB = 150;
+ /**
+ * Default to enable log collection.
+ */
+ public static final boolean LOG_COLLECTION_ENABLED = true;
+
private Defaults() {
}
http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index 212411f..e49a2ad 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -71,21 +71,37 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillController.class);
private static final Gson GSON = new Gson();
+ private final String appName;
+ private final RunId runId;
private final Queue<LogHandler> logHandlers;
private final KafkaClientService kafkaClient;
private ZKDiscoveryService discoveryServiceClient;
private Cancellable logCancellable;
- public AbstractTwillController(RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers) {
+ public AbstractTwillController(String appName, RunId runId, ZKClient zkClient, boolean logCollectionEnabled,
+ Iterable<LogHandler> logHandlers) {
super(runId, zkClient);
+ this.appName = appName;
+ this.runId = runId;
this.logHandlers = new ConcurrentLinkedQueue<>();
- this.kafkaClient = new ZKKafkaClientService(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka"));
- Iterables.addAll(this.logHandlers, logHandlers);
+
+ // When addressing TWILL-147, need to check if the given ZKClient is
+ // actually used by the Kafka used for log collection
+ if (logCollectionEnabled) {
+ this.kafkaClient = new ZKKafkaClientService(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka"));
+ Iterables.addAll(this.logHandlers, logHandlers);
+ } else {
+ this.kafkaClient = null;
+ if (!Iterables.isEmpty(logHandlers)) {
+ LOG.warn("Log collection is disabled for application {} with runId {}. " +
+ "Adding log handler won't get any logs.", appName, runId);
+ }
+ }
}
@Override
protected synchronized void doStartUp() {
- if (!logHandlers.isEmpty()) {
+ if (kafkaClient != null && !logHandlers.isEmpty()) {
kafkaClient.startAndWait();
logCancellable = kafkaClient.getConsumer().prepare()
.addFromBeginning(Constants.LOG_TOPIC, 0)
@@ -101,12 +117,20 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
if (discoveryServiceClient != null) {
discoveryServiceClient.close();
}
- // Safe to call stop no matter when state the KafkaClientService is in.
- kafkaClient.stopAndWait();
+ if (kafkaClient != null) {
+ // Safe to call stop no matter what state the KafkaClientService is in.
+ kafkaClient.stopAndWait();
+ }
}
@Override
public final synchronized void addLogHandler(LogHandler handler) {
+ if (kafkaClient == null) {
+ LOG.warn("Log collection is disabled for application {} with runId {}. " +
+ "Adding log handler won't get any logs.", appName, runId);
+ return;
+ }
+
logHandlers.add(handler);
if (logHandlers.size() == 1) {
kafkaClient.startAndWait();
http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java b/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java
index 6948f80..8f37f5e 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java
@@ -31,8 +31,6 @@ public final class EnvKeys {
*/
public static final String TWILL_RUNNABLE_NAME = "TWILL_RUNNABLE_NAME";
- public static final String TWILL_LOG_KAFKA_ZK = "TWILL_LOG_KAFKA_ZK";
-
public static final String YARN_APP_ID = "YARN_APP_ID";
public static final String YARN_APP_ID_CLUSTER_TIME = "YARN_APP_ID_CLUSTER_TIME";
public static final String YARN_APP_ID_STR = "YARN_APP_ID_STR";
http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
index 965b203..831c831 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
@@ -41,13 +41,14 @@ public class TwillRuntimeSpecification {
private final String rmSchedulerAddr;
private final Map<String, Map<String, String>> logLevels;
private final Map<String, Integer> maxRetries;
- private double minHeapRatio;
+ private final double minHeapRatio;
+ private final boolean logCollectionEnabled;
public TwillRuntimeSpecification(TwillSpecification twillSpecification, String fsUser, URI twillAppDir,
String zkConnectStr, RunId twillRunId, String twillAppName,
int reservedMemory, @Nullable String rmSchedulerAddr,
Map<String, Map<String, String>> logLevels, Map<String, Integer> maxRetries,
- double minHeapRatio) {
+ double minHeapRatio, boolean logCollectionEnabled) {
this.twillSpecification = twillSpecification;
this.fsUser = fsUser;
this.twillAppDir = twillAppDir;
@@ -59,6 +60,7 @@ public class TwillRuntimeSpecification {
this.logLevels = logLevels;
this.maxRetries = maxRetries;
this.minHeapRatio = minHeapRatio;
+ this.logCollectionEnabled = logCollectionEnabled;
}
public TwillSpecification getTwillSpecification() {
@@ -93,6 +95,13 @@ public class TwillRuntimeSpecification {
return minHeapRatio;
}
+ /**
+ * Returns whether log collection is enabled.
+ */
+ public boolean isLogCollectionEnabled() {
+ return logCollectionEnabled;
+ }
+
@Nullable
public String getRmSchedulerAddr() {
return rmSchedulerAddr;
@@ -105,4 +114,17 @@ public class TwillRuntimeSpecification {
public Map<String, Integer> getMaxRetries() {
return maxRetries;
}
+
+ /**
+ * Returns the ZK connection string for the Kafka used for log collections,
+ * or {@code null} if log collection is disabled.
+ */
+ @Nullable
+ public String getKafkaZKConnect() {
+ if (!isLogCollectionEnabled()) {
+ return null;
+ }
+ // When addressing TWILL-147, a field can be introduced to carry this value.
+ return String.format("%s/%s/%s/kafka", getZkConnectStr(), getTwillAppName(), getTwillAppRunId());
+ }
}
http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
index f14fc18..5ff05e8 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
@@ -25,7 +25,6 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
-import org.apache.twill.api.Configs;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.internal.RunIds;
import org.apache.twill.internal.TwillRuntimeSpecification;
@@ -51,6 +50,7 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
private static final String TWILL_SPEC = "twillSpecification";
private static final String LOG_LEVELS = "logLevels";
private static final String MAX_RETRIES = "maxRetries";
+ private static final String LOG_COLLECTION_ENABLED = "logCollectionEnabled";
@Override
public JsonElement serialize(TwillRuntimeSpecification src, Type typeOfSrc, JsonSerializationContext context) {
@@ -71,6 +71,7 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
context.serialize(src.getLogLevels(), new TypeToken<Map<String, Map<String, String>>>() { }.getType()));
json.add(MAX_RETRIES,
context.serialize(src.getMaxRetries(), new TypeToken<Map<String, Integer>>() { }.getType()));
+ json.addProperty(LOG_COLLECTION_ENABLED, src.isLogCollectionEnabled());
return json;
}
@@ -98,8 +99,7 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
jsonObj.get(RM_SCHEDULER_ADDR).getAsString() : null,
logLevels,
maxRetries,
- jsonObj.has(HEAP_RESERVED_MIN_RATIO) ?
- jsonObj.get(HEAP_RESERVED_MIN_RATIO).getAsDouble()
- : Configs.Defaults.HEAP_RESERVED_MIN_RATIO);
+ jsonObj.get(HEAP_RESERVED_MIN_RATIO).getAsDouble(),
+ jsonObj.get(LOG_COLLECTION_ENABLED).getAsBoolean());
}
}
http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
index 90d7415..493c4ca 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
@@ -18,7 +18,8 @@
package org.apache.twill.internal.logging;
import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.AppenderBase;
+import ch.qos.logback.core.Appender;
+import ch.qos.logback.core.UnsynchronizedAppenderBase;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
@@ -44,8 +45,6 @@ import org.apache.twill.zookeeper.RetryStrategies;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKClientServices;
import org.apache.twill.zookeeper.ZKClients;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.Collection;
@@ -61,11 +60,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
- *
+ * A logback {@link Appender} for writing log events to Kafka.
*/
-public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
+public final class KafkaAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaAppender.class);
+ private static final String PUBLISH_THREAD_NAME = "kafka-logger";
private final AtomicReference<KafkaPublisher.Preparer> publisher;
private final Runnable flushTask;
@@ -146,7 +145,7 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
Preconditions.checkNotNull(zkConnectStr);
eventConverter = new LogEventConverter(hostname, runnableName);
- scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("kafka-logger"));
+ scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory(PUBLISH_THREAD_NAME));
zkClientService = ZKClientServices.delegate(
ZKClients.reWatchOnExpire(
@@ -162,14 +161,14 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
Preconditions.checkState(Futures.getUnchecked(future) == Service.State.RUNNING,
"Service is not running.");
}
- LOG.info("Kafka client started: " + zkConnectStr);
+ addInfo("Kafka client started: " + zkConnectStr);
scheduler.scheduleWithFixedDelay(flushTask, 0, flushPeriod, TimeUnit.MILLISECONDS);
}
@Override
public void onFailure(Throwable t) {
// Fail to talk to kafka. Other than logging, what can be done?
- LOG.error("Failed to start kafka appender.", t);
+ addError("Failed to start kafka appender.", t);
}
}, Threads.SAME_THREAD_EXECUTOR);
@@ -187,7 +186,7 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
try {
scheduler.submit(flushTask).get(2, TimeUnit.SECONDS);
} catch (Exception e) {
- LOG.error("Failed to force log flush in 2 seconds.", e);
+ addError("Failed to force log flush in 2 seconds.", e);
}
}
@@ -229,7 +228,7 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
bufferedSize.addAndGet(-published);
return published;
} catch (ExecutionException e) {
- LOG.error("Failed to publish logs to Kafka.", e);
+ addError("Failed to publish logs to Kafka.", e);
TimeUnit.NANOSECONDS.sleep(backOffTime);
publishTimeout -= stopwatch.elapsedTime(timeoutUnit);
stopwatch.reset();
@@ -237,7 +236,7 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
}
} while (publishTimeout > 0);
} catch (InterruptedException e) {
- LOG.warn("Logs publish to Kafka interrupted.", e);
+ addWarn("Logs publish to Kafka interrupted.", e);
}
return 0;
}
@@ -277,12 +276,9 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
@Override
public void run() {
try {
- int published = publishLogs(2L, TimeUnit.SECONDS);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Published {} log messages to Kafka.", published);
- }
+ publishLogs(2L, TimeUnit.SECONDS);
} catch (Exception e) {
- LOG.error("Failed to push logs to Kafka. Log entries dropped.", e);
+ addError("Failed to push logs to Kafka. Log entries dropped.", e);
}
}
};
http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
index a5cb04b..c37cd00 100644
--- a/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
+++ b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
@@ -62,7 +62,7 @@ public class ControllerTest {
Service service = createService(zkClientService, runId);
service.startAndWait();
- TwillController controller = getController(zkClientService, runId);
+ TwillController controller = getController(zkClientService, "testController", runId);
controller.sendCommand(Command.Builder.of("test").build()).get(2, TimeUnit.SECONDS);
controller.terminate().get(2, TimeUnit.SECONDS);
@@ -96,7 +96,7 @@ public class ControllerTest {
zkClientService.startAndWait();
final CountDownLatch runLatch = new CountDownLatch(1);
- TwillController controller = getController(zkClientService, runId);
+ TwillController controller = getController(zkClientService, "testControllerBefore", runId);
controller.onRunning(new Runnable() {
@Override
public void run() {
@@ -140,7 +140,7 @@ public class ControllerTest {
service.startAndWait();
final CountDownLatch runLatch = new CountDownLatch(1);
- TwillController controller = getController(zkClientService, runId);
+ TwillController controller = getController(zkClientService, "testControllerListener", runId);
controller.onRunning(new Runnable() {
@Override
public void run() {
@@ -185,8 +185,9 @@ public class ControllerTest {
};
}
- private TwillController getController(ZKClient zkClient, RunId runId) {
- AbstractTwillController controller = new AbstractTwillController(runId, zkClient, ImmutableList.<LogHandler>of()) {
+ private TwillController getController(ZKClient zkClient, String appName, RunId runId) {
+ AbstractTwillController controller = new AbstractTwillController(appName, runId,
+ zkClient, false, ImmutableList.<LogHandler>of()) {
@Override
public void kill() {
http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index 727435c..0bf07e0 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -18,9 +18,6 @@
package org.apache.twill.internal;
import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.joran.JoranConfigurator;
-import ch.qos.logback.classic.util.ContextInitializer;
-import ch.qos.logback.core.joran.spi.JoranException;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Futures;
@@ -45,14 +42,13 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.ILoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.xml.sax.InputSource;
import java.io.File;
-import java.io.StringReader;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
/**
* Class for main method that starts a service.
@@ -70,7 +66,10 @@ public abstract class ServiceMain {
protected final void doMain(final Service mainService,
Service...prerequisites) throws ExecutionException, InterruptedException {
- configureLogger();
+ // Only configure the log collection if it is enabled.
+ if (getTwillRuntimeSpecification().isLogCollectionEnabled()) {
+ configureLogger();
+ }
Service requiredServices = new CompositeService(prerequisites);
Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -121,14 +120,21 @@ public abstract class ServiceMain {
protected abstract String getHostname();
- protected abstract String getKafkaZKConnect();
+ /**
+ * Returns the {@link TwillRuntimeSpecification} for this application.
+ */
+ protected abstract TwillRuntimeSpecification getTwillRuntimeSpecification();
+ /**
+ * Returns the name of the runnable that this running inside this process.
+ */
+ @Nullable
protected abstract String getRunnableName();
/**
* Returns the {@link Location} for the application based on the app directory.
*/
- protected static Location createAppLocation(final Configuration conf, String fsUser, final URI appDir) {
+ protected final Location createAppLocation(final Configuration conf, String fsUser, final URI appDir) {
// Note: It's a little bit hacky based on the uri schema to create the LocationFactory, refactor it later.
try {
@@ -168,16 +174,19 @@ public abstract class ServiceMain {
/**
* Creates a {@link ZKClientService}.
*/
- protected static ZKClientService createZKClient(String zkConnectStr, String appName) {
+ protected final ZKClientService createZKClient() {
+ TwillRuntimeSpecification twillRuntimeSpec = getTwillRuntimeSpecification();
+
return ZKClientServices.delegate(
ZKClients.namespace(
ZKClients.reWatchOnExpire(
ZKClients.retryOnFailure(
- ZKClientService.Builder.of(zkConnectStr).build(),
+ ZKClientService.Builder.of(twillRuntimeSpec.getZkConnectStr()).build(),
RetryStrategies.fixDelay(1, TimeUnit.SECONDS)
)
- ), "/" + appName
- ));
+ ), "/" + twillRuntimeSpec.getTwillAppName()
+ )
+ );
}
private void configureLogger() {
@@ -188,84 +197,23 @@ public abstract class ServiceMain {
}
LoggerContext context = (LoggerContext) loggerFactory;
- context.reset();
- JoranConfigurator configurator = new JoranConfigurator();
- configurator.setContext(context);
-
- try {
- File twillLogback = new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.LOGBACK_TEMPLATE);
- if (twillLogback.exists()) {
- configurator.doConfigure(twillLogback);
- }
- new ContextInitializer(context).autoConfig();
- } catch (JoranException e) {
- throw Throwables.propagate(e);
- }
- doConfigure(configurator, getLogConfig(getLoggerLevel(context.getLogger(Logger.ROOT_LOGGER_NAME))));
- }
-
- private void doConfigure(JoranConfigurator configurator, String config) {
- try {
- configurator.doConfigure(new InputSource(new StringReader(config)));
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- private String getLogConfig(String rootLevel) {
- return
- "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
- "<configuration>\n" +
- " <appender name=\"KAFKA\" class=\"" + KafkaAppender.class.getName() + "\">\n" +
- " <topic>" + Constants.LOG_TOPIC + "</topic>\n" +
- " <hostname>" + getHostname() + "</hostname>\n" +
- " <zookeeper>" + getKafkaZKConnect() + "</zookeeper>\n" +
- appendRunnable() +
- " </appender>\n" +
- " <logger name=\"org.apache.twill.internal.logging\" additivity=\"false\" />\n" +
- " <root level=\"" + rootLevel + "\">\n" +
- " <appender-ref ref=\"KAFKA\"/>\n" +
- " </root>\n" +
- "</configuration>";
- }
-
- private String appendRunnable() {
- // RunnableName for AM is null, so append runnable name to log config only if the name is not null.
- if (getRunnableName() == null) {
- return "";
- } else {
- return " <runnableName>" + getRunnableName() + "</runnableName>\n";
+ // Attach the KafkaAppender to the root logger
+ KafkaAppender kafkaAppender = new KafkaAppender();
+ kafkaAppender.setName("KAFKA");
+ kafkaAppender.setTopic(Constants.LOG_TOPIC);
+ kafkaAppender.setHostname(getHostname());
+ // The Kafka ZK Connection shouldn't be null as this method only get called if log collection is enabled
+ kafkaAppender.setZookeeper(getTwillRuntimeSpecification().getKafkaZKConnect());
+ String runnableName = getRunnableName();
+ if (runnableName != null) {
+ kafkaAppender.setRunnableName(runnableName);
}
- }
- /**
- * Return the right log level for the service.
- *
- * @param logger the {@link Logger} instance of the service context.
- * @return String of log level based on {@code slf4j} log levels.
- */
- private String getLoggerLevel(Logger logger) {
- if (logger instanceof ch.qos.logback.classic.Logger) {
- return ((ch.qos.logback.classic.Logger) logger).getLevel().toString();
- }
+ kafkaAppender.setContext(context);
+ kafkaAppender.start();
- if (logger.isTraceEnabled()) {
- return "TRACE";
- }
- if (logger.isDebugEnabled()) {
- return "DEBUG";
- }
- if (logger.isInfoEnabled()) {
- return "INFO";
- }
- if (logger.isWarnEnabled()) {
- return "WARN";
- }
- if (logger.isErrorEnabled()) {
- return "ERROR";
- }
- return "OFF";
+ context.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME).addAppender(kafkaAppender);
}
/**
http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
index dd4d946..a38a163 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
@@ -20,6 +20,7 @@ package org.apache.twill.internal.appmaster;
import org.apache.twill.api.LocalFile;
import java.util.List;
+import javax.annotation.Nullable;
/**
* Represents data being stored in the live node of the application master.
@@ -30,13 +31,16 @@ public final class ApplicationMasterLiveNodeData {
private final long appIdClusterTime;
private final String containerId;
private final List<LocalFile> localFiles;
+ private final String kafkaZKConnect;
public ApplicationMasterLiveNodeData(int appId, long appIdClusterTime,
- String containerId, List<LocalFile> localFiles) {
+ String containerId, List<LocalFile> localFiles,
+ @Nullable String kafkaZKConnect) {
this.appId = appId;
this.appIdClusterTime = appIdClusterTime;
this.containerId = containerId;
this.localFiles = localFiles;
+ this.kafkaZKConnect = kafkaZKConnect;
}
public int getAppId() {
@@ -55,6 +59,15 @@ public final class ApplicationMasterLiveNodeData {
return localFiles;
}
+ /**
+ * @return the Kafka ZK connection string for the Kafka used for log collection;
+ * if log collection is turned off, a {@code null} value will be returned.
+ */
+ @Nullable
+ public String getKafkaZKConnect() {
+ return kafkaZKConnect;
+ }
+
@Override
public String toString() {
return "ApplicationMasterLiveNodeData{" +
http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 8dd5046..81c61ac 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -17,14 +17,15 @@
*/
package org.apache.twill.internal.appmaster;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.Service;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.twill.api.RunId;
import org.apache.twill.internal.Constants;
-import org.apache.twill.internal.EnvKeys;
import org.apache.twill.internal.ServiceMain;
import org.apache.twill.internal.TwillRuntimeSpecification;
import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
@@ -45,21 +46,20 @@ import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
/**
* Main class for launching {@link ApplicationMasterService}.
*/
public final class ApplicationMasterMain extends ServiceMain {
- private final String kafkaZKConnect;
-
- private ApplicationMasterMain(String kafkaZKConnect) {
- this.kafkaZKConnect = kafkaZKConnect;
- }
+ private static final Logger LOG = LoggerFactory.getLogger(ApplicationMasterMain.class);
+ private final TwillRuntimeSpecification twillRuntimeSpec;
/**
* Starts the application master.
@@ -67,10 +67,18 @@ public final class ApplicationMasterMain extends ServiceMain {
public static void main(String[] args) throws Exception {
File twillSpec = new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.TWILL_SPEC);
TwillRuntimeSpecification twillRuntimeSpec = TwillRuntimeSpecificationAdapter.create().fromJson(twillSpec);
- String zkConnect = twillRuntimeSpec.getZkConnectStr();
+
+ new ApplicationMasterMain(twillRuntimeSpec).doMain();
+ }
+
+ private ApplicationMasterMain(TwillRuntimeSpecification twillRuntimeSpec) {
+ this.twillRuntimeSpec = twillRuntimeSpec;
+ }
+
+ private void doMain() throws Exception {
RunId runId = twillRuntimeSpec.getTwillAppRunId();
- ZKClientService zkClientService = createZKClient(zkConnect, twillRuntimeSpec.getTwillAppName());
+ ZKClientService zkClientService = createZKClient();
Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
setRMSchedulerAddress(conf, twillRuntimeSpec.getRmSchedulerAddr());
@@ -81,13 +89,22 @@ public final class ApplicationMasterMain extends ServiceMain {
twillRuntimeSpec.getTwillAppDir()));
TrackerService trackerService = new TrackerService(service);
- new ApplicationMasterMain(service.getKafkaZKConnect())
+ List<Service> prerequisites = Lists.newArrayList(
+ new YarnAMClientService(amClient, trackerService),
+ zkClientService,
+ new AppMasterTwillZKPathService(zkClientService, runId)
+ );
+
+ if (twillRuntimeSpec.isLogCollectionEnabled()) {
+ prerequisites.add(new ApplicationKafkaService(zkClientService, twillRuntimeSpec.getKafkaZKConnect()));
+ } else {
+ LOG.info("Log collection through kafka disabled");
+ }
+
+ new ApplicationMasterMain(twillRuntimeSpec)
.doMain(
service,
- new YarnAMClientService(amClient, trackerService),
- zkClientService,
- new AppMasterTwillZKPathService(zkClientService, runId),
- new ApplicationKafkaService(zkClientService, runId)
+ prerequisites.toArray(new Service[prerequisites.size()])
);
}
@@ -117,13 +134,15 @@ public final class ApplicationMasterMain extends ServiceMain {
}
@Override
- protected String getKafkaZKConnect() {
- return kafkaZKConnect;
+ protected TwillRuntimeSpecification getTwillRuntimeSpecification() {
+ return twillRuntimeSpec;
}
+ @Nullable
@Override
protected String getRunnableName() {
- return System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
+ // No runnable name for the AM
+ return null;
}
/**
@@ -135,13 +154,13 @@ public final class ApplicationMasterMain extends ServiceMain {
private static final Logger LOG = LoggerFactory.getLogger(ApplicationKafkaService.class);
private final ZKClient zkClient;
- private final String kafkaZKPath;
private final EmbeddedKafkaServer kafkaServer;
+ private final String kafkaZKPath;
- private ApplicationKafkaService(ZKClient zkClient, RunId runId) {
+ private ApplicationKafkaService(ZKClient zkClient, String kafkaZKConnect) {
this.zkClient = zkClient;
- this.kafkaZKPath = "/" + runId.getId() + "/kafka";
- this.kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkClient.getConnectString() + kafkaZKPath));
+ this.kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(kafkaZKConnect));
+ this.kafkaZKPath = kafkaZKConnect.substring(zkClient.getConnectString().length());
}
@Override
@@ -236,7 +255,7 @@ public final class ApplicationMasterMain extends ServiceMain {
private static final Logger LOG = LoggerFactory.getLogger(AppMasterTwillZKPathService.class);
private final ZKClient zkClient;
- public AppMasterTwillZKPathService(ZKClient zkClient, RunId runId) {
+ AppMasterTwillZKPathService(ZKClient zkClient, RunId runId) {
super(zkClient, runId);
this.zkClient = zkClient;
}
@@ -258,8 +277,7 @@ public final class ApplicationMasterMain extends ServiceMain {
// Try to delete children under /discovery. It may fail with NotEmptyException if there are other instances
// of the same app running that has discovery services running.
- List<String> children = zkClient.getChildren(Constants.DISCOVERY_PATH_PREFIX)
- .get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getChildren();
+ List<String> children = getChildren(Constants.DISCOVERY_PATH_PREFIX);
List<OperationFuture<?>> deleteFutures = new ArrayList<>();
for (String child : children) {
String path = Constants.DISCOVERY_PATH_PREFIX + "/" + child;
@@ -272,8 +290,15 @@ public final class ApplicationMasterMain extends ServiceMain {
future.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof KeeperException.NotEmptyException) {
+ // If any deletion of the service failed with not empty, if means there are other apps running,
+ // hence just return
return;
}
+ if (e.getCause() instanceof KeeperException.NoNodeException) {
+ // If the service node is gone, it maybe deleted by another app instance that is also shutting down,
+ // hence just keep going
+ continue;
+ }
throw e;
}
}
@@ -304,6 +329,29 @@ public final class ApplicationMasterMain extends ServiceMain {
if (e.getCause() instanceof KeeperException.NotEmptyException) {
return false;
}
+ if (e.getCause() instanceof KeeperException.NoNodeException) {
+ // If the node to be deleted was not created or is already gone, it is the same as delete successfully.
+ return true;
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Returns the list of children node under the given path.
+ *
+ * @param path path to get children
+ * @return the list of children or empty list if the path doesn't exist.
+ * @throws Exception if failed to get children
+ */
+ private List<String> getChildren(String path) throws Exception {
+ try {
+ return zkClient.getChildren(path).get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getChildren();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof KeeperException.NoNodeException) {
+ // If the node doesn't exists, return an empty list
+ return Collections.emptyList();
+ }
throw e;
}
}
http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 368c7b8..0f647cd 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -157,7 +157,8 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
this.amLiveNode = new ApplicationMasterLiveNodeData(Integer.parseInt(System.getenv(EnvKeys.YARN_APP_ID)),
Long.parseLong(System.getenv(EnvKeys.YARN_APP_ID_CLUSTER_TIME)),
- amClient.getContainerId().toString(), getLocalizeFiles());
+ amClient.getContainerId().toString(), getLocalizeFiles(),
+ twillRuntimeSpec.getKafkaZKConnect());
this.expectedContainers = new ExpectedContainers(twillSpec);
this.runningContainers = createRunningContainers(amClient.getContainerId(), amClient.getHost());
@@ -657,8 +658,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
if (environments.containsKey(runnableName)) {
env.putAll(environments.get(runnableName));
}
- // Override with system env
- env.put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect());
ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(env,
amLiveNode.getLocalFiles(),
@@ -714,10 +713,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
return String.format("/%s/runnables/%s", runId.getId(), runnableName);
}
- String getKafkaZKConnect() {
- return String.format("%s/%s/kafka", zkClient.getConnectString(), runId.getId());
- }
-
/**
* Attempts to change the number of running instances.
*
http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index a5efb41..2baaca1 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -66,20 +66,26 @@ public final class TwillContainerMain extends ServiceMain {
private static final Logger LOG = LoggerFactory.getLogger(TwillContainerMain.class);
+ private final TwillRuntimeSpecification twillRuntimeSpec;
+
/**
* Main method for launching a {@link TwillContainerService} which runs
* a {@link org.apache.twill.api.TwillRunnable}.
*/
public static void main(String[] args) throws Exception {
- new TwillContainerMain().doMain();
+ File twillSpecFile = new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.TWILL_SPEC);
+ TwillRuntimeSpecification twillRuntimeSpec = TwillRuntimeSpecificationAdapter.create().fromJson(twillSpecFile);
+
+ new TwillContainerMain(twillRuntimeSpec).doMain();
+ }
+
+ private TwillContainerMain(TwillRuntimeSpecification twillRuntimeSpec) {
+ this.twillRuntimeSpec = twillRuntimeSpec;
}
private void doMain() throws Exception {
// Try to load the secure store from localized file, which AM requested RM to localize it for this container.
loadSecureStore();
- File twillSpecFile = new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.TWILL_SPEC);
- TwillRuntimeSpecification twillRuntimeSpec = loadTwillSpec(twillSpecFile);
- String zkConnectStr = twillRuntimeSpec.getZkConnectStr();
RunId appRunId = twillRuntimeSpec.getTwillAppRunId();
RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
String runnableName = System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
@@ -98,7 +104,7 @@ public final class TwillContainerMain extends ServiceMain {
logLevels.putAll(dynamicLogLevels);
}
- ZKClientService zkClientService = createZKClient(zkConnectStr, twillRuntimeSpec.getTwillAppName());
+ ZKClientService zkClientService = createZKClient();
ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
ZKClient appRunZkClient = getAppRunZKClient(zkClientService, appRunId);
@@ -170,12 +176,6 @@ public final class TwillContainerMain extends ServiceMain {
return classLoader;
}
- private static TwillRuntimeSpecification loadTwillSpec(File specFile) throws IOException {
- try (Reader reader = Files.newReader(specFile, Charsets.UTF_8)) {
- return TwillRuntimeSpecificationAdapter.create().fromJson(reader);
- }
- }
-
private static Map<String, Map<String, String>> loadLogLevels() throws IOException {
File file = new File(Constants.Files.LOG_LEVELS);
if (file.exists()) {
@@ -198,8 +198,8 @@ public final class TwillContainerMain extends ServiceMain {
}
@Override
- protected String getKafkaZKConnect() {
- return System.getenv(EnvKeys.TWILL_LOG_KAFKA_ZK);
+ protected TwillRuntimeSpecification getTwillRuntimeSpecification() {
+ return twillRuntimeSpec;
}
@Override
@@ -207,7 +207,6 @@ public final class TwillContainerMain extends ServiceMain {
return System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
}
-
/**
* Simple service that force flushing logs on stop.
*/
http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
index 61306d6..1945731 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
@@ -77,7 +77,7 @@ final class YarnTwillController extends AbstractTwillController implements Twill
*/
YarnTwillController(String appName, RunId runId, ZKClient zkClient,
final ApplicationMasterLiveNodeData amLiveNodeData, final YarnAppClient yarnAppClient) {
- super(runId, zkClient, Collections.<LogHandler>emptyList());
+ super(appName, runId, zkClient, amLiveNodeData.getKafkaZKConnect() != null, Collections.<LogHandler>emptyList());
this.appName = appName;
this.amLiveNodeData = amLiveNodeData;
this.startUp = new Callable<ProcessController<YarnApplicationReport>>() {
@@ -91,10 +91,10 @@ final class YarnTwillController extends AbstractTwillController implements Twill
this.startTimeoutUnit = TimeUnit.SECONDS;
}
- YarnTwillController(String appName, RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers,
- Callable<ProcessController<YarnApplicationReport>> startUp,
+ YarnTwillController(String appName, RunId runId, ZKClient zkClient, boolean logCollectionEnabled,
+ Iterable<LogHandler> logHandlers, Callable<ProcessController<YarnApplicationReport>> startUp,
long startTimeout, TimeUnit startTimeoutUnit) {
- super(runId, zkClient, logHandlers);
+ super(appName, runId, zkClient, logCollectionEnabled, logHandlers);
this.appName = appName;
this.startUp = startUp;
this.startTimeout = startTimeout;
http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
index 61fb7cc..40de6a6 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
@@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit;
*/
interface YarnTwillControllerFactory {
- YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
+ YarnTwillController create(RunId runId, boolean logCollectionEnabled, Iterable<LogHandler> logHandlers,
Callable<ProcessController<YarnApplicationReport>> startUp,
long startTimeout, TimeUnit startTimeoutUnit);
}
http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 2d1edd0..4846fe3 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -419,7 +419,10 @@ final class YarnTwillPreparer implements TwillPreparer {
}
};
- YarnTwillController controller = controllerFactory.create(runId, logHandlers, submitTask, timeout, timeoutUnit);
+ boolean logCollectionEnabled = config.getBoolean(Configs.Keys.LOG_COLLECTION_ENABLED,
+ Configs.Defaults.LOG_COLLECTION_ENABLED);
+ YarnTwillController controller = controllerFactory.create(runId, logCollectionEnabled,
+ logHandlers, submitTask, timeout, timeoutUnit);
controller.start();
return controller;
} catch (Exception e) {
@@ -671,11 +674,13 @@ final class YarnTwillPreparer implements TwillPreparer {
}
TwillSpecification newTwillSpec = new DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(),
spec.getPlacementPolicies(), eventHandler);
+ boolean logCollectionEnabled = config.getBoolean(Configs.Keys.LOG_COLLECTION_ENABLED,
+ Configs.Defaults.LOG_COLLECTION_ENABLED);
TwillRuntimeSpecificationAdapter.create().toJson(
new TwillRuntimeSpecification(newTwillSpec, appLocation.getLocationFactory().getHomeLocation().getName(),
appLocation.toURI(), zkConnectString, runId, twillSpec.getName(),
getReservedMemory(), config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS),
- logLevels, maxRetries, getMinHeapRatio()), writer);
+ logLevels, maxRetries, getMinHeapRatio(), logCollectionEnabled), writer);
}
LOG.debug("Done {}", targetFile);
}
http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index d8e48de..d2b53b9 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -291,11 +291,12 @@ public final class YarnTwillRunnerService implements TwillRunnerService {
appLocation, twillClassPaths, jvmOptions,
locationCache, new YarnTwillControllerFactory() {
@Override
- public YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
+ public YarnTwillController create(RunId runId, boolean logCollectionEnabled, Iterable<LogHandler> logHandlers,
Callable<ProcessController<YarnApplicationReport>> startUp,
long startTimeout, TimeUnit startTimeoutUnit) {
ZKClient zkClient = ZKClients.namespace(zkClientService, "/" + appName);
YarnTwillController controller = listenController(new YarnTwillController(appName, runId, zkClient,
+ logCollectionEnabled,
logHandlers, startUp,
startTimeout, startTimeoutUnit));
synchronized (YarnTwillRunnerService.this) {
http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
index ad0a837..902c146 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
@@ -17,26 +17,32 @@
*/
package org.apache.twill.yarn;
+import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.Configs;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.api.logging.LogHandler;
import org.apache.twill.api.logging.LogThrowable;
import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.common.Threads;
+import org.apache.twill.discovery.ServiceDiscovered;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
+import java.util.Collections;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Test for LogHandler able to receive logs from AM and runnable.
@@ -100,6 +106,42 @@ public class LogHandlerTestRun extends BaseYarnTest {
Assert.assertEquals("Exception", t.getMessage());
}
+ @Test
+ public void testDisableLogCollection() throws Exception {
+ final AtomicBoolean logReceived = new AtomicBoolean();
+
+ // Start the LogRunnable by turning off log collection
+ TwillRunner runner = getTwillRunner();
+ TwillController controller = runner.prepare(new LogRunnable())
+ .withConfiguration(Collections.singletonMap(Configs.Keys.LOG_COLLECTION_ENABLED, "false"))
+ .addLogHandler(new LogHandler() {
+ @Override
+ public void onLog(LogEntry logEntry) {
+ logReceived.set(true);
+ }
+ })
+ .start();
+
+ // Make sure the runnable gets executed
+ try {
+ final CountDownLatch latch = new CountDownLatch(1);
+ controller.discoverService("log").watchChanges(new ServiceDiscovered.ChangeListener() {
+ @Override
+ public void onChange(ServiceDiscovered serviceDiscovered) {
+ if (Iterables.size(serviceDiscovered) == 1) {
+ latch.countDown();
+ }
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ Assert.assertTrue(latch.await(120, TimeUnit.SECONDS));
+ } finally {
+ controller.terminate().get(120, TimeUnit.SECONDS);
+ }
+
+ // Should receive no log
+ Assert.assertFalse("Not expecting logs collected", logReceived.get());
+ }
+
/**
* TwillRunnable for the test case to simply emit one log line.
*/
@@ -121,6 +163,8 @@ public class LogHandlerTestRun extends BaseYarnTest {
} catch (Throwable t) {
LOG.error("Got exception", t);
}
+ // Announce so that test case knows the code reaches here.
+ getContext().announce("log", 12345);
Uninterruptibles.awaitUninterruptibly(stopLatch);
}