You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2017/04/08 18:08:30 UTC

[07/24] twill git commit: (TWILL-122) Allow disabling log collection

(TWILL-122) Allow disabling log collection

- Introduced a new configuration twill.log.collection.enabled for
  turning off log collection
- Refactor YarnTwillController and related class hierarchy to not
  starting Kafka client when log collection is disabled
- Added Kafka zk connection string information in AM live node data
- Refactor KafkaAppender and ServiceMain configureLogger
  - Log to StatusManager instead of Logger to avoid recursive logging
  - Instead of resetting logback configuration, directly instantiate and
    add the Kafka log appender to the logging context.
- Refactor ServiceMain, ApplicationMasterMain and TwillContainerMain to
  simplify ZK Connection string construction

This closes #40 on Github.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/3045b91b
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/3045b91b
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/3045b91b

Branch: refs/heads/site
Commit: 3045b91b0367448db0dd3146db3bd34c107eb4c5
Parents: e154bfe
Author: Terence Yim <ch...@apache.org>
Authored: Mon Mar 20 14:42:46 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Thu Mar 23 15:13:47 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/twill/api/Configs.java |  10 ++
 .../twill/internal/AbstractTwillController.java |  36 +++++-
 .../java/org/apache/twill/internal/EnvKeys.java |   2 -
 .../internal/TwillRuntimeSpecification.java     |  26 +++-
 .../json/TwillRuntimeSpecificationCodec.java    |   8 +-
 .../twill/internal/logging/KafkaAppender.java   |  30 ++---
 .../apache/twill/internal/ControllerTest.java   |  11 +-
 .../org/apache/twill/internal/ServiceMain.java  | 120 ++++++-------------
 .../ApplicationMasterLiveNodeData.java          |  15 ++-
 .../appmaster/ApplicationMasterMain.java        |  94 +++++++++++----
 .../appmaster/ApplicationMasterService.java     |   9 +-
 .../internal/container/TwillContainerMain.java  |  27 ++---
 .../apache/twill/yarn/YarnTwillController.java  |   8 +-
 .../twill/yarn/YarnTwillControllerFactory.java  |   2 +-
 .../apache/twill/yarn/YarnTwillPreparer.java    |   9 +-
 .../twill/yarn/YarnTwillRunnerService.java      |   3 +-
 .../apache/twill/yarn/LogHandlerTestRun.java    |  44 +++++++
 17 files changed, 279 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-api/src/main/java/org/apache/twill/api/Configs.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/Configs.java b/twill-api/src/main/java/org/apache/twill/api/Configs.java
index 570eafe..1447a37 100644
--- a/twill-api/src/main/java/org/apache/twill/api/Configs.java
+++ b/twill-api/src/main/java/org/apache/twill/api/Configs.java
@@ -78,6 +78,11 @@ public final class Configs {
      */
     public static final String YARN_AM_RESERVED_MEMORY_MB = "twill.yarn.am.reserved.memory.mb";
 
+    /**
+     * Setting for enabling log collection.
+     */
+    public static final String LOG_COLLECTION_ENABLED = "twill.log.collection.enabled";
+
     private Keys() {
     }
   }
@@ -117,6 +122,11 @@ public final class Configs {
      */
     public static final int YARN_AM_RESERVED_MEMORY_MB = 150;
 
+    /**
+     * Default to enable log collection.
+     */
+    public static final boolean LOG_COLLECTION_ENABLED = true;
+
 
     private Defaults() {
     }

http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index 212411f..e49a2ad 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -71,21 +71,37 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
   private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillController.class);
   private static final Gson GSON = new Gson();
 
+  private final String appName;
+  private final RunId runId;
   private final Queue<LogHandler> logHandlers;
   private final KafkaClientService kafkaClient;
   private ZKDiscoveryService discoveryServiceClient;
   private Cancellable logCancellable;
 
