You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2016/12/11 23:38:31 UTC

[2/2] twill git commit: (TWILL-138) Change Log level at Runtime

(TWILL-138) Change Log level at Runtime

- Added new methods in TwillPreparer for setting log levels
- Added new methods in TwillController for setting and resetting log levels
- Exposes log levels throuhg TwillResources for each logger that has been set through the TwillPreparer or TwillController

This closes #14 on Github

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/21a3734d
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/21a3734d
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/21a3734d

Branch: refs/heads/master
Commit: 21a3734d8bbfc3fa95173b83fbc6f7d07e0afc01
Parents: 6837f15
Author: yaojiefeng <ya...@cask.co>
Authored: Wed Oct 5 16:17:26 2016 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Sun Dec 11 15:38:04 2016 -0800

----------------------------------------------------------------------
 .../org/apache/twill/api/TwillController.java   |  49 ++++
 .../org/apache/twill/api/TwillPreparer.java     |  23 +-
 .../org/apache/twill/api/TwillRunResources.java |   8 +
 .../internal/DefaultTwillRunResources.java      |  32 ++-
 .../org/apache/twill/internal/Constants.java    |   3 +-
 .../twill/internal/AbstractTwillController.java |  27 +-
 .../twill/internal/AbstractTwillService.java    |  31 ++-
 .../twill/internal/ContainerLiveNodeData.java   |  22 +-
 .../twill/internal/DefaultResourceReport.java   |  10 +-
 .../twill/internal/TwillContainerLauncher.java  |  13 +-
 .../internal/TwillRuntimeSpecification.java     |  15 +-
 .../internal/json/ResourceReportCodec.java      |   8 +-
 .../internal/json/TwillRunResourcesCodec.java   |  26 +-
 .../json/TwillRuntimeSpecificationAdapter.java  |   3 +
 .../json/TwillRuntimeSpecificationCodec.java    |  22 +-
 .../twill/internal/state/MessageCodec.java      |   1 +
 .../twill/internal/state/SystemMessages.java    |  67 ++++-
 .../org/apache/twill/internal/ServiceMain.java  |   3 -
 .../appmaster/ApplicationMasterMain.java        |   7 +-
 .../appmaster/ApplicationMasterService.java     |  51 +++-
 .../internal/appmaster/RunningContainers.java   | 112 +++++++-
 .../internal/container/TwillContainerMain.java  |  56 +++-
 .../container/TwillContainerService.java        | 105 ++++++-
 .../apache/twill/yarn/YarnTwillPreparer.java    |  50 +++-
 .../twill/yarn/LogLevelChangeTestRun.java       | 278 +++++++++++++++++++
 .../org/apache/twill/yarn/LogLevelTestRun.java  |  39 ++-
 .../org/apache/twill/yarn/YarnTestSuite.java    |   1 +
 27 files changed, 938 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-api/src/main/java/org/apache/twill/api/TwillController.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillController.java b/twill-api/src/main/java/org/apache/twill/api/TwillController.java
index 08206f5..f5004ae 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillController.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillController.java
@@ -17,6 +17,7 @@
  */
 package org.apache.twill.api;
 
+import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.api.logging.LogHandler;
 import org.apache.twill.discovery.Discoverable;
 import org.apache.twill.discovery.ServiceDiscovered;
@@ -89,4 +90,52 @@ public interface TwillController extends ServiceController {
    * @return A {@link Future} that will be completed when the restart operation has been done.
    */
   Future<String> restartInstances(String runnable, int instanceId, int... moreInstanceIds);
+
+  /**
+   * Update the log levels for requested logger names for Twill applications running in a container.
+   * The log level for a logger name can be {@code null} except for the root logger, which will reset the log level for
+   * the specified logger.
+   *
+   * @param logLevels The {@link Map} contains the requested logger names and log levels that need to be updated.
+   * @return A {@link Future} that will be completed when the log level update has been done. It will carry the
+   *         {@link Map} of log levels as the result.
+   */
+  Future<Map<String, LogEntry.Level>> updateLogLevels(Map<String, LogEntry.Level> logLevels);
+
+  /**
+   * Update the log levels for requested logger names for a {@link TwillRunnable}.
+   * The log level for a logger name can be {@code null} except for the root logger, which will reset the log level for
+   * the specified logger.
+   *
+   * @param runnableName The name of the runnable to update the log level.
+   * @param logLevelsForRunnable The {@link Map} contains the requested logger name and log level that
+   *                             need to be updated.
+   * @return A {@link Future} that will be completed when the log level update has been done. It will carry the
+   *         {@link Map} of log levels as the result.
+   */
+  Future<Map<String, LogEntry.Level>> updateLogLevels(String runnableName,
+                                                      Map<String, LogEntry.Level> logLevelsForRunnable);
+
+  /**
+   * Reset the log levels of all runnables.
+   * The log levels will be the same as when the runnables start up.
+   *
+   * @param loggerNames The optional logger names to be reset for all runnables, if not provided, all log levels will
+   *                    be reset.
+   * @return A {@link Future} that will be completed when the set log level operation has been done. The future result
+   *         is the logger names provided in the parameter.
+   */
+  Future<String[]> resetLogLevels(String...loggerNames);
+
+  /**
+   * Reset the log levels of the given runnable.
+   * The log levels will be same as when the runnable starts up.
+   *
+   * @param loggerNames The optional logger names to be reset for the runnable, if not provided, all log levels will
+   *                    be reset.
+   * @return A {@link Future} that will be completed when the set log level operation has been done. The future result
+   *         is the logger names provided in the parameter.
+   */
+  Future<String[]> resetRunnableLogLevels(String runnableName, String...loggerNames);
+
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
index d7d5529..575ba8f 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
@@ -227,15 +227,36 @@ public interface TwillPreparer {
   TwillPreparer addSecureStore(SecureStore secureStore);
 
   /**
-   * Set the log level for Twill applications running in a container.
+   * Set the root log level for Twill applications in all containers.
    *
    * @param logLevel the {@link LogEntry.Level} that should be set.
    *                 The level match the {@code Logback} levels.
    * @return This {@link TwillPreparer}.
+   * @deprecated Use {@link #setLogLevels(Map)} with key {@link org.slf4j.Logger#ROOT_LOGGER_NAME} instead.
    */
+  @Deprecated
   TwillPreparer setLogLevel(LogEntry.Level logLevel);
 
   /**
+   * Set the log levels for requested logger names for Twill applications running in a container. The log level of any
+   * logger cannot be {@code null}, if there is {@code null} value, a {@link IllegalArgumentException} will be thrown.
+   *
+   * @param logLevels The {@link Map} contains the requested logger names and log levels that need to be set.
+   * @return This {@link TwillPreparer}.
+   */
+  TwillPreparer setLogLevels(Map<String, LogEntry.Level> logLevels);
+
+  /**
+   * Set the log levels for requested logger names for a {@link TwillRunnable}. The log level of any logger cannot be
+   * {@code null}, if there is {@code null} value, a {@link IllegalArgumentException} will be thrown.
+   *
+   * @param runnableName The name of the runnable to set the log level.
+   * @param logLevelsForRunnable The {@link Map} contains the requested logger names and log levels that need to be set.
+   * @return This {@link TwillPreparer}.
+   */
+  TwillPreparer setLogLevels(String runnableName, Map<String, LogEntry.Level> logLevelsForRunnable);
+
+  /**
    * Starts the application.
    * @return A {@link TwillController} for controlling the running application.
    */

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java b/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
index 1a665ea..f721f47 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
@@ -19,6 +19,8 @@ package org.apache.twill.api;
 
 import org.apache.twill.api.logging.LogEntry;
 
+import java.util.Map;
+
 /**
  * Information about the container the {@link TwillRunnable}
  * is running in.
@@ -58,7 +60,13 @@ public interface TwillRunResources {
 
   /**
    * @return the enabled log level for the container where the runnable is running in.
+   * @deprecated Use {@link #getLogLevels()} to get the log levels map and get root level from the map instead.
    */
+  @Deprecated
   LogEntry.Level getLogLevel();
 
+  /**
+   * @return the enabled log level arguments for the container where the runnable is running in.
+   */
+  Map<String, LogEntry.Level> getLogLevels();
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
index 93ef90e..83b973a 100644
--- a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
+++ b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
@@ -18,7 +18,14 @@
 package org.apache.twill.internal;
 
 import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.api.logging.LogEntry.Level;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
 
 /**
  *  Straightforward implementation of {@link org.apache.twill.api.TwillRunResources}.
@@ -30,17 +37,25 @@ public class DefaultTwillRunResources implements TwillRunResources {
   private final int memoryMB;
   private final String host;
   private final Integer debugPort;
-  private final Level logLevel;
+  private final Map<String, LogEntry.Level> logLevels;
 
+  /**
+   * Constructor to create an instance of {@link DefaultTwillRunResources} with empty log levels.
+   */
   public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB,
-                                  String host, Integer debugPort, Level logLevel) {
+                                  String host, Integer debugPort) {
+    this(instanceId, containerId, cores, memoryMB, host, debugPort, Collections.<String, Level>emptyMap());
+  }
+
+  public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB,
+                                  String host, Integer debugPort, Map<String, LogEntry.Level> logLevels) {
     this.instanceId = instanceId;
     this.containerId = containerId;
     this.virtualCores = cores;
     this.memoryMB = memoryMB;
     this.host = host;
     this.debugPort = debugPort;
-    this.logLevel = logLevel;
+    this.logLevels = new HashMap<>(logLevels);
   }
 
   /**
@@ -90,8 +105,15 @@ public class DefaultTwillRunResources implements TwillRunResources {
   }
 
   @Override
+  @Deprecated
+  @Nullable
   public Level getLogLevel() {
-    return logLevel;
+    return getLogLevels().get(Logger.ROOT_LOGGER_NAME);
+  }
+
+  @Override
+  public Map<String, LogEntry.Level> getLogLevels() {
+    return logLevels;
   }
 
   @Override
@@ -129,7 +151,7 @@ public class DefaultTwillRunResources implements TwillRunResources {
       ", memoryMB=" + memoryMB +
       ", host='" + host + '\'' +
       ", debugPort=" + debugPort +
-      ", logLevel=" + logLevel +
+      ", logLevels=" + logLevels +
       '}';
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-common/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/internal/Constants.java b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
index 8a33962..40b988f 100644
--- a/twill-common/src/main/java/org/apache/twill/internal/Constants.java
+++ b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
@@ -52,7 +52,7 @@ public final class Constants {
   public static final String RESTART_RUNNABLES_INSTANCES = "restartRunnablesInstances";
 
   /**
-   * Common ZK paths constants
+   * Common ZK paths constants.
    */
   public static final String DISCOVERY_PATH_PREFIX = "/discoverable";
   public static final String INSTANCES_PATH_PREFIX = "/instances";