-  public AbstractTwillController(RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers) {
+  public AbstractTwillController(String appName, RunId runId, ZKClient zkClient, boolean logCollectionEnabled,
+                                 Iterable<LogHandler> logHandlers) {
     super(runId, zkClient);
+    this.appName = appName;
+    this.runId = runId;
     this.logHandlers = new ConcurrentLinkedQueue<>();
-    this.kafkaClient = new ZKKafkaClientService(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka"));
-    Iterables.addAll(this.logHandlers, logHandlers);
+
+    // When addressing TWILL-147, need to check if the given ZKClient is
+    // actually used by the Kafka used for log collection
+    if (logCollectionEnabled) {
+      this.kafkaClient = new ZKKafkaClientService(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka"));
+      Iterables.addAll(this.logHandlers, logHandlers);
+    } else {
+      this.kafkaClient = null;
+      if (!Iterables.isEmpty(logHandlers)) {
+        LOG.warn("Log collection is disabled for application {} with runId {}. " +
+                   "Adding log handler won't get any logs.", appName, runId);
+      }
+    }
   }
 
   @Override
   protected synchronized void doStartUp() {
-    if (!logHandlers.isEmpty()) {
+    if (kafkaClient != null && !logHandlers.isEmpty()) {
       kafkaClient.startAndWait();
       logCancellable = kafkaClient.getConsumer().prepare()
                                   .addFromBeginning(Constants.LOG_TOPIC, 0)
@@ -101,12 +117,20 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
     if (discoveryServiceClient != null) {
       discoveryServiceClient.close();
     }
-    // Safe to call stop no matter when state the KafkaClientService is in.
-    kafkaClient.stopAndWait();
+    if (kafkaClient != null) {
+      // Safe to call stop no matter what state the KafkaClientService is in.
+      kafkaClient.stopAndWait();
+    }
   }
 
   @Override
   public final synchronized void addLogHandler(LogHandler handler) {
+    if (kafkaClient == null) {
+      LOG.warn("Log collection is disabled for application {} with runId {}. " +
+                 "Adding log handler won't get any logs.", appName, runId);
+      return;
+    }
+
     logHandlers.add(handler);
     if (logHandlers.size() == 1) {
       kafkaClient.startAndWait();

http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java b/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java
index 6948f80..8f37f5e 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java
@@ -31,8 +31,6 @@ public final class EnvKeys {
    */
   public static final String TWILL_RUNNABLE_NAME = "TWILL_RUNNABLE_NAME";
 
-  public static final String TWILL_LOG_KAFKA_ZK = "TWILL_LOG_KAFKA_ZK";
-
   public static final String YARN_APP_ID = "YARN_APP_ID";
   public static final String YARN_APP_ID_CLUSTER_TIME = "YARN_APP_ID_CLUSTER_TIME";
   public static final String YARN_APP_ID_STR = "YARN_APP_ID_STR";

http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
index 965b203..831c831 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
@@ -41,13 +41,14 @@ public class TwillRuntimeSpecification {
   private final String rmSchedulerAddr;
   private final Map<String, Map<String, String>> logLevels;
   private final Map<String, Integer> maxRetries;
-  private double minHeapRatio;
+  private final double minHeapRatio;
+  private final boolean logCollectionEnabled;
 
   public TwillRuntimeSpecification(TwillSpecification twillSpecification, String fsUser, URI twillAppDir,
                                    String zkConnectStr, RunId twillRunId, String twillAppName,
                                    int reservedMemory, @Nullable String rmSchedulerAddr,
                                    Map<String, Map<String, String>> logLevels, Map<String, Integer> maxRetries,
-                                   double minHeapRatio) {
+                                   double minHeapRatio, boolean logCollectionEnabled) {
     this.twillSpecification = twillSpecification;
     this.fsUser = fsUser;
     this.twillAppDir = twillAppDir;
@@ -59,6 +60,7 @@ public class TwillRuntimeSpecification {
     this.logLevels = logLevels;
     this.maxRetries = maxRetries;
     this.minHeapRatio = minHeapRatio;
+    this.logCollectionEnabled = logCollectionEnabled;
   }
 
   public TwillSpecification getTwillSpecification() {
@@ -93,6 +95,13 @@ public class TwillRuntimeSpecification {
     return minHeapRatio;
   }
 
+  /**
+   * Returns whether log collection is enabled.
+   */
+  public boolean isLogCollectionEnabled() {
+    return logCollectionEnabled;
+  }
+
   @Nullable
   public String getRmSchedulerAddr() {
     return rmSchedulerAddr;
@@ -105,4 +114,17 @@ public class TwillRuntimeSpecification {
   public Map<String, Integer> getMaxRetries() {
     return maxRetries;
   }
+
+  /**
+   * Returns the ZK connection string for the Kafka used for log collections,
+   * or {@code null} if log collection is disabled.
+   */
+  @Nullable
+  public String getKafkaZKConnect() {
+    if (!isLogCollectionEnabled()) {
+      return null;
+    }
+    // When addressing TWILL-147, a field can be introduced to carry this value.
+    return String.format("%s/%s/%s/kafka", getZkConnectStr(), getTwillAppName(), getTwillAppRunId());
+  }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
index f14fc18..5ff05e8 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
@@ -25,7 +25,6 @@ import com.google.gson.JsonObject;
 import com.google.gson.JsonParseException;
 import com.google.gson.JsonSerializationContext;
 import com.google.gson.JsonSerializer;
-import org.apache.twill.api.Configs;
 import org.apache.twill.api.TwillSpecification;
 import org.apache.twill.internal.RunIds;
 import org.apache.twill.internal.TwillRuntimeSpecification;
@@ -51,6 +50,7 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
   private static final String TWILL_SPEC = "twillSpecification";
   private static final String LOG_LEVELS = "logLevels";
   private static final String MAX_RETRIES = "maxRetries";
+  private static final String LOG_COLLECTION_ENABLED = "logCollectionEnabled";
 
   @Override
   public JsonElement serialize(TwillRuntimeSpecification src, Type typeOfSrc, JsonSerializationContext context) {
@@ -71,6 +71,7 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
              context.serialize(src.getLogLevels(), new TypeToken<Map<String, Map<String, String>>>() { }.getType()));
     json.add(MAX_RETRIES,
              context.serialize(src.getMaxRetries(), new TypeToken<Map<String, Integer>>() { }.getType()));
+    json.addProperty(LOG_COLLECTION_ENABLED, src.isLogCollectionEnabled());
 
     return json;
   }
@@ -98,8 +99,7 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
                                          jsonObj.get(RM_SCHEDULER_ADDR).getAsString() : null,
                                          logLevels,
                                          maxRetries,
-                                         jsonObj.has(HEAP_RESERVED_MIN_RATIO) ?
-                                         jsonObj.get(HEAP_RESERVED_MIN_RATIO).getAsDouble()
-                                         : Configs.Defaults.HEAP_RESERVED_MIN_RATIO);
+                                         jsonObj.get(HEAP_RESERVED_MIN_RATIO).getAsDouble(),
+                                         jsonObj.get(LOG_COLLECTION_ENABLED).getAsBoolean());
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
index 90d7415..493c4ca 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
@@ -18,7 +18,8 @@
 package org.apache.twill.internal.logging;
 
 import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.AppenderBase;
+import ch.qos.logback.core.Appender;
+import ch.qos.logback.core.UnsynchronizedAppenderBase;
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
@@ -44,8 +45,6 @@ import org.apache.twill.zookeeper.RetryStrategies;
 import org.apache.twill.zookeeper.ZKClientService;
 import org.apache.twill.zookeeper.ZKClientServices;
 import org.apache.twill.zookeeper.ZKClients;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.Collection;
@@ -61,11 +60,11 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- *
+ * A logback {@link Appender} for writing log events to Kafka.
  */
-public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
+public final class KafkaAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
 
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaAppender.class);
+  private static final String PUBLISH_THREAD_NAME = "kafka-logger";
 
   private final AtomicReference<KafkaPublisher.Preparer> publisher;
   private final Runnable flushTask;
@@ -146,7 +145,7 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
     Preconditions.checkNotNull(zkConnectStr);
 
     eventConverter = new LogEventConverter(hostname, runnableName);
-    scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("kafka-logger"));
+    scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory(PUBLISH_THREAD_NAME));
 
     zkClientService = ZKClientServices.delegate(
       ZKClients.reWatchOnExpire(
@@ -162,14 +161,14 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
           Preconditions.checkState(Futures.getUnchecked(future) == Service.State.RUNNING,
                                    "Service is not running.");
         }
-        LOG.info("Kafka client started: " + zkConnectStr);
+        addInfo("Kafka client started: " + zkConnectStr);
         scheduler.scheduleWithFixedDelay(flushTask, 0, flushPeriod, TimeUnit.MILLISECONDS);
       }
 
       @Override
       public void onFailure(Throwable t) {
         // Fail to talk to kafka. Other than logging, what can be done?
-        LOG.error("Failed to start kafka appender.", t);
+        addError("Failed to start kafka appender.", t);
       }
     }, Threads.SAME_THREAD_EXECUTOR);
 
@@ -187,7 +186,7 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
     try {
       scheduler.submit(flushTask).get(2, TimeUnit.SECONDS);
     } catch (Exception e) {
-      LOG.error("Failed to force log flush in 2 seconds.", e);
+      addError("Failed to force log flush in 2 seconds.", e);
     }
   }
 
@@ -229,7 +228,7 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
           bufferedSize.addAndGet(-published);
           return published;
         } catch (ExecutionException e) {
-          LOG.error("Failed to publish logs to Kafka.", e);
+          addError("Failed to publish logs to Kafka.", e);
           TimeUnit.NANOSECONDS.sleep(backOffTime);
           publishTimeout -= stopwatch.elapsedTime(timeoutUnit);
           stopwatch.reset();
@@ -237,7 +236,7 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
         }
       } while (publishTimeout > 0);
     } catch (InterruptedException e) {
-      LOG.warn("Logs publish to Kafka interrupted.", e);
+      addWarn("Logs publish to Kafka interrupted.", e);
     }
     return 0;
   }