@@ -77,6 +77,7 @@ public final class Constants {
     public static final String LOGBACK_TEMPLATE = "logback-template.xml";
     public static final String JVM_OPTIONS = "jvm.opts";
     public static final String CREDENTIALS = "credentials.store";
+    public static final String LOG_LEVELS = "logLevel.json";
 
     private Files() {
     }

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index e60f6a3..5a1c5b3 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -19,6 +19,7 @@ package org.apache.twill.internal;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
@@ -59,6 +60,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
 
 /**
  * A abstract base class for {@link TwillController} implementation that uses Zookeeper to controller a
@@ -136,8 +138,8 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
   }
 
   @Override
-  public final ListenableFuture<Set<String>> restartInstances(Map<String,
-      ? extends Set<Integer>> runnableToInstanceIds) {
+  public final ListenableFuture<Set<String>> restartInstances(
+    Map<String, ? extends Set<Integer>> runnableToInstanceIds) {
     Map<String, String> runnableToStringInstanceIds =
       Maps.transformEntries(runnableToInstanceIds, new Maps.EntryTransformer<String, Set<Integer>, String>() {
         @Override
@@ -170,6 +172,27 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
     });
   }
 
+  @Override
+  public Future<Map<String, LogEntry.Level>> updateLogLevels(Map<String, LogEntry.Level> logLevels) {
+    return sendMessage(SystemMessages.updateLogLevels(logLevels), logLevels);
+  }
+
+  @Override
+  public Future<Map<String, LogEntry.Level>> updateLogLevels(String runnableName,
+                                                             Map<String, LogEntry.Level> runnableLogLevels) {
+    Preconditions.checkNotNull(runnableName);
+    return sendMessage(SystemMessages.updateLogLevels(runnableName, runnableLogLevels), runnableLogLevels);
+  }
+
+  @Override
+  public Future<String[]> resetLogLevels(String...loggerNames) {
+    return sendMessage(SystemMessages.resetLogLevels(Sets.newHashSet(loggerNames)), loggerNames);
+  }
+  @Override
+  public Future<String[]> resetRunnableLogLevels(String runnableName, String...loggerNames) {
+    return sendMessage(SystemMessages.resetLogLevels(runnableName, Sets.newHashSet(loggerNames)), loggerNames);
+  }
+
   private void validateInstanceIds(String runnable, Set<Integer> instanceIds) {
     ResourceReport resourceReport = getResourceReport();
     if (resourceReport == null) {

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
index 1717117..fc5b907 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Service;
 import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import com.google.gson.JsonObject;
 import org.apache.twill.api.RunId;
 import org.apache.twill.common.Cancellable;
@@ -84,7 +85,7 @@ import java.util.concurrent.TimeUnit;
 public abstract class AbstractTwillService extends AbstractExecutionThreadService implements MessageCallback {
 
   private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillService.class);
-  private static final Gson GSON = new Gson();
+  private static final Gson GSON = new GsonBuilder().serializeNulls().create();
 
   protected final ZKClient zkClient;
   protected final RunId runId;
@@ -197,16 +198,21 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
     }
   }
 
+  /**
+   * Update the live node for the runnable.
+   *
+   * @return A {@link OperationFuture} that will be completed when the update is done.
+   */
+  protected final OperationFuture<?> updateLiveNode() {
+    String liveNode = getLiveNodePath();
+    LOG.info("Update live node {}{}", zkClient.getConnectString(), liveNode);
+    return zkClient.setData(liveNode, toJson(getLiveNodeJsonObject()));
+  }
+
   private OperationFuture<String> createLiveNode() {
     String liveNode = getLiveNodePath();
     LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNode);
-
-    JsonObject content = new JsonObject();
-    Object liveNodeData = getLiveNodeData();
-    if (liveNodeData != null) {
-      content.add("data", GSON.toJsonTree(liveNodeData));
-    }
-    return ZKOperations.ignoreError(zkClient.create(liveNode, toJson(content), CreateMode.EPHEMERAL),
+    return ZKOperations.ignoreError(zkClient.create(liveNode, toJson(getLiveNodeJsonObject()), CreateMode.EPHEMERAL),
                                     KeeperException.NodeExistsException.class, liveNode);
   }
 
@@ -368,6 +374,15 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
     return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, runId.getId());
   }
 
+  private JsonObject getLiveNodeJsonObject() {
+    JsonObject content = new JsonObject();
+    Object liveNodeData = getLiveNodeData();
+    if (liveNodeData != null) {
+      content.add("data", GSON.toJsonTree(liveNodeData));
+    }
+    return content;
+  }
+
   private <T> byte[] toJson(T obj) {
     return GSON.toJson(obj).getBytes(Charsets.UTF_8);
   }

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java b/twill-core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
index 4c6f32e..64025b5 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
@@ -17,19 +17,25 @@
  */
 package org.apache.twill.internal;
 
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
 /**
  *
  */
 public final class ContainerLiveNodeData {
-
   private final String containerId;
   private final String host;
   private final String debugPort;
+  private final Map<String, String> logLevels;
 
-  public ContainerLiveNodeData(String containerId, String host, String debugPort) {
+  public ContainerLiveNodeData(String containerId, String host, String debugPort,
+                               Map<String, String> logLevels) {
     this.containerId = containerId;
     this.host = host;
     this.debugPort = debugPort;
+    this.logLevels = new HashMap<>(logLevels);
   }
 
   public String getContainerId() {
@@ -43,4 +49,16 @@ public final class ContainerLiveNodeData {
   public String getDebugPort() {
     return debugPort;
   }
+
+  public Map<String, String> getLogLevels() {
+    return logLevels;
+  }
+
+  public void setLogLevel(String loggerName, @Nullable String logLevel) {
+    logLevels.put(loggerName, logLevel);
+  }
+
+  public void removeLogLevel(String loggerName) {
+    logLevels.remove(loggerName);
+  }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/DefaultResourceReport.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/DefaultResourceReport.java b/twill-core/src/main/java/org/apache/twill/internal/DefaultResourceReport.java
index 08c93f8..6133db9 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/DefaultResourceReport.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/DefaultResourceReport.java
@@ -19,13 +19,13 @@ package org.apache.twill.internal;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimaps;
 import com.google.common.collect.SetMultimap;
 import org.apache.twill.api.ResourceReport;
 import org.apache.twill.api.TwillRunResources;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
@@ -41,12 +41,8 @@ public final class DefaultResourceReport implements ResourceReport {
   private final AtomicReference<List<String>> services;
 
   public DefaultResourceReport(String applicationId, TwillRunResources masterResources) {
-    this(applicationId, masterResources, ImmutableMap.<String, Collection<TwillRunResources>>of());
-  }
-
-  public DefaultResourceReport(String applicationId, TwillRunResources masterResources,
-                               Map<String, Collection<TwillRunResources>> resources) {
-    this(applicationId, masterResources, resources, ImmutableList.<String>of());
+    this(applicationId, masterResources, Collections.<String, Collection<TwillRunResources>>emptyMap(),
+         Collections.<String>emptyList());
   }
 
   public DefaultResourceReport(String applicationId, TwillRunResources masterResources,

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 53c378a..ff8ddb2 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -43,6 +43,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 
 /**
  * This class helps launching a container.
@@ -82,9 +83,11 @@ public final class TwillContainerLauncher {
    * @param instanceId The Twill instance Id.
    * @param mainClass The main class to run in the container.
    * @param classPath The class path to load classes for the container.
+   * @param logLevelLocation The log level file location for the container to localize.
    * @return instance of {@link TwillContainerController} to control the container run.
    */
-  public TwillContainerController start(RunId runId, int instanceId, Class<?> mainClass, String classPath) {
+  public TwillContainerController start(RunId runId, int instanceId, Class<?> mainClass, String classPath,
+                                        @Nullable Location logLevelLocation) {
     // Clean up zookeeper path in case this is a retry and there are old messages and state there.
     Futures.getUnchecked(ZKOperations.ignoreError(
       ZKOperations.recursiveDelete(zkClient, "/" + runId), KeeperException.NoNodeException.class, null));
@@ -92,7 +95,7 @@ public final class TwillContainerLauncher {
     // Adds all file to be localized to container
     launchContext.addResources(runtimeSpec.getLocalFiles());
 
-    // Optionally localize secure store.
+    // Optionally localize secure store and log level file.
     try {
       if (secureStoreLocation != null && secureStoreLocation.exists()) {
         launchContext.addResources(new DefaultLocalFile(Constants.Files.CREDENTIALS,
@@ -100,6 +103,12 @@ public final class TwillContainerLauncher {
                                                         secureStoreLocation.lastModified(),
                                                         secureStoreLocation.length(), false, null));
       }
+      if (logLevelLocation != null && logLevelLocation.exists()) {
+        launchContext.addResources(new DefaultLocalFile(Constants.Files.LOG_LEVELS,
+                                                        logLevelLocation.toURI(),
+                                                        logLevelLocation.lastModified(),
+                                                        logLevelLocation.length(), false, null));
+      }
     } catch (IOException e) {
       LOG.warn("Failed to launch container with secure store {}.", secureStoreLocation);
     }

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
index a48133f..387b01d 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
@@ -20,9 +20,9 @@ package org.apache.twill.internal;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.TwillApplication;
 import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.api.logging.LogEntry;
 
 import java.net.URI;
+import java.util.Map;
 import javax.annotation.Nullable;
 
 /**
@@ -39,12 +39,12 @@ public class TwillRuntimeSpecification {
   private final String twillAppName;
   private final int reservedMemory;
   private final String rmSchedulerAddr;
-  private final LogEntry.Level logLevel;
+  private final Map<String, Map<String, String>> logLevels;
 
   public TwillRuntimeSpecification(TwillSpecification twillSpecification, String fsUser, URI twillAppDir,
                                    String zkConnectStr, RunId twillRunId, String twillAppName,
                                    int reservedMemory, @Nullable String rmSchedulerAddr,
-                                   @Nullable LogEntry.Level logLevel) {
+                                   Map<String, Map<String, String>> logLevels) {
     this.twillSpecification = twillSpecification;
     this.fsUser = fsUser;
     this.twillAppDir = twillAppDir;
@@ -53,7 +53,7 @@ public class TwillRuntimeSpecification {
     this.twillAppName = twillAppName;
     this.reservedMemory = reservedMemory;
     this.rmSchedulerAddr = rmSchedulerAddr;
-    this.logLevel = logLevel;
+    this.logLevels = logLevels;
   }
 
   public TwillSpecification getTwillSpecification() {
@@ -72,7 +72,7 @@ public class TwillRuntimeSpecification {
     return zkConnectStr;
   }
 
-  public RunId getTwillRunId() {
+  public RunId getTwillAppRunId() {
     return twillRunId;
   }
 
@@ -89,8 +89,7 @@ public class TwillRuntimeSpecification {
     return rmSchedulerAddr;
   }
 
-  @Nullable
-  public LogEntry.Level getLogLevel() {
-    return logLevel;
+  public Map<String, Map<String, String>> getLogLevels() {
+    return logLevels;
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
index 475871c..63d8b80 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
@@ -27,6 +27,7 @@ import com.google.gson.JsonSerializer;
 import com.google.gson.reflect.TypeToken;
 import org.apache.twill.api.ResourceReport;
 import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.internal.DefaultResourceReport;
 
 import java.lang.reflect.Type;
@@ -45,12 +46,11 @@ public final class ResourceReportCodec implements JsonSerializer<ResourceReport>
     JsonObject json = new JsonObject();
 
     json.addProperty("appMasterId", src.getApplicationId());
-    json.add("appMasterResources", context.serialize(
-      src.getAppMasterResources(), new TypeToken<TwillRunResources>() { }.getType()));
+    json.add("appMasterResources", context.serialize(src.getAppMasterResources(), TwillRunResources.class));
     json.add("runnableResources", context.serialize(
       src.getResources(), new TypeToken<Map<String, Collection<TwillRunResources>>>() { }.getType()));
     json.add("services", context.serialize(
-      src.getServices(), new TypeToken<List<String>>() {}.getType()));
+      src.getServices(), new TypeToken<List<String>>() { }.getType()));
     return json;
   }
 
@@ -64,7 +64,7 @@ public final class ResourceReportCodec implements JsonSerializer<ResourceReport>
     Map<String, Collection<TwillRunResources>> resources = context.deserialize(
       jsonObj.get("runnableResources"), new TypeToken<Map<String, Collection<TwillRunResources>>>() { }.getType());
     List<String> services = context.deserialize(
-      jsonObj.get("services"), new TypeToken<List<String>>() {}.getType());
+      jsonObj.get("services"), new TypeToken<List<String>>() { }.getType());
 
     return new DefaultResourceReport(appMasterId, masterResources, resources, services);
   }

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
index 7dea371..bb4d435 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
@@ -24,11 +24,13 @@ import com.google.gson.JsonObject;
 import com.google.gson.JsonParseException;
 import com.google.gson.JsonSerializationContext;
 import com.google.gson.JsonSerializer;
+import com.google.gson.reflect.TypeToken;
 import org.apache.twill.api.TwillRunResources;
 import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.internal.DefaultTwillRunResources;
 
 import java.lang.reflect.Type;
+import java.util.Map;
 
 /**
  * Codec for serializing and deserializing a {@link org.apache.twill.api.TwillRunResources} object using json.
@@ -41,7 +43,7 @@ public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunReso
   private static final String MEMORY_MB = "memoryMB";
   private static final String VIRTUAL_CORES = "virtualCores";
   private static final String DEBUG_PORT = "debugPort";
-  private static final String LOG_LEVEL = "logLevel";
+  private static final String LOG_LEVELS = "logLevels";
 
   @Override
   public JsonElement serialize(TwillRunResources src, Type typeOfSrc, JsonSerializationContext context) {
@@ -55,9 +57,8 @@ public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunReso
     if (src.getDebugPort() != null) {
       json.addProperty(DEBUG_PORT, src.getDebugPort());
     }
-    if (src.getLogLevel() != null) {
-      json.addProperty(LOG_LEVEL, src.getLogLevel().toString());
-    }
+    json.add(LOG_LEVELS, context.serialize(src.getLogLevels(),
+                                           new TypeToken<Map<String, LogEntry.Level>>() { }.getType()));
 
     return json;
   }
@@ -66,13 +67,14 @@ public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunReso
   public TwillRunResources deserialize(JsonElement json, Type typeOfT,
                                            JsonDeserializationContext context) throws JsonParseException {
     JsonObject jsonObj = json.getAsJsonObject();
-    return new DefaultTwillRunResources(jsonObj.get("instanceId").getAsInt(),
-                                        jsonObj.get("containerId").getAsString(),
-                                        jsonObj.get("virtualCores").getAsInt(),
-                                        jsonObj.get("memoryMB").getAsInt(),
-                                        jsonObj.get("host").getAsString(),
-                                        jsonObj.has("debugPort") ? jsonObj.get("debugPort").getAsInt() : null,
-                                        jsonObj.has("logLevel") ? LogEntry.Level.valueOf(
-                                          jsonObj.get("logLevel").getAsString()) : LogEntry.Level.INFO);
+    Map<String, LogEntry.Level> logLevels =
+      context.deserialize(jsonObj.get(LOG_LEVELS), new TypeToken<Map<String, LogEntry.Level>>() { }.getType());
+    return new DefaultTwillRunResources(jsonObj.get(INSTANCE_ID).getAsInt(),
+                                        jsonObj.get(CONTAINER_ID).getAsString(),
+                                        jsonObj.get(VIRTUAL_CORES).getAsInt(),
+                                        jsonObj.get(MEMORY_MB).getAsInt(),
+                                        jsonObj.get(HOST).getAsString(),
+                                        jsonObj.has(DEBUG_PORT) ? jsonObj.get(DEBUG_PORT).getAsInt() : null,
+                                        logLevels);
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java
index ae9747d..4df9081 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java
@@ -46,6 +46,7 @@ import java.io.Writer;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.Map;
+import javax.annotation.Nullable;
 
 /**
  *
@@ -108,6 +109,7 @@ public final class TwillRuntimeSpecificationAdapter {
 
     @SuppressWarnings("unchecked")
     @Override
+    @Nullable
     public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) {
       Class<?> rawType = type.getRawType();
       if (!Map.class.isAssignableFrom(rawType)) {
@@ -141,6 +143,7 @@ public final class TwillRuntimeSpecificationAdapter {
         }
 
         @Override
+        @Nullable
         public Map<String, V> read(JsonReader reader) throws IOException {
           if (reader.peek() == JsonToken.NULL) {
             reader.nextNull();

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
index 6100c99..133371d 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
@@ -26,12 +26,12 @@ import com.google.gson.JsonParseException;
 import com.google.gson.JsonSerializationContext;
 import com.google.gson.JsonSerializer;
 import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.internal.RunIds;
 import org.apache.twill.internal.TwillRuntimeSpecification;
 
 import java.lang.reflect.Type;
 import java.net.URI;
+import java.util.Map;
 
 /**
  * Codec for serializing and deserializing a {@link TwillRuntimeSpecification} object using json.
@@ -46,8 +46,8 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
   private static final String TWILL_APP_NAME = "twillAppName";
   private static final String RESERVED_MEMORY = "reservedMemory";
   private static final String RM_SCHEDULER_ADDR = "rmSchedulerAddr";
-  private static final String LOG_LEVEL = "logLevel";
   private static final String TWILL_SPEC = "twillSpecification";
+  private static final String LOG_LEVELS = "logLevels";
 
   @Override
   public JsonElement serialize(TwillRuntimeSpecification src, Type typeOfSrc, JsonSerializationContext context) {
@@ -55,17 +55,16 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
     json.addProperty(FS_USER, src.getFsUser());
     json.addProperty(TWILL_APP_DIR, src.getTwillAppDir().toASCIIString());
     json.addProperty(ZK_CONNECT_STR, src.getZkConnectStr());
-    json.addProperty(TWILL_RUNID, src.getTwillRunId().getId());
+    json.addProperty(TWILL_RUNID, src.getTwillAppRunId().getId());
     json.addProperty(TWILL_APP_NAME, src.getTwillAppName());
     json.addProperty(RESERVED_MEMORY, src.getReservedMemory());
     if (src.getRmSchedulerAddr() != null) {
       json.addProperty(RM_SCHEDULER_ADDR, src.getRmSchedulerAddr());
     }
-    if (src.getLogLevel() != null) {
-      json.addProperty(LOG_LEVEL, src.getLogLevel().name());
-    }
-    json.add(TWILL_SPEC, context.serialize(src.getTwillSpecification(),
-                                           new TypeToken<TwillSpecification>() { }.getType()));
+    json.add(TWILL_SPEC,
+             context.serialize(src.getTwillSpecification(), new TypeToken<TwillSpecification>() { }.getType()));
+    json.add(LOG_LEVELS,
+             context.serialize(src.getLogLevels(), new TypeToken<Map<String, Map<String, String>>>() { }.getType()));
     return json;
   }
 
@@ -76,6 +75,8 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
 
     TwillSpecification twillSpecification = context.deserialize(
       jsonObj.get(TWILL_SPEC), new TypeToken<TwillSpecification>() { }.getType());
+    Map<String, Map<String, String>> logLevels =
+      context.deserialize(jsonObj.get(LOG_LEVELS), new TypeToken<Map<String, Map<String, String>>>() { }.getType());
     return new TwillRuntimeSpecification(twillSpecification,
                                          jsonObj.get(FS_USER).getAsString(),
                                          URI.create(jsonObj.get(TWILL_APP_DIR).getAsString()),
@@ -84,8 +85,7 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
                                          jsonObj.get(TWILL_APP_NAME).getAsString(),
                                          jsonObj.get(RESERVED_MEMORY).getAsInt(),
                                          jsonObj.has(RM_SCHEDULER_ADDR) ?
-                                                  jsonObj.get(RM_SCHEDULER_ADDR).getAsString() : null,
-                                         jsonObj.has(LOG_LEVEL) ?
-                                           LogEntry.Level.valueOf(jsonObj.get(LOG_LEVEL).getAsString()) : null);
+                                           jsonObj.get(RM_SCHEDULER_ADDR).getAsString() : null,
+                                         logLevels);
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java b/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
index 60c5da1..79a53d2 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
@@ -40,6 +40,7 @@ public final class MessageCodec {
 
   private static final Type OPTIONS_TYPE = new TypeToken<Map<String, String>>() { }.getType();
   private static final Gson GSON = new GsonBuilder()
+                                        .serializeNulls()
                                         .registerTypeAdapter(Message.class, new MessageAdapter())
                                         .registerTypeAdapter(Command.class, new CommandAdapter())
                                         .create();

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java b/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
index 4c4bb8b..6a7a9c0 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
@@ -17,15 +17,24 @@
  */
 package org.apache.twill.internal.state;
 
+import ch.qos.logback.classic.Logger;
+import com.google.common.base.Functions;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import org.apache.twill.api.Command;
+import org.apache.twill.api.logging.LogEntry;
 
-import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
 
 /**
  * Collection of predefined system messages.
  */
 public final class SystemMessages {
 
+  public static final String LOG_LEVEL = "LOG_LEVEL";
+  public static final String RESET_LOG_LEVEL = "RESET_LOG_LEVEL";
   public static final Command STOP_COMMAND = Command.Builder.of("stop").build();
   public static final Message SECURE_STORE_UPDATED = new SimpleMessage(
     Message.Type.SYSTEM, Message.Scope.APPLICATION, null, Command.Builder.of("secureStoreUpdated").build());
@@ -68,6 +77,62 @@ public final class SystemMessages {
     return new SimpleMessage(Message.Type.SYSTEM, Message.Scope.RUNNABLES, null, updateCommand);
   }
 
+  /**
+   * Helper method to get System {@link Message} for updating the log levels for all runnables.
+   *
+   * @param logLevels The log levels to be updated.
+   * @return An instance of System {@link Message} to update the log levels.
+   */
+  public static Message updateLogLevels(Map<String, LogEntry.Level> logLevels) {
+    return updateLogLevels(null, logLevels);
+  }
+
+  /**
+   * Helper method to get System {@link Message} for updating the log levels for one or all runnables.
+   *
+   * @param runnableName The name of the runnable to update the log level, null if apply to all runnables.
+   * @param logLevels The log levels to be updated.
+   * @return An instance of System {@link Message} to update the log levels.
+   */
+  public static Message updateLogLevels(@Nullable String runnableName, Map<String, LogEntry.Level> logLevels) {
+    Preconditions.checkNotNull(logLevels);
+    Preconditions.checkArgument(!(logLevels.containsKey(Logger.ROOT_LOGGER_NAME)
+                                  && logLevels.get(Logger.ROOT_LOGGER_NAME) == null));
+    Map<String, String> options = convertLogEntryToString(logLevels);
+    return new SimpleMessage(Message.Type.SYSTEM,
+                             runnableName == null ? Message.Scope.ALL_RUNNABLE : Message.Scope.RUNNABLE,
+                             runnableName, Command.Builder.of(LOG_LEVEL).addOptions(options).build());
+  }
+
+  /**
+   * Helper method to get System {@link Message} for resetting the log levels for all runnables.
+   */
+  public static Message resetLogLevels(Set<String> loggerNames) {
+    return resetLogLevels(null, loggerNames);
+  }
+
+  /**
+   * Helper method to get System {@link Message} for resetting log levels for one or all runnables.
+   *
+   * @param runnableName The name of the runnable to set the log level, null if apply to all runnables.
+   * @return An instance of System {@link Message} to reset the log levels.
+   */
+   public static Message resetLogLevels(@Nullable String runnableName, Set<String> loggerNames) {
+     return new SimpleMessage(Message.Type.SYSTEM,
+                              runnableName == null ? Message.Scope.ALL_RUNNABLE : Message.Scope.RUNNABLE,
+                              runnableName, Command.Builder.of(RESET_LOG_LEVEL)
+                                .addOptions(Maps.uniqueIndex(loggerNames, Functions.toStringFunction())).build());
+   }
+
+  private static Map<String, String> convertLogEntryToString(Map<String, LogEntry.Level> logLevels) {
+    return Maps.transformEntries(logLevels, new Maps.EntryTransformer<String, LogEntry.Level, String>() {
+      @Override
+      public String transformEntry(String loggerName, LogEntry.Level level) {
+        return level == null ? null : level.name();
+      }
+    });
+  }
+
   private SystemMessages() {
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index d6bbbff..7e858ec 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -28,8 +28,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Service;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.twill.api.RunId;
 import org.apache.twill.filesystem.FileContextLocationFactory;
@@ -52,7 +50,6 @@ import org.xml.sax.InputSource;
 import java.io.File;
 import java.io.StringReader;
 import java.net.URI;
-import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 2ec1a14..3f68bc3 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -68,7 +68,7 @@ public final class ApplicationMasterMain extends ServiceMain {
     File twillSpec = new File(Constants.Files.TWILL_SPEC);
     TwillRuntimeSpecification twillRuntimeSpec = TwillRuntimeSpecificationAdapter.create().fromJson(twillSpec);
     String zkConnect = twillRuntimeSpec.getZkConnectStr();
-    RunId runId = twillRuntimeSpec.getTwillRunId();
+    RunId runId = twillRuntimeSpec.getTwillAppRunId();
 
     ZKClientService zkClientService = createZKClient(zkConnect, twillRuntimeSpec.getTwillAppName());
     Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
@@ -76,8 +76,9 @@ public final class ApplicationMasterMain extends ServiceMain {
 
     final YarnAMClient amClient = new VersionDetectYarnAMClientFactory(conf).create();
     ApplicationMasterService service =
-      new ApplicationMasterService(runId, zkClientService, twillRuntimeSpec, amClient, createAppLocation(
-        conf, twillRuntimeSpec.getFsUser(), twillRuntimeSpec.getTwillAppDir()));
+      new ApplicationMasterService(runId, zkClientService, twillRuntimeSpec, amClient,
+                                   createAppLocation(conf, twillRuntimeSpec.getFsUser(),
+                                                     twillRuntimeSpec.getTwillAppDir()));
     TrackerService trackerService = new TrackerService(service);
 
     new ApplicationMasterMain(service.getKafkaZKConnect())

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 063bed2..9faadc2 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -59,7 +59,6 @@ import org.apache.twill.api.TwillRunResources;
 import org.apache.twill.api.TwillSpecification;
 import org.apache.twill.common.Threads;
 import org.apache.twill.filesystem.Location;
-import org.apache.twill.internal.Configs;
 import org.apache.twill.internal.Constants;
 import org.apache.twill.internal.ContainerInfo;
 import org.apache.twill.internal.DefaultTwillRunResources;
@@ -72,6 +71,7 @@ import org.apache.twill.internal.json.JvmOptionsCodec;
 import org.apache.twill.internal.json.LocalFileCodec;
 import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
 import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.SystemMessages;
 import org.apache.twill.internal.utils.Instances;
 import org.apache.twill.internal.yarn.AbstractYarnTwillService;
 import org.apache.twill.internal.yarn.YarnAMClient;
@@ -198,9 +198,10 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
       appMasterContainerId.toString(),
       Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES)),
       Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_MEMORY_MB)),
-      appMasterHost, null, null);
+      appMasterHost, null);
     String appId = appMasterContainerId.getApplicationAttemptId().getApplicationId().toString();
-    return new RunningContainers(appId, appMasterResources, zkClient);
+    return new RunningContainers(appId, appMasterResources, zkClient,
+                                 applicationLocation, twillSpec.getRunnables().keySet());
   }
 
   private ExpectedContainers initExpectedContainers(TwillSpecification twillSpec) {
@@ -320,6 +321,10 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
       return result;
     }
 
+    if (handleLogLevelMessages(message, completion)) {
+      return result;
+    }
+
     // Replicate messages to all runnables
     if (message.getScope() == Message.Scope.ALL_RUNNABLE) {
       runningContainers.sendToAll(message, completion);
@@ -649,7 +654,8 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
       String runnableName = provisionRequest.getRuntimeSpec().getName();
       LOG.info("Starting runnable {} with {}", runnableName, processLauncher);
 
-      LOG.debug("Log level for Twill runnable {} is {}", runnableName, twillRuntimeSpec.getLogLevel());
+      LOG.debug("Log level for Twill runnable {} is {}", runnableName,
+                twillRuntimeSpec.getLogLevels().get(runnableName).get(Logger.ROOT_LOGGER_NAME));
 
       int containerCount = expectedContainers.getExpected(runnableName);
 
@@ -669,8 +675,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
         ZKClients.namespace(zkClient, getZKNamespace(runnableName)),
         containerCount, jvmOpts, reservedMemory, getSecureStoreLocation());
 
-      runningContainers.start(runnableName, processLauncher.getContainerInfo(), launcher,
-                              twillRuntimeSpec.getLogLevel());
+      runningContainers.start(runnableName, processLauncher.getContainerInfo(), launcher);
 
       // Need to call complete to workaround bug in YARN AMRMClient
       if (provisionRequest.containerAcquired()) {
@@ -725,7 +730,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
    * @return {@code true} if the message does requests for changes in number of running instances of a runnable,
    * {@code false} otherwise.
    */
-  private boolean handleSetInstances(final Message message, final Runnable completion) {
+  private boolean handleSetInstances(Message message, Runnable completion) {
     if (message.getType() != Message.Type.SYSTEM || message.getScope() != Message.Scope.RUNNABLE) {
       return false;
     }
@@ -860,7 +865,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
    *
    * @return {@code true} if the message requests restarting some instances and {@code false} otherwise.
    */
-  private boolean handleRestartRunnablesInstances(final Message message, final Runnable completion) {
+  private boolean handleRestartRunnablesInstances(Message message, Runnable completion) {
     LOG.debug("Check if it should process a restart runnable instances.");
 
     if (message.getType() != Message.Type.SYSTEM) {
@@ -951,4 +956,34 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
       }
     };
   }
+
+  /**
+   * Attempt to change the log level from a runnable or all runnables.
+   *
+   * @return {@code true} if the message requests changing log levels and {@code false} otherwise.
+   */
+  private boolean handleLogLevelMessages(Message message, Runnable completion) {
+    Message.Scope scope = message.getScope();
+    if (message.getType() != Message.Type.SYSTEM ||
+      (scope != Message.Scope.RUNNABLE && scope != Message.Scope.ALL_RUNNABLE)) {
+      return false;
+    }
+
+    String command = message.getCommand().getCommand();
+    if (!command.equals(SystemMessages.LOG_LEVEL) && !command.equals(SystemMessages.RESET_LOG_LEVEL)) {
+      return false;
+    }
+
+    if (scope == Message.Scope.ALL_RUNNABLE) {
+      runningContainers.sendToAll(message, completion);
+    } else {
+      final String runnableName = message.getRunnableName();
+      if (runnableName == null || !twillSpec.getRunnables().containsKey(runnableName)) {
+        LOG.info("Unknown runnable {}", runnableName);
+        return false;
+      }
+      runningContainers.sendToRunnable(runnableName, message, completion);
+    }
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
index 2d49d02..e6234c3 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -17,6 +17,7 @@
  */
 package org.apache.twill.internal.appmaster;
 
+import com.google.common.base.Charsets;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashBasedTable;
@@ -29,26 +30,31 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Table;
+import com.google.common.hash.Hashing;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Service;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.twill.api.ResourceReport;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.TwillRunResources;
 import org.apache.twill.api.logging.LogEntry;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.Constants;
 import org.apache.twill.internal.ContainerExitCodes;
 import org.apache.twill.internal.ContainerInfo;
 import org.apache.twill.internal.ContainerLiveNodeData;
 import org.apache.twill.internal.DefaultResourceReport;
 import org.apache.twill.internal.DefaultTwillRunResources;
-import org.apache.twill.internal.EnvKeys;
 import org.apache.twill.internal.RunIds;
 import org.apache.twill.internal.TwillContainerController;
 import org.apache.twill.internal.TwillContainerLauncher;
 import org.apache.twill.internal.container.TwillContainerMain;
 import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.SystemMessages;
 import org.apache.twill.internal.yarn.YarnContainerStatus;
 import org.apache.twill.zookeeper.NodeChildren;
 import org.apache.twill.zookeeper.ZKClient;
@@ -56,18 +62,24 @@ import org.apache.twill.zookeeper.ZKOperations;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Deque;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
 
 /**
  * A helper class for ApplicationMasterService to keep track of running containers and to interact
@@ -98,8 +110,12 @@ final class RunningContainers {
   private final Condition containerChange;
   private final ZKClient zkClient;
   private final Multimap<String, ContainerInfo> containerStats;
+  private final Location applicationLocation;
+  private final Set<String> runnableNames;
+  private final Map<String, Map<String, String>> logLevels;
 
-  RunningContainers(String appId, TwillRunResources appMasterResources, ZKClient zookeeperClient) {
+  RunningContainers(String appId, TwillRunResources appMasterResources, ZKClient zookeeperClient,
+                    Location applicationLocation, Set<String> runnableNames) {
     containers = HashBasedTable.create();
     runnableInstances = Maps.newHashMap();
     completedContainerCount = Maps.newHashMap();
@@ -109,6 +125,9 @@ final class RunningContainers {
     resourceReport = new DefaultResourceReport(appId, appMasterResources);
     zkClient = zookeeperClient;
     containerStats = HashMultimap.create();
+    this.applicationLocation = applicationLocation;
+    this.runnableNames = runnableNames;
+    this.logLevels = new TreeMap<>();
   }
 
   /**
@@ -126,22 +145,22 @@ final class RunningContainers {
   /**
    * Start a container for a runnable.
    */
-  void start(String runnableName, ContainerInfo containerInfo, TwillContainerLauncher launcher,
-             LogEntry.Level logLevel) {
+  void start(String runnableName, ContainerInfo containerInfo, TwillContainerLauncher launcher) {
     containerLock.lock();
     try {
       int instanceId = getStartInstanceId(runnableName);
       RunId runId = getRunId(runnableName, instanceId);
       TwillContainerController controller = launcher.start(runId, instanceId,
-                                                           TwillContainerMain.class, "$HADOOP_CONF_DIR");
+                                                           TwillContainerMain.class, "$HADOOP_CONF_DIR",
+                                                           saveLogLevels());
       containers.put(runnableName, containerInfo.getId(), controller);
       TwillRunResources resources = new DynamicTwillRunResources(instanceId,
                                                                  containerInfo.getId(),
                                                                  containerInfo.getVirtualCores(),
                                                                  containerInfo.getMemoryMB(),
                                                                  containerInfo.getHost().getHostName(),
-                                                                 controller,
-                                                                 logLevel);
+                                                                 controller);
+
       resourceReport.addRunResources(runnableName, resources);
       containerStats.put(runnableName, containerInfo);
 
@@ -298,10 +317,12 @@ final class RunningContainers {
   void sendToAll(Message message, Runnable completion) {
     containerLock.lock();
     try {
+      for (String runnableName : runnableNames) {
+        checkAndUpdateLogLevels(message, runnableName);
+      }
       if (containers.isEmpty()) {
         completion.run();
       }
-
       // Sends the command to all running containers
       AtomicInteger count = new AtomicInteger(containers.size());
       for (Map.Entry<String, Map<String, TwillContainerController>> entry : containers.rowMap().entrySet()) {
@@ -318,6 +339,7 @@ final class RunningContainers {
     List<TwillContainerController> controllers;
     containerLock.lock();
     try {
+      checkAndUpdateLogLevels(message, runnableName);
       controllers = new ArrayList<>(containers.row(runnableName).values());
     } finally {
       containerLock.unlock();
@@ -575,18 +597,77 @@ final class RunningContainers {
     return false;
   }
 
+  private void checkAndUpdateLogLevels(Message message, String runnableName) {
+    String command = message.getCommand().getCommand();
+    if (message.getType() != Message.Type.SYSTEM || (!SystemMessages.LOG_LEVEL.equals(command) &&
+      !SystemMessages.RESET_LOG_LEVEL.equals(command))) {
+      return;
+    }
+
+    // Need to copy to a tree map to maintain the ordering since we compute the md5 of the tree
+    Map<String, String> messageOptions = message.getCommand().getOptions();
+    Map<String, String> runnableLogLevels = logLevels.get(runnableName);
+
+    if (SystemMessages.RESET_LOG_LEVEL.equals(command)) {
+      // Reset case, remove the log levels that were set before
+      if (runnableLogLevels != null) {
+        if (messageOptions.isEmpty()) {
+          logLevels.remove(runnableName);
+        } else {
+          runnableLogLevels.keySet().removeAll(messageOptions.keySet());
+        }
+      }
+      return;
+    }
+
+    if (runnableLogLevels == null) {
+      runnableLogLevels = new TreeMap<>();
+      logLevels.put(runnableName, runnableLogLevels);
+    }
+    runnableLogLevels.putAll(messageOptions);
+  }
+
+  private Location saveLogLevels() {
+    LOG.debug("save the log level file");
+    try {
+      Gson gson = new GsonBuilder().serializeNulls().create();
+      String jsonStr = gson.toJson(logLevels);
+      String fileName = Hashing.md5().hashString(jsonStr) + "." + Constants.Files.LOG_LEVELS;
+      Location location = applicationLocation.append(fileName);
+      if (!location.exists()) {
+        try (Writer writer = new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8)) {
+          writer.write(jsonStr);
+        }
+      }
+      LOG.debug("Done saving the log level file");
+      return location;
+    } catch (IOException e) {
+      LOG.error("Failed to save the log level file.");
+      return null;
+    }
+  }
+
   /**
    * A helper class that overrides the debug port of the resources with the live info from the container controller.
    */
-  private static class DynamicTwillRunResources extends DefaultTwillRunResources {
+  private static final class DynamicTwillRunResources extends DefaultTwillRunResources {
 
+    private static final Function<String, LogEntry.Level> LOG_LEVEL_CONVERTER = new Function<String, LogEntry.Level>() {
+      @Nullable
+      @Override
+      public LogEntry.Level apply(@Nullable String logLevel) {
+        // The logLevel is always a valid LogEntry.Level enum, because that's what the user can set through the
+        // Twill Preparer or Controller API
+        return logLevel == null ? null : LogEntry.Level.valueOf(logLevel);
+      }
+    };
     private final TwillContainerController controller;
     private Integer dynamicDebugPort = null;
 
     private DynamicTwillRunResources(int instanceId, String containerId,
                                      int cores, int memoryMB, String host,
-                                     TwillContainerController controller, LogEntry.Level logLevel) {
-      super(instanceId, containerId, cores, memoryMB, host, null, logLevel);
+                                     TwillContainerController controller) {
+      super(instanceId, containerId, cores, memoryMB, host, null);
       this.controller = controller;
     }
 
@@ -605,5 +686,14 @@ final class RunningContainers {
       }
       return dynamicDebugPort;
     }
+
+    @Override
+    public synchronized Map<String, LogEntry.Level> getLogLevels() {
+      ContainerLiveNodeData liveData = controller.getLiveNodeData();
+      if (liveData != null) {
+        return Maps.transformValues(liveData.getLogLevels(), LOG_LEVEL_CONVERTER);
+      }
+      return Collections.emptyMap();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index 451db69..807e50f 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -19,9 +19,11 @@ package org.apache.twill.internal.container;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
+import com.google.common.reflect.TypeToken;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.AbstractService;
-import com.google.common.util.concurrent.Service;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.security.Credentials;
@@ -29,7 +31,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.TwillRunnableSpecification;
-import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.discovery.ZKDiscoveryService;
 import org.apache.twill.internal.Arguments;
 import org.apache.twill.internal.BasicTwillContext;
@@ -54,6 +55,8 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.Reader;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * The main class for launching a {@link TwillContainerService}.
@@ -61,25 +64,35 @@ import java.io.Reader;
 public final class TwillContainerMain extends ServiceMain {
 
   private static final Logger LOG = LoggerFactory.getLogger(TwillContainerMain.class);
-  private static LogEntry.Level logLevel;
+
+  private final Map<String, String> logLevels = new HashMap<>();
 
   /**
    * Main method for launching a {@link TwillContainerService} which runs
    * a {@link org.apache.twill.api.TwillRunnable}.
    */
-  public static void main(final String[] args) throws Exception {
+  public static void main(String[] args) throws Exception {
+    new TwillContainerMain().doMain(args);
+  }
+
+  private void doMain(String[] args) throws Exception {
     // Try to load the secure store from localized file, which AM requested RM to localize it for this container.
     loadSecureStore();
-
     File twillSpecFile = new File(Constants.Files.TWILL_SPEC);
     TwillRuntimeSpecification twillRuntimeSpec = loadTwillSpec(twillSpecFile);
     String zkConnectStr = twillRuntimeSpec.getZkConnectStr();
-    RunId appRunId = twillRuntimeSpec.getTwillRunId();
+    RunId appRunId = twillRuntimeSpec.getTwillAppRunId();
     RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
     String runnableName = System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
-    int instanceId = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_ID));
-    int instanceCount = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT));
-    logLevel = twillRuntimeSpec.getLogLevel();
+    int instanceId = Integer.valueOf(System.getenv(EnvKeys.TWILL_INSTANCE_ID));
+    int instanceCount = Integer.valueOf(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT));
+    Map<String, String> defaultLogLevels = twillRuntimeSpec.getLogLevels().get(runnableName);
+    Map<String, String> dynamicLogLevels = loadLogLevels().get(runnableName);
+
+    logLevels.putAll(defaultLogLevels);
+    if (dynamicLogLevels != null) {
+      logLevels.putAll(dynamicLogLevels);
+    }
 
     ZKClientService zkClientService = createZKClient(zkConnectStr, twillRuntimeSpec.getTwillAppName());
     ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
@@ -100,11 +113,12 @@ public final class TwillContainerMain extends ServiceMain {
 
     ZKClient containerZKClient = getContainerZKClient(zkClientService, appRunId, runnableName);
     Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
-    Service service = new TwillContainerService(context, containerInfo, containerZKClient,
-                                                runId, runnableSpec, getClassLoader(),
-                                                createAppLocation(conf, twillRuntimeSpec.getFsUser(),
-                                                                  twillRuntimeSpec.getTwillAppDir()));
-    new TwillContainerMain().doMain(
+    TwillContainerService service = new TwillContainerService(context, containerInfo, containerZKClient,
+                                                              runId, runnableSpec, getClassLoader(),
+                                                              createAppLocation(conf, twillRuntimeSpec.getFsUser(),
+                                                                                twillRuntimeSpec.getTwillAppDir()),
+                                                              defaultLogLevels, logLevels);
+    doMain(
       service,
       zkClientService,
       new LogFlushService(),
@@ -115,7 +129,8 @@ public final class TwillContainerMain extends ServiceMain {
 
   @Override
   protected String getLoggerLevel(Logger logger) {
-    return logLevel == null ? super.getLoggerLevel(logger) : logLevel.name();
+    String logLevel = logLevels.get(Logger.ROOT_LOGGER_NAME);
+    return logLevel == null ? super.getLoggerLevel(logger) : logLevel;
   }
 
   private static void loadSecureStore() throws IOException {
@@ -163,6 +178,17 @@ public final class TwillContainerMain extends ServiceMain {
     }
   }
 
+  private static Map<String, Map<String, String>> loadLogLevels() throws IOException {
+    File file = new File(Constants.Files.LOG_LEVELS);
+    if (file.exists()) {
+      try (Reader reader = Files.newReader(file, Charsets.UTF_8)) {
+        Gson gson = new GsonBuilder().serializeNulls().create();
+        return gson.fromJson(reader, new TypeToken<Map<String, Map<String, String>>>() { }.getType());
+      }
+    }
+    return new HashMap<>();
+  }
+
   private static Arguments decodeArgs() throws IOException {
     return ArgumentsCodec.decode(Files.newReaderSupplier(new File(Constants.Files.ARGUMENTS), Charsets.UTF_8));
   }

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
index 96031ce..cd62578 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
@@ -17,7 +17,10 @@
  */
 package org.apache.twill.internal.container;
 
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.LoggerContext;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -25,20 +28,27 @@ import org.apache.twill.api.Command;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.TwillRunnable;
 import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.common.Threads;
 import org.apache.twill.filesystem.Location;
 import org.apache.twill.internal.BasicTwillContext;
 import org.apache.twill.internal.ContainerInfo;
 import org.apache.twill.internal.ContainerLiveNodeData;
 import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.SystemMessages;
 import org.apache.twill.internal.utils.Instances;
 import org.apache.twill.internal.yarn.AbstractYarnTwillService;
 import org.apache.twill.zookeeper.ZKClient;
+import org.slf4j.ILoggerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
 
 /**
  * This class act as a yarn container and run a {@link org.apache.twill.api.TwillRunnable}.
@@ -51,29 +61,35 @@ public final class TwillContainerService extends AbstractYarnTwillService {
   private final ClassLoader classLoader;
   private final BasicTwillContext context;
   private final ContainerLiveNodeData containerLiveNodeData;
+  private final Map<String, String> oldLogLevels;
+  private final Map<String, String> defaultLogLevels;
   private ExecutorService commandExecutor;
   private TwillRunnable runnable;
 
-  public TwillContainerService(BasicTwillContext context, ContainerInfo containerInfo, ZKClient zkClient,
-                               RunId runId, TwillRunnableSpecification specification, ClassLoader classLoader,
-                               Location applicationLocation) {
+  TwillContainerService(BasicTwillContext context, ContainerInfo containerInfo, ZKClient zkClient,
+                        RunId runId, TwillRunnableSpecification specification, ClassLoader classLoader,
+                        Location applicationLocation, Map<String, String> defaultLogLevels,
+                        Map<String, String> logLevels) {
     super(zkClient, runId, applicationLocation);
 
     this.specification = specification;
     this.classLoader = classLoader;
-    this.containerLiveNodeData = createLiveNodeData(containerInfo);
+    this.defaultLogLevels = ImmutableMap.copyOf(defaultLogLevels);
+    this.oldLogLevels = new HashMap<>(defaultLogLevels);
+    this.containerLiveNodeData = createLiveNodeData(
+      containerInfo, isLoggerContext() ? logLevels : Collections.<String, String>emptyMap());
     this.context = context;
   }
 
-  private ContainerLiveNodeData createLiveNodeData(ContainerInfo containerInfo) {
+  private ContainerLiveNodeData createLiveNodeData(ContainerInfo containerInfo,
+                                                   Map<String, String> logLevels) {
     // if debugging is enabled, log the port and register it in service discovery.
     String debugPort = System.getProperty("twill.debug.port");
     if (debugPort != null) {
       LOG.info("JVM is listening for debugger on port {}", debugPort);
     }
     return new ContainerLiveNodeData(containerInfo.getId(),
-                                     containerInfo.getHost().getCanonicalHostName(),
-                                     debugPort);
+                                     containerInfo.getHost().getCanonicalHostName(), debugPort, logLevels);
   }
 
   @Override
@@ -96,6 +112,46 @@ public final class TwillContainerService extends AbstractYarnTwillService {
       context.setInstanceCount(Integer.parseInt(command.getOptions().get("count")));
     }
 
+    String commandStr = command.getCommand();
+    if (message.getType() == Message.Type.SYSTEM &&
+      (SystemMessages.LOG_LEVEL.equals(commandStr) || SystemMessages.RESET_LOG_LEVEL.equals(commandStr))) {
+      if (SystemMessages.RESET_LOG_LEVEL.equals(commandStr)) {
+        Map<String, String> loggerNames = command.getOptions();
+        // Reset
+        for (Map.Entry<String, String> entry : oldLogLevels.entrySet()) {
+          String loggerName = entry.getKey();
+          // logger name is empty if we are resetting all loggers.
+          if (loggerNames.isEmpty() || loggerNames.containsKey(loggerName)) {
+            String oldLogLevel = entry.getValue();
+
+            setLogLevel(loggerName, oldLogLevel);
+            if (oldLogLevel == null || !defaultLogLevels.containsKey(loggerName)) {
+              containerLiveNodeData.removeLogLevel(loggerName);
+            } else {
+              containerLiveNodeData.setLogLevel(loggerName, oldLogLevel);
+            }
+          }
+        }
+      } else {
+        // Set log levels
+        for (Map.Entry<String, String> entry : command.getOptions().entrySet()) {
+          String loggerName = entry.getKey();
+          String logLevel = entry.getValue();
+
+          // Setting the log level in logging system as well as in the container live node
+          String oldLogLevel = setLogLevel(loggerName, logLevel);
+          containerLiveNodeData.setLogLevel(loggerName, logLevel);
+
+          if (!oldLogLevels.containsKey(loggerName)) {
+            String defaultLogLevel = defaultLogLevels.get(loggerName);
+            oldLogLevels.put(loggerName, defaultLogLevel == null ? oldLogLevel : defaultLogLevel);
+          }
+        }
+      }
+
+      updateLiveNode();
+    }
+
     commandExecutor.execute(new Runnable() {
 
       @Override
@@ -114,6 +170,10 @@ public final class TwillContainerService extends AbstractYarnTwillService {
   @SuppressWarnings("unchecked")
   @Override
   protected void doStart() throws Exception {
+    for (Map.Entry<String, String> entry : containerLiveNodeData.getLogLevels().entrySet()) {
+      setLogLevel(entry.getKey(), entry.getValue());
+    }
+
     commandExecutor = Executors.newSingleThreadExecutor(
       Threads.createDaemonThreadFactory("runnable-command-executor"));
 
@@ -152,4 +212,35 @@ public final class TwillContainerService extends AbstractYarnTwillService {
       LOG.error("Exception when stopping runnable.", t);
     }
   }
+
+  private boolean isLoggerContext() {
+    ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
+    return loggerFactory instanceof LoggerContext;
+  }
+
+  /**
+   * Set the log level for the requested logger name.
+   *
+   * @param loggerName name of the logger
+   * @param logLevel the log level to set to.
+   * @return the current log level of the given logger. If there is no log level configured for the given logger or
+   *         if the logging implementation is not logback, {@code null} will be returned
+   */
+  @Nullable
+  private String setLogLevel(String loggerName, @Nullable String logLevel) {
+    ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
+    if (!(loggerFactory instanceof LoggerContext)) {
+      LOG.error("LoggerFactory is not a logback LoggerContext, cannot make the log level change");
+      return null;
+    }
+    LoggerContext loggerContext = (LoggerContext) loggerFactory;
+
+    ch.qos.logback.classic.Logger logger = loggerContext.getLogger(loggerName);
+    LogEntry.Level oldLogLevel = logger.getLevel() == null ? null :
+      LogEntry.Level.valueOf(logger.getLevel().toString());
+    LOG.debug("Log level of {} changed from {} to {}", loggerName, oldLogLevel, logLevel);
+    logger.setLevel(logLevel == null ? null : Level.toLevel(logLevel, Level.ERROR));
+
+    return oldLogLevel == null ? null : oldLogLevel.name();
+  }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 811aa04..2aa24ab 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -130,11 +130,11 @@ final class YarnTwillPreparer implements TwillPreparer {
   private final List<String> applicationClassPaths = Lists.newArrayList();
   private final Credentials credentials;
   private final int reservedMemory;
+  private final Map<String, Map<String, String>> logLevels = new HashMap<>();
   private String schedulerQueue;
   private String extraOptions;
   private JvmOptions.DebugOptions debugOptions = JvmOptions.DebugOptions.NO_DEBUG;
   private ClassAcceptor classAcceptor;
-  private LogEntry.Level logLevel;
 
   YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec,
                     YarnAppClient yarnAppClient, String zkConnectString,
@@ -151,8 +151,8 @@ final class YarnTwillPreparer implements TwillPreparer {
     this.reservedMemory = yarnConfig.getInt(Configs.Keys.JAVA_RESERVED_MEMORY_MB,
                                             Configs.Defaults.JAVA_RESERVED_MEMORY_MB);
     this.extraOptions = extraOptions;
-    this.logLevel = logLevel;
     this.classAcceptor = new ClassAcceptor();
+    saveLogLevels(logLevel);
   }
 
   @Override
@@ -296,8 +296,27 @@ final class YarnTwillPreparer implements TwillPreparer {
 
   @Override
   public TwillPreparer setLogLevel(LogEntry.Level logLevel) {
-    Preconditions.checkNotNull(logLevel);
-    this.logLevel = logLevel;
+    return setLogLevels(ImmutableMap.of(Logger.ROOT_LOGGER_NAME, logLevel));
+  }
+
+  @Override
+  public TwillPreparer setLogLevels(Map<String, LogEntry.Level> logLevels) {
+    Preconditions.checkNotNull(logLevels);
+    for (String runnableName : twillSpec.getRunnables().keySet()) {
+      saveLogLevels(runnableName, logLevels);
+    }
+    return this;
+  }
+
+  @Override
+  public TwillPreparer setLogLevels(String runnableName, Map<String, LogEntry.Level> runnableLogLevels) {
+    Preconditions.checkNotNull(runnableName);
+    Preconditions.checkArgument(twillSpec.getRunnables().containsKey(runnableName),
+                                "Runnable %s is not defined in the application.", runnableName);
+    Preconditions.checkNotNull(runnableLogLevels);
+    Preconditions.checkArgument(!(logLevels.containsKey(Logger.ROOT_LOGGER_NAME)
+      && logLevels.get(Logger.ROOT_LOGGER_NAME) == null));
+    saveLogLevels(runnableName, runnableLogLevels);
     return this;
   }
 
@@ -310,7 +329,6 @@ final class YarnTwillPreparer implements TwillPreparer {
         new Callable<ProcessController<YarnApplicationReport>>() {
         @Override
         public ProcessController<YarnApplicationReport> call() throws Exception {
-          String fsUser = locationFactory.getHomeLocation().getName();
 
           // Local files needed by AM
           Map<String, LocalFile> localFiles = Maps.newHashMap();
@@ -339,8 +357,6 @@ final class YarnTwillPreparer implements TwillPreparer {
           //     org.apache.twill.internal.appmaster.ApplicationMasterMain
           //     false
 
-          LOG.debug("Log level is set to {} for the Twill application.", logLevel);
-
           int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(),
                                                     Constants.APP_MASTER_RESERVED_MEMORY_MB, Constants.HEAP_MIN_RATIO);
           return launcher.prepareLaunch(ImmutableMap.<String, String>of(), localFiles.values(), credentials)
@@ -384,6 +400,24 @@ final class YarnTwillPreparer implements TwillPreparer {
     }
   }
 
+  private void saveLogLevels(LogEntry.Level level) {
+    Preconditions.checkNotNull(level);
+    Map<String, String> appLogLevels = new HashMap<>();
+    appLogLevels.put(Logger.ROOT_LOGGER_NAME, level.name());
+    for (String runnableName : twillSpec.getRunnables().keySet()) {
+      this.logLevels.put(runnableName, appLogLevels);
+    }
+  }
+
+  private void saveLogLevels(String runnableName, Map<String, LogEntry.Level> logLevels) {
+    Map<String, String> newLevels = new HashMap<>();
+    for (Map.Entry<String, LogEntry.Level> entry : logLevels.entrySet()) {
+      Preconditions.checkArgument(entry.getValue() != null, "Log level cannot be null for logger {}", entry.getKey());
+      newLevels.put(entry.getKey(), entry.getValue().name());
+    }
+    this.logLevels.put(runnableName, newLevels);
+  }
+
   private Credentials createCredentials() {
     Credentials credentials = new Credentials();
 
@@ -521,7 +555,7 @@ final class YarnTwillPreparer implements TwillPreparer {
         new TwillRuntimeSpecification(newTwillSpec, locationFactory.getHomeLocation().getName(),
                                       getAppLocation().toURI(), zkConnectString, runId, twillSpec.getName(),
                                       reservedMemory, yarnConfig.get(YarnConfiguration.RM_SCHEDULER_ADDRESS),
-                                      logLevel), writer);
+                                      logLevels), writer);
     }
     LOG.debug("Done {}", Constants.Files.TWILL_SPEC);