@@ -277,12 +276,9 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
       @Override
       public void run() {
         try {
-          int published = publishLogs(2L, TimeUnit.SECONDS);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Published {} log messages to Kafka.", published);
-          }
+          publishLogs(2L, TimeUnit.SECONDS);
         } catch (Exception e) {
-          LOG.error("Failed to push logs to Kafka. Log entries dropped.", e);
+          addError("Failed to push logs to Kafka. Log entries dropped.", e);
         }
       }
     };

http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
index a5cb04b..c37cd00 100644
--- a/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
+++ b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java
@@ -62,7 +62,7 @@ public class ControllerTest {
       Service service = createService(zkClientService, runId);
       service.startAndWait();
 
-      TwillController controller = getController(zkClientService, runId);
+      TwillController controller = getController(zkClientService, "testController", runId);
       controller.sendCommand(Command.Builder.of("test").build()).get(2, TimeUnit.SECONDS);
       controller.terminate().get(2, TimeUnit.SECONDS);
 
@@ -96,7 +96,7 @@ public class ControllerTest {
       zkClientService.startAndWait();
 
       final CountDownLatch runLatch = new CountDownLatch(1);
-      TwillController controller = getController(zkClientService, runId);
+      TwillController controller = getController(zkClientService, "testControllerBefore", runId);
       controller.onRunning(new Runnable() {
         @Override
         public void run() {
@@ -140,7 +140,7 @@ public class ControllerTest {
       service.startAndWait();
 
       final CountDownLatch runLatch = new CountDownLatch(1);
-      TwillController controller = getController(zkClientService, runId);
+      TwillController controller = getController(zkClientService, "testControllerListener", runId);
       controller.onRunning(new Runnable() {
         @Override
         public void run() {
@@ -185,8 +185,9 @@ public class ControllerTest {
     };
   }
 
-  private TwillController getController(ZKClient zkClient, RunId runId) {
-    AbstractTwillController controller = new AbstractTwillController(runId, zkClient, ImmutableList.<LogHandler>of()) {
+  private TwillController getController(ZKClient zkClient, String appName, RunId runId) {
+    AbstractTwillController controller = new AbstractTwillController(appName, runId,
+                                                                     zkClient, false, ImmutableList.<LogHandler>of()) {
 
       @Override
       public void kill() {

http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index 727435c..0bf07e0 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -18,9 +18,6 @@
 package org.apache.twill.internal;
 
 import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.joran.JoranConfigurator;
-import ch.qos.logback.classic.util.ContextInitializer;
-import ch.qos.logback.core.joran.spi.JoranException;
 import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.Futures;
@@ -45,14 +42,13 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.ILoggerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.xml.sax.InputSource;
 
 import java.io.File;
-import java.io.StringReader;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 
 /**
  * Class for main method that starts a service.
@@ -70,7 +66,10 @@ public abstract class ServiceMain {
 
   protected final void doMain(final Service mainService,
                               Service...prerequisites) throws ExecutionException, InterruptedException {
-    configureLogger();
+    // Only configure the log collection if it is enabled.
+    if (getTwillRuntimeSpecification().isLogCollectionEnabled()) {
+      configureLogger();
+    }
 
     Service requiredServices = new CompositeService(prerequisites);
     Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -121,14 +120,21 @@ public abstract class ServiceMain {
 
   protected abstract String getHostname();
 
-  protected abstract String getKafkaZKConnect();
+  /**
+   * Returns the {@link TwillRuntimeSpecification} for this application.
+   */
+  protected abstract TwillRuntimeSpecification getTwillRuntimeSpecification();
 
+  /**
+   * Returns the name of the runnable that this running inside this process.
+   */
+  @Nullable
   protected abstract String getRunnableName();
 
   /**
    * Returns the {@link Location} for the application based on the app directory.
    */
-  protected static Location createAppLocation(final Configuration conf, String fsUser, final URI appDir) {
+  protected final Location createAppLocation(final Configuration conf, String fsUser, final URI appDir) {
     // Note: It's a little bit hacky based on the uri schema to create the LocationFactory, refactor it later.
 
     try {
@@ -168,16 +174,19 @@ public abstract class ServiceMain {
   /**
    * Creates a {@link ZKClientService}.
    */
-  protected static ZKClientService createZKClient(String zkConnectStr, String appName) {
+  protected final ZKClientService createZKClient() {
+    TwillRuntimeSpecification twillRuntimeSpec = getTwillRuntimeSpecification();
+
     return ZKClientServices.delegate(
       ZKClients.namespace(
         ZKClients.reWatchOnExpire(
           ZKClients.retryOnFailure(
-            ZKClientService.Builder.of(zkConnectStr).build(),
+            ZKClientService.Builder.of(twillRuntimeSpec.getZkConnectStr()).build(),
             RetryStrategies.fixDelay(1, TimeUnit.SECONDS)
           )
-        ), "/" + appName
-      ));
+        ), "/" + twillRuntimeSpec.getTwillAppName()
+      )
+    );
   }
 
   private void configureLogger() {
@@ -188,84 +197,23 @@ public abstract class ServiceMain {
     }
 
     LoggerContext context = (LoggerContext) loggerFactory;
-    context.reset();
-    JoranConfigurator configurator = new JoranConfigurator();
-    configurator.setContext(context);
-
-    try {
-      File twillLogback = new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.LOGBACK_TEMPLATE);
-      if (twillLogback.exists()) {
-        configurator.doConfigure(twillLogback);
-      }
-      new ContextInitializer(context).autoConfig();
-    } catch (JoranException e) {
-      throw Throwables.propagate(e);
-    }
-    doConfigure(configurator, getLogConfig(getLoggerLevel(context.getLogger(Logger.ROOT_LOGGER_NAME))));
-  }
-
-  private void doConfigure(JoranConfigurator configurator, String config) {
-    try {
-      configurator.doConfigure(new InputSource(new StringReader(config)));
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  private String getLogConfig(String rootLevel) {
-    return
-      "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
-      "<configuration>\n" +
-      "    <appender name=\"KAFKA\" class=\"" + KafkaAppender.class.getName() + "\">\n" +
-      "        <topic>" + Constants.LOG_TOPIC + "</topic>\n" +
-      "        <hostname>" + getHostname() + "</hostname>\n" +
-      "        <zookeeper>" + getKafkaZKConnect() + "</zookeeper>\n" +
-      appendRunnable() +
-      "    </appender>\n" +
-      "    <logger name=\"org.apache.twill.internal.logging\" additivity=\"false\" />\n" +
-      "    <root level=\"" + rootLevel + "\">\n" +
-      "        <appender-ref ref=\"KAFKA\"/>\n" +
-      "    </root>\n" +
-      "</configuration>";
-  }
-
 
-  private String appendRunnable() {
-    // RunnableName for AM is null, so append runnable name to log config only if the name is not null.
-    if (getRunnableName() == null) {
-     return "";
-    } else {
-      return "        <runnableName>" + getRunnableName() + "</runnableName>\n";
+    // Attach the KafkaAppender to the root logger
+    KafkaAppender kafkaAppender = new KafkaAppender();
+    kafkaAppender.setName("KAFKA");
+    kafkaAppender.setTopic(Constants.LOG_TOPIC);
+    kafkaAppender.setHostname(getHostname());
+    // The Kafka ZK Connection shouldn't be null as this method only get called if log collection is enabled
+    kafkaAppender.setZookeeper(getTwillRuntimeSpecification().getKafkaZKConnect());
+    String runnableName = getRunnableName();
+    if (runnableName != null) {
+      kafkaAppender.setRunnableName(runnableName);
     }
-  }
 
-  /**
-   * Return the right log level for the service.
-   *
-   * @param logger the {@link Logger} instance of the service context.
-   * @return String of log level based on {@code slf4j} log levels.
-   */
-  private String getLoggerLevel(Logger logger) {
-    if (logger instanceof ch.qos.logback.classic.Logger) {
-      return ((ch.qos.logback.classic.Logger) logger).getLevel().toString();
-    }
+    kafkaAppender.setContext(context);
+    kafkaAppender.start();
 
-    if (logger.isTraceEnabled()) {
-      return "TRACE";
-    }
-    if (logger.isDebugEnabled()) {
-      return "DEBUG";
-    }
-    if (logger.isInfoEnabled()) {
-      return "INFO";
-    }
-    if (logger.isWarnEnabled()) {
-      return "WARN";
-    }
-    if (logger.isErrorEnabled()) {
-      return "ERROR";
-    }
-    return "OFF";
+    context.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME).addAppender(kafkaAppender);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
index dd4d946..a38a163 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
@@ -20,6 +20,7 @@ package org.apache.twill.internal.appmaster;
 import org.apache.twill.api.LocalFile;
 
 import java.util.List;
+import javax.annotation.Nullable;
 
 /**
  * Represents data being stored in the live node of the application master.
@@ -30,13 +31,16 @@ public final class ApplicationMasterLiveNodeData {
   private final long appIdClusterTime;
   private final String containerId;
   private final List<LocalFile> localFiles;
+  private final String kafkaZKConnect;
 
   public ApplicationMasterLiveNodeData(int appId, long appIdClusterTime,
-                                       String containerId, List<LocalFile> localFiles) {
+                                       String containerId, List<LocalFile> localFiles,
+                                       @Nullable String kafkaZKConnect) {
     this.appId = appId;
     this.appIdClusterTime = appIdClusterTime;
     this.containerId = containerId;
     this.localFiles = localFiles;
+    this.kafkaZKConnect = kafkaZKConnect;
   }
 
   public int getAppId() {
@@ -55,6 +59,15 @@ public final class ApplicationMasterLiveNodeData {
     return localFiles;
   }
 
+  /**
+   * @return the Kafka ZK connection string for the Kafka used for log collection;
+   *         if log collection is turned off, a {@code null} value will be returned.
+   */
+  @Nullable
+  public String getKafkaZKConnect() {
+    return kafkaZKConnect;
+  }
+
   @Override
   public String toString() {
     return "ApplicationMasterLiveNodeData{" +

http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 8dd5046..81c61ac 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -17,14 +17,15 @@
  */
 package org.apache.twill.internal.appmaster;
 
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.Service;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.twill.api.RunId;
 import org.apache.twill.internal.Constants;
-import org.apache.twill.internal.EnvKeys;
 import org.apache.twill.internal.ServiceMain;
 import org.apache.twill.internal.TwillRuntimeSpecification;
 import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
@@ -45,21 +46,20 @@ import java.io.File;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 
 /**
  * Main class for launching {@link ApplicationMasterService}.
  */
 public final class ApplicationMasterMain extends ServiceMain {
 
-  private final String kafkaZKConnect;
-
-  private ApplicationMasterMain(String kafkaZKConnect) {
-    this.kafkaZKConnect = kafkaZKConnect;
-  }
+  private static final Logger LOG = LoggerFactory.getLogger(ApplicationMasterMain.class);
+  private final TwillRuntimeSpecification twillRuntimeSpec;
 
   /**
    * Starts the application master.
@@ -67,10 +67,18 @@ public final class ApplicationMasterMain extends ServiceMain {
   public static void main(String[] args) throws Exception {
     File twillSpec = new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.TWILL_SPEC);
     TwillRuntimeSpecification twillRuntimeSpec = TwillRuntimeSpecificationAdapter.create().fromJson(twillSpec);
-    String zkConnect = twillRuntimeSpec.getZkConnectStr();
+
+    new ApplicationMasterMain(twillRuntimeSpec).doMain();
+  }
+
+  private ApplicationMasterMain(TwillRuntimeSpecification twillRuntimeSpec) {
+    this.twillRuntimeSpec = twillRuntimeSpec;
+  }
+
+  private void doMain() throws Exception {
     RunId runId = twillRuntimeSpec.getTwillAppRunId();
 
-    ZKClientService zkClientService = createZKClient(zkConnect, twillRuntimeSpec.getTwillAppName());
+    ZKClientService zkClientService = createZKClient();
     Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
     setRMSchedulerAddress(conf, twillRuntimeSpec.getRmSchedulerAddr());
 
@@ -81,13 +89,22 @@ public final class ApplicationMasterMain extends ServiceMain {
                                                      twillRuntimeSpec.getTwillAppDir()));
     TrackerService trackerService = new TrackerService(service);
 
-    new ApplicationMasterMain(service.getKafkaZKConnect())
+    List<Service> prerequisites = Lists.newArrayList(
+      new YarnAMClientService(amClient, trackerService),
+      zkClientService,
+      new AppMasterTwillZKPathService(zkClientService, runId)
+    );
+
+    if (twillRuntimeSpec.isLogCollectionEnabled()) {
+      prerequisites.add(new ApplicationKafkaService(zkClientService, twillRuntimeSpec.getKafkaZKConnect()));
+    } else {
+      LOG.info("Log collection through kafka disabled");
+    }
+
+    new ApplicationMasterMain(twillRuntimeSpec)
       .doMain(
         service,
-        new YarnAMClientService(amClient, trackerService),
-        zkClientService,
-        new AppMasterTwillZKPathService(zkClientService, runId),
-        new ApplicationKafkaService(zkClientService, runId)
+        prerequisites.toArray(new Service[prerequisites.size()])
       );
   }
 
@@ -117,13 +134,15 @@ public final class ApplicationMasterMain extends ServiceMain {
   }
 
   @Override
-  protected String getKafkaZKConnect() {
-    return kafkaZKConnect;
+  protected TwillRuntimeSpecification getTwillRuntimeSpecification() {
+    return twillRuntimeSpec;
   }
 
+  @Nullable
   @Override
   protected String getRunnableName() {
-    return System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
+    // No runnable name for the AM
+    return null;
   }
 
   /**
@@ -135,13 +154,13 @@ public final class ApplicationMasterMain extends ServiceMain {
     private static final Logger LOG = LoggerFactory.getLogger(ApplicationKafkaService.class);
 
     private final ZKClient zkClient;
-    private final String kafkaZKPath;
     private final EmbeddedKafkaServer kafkaServer;
+    private final String kafkaZKPath;
 
-    private ApplicationKafkaService(ZKClient zkClient, RunId runId) {
+    private ApplicationKafkaService(ZKClient zkClient, String kafkaZKConnect) {
       this.zkClient = zkClient;
-      this.kafkaZKPath = "/" + runId.getId() + "/kafka";
-      this.kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkClient.getConnectString() + kafkaZKPath));
+      this.kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(kafkaZKConnect));
+      this.kafkaZKPath = kafkaZKConnect.substring(zkClient.getConnectString().length());
     }
 
     @Override
@@ -236,7 +255,7 @@ public final class ApplicationMasterMain extends ServiceMain {
     private static final Logger LOG = LoggerFactory.getLogger(AppMasterTwillZKPathService.class);
     private final ZKClient zkClient;
 
-    public AppMasterTwillZKPathService(ZKClient zkClient, RunId runId) {
+    AppMasterTwillZKPathService(ZKClient zkClient, RunId runId) {
       super(zkClient, runId);
       this.zkClient = zkClient;
     }
@@ -258,8 +277,7 @@ public final class ApplicationMasterMain extends ServiceMain {
 
       // Try to delete children under /discovery. It may fail with NotEmptyException if there are other instances
       // of the same app running that has discovery services running.
-      List<String> children = zkClient.getChildren(Constants.DISCOVERY_PATH_PREFIX)
-                                      .get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getChildren();
+      List<String> children = getChildren(Constants.DISCOVERY_PATH_PREFIX);
       List<OperationFuture<?>> deleteFutures = new ArrayList<>();
       for (String child : children) {
         String path = Constants.DISCOVERY_PATH_PREFIX + "/" + child;
@@ -272,8 +290,15 @@ public final class ApplicationMasterMain extends ServiceMain {
           future.get();
         } catch (ExecutionException e) {
           if (e.getCause() instanceof KeeperException.NotEmptyException) {
+            // If any deletion of the service failed with not empty, if means there are other apps running,
+            // hence just return
             return;
           }
+          if (e.getCause() instanceof KeeperException.NoNodeException) {
+            // If the service node is gone, it maybe deleted by another app instance that is also shutting down,
+            // hence just keep going
+            continue;
+          }
           throw e;
         }
       }
@@ -304,6 +329,29 @@ public final class ApplicationMasterMain extends ServiceMain {
         if (e.getCause() instanceof KeeperException.NotEmptyException) {
           return false;
         }
+        if (e.getCause() instanceof KeeperException.NoNodeException) {
+          // If the node to be deleted was not created or is already gone, it is the same as delete successfully.
+          return true;
+        }
+        throw e;
+      }
+    }
+
+    /**
+     * Returns the list of children node under the given path.
+     *
+     * @param path path to get children
+     * @return the list of children or empty list if the path doesn't exist.
+     * @throws Exception if failed to get children
+     */
+    private List<String> getChildren(String path) throws Exception {
+      try {
+        return zkClient.getChildren(path).get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getChildren();
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof KeeperException.NoNodeException) {
+          // If the node doesn't exists, return an empty list
+          return Collections.emptyList();
+        }
         throw e;
       }
     }

http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 368c7b8..0f647cd 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -157,7 +157,8 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
 
     this.amLiveNode = new ApplicationMasterLiveNodeData(Integer.parseInt(System.getenv(EnvKeys.YARN_APP_ID)),
                                                         Long.parseLong(System.getenv(EnvKeys.YARN_APP_ID_CLUSTER_TIME)),
-                                                        amClient.getContainerId().toString(), getLocalizeFiles());
+                                                        amClient.getContainerId().toString(), getLocalizeFiles(),
+                                                        twillRuntimeSpec.getKafkaZKConnect());
 
     this.expectedContainers = new ExpectedContainers(twillSpec);
     this.runningContainers = createRunningContainers(amClient.getContainerId(), amClient.getHost());
@@ -657,8 +658,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
       if (environments.containsKey(runnableName)) {
         env.putAll(environments.get(runnableName));
       }
-      // Override with system env
-      env.put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect());
 
       ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(env,
                                                                                          amLiveNode.getLocalFiles(),
@@ -714,10 +713,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
     return String.format("/%s/runnables/%s", runId.getId(), runnableName);
   }
 
-  String getKafkaZKConnect() {
-    return String.format("%s/%s/kafka", zkClient.getConnectString(), runId.getId());
-  }
-
   /**
    * Attempts to change the number of running instances.
    *

http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index a5efb41..2baaca1 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -66,20 +66,26 @@ public final class TwillContainerMain extends ServiceMain {
 
   private static final Logger LOG = LoggerFactory.getLogger(TwillContainerMain.class);
 
+  private final TwillRuntimeSpecification twillRuntimeSpec;
+
   /**
    * Main method for launching a {@link TwillContainerService} which runs
    * a {@link org.apache.twill.api.TwillRunnable}.
    */
   public static void main(String[] args) throws Exception {
-    new TwillContainerMain().doMain();
+    File twillSpecFile = new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.TWILL_SPEC);
+    TwillRuntimeSpecification twillRuntimeSpec = TwillRuntimeSpecificationAdapter.create().fromJson(twillSpecFile);
+
+    new TwillContainerMain(twillRuntimeSpec).doMain();
+  }
+
+  private TwillContainerMain(TwillRuntimeSpecification twillRuntimeSpec) {
+    this.twillRuntimeSpec = twillRuntimeSpec;
   }
 
   private void doMain() throws Exception {
     // Try to load the secure store from localized file, which AM requested RM to localize it for this container.
     loadSecureStore();
-    File twillSpecFile = new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.TWILL_SPEC);
-    TwillRuntimeSpecification twillRuntimeSpec = loadTwillSpec(twillSpecFile);
-    String zkConnectStr = twillRuntimeSpec.getZkConnectStr();
     RunId appRunId = twillRuntimeSpec.getTwillAppRunId();
     RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
     String runnableName = System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
@@ -98,7 +104,7 @@ public final class TwillContainerMain extends ServiceMain {
       logLevels.putAll(dynamicLogLevels);
     }
 
-    ZKClientService zkClientService = createZKClient(zkConnectStr, twillRuntimeSpec.getTwillAppName());
+    ZKClientService zkClientService = createZKClient();
     ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
 
     ZKClient appRunZkClient = getAppRunZKClient(zkClientService, appRunId);
@@ -170,12 +176,6 @@ public final class TwillContainerMain extends ServiceMain {
     return classLoader;
   }
 
-  private static TwillRuntimeSpecification loadTwillSpec(File specFile) throws IOException {
-    try (Reader reader = Files.newReader(specFile, Charsets.UTF_8)) {
-      return TwillRuntimeSpecificationAdapter.create().fromJson(reader);
-    }
-  }
-
   private static Map<String, Map<String, String>> loadLogLevels() throws IOException {
     File file = new File(Constants.Files.LOG_LEVELS);
     if (file.exists()) {
@@ -198,8 +198,8 @@ public final class TwillContainerMain extends ServiceMain {
   }
 
   @Override
-  protected String getKafkaZKConnect() {
-    return System.getenv(EnvKeys.TWILL_LOG_KAFKA_ZK);
+  protected TwillRuntimeSpecification getTwillRuntimeSpecification() {
+    return twillRuntimeSpec;
   }
 
   @Override
@@ -207,7 +207,6 @@ public final class TwillContainerMain extends ServiceMain {
     return System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
   }
 
-
   /**
    * Simple service that force flushing logs on stop.
    */

http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
index 61306d6..1945731 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
@@ -77,7 +77,7 @@ final class YarnTwillController extends AbstractTwillController implements Twill
    */
   YarnTwillController(String appName, RunId runId, ZKClient zkClient,
                       final ApplicationMasterLiveNodeData amLiveNodeData, final YarnAppClient yarnAppClient) {
-    super(runId, zkClient, Collections.<LogHandler>emptyList());
+    super(appName, runId, zkClient, amLiveNodeData.getKafkaZKConnect() != null, Collections.<LogHandler>emptyList());
     this.appName = appName;
     this.amLiveNodeData = amLiveNodeData;
     this.startUp = new Callable<ProcessController<YarnApplicationReport>>() {
@@ -91,10 +91,10 @@ final class YarnTwillController extends AbstractTwillController implements Twill
     this.startTimeoutUnit = TimeUnit.SECONDS;
   }
 
-  YarnTwillController(String appName, RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers,
-                      Callable<ProcessController<YarnApplicationReport>> startUp,
+  YarnTwillController(String appName, RunId runId, ZKClient zkClient, boolean logCollectionEnabled,
+                      Iterable<LogHandler> logHandlers, Callable<ProcessController<YarnApplicationReport>> startUp,
                       long startTimeout, TimeUnit startTimeoutUnit) {
-    super(runId, zkClient, logHandlers);
+    super(appName, runId, zkClient, logCollectionEnabled, logHandlers);
     this.appName = appName;
     this.startUp = startUp;
     this.startTimeout = startTimeout;

http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
index 61fb7cc..40de6a6 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
@@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit;
  */
 interface YarnTwillControllerFactory {
 
-  YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
+  YarnTwillController create(RunId runId, boolean logCollectionEnabled, Iterable<LogHandler> logHandlers,
                              Callable<ProcessController<YarnApplicationReport>> startUp,
                              long startTimeout, TimeUnit startTimeoutUnit);
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 2d1edd0..4846fe3 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -419,7 +419,10 @@ final class YarnTwillPreparer implements TwillPreparer {
           }
         };
 
-      YarnTwillController controller = controllerFactory.create(runId, logHandlers, submitTask, timeout, timeoutUnit);
+      boolean logCollectionEnabled = config.getBoolean(Configs.Keys.LOG_COLLECTION_ENABLED,
+                                                       Configs.Defaults.LOG_COLLECTION_ENABLED);
+      YarnTwillController controller = controllerFactory.create(runId, logCollectionEnabled,
+                                                                logHandlers, submitTask, timeout, timeoutUnit);
       controller.start();
       return controller;
     } catch (Exception e) {
@@ -671,11 +674,13 @@ final class YarnTwillPreparer implements TwillPreparer {
       }
       TwillSpecification newTwillSpec = new DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(),
                                                                       spec.getPlacementPolicies(), eventHandler);
+      boolean logCollectionEnabled = config.getBoolean(Configs.Keys.LOG_COLLECTION_ENABLED,
+                                                       Configs.Defaults.LOG_COLLECTION_ENABLED);
       TwillRuntimeSpecificationAdapter.create().toJson(
         new TwillRuntimeSpecification(newTwillSpec, appLocation.getLocationFactory().getHomeLocation().getName(),
                                       appLocation.toURI(), zkConnectString, runId, twillSpec.getName(),
                                       getReservedMemory(), config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS),
-                                      logLevels, maxRetries, getMinHeapRatio()), writer);
+                                      logLevels, maxRetries, getMinHeapRatio(), logCollectionEnabled), writer);
     }
     LOG.debug("Done {}", targetFile);
   }

http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index d8e48de..d2b53b9 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -291,11 +291,12 @@ public final class YarnTwillRunnerService implements TwillRunnerService {
                                  appLocation, twillClassPaths, jvmOptions,
                                  locationCache, new YarnTwillControllerFactory() {
       @Override
-      public YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
+      public YarnTwillController create(RunId runId, boolean logCollectionEnabled, Iterable<LogHandler> logHandlers,
                                         Callable<ProcessController<YarnApplicationReport>> startUp,
                                         long startTimeout, TimeUnit startTimeoutUnit) {
         ZKClient zkClient = ZKClients.namespace(zkClientService, "/" + appName);
         YarnTwillController controller = listenController(new YarnTwillController(appName, runId, zkClient,
+                                                                                  logCollectionEnabled,
                                                                                   logHandlers, startUp,
                                                                                   startTimeout, startTimeoutUnit));
         synchronized (YarnTwillRunnerService.this) {

http://git-wip-us.apache.org/repos/asf/twill/blob/3045b91b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
index ad0a837..902c146 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
@@ -17,26 +17,32 @@
  */
 package org.apache.twill.yarn;
 
+import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.Configs;
 import org.apache.twill.api.TwillController;
 import org.apache.twill.api.TwillRunner;
 import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.api.logging.LogHandler;
 import org.apache.twill.api.logging.LogThrowable;
 import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.common.Threads;
+import org.apache.twill.discovery.ServiceDiscovered;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.PrintWriter;
+import java.util.Collections;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Test for LogHandler able to receive logs from AM and runnable.
@@ -100,6 +106,42 @@ public class LogHandlerTestRun extends BaseYarnTest {
     Assert.assertEquals("Exception", t.getMessage());
   }
 
+  @Test
+  public void testDisableLogCollection() throws Exception {
+    final AtomicBoolean logReceived = new AtomicBoolean();
+
+    // Start the LogRunnable by turning off log collection
+    TwillRunner runner = getTwillRunner();
+    TwillController controller = runner.prepare(new LogRunnable())
+      .withConfiguration(Collections.singletonMap(Configs.Keys.LOG_COLLECTION_ENABLED, "false"))
+      .addLogHandler(new LogHandler() {
+        @Override
+        public void onLog(LogEntry logEntry) {
+          logReceived.set(true);
+        }
+      })
+      .start();
+
+    // Make sure the runnable gets executed
+    try {
+      final CountDownLatch latch = new CountDownLatch(1);
+      controller.discoverService("log").watchChanges(new ServiceDiscovered.ChangeListener() {
+        @Override
+        public void onChange(ServiceDiscovered serviceDiscovered) {
+          if (Iterables.size(serviceDiscovered) == 1) {
+            latch.countDown();
+          }
+        }
+      }, Threads.SAME_THREAD_EXECUTOR);
+      Assert.assertTrue(latch.await(120, TimeUnit.SECONDS));
+    } finally {
+      controller.terminate().get(120, TimeUnit.SECONDS);
+    }
+
+    // Should receive no log
+    Assert.assertFalse("Not expecting logs collected", logReceived.get());
+  }
+
   /**
    * TwillRunnable for the test case to simply emit one log line.
    */
@@ -121,6 +163,8 @@ public class LogHandlerTestRun extends BaseYarnTest {
       } catch (Throwable t) {
         LOG.error("Got exception", t);
       }
+      // Announce so that test case knows the code reaches here.
+      getContext().announce("log", 12345);
 
       Uninterruptibles.awaitUninterruptibly(stopLatch);
     }