You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2016/12/11 23:38:31 UTC
[2/2] twill git commit: (TWILL-138) Change Log level at Runtime
(TWILL-138) Change Log level at Runtime
- Added new methods in TwillPreparer for setting log levels
- Added new methods in TwillController for setting and resetting log levels
- Exposes log levels throuhg TwillResources for each logger that has been set through the TwillPreparer or TwillController
This closes #14 on Github
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/21a3734d
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/21a3734d
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/21a3734d
Branch: refs/heads/master
Commit: 21a3734d8bbfc3fa95173b83fbc6f7d07e0afc01
Parents: 6837f15
Author: yaojiefeng <ya...@cask.co>
Authored: Wed Oct 5 16:17:26 2016 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Sun Dec 11 15:38:04 2016 -0800
----------------------------------------------------------------------
.../org/apache/twill/api/TwillController.java | 49 ++++
.../org/apache/twill/api/TwillPreparer.java | 23 +-
.../org/apache/twill/api/TwillRunResources.java | 8 +
.../internal/DefaultTwillRunResources.java | 32 ++-
.../org/apache/twill/internal/Constants.java | 3 +-
.../twill/internal/AbstractTwillController.java | 27 +-
.../twill/internal/AbstractTwillService.java | 31 ++-
.../twill/internal/ContainerLiveNodeData.java | 22 +-
.../twill/internal/DefaultResourceReport.java | 10 +-
.../twill/internal/TwillContainerLauncher.java | 13 +-
.../internal/TwillRuntimeSpecification.java | 15 +-
.../internal/json/ResourceReportCodec.java | 8 +-
.../internal/json/TwillRunResourcesCodec.java | 26 +-
.../json/TwillRuntimeSpecificationAdapter.java | 3 +
.../json/TwillRuntimeSpecificationCodec.java | 22 +-
.../twill/internal/state/MessageCodec.java | 1 +
.../twill/internal/state/SystemMessages.java | 67 ++++-
.../org/apache/twill/internal/ServiceMain.java | 3 -
.../appmaster/ApplicationMasterMain.java | 7 +-
.../appmaster/ApplicationMasterService.java | 51 +++-
.../internal/appmaster/RunningContainers.java | 112 +++++++-
.../internal/container/TwillContainerMain.java | 56 +++-
.../container/TwillContainerService.java | 105 ++++++-
.../apache/twill/yarn/YarnTwillPreparer.java | 50 +++-
.../twill/yarn/LogLevelChangeTestRun.java | 278 +++++++++++++++++++
.../org/apache/twill/yarn/LogLevelTestRun.java | 39 ++-
.../org/apache/twill/yarn/YarnTestSuite.java | 1 +
27 files changed, 938 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-api/src/main/java/org/apache/twill/api/TwillController.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillController.java b/twill-api/src/main/java/org/apache/twill/api/TwillController.java
index 08206f5..f5004ae 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillController.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillController.java
@@ -17,6 +17,7 @@
*/
package org.apache.twill.api;
+import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.api.logging.LogHandler;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.ServiceDiscovered;
@@ -89,4 +90,52 @@ public interface TwillController extends ServiceController {
* @return A {@link Future} that will be completed when the restart operation has been done.
*/
Future<String> restartInstances(String runnable, int instanceId, int... moreInstanceIds);
+
+ /**
+ * Update the log levels for requested logger names for Twill applications running in a container.
+ * The log level for a logger name can be {@code null} except for the root logger, which will reset the log level for
+ * the specified logger.
+ *
+ * @param logLevels The {@link Map} contains the requested logger names and log levels that need to be updated.
+ * @return A {@link Future} that will be completed when the log level update has been done. It will carry the
+ * {@link Map} of log levels as the result.
+ */
+ Future<Map<String, LogEntry.Level>> updateLogLevels(Map<String, LogEntry.Level> logLevels);
+
+ /**
+ * Update the log levels for requested logger names for a {@link TwillRunnable}.
+ * The log level for a logger name can be {@code null} except for the root logger, which will reset the log level for
+ * the specified logger.
+ *
+ * @param runnableName The name of the runnable to update the log level.
+ * @param logLevelsForRunnable The {@link Map} contains the requested logger name and log level that
+ * need to be updated.
+ * @return A {@link Future} that will be completed when the log level update has been done. It will carry the
+ * {@link Map} of log levels as the result.
+ */
+ Future<Map<String, LogEntry.Level>> updateLogLevels(String runnableName,
+ Map<String, LogEntry.Level> logLevelsForRunnable);
+
+ /**
+ * Reset the log levels of all runnables.
+ * The log levels will be the same as when the runnables start up.
+ *
+ * @param loggerNames The optional logger names to be reset for all runnables, if not provided, all log levels will
+ * be reset.
+ * @return A {@link Future} that will be completed when the set log level operation has been done. The future result
+ * is the logger names provided in the parameter.
+ */
+ Future<String[]> resetLogLevels(String...loggerNames);
+
+ /**
+ * Reset the log levels of the given runnable.
+ * The log levels will be same as when the runnable starts up.
+ *
+ * @param loggerNames The optional logger names to be reset for the runnable, if not provided, all log levels will
+ * be reset.
+ * @return A {@link Future} that will be completed when the set log level operation has been done. The future result
+ * is the logger names provided in the parameter.
+ */
+ Future<String[]> resetRunnableLogLevels(String runnableName, String...loggerNames);
+
}
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
index d7d5529..575ba8f 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
@@ -227,15 +227,36 @@ public interface TwillPreparer {
TwillPreparer addSecureStore(SecureStore secureStore);
/**
- * Set the log level for Twill applications running in a container.
+ * Set the root log level for Twill applications in all containers.
*
* @param logLevel the {@link LogEntry.Level} that should be set.
* The level match the {@code Logback} levels.
* @return This {@link TwillPreparer}.
+ * @deprecated Use {@link #setLogLevels(Map)} with key {@link org.slf4j.Logger#ROOT_LOGGER_NAME} instead.
*/
+ @Deprecated
TwillPreparer setLogLevel(LogEntry.Level logLevel);
/**
+ * Set the log levels for requested logger names for Twill applications running in a container. The log level of any
+ * logger cannot be {@code null}, if there is {@code null} value, a {@link IllegalArgumentException} will be thrown.
+ *
+ * @param logLevels The {@link Map} contains the requested logger names and log levels that need to be set.
+ * @return This {@link TwillPreparer}.
+ */
+ TwillPreparer setLogLevels(Map<String, LogEntry.Level> logLevels);
+
+ /**
+ * Set the log levels for requested logger names for a {@link TwillRunnable}. The log level of any logger cannot be
+ * {@code null}, if there is {@code null} value, a {@link IllegalArgumentException} will be thrown.
+ *
+ * @param runnableName The name of the runnable to set the log level.
+ * @param logLevelsForRunnable The {@link Map} contains the requested logger names and log levels that need to be set.
+ * @return This {@link TwillPreparer}.
+ */
+ TwillPreparer setLogLevels(String runnableName, Map<String, LogEntry.Level> logLevelsForRunnable);
+
+ /**
* Starts the application.
* @return A {@link TwillController} for controlling the running application.
*/
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java b/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
index 1a665ea..f721f47 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
@@ -19,6 +19,8 @@ package org.apache.twill.api;
import org.apache.twill.api.logging.LogEntry;
+import java.util.Map;
+
/**
* Information about the container the {@link TwillRunnable}
* is running in.
@@ -58,7 +60,13 @@ public interface TwillRunResources {
/**
* @return the enabled log level for the container where the runnable is running in.
+ * @deprecated Use {@link #getLogLevels()} to get the log levels map and get root level from the map instead.
*/
+ @Deprecated
LogEntry.Level getLogLevel();
+ /**
+ * @return the enabled log level arguments for the container where the runnable is running in.
+ */
+ Map<String, LogEntry.Level> getLogLevels();
}
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
index 93ef90e..83b973a 100644
--- a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
+++ b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
@@ -18,7 +18,14 @@
package org.apache.twill.internal;
import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.api.logging.LogEntry.Level;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
/**
* Straightforward implementation of {@link org.apache.twill.api.TwillRunResources}.
@@ -30,17 +37,25 @@ public class DefaultTwillRunResources implements TwillRunResources {
private final int memoryMB;
private final String host;
private final Integer debugPort;
- private final Level logLevel;
+ private final Map<String, LogEntry.Level> logLevels;
+ /**
+ * Constructor to create an instance of {@link DefaultTwillRunResources} with empty log levels.
+ */
public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB,
- String host, Integer debugPort, Level logLevel) {
+ String host, Integer debugPort) {
+ this(instanceId, containerId, cores, memoryMB, host, debugPort, Collections.<String, Level>emptyMap());
+ }
+
+ public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB,
+ String host, Integer debugPort, Map<String, LogEntry.Level> logLevels) {
this.instanceId = instanceId;
this.containerId = containerId;
this.virtualCores = cores;
this.memoryMB = memoryMB;
this.host = host;
this.debugPort = debugPort;
- this.logLevel = logLevel;
+ this.logLevels = new HashMap<>(logLevels);
}
/**
@@ -90,8 +105,15 @@ public class DefaultTwillRunResources implements TwillRunResources {
}
@Override
+ @Deprecated
+ @Nullable
public Level getLogLevel() {
- return logLevel;
+ return getLogLevels().get(Logger.ROOT_LOGGER_NAME);
+ }
+
+ @Override
+ public Map<String, LogEntry.Level> getLogLevels() {
+ return logLevels;
}
@Override
@@ -129,7 +151,7 @@ public class DefaultTwillRunResources implements TwillRunResources {
", memoryMB=" + memoryMB +
", host='" + host + '\'' +
", debugPort=" + debugPort +
- ", logLevel=" + logLevel +
+ ", logLevels=" + logLevels +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-common/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/internal/Constants.java b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
index 8a33962..40b988f 100644
--- a/twill-common/src/main/java/org/apache/twill/internal/Constants.java
+++ b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
@@ -52,7 +52,7 @@ public final class Constants {
public static final String RESTART_RUNNABLES_INSTANCES = "restartRunnablesInstances";
/**
- * Common ZK paths constants
+ * Common ZK paths constants.
*/
public static final String DISCOVERY_PATH_PREFIX = "/discoverable";
public static final String INSTANCES_PATH_PREFIX = "/instances";
@@ -77,6 +77,7 @@ public final class Constants {
public static final String LOGBACK_TEMPLATE = "logback-template.xml";
public static final String JVM_OPTIONS = "jvm.opts";
public static final String CREDENTIALS = "credentials.store";
+ public static final String LOG_LEVELS = "logLevel.json";
private Files() {
}
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index e60f6a3..5a1c5b3 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -19,6 +19,7 @@ package org.apache.twill.internal;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
@@ -59,6 +60,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
/**
* A abstract base class for {@link TwillController} implementation that uses Zookeeper to controller a
@@ -136,8 +138,8 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
}
@Override
- public final ListenableFuture<Set<String>> restartInstances(Map<String,
- ? extends Set<Integer>> runnableToInstanceIds) {
+ public final ListenableFuture<Set<String>> restartInstances(
+ Map<String, ? extends Set<Integer>> runnableToInstanceIds) {
Map<String, String> runnableToStringInstanceIds =
Maps.transformEntries(runnableToInstanceIds, new Maps.EntryTransformer<String, Set<Integer>, String>() {
@Override
@@ -170,6 +172,27 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
});
}
+ @Override
+ public Future<Map<String, LogEntry.Level>> updateLogLevels(Map<String, LogEntry.Level> logLevels) {
+ return sendMessage(SystemMessages.updateLogLevels(logLevels), logLevels);
+ }
+
+ @Override
+ public Future<Map<String, LogEntry.Level>> updateLogLevels(String runnableName,
+ Map<String, LogEntry.Level> runnableLogLevels) {
+ Preconditions.checkNotNull(runnableName);
+ return sendMessage(SystemMessages.updateLogLevels(runnableName, runnableLogLevels), runnableLogLevels);
+ }
+
+ @Override
+ public Future<String[]> resetLogLevels(String...loggerNames) {
+ return sendMessage(SystemMessages.resetLogLevels(Sets.newHashSet(loggerNames)), loggerNames);
+ }
+ @Override
+ public Future<String[]> resetRunnableLogLevels(String runnableName, String...loggerNames) {
+ return sendMessage(SystemMessages.resetLogLevels(runnableName, Sets.newHashSet(loggerNames)), loggerNames);
+ }
+
private void validateInstanceIds(String runnable, Set<Integer> instanceIds) {
ResourceReport resourceReport = getResourceReport();
if (resourceReport == null) {
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
index 1717117..fc5b907 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import org.apache.twill.api.RunId;
import org.apache.twill.common.Cancellable;
@@ -84,7 +85,7 @@ import java.util.concurrent.TimeUnit;
public abstract class AbstractTwillService extends AbstractExecutionThreadService implements MessageCallback {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillService.class);
- private static final Gson GSON = new Gson();
+ private static final Gson GSON = new GsonBuilder().serializeNulls().create();
protected final ZKClient zkClient;
protected final RunId runId;
@@ -197,16 +198,21 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
}
}
+ /**
+ * Update the live node for the runnable.
+ *
+ * @return A {@link OperationFuture} that will be completed when the update is done.
+ */
+ protected final OperationFuture<?> updateLiveNode() {
+ String liveNode = getLiveNodePath();
+ LOG.info("Update live node {}{}", zkClient.getConnectString(), liveNode);
+ return zkClient.setData(liveNode, toJson(getLiveNodeJsonObject()));
+ }
+
private OperationFuture<String> createLiveNode() {
String liveNode = getLiveNodePath();
LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNode);
-
- JsonObject content = new JsonObject();
- Object liveNodeData = getLiveNodeData();
- if (liveNodeData != null) {
- content.add("data", GSON.toJsonTree(liveNodeData));
- }
- return ZKOperations.ignoreError(zkClient.create(liveNode, toJson(content), CreateMode.EPHEMERAL),
+ return ZKOperations.ignoreError(zkClient.create(liveNode, toJson(getLiveNodeJsonObject()), CreateMode.EPHEMERAL),
KeeperException.NodeExistsException.class, liveNode);
}
@@ -368,6 +374,15 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, runId.getId());
}
+ private JsonObject getLiveNodeJsonObject() {
+ JsonObject content = new JsonObject();
+ Object liveNodeData = getLiveNodeData();
+ if (liveNodeData != null) {
+ content.add("data", GSON.toJsonTree(liveNodeData));
+ }
+ return content;
+ }
+
private <T> byte[] toJson(T obj) {
return GSON.toJson(obj).getBytes(Charsets.UTF_8);
}
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java b/twill-core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
index 4c6f32e..64025b5 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
@@ -17,19 +17,25 @@
*/
package org.apache.twill.internal;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
/**
*
*/
public final class ContainerLiveNodeData {
-
private final String containerId;
private final String host;
private final String debugPort;
+ private final Map<String, String> logLevels;
- public ContainerLiveNodeData(String containerId, String host, String debugPort) {
+ public ContainerLiveNodeData(String containerId, String host, String debugPort,
+ Map<String, String> logLevels) {
this.containerId = containerId;
this.host = host;
this.debugPort = debugPort;
+ this.logLevels = new HashMap<>(logLevels);
}
public String getContainerId() {
@@ -43,4 +49,16 @@ public final class ContainerLiveNodeData {
public String getDebugPort() {
return debugPort;
}
+
+ public Map<String, String> getLogLevels() {
+ return logLevels;
+ }
+
+ public void setLogLevel(String loggerName, @Nullable String logLevel) {
+ logLevels.put(loggerName, logLevel);
+ }
+
+ public void removeLogLevel(String loggerName) {
+ logLevels.remove(loggerName);
+ }
}
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/DefaultResourceReport.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/DefaultResourceReport.java b/twill-core/src/main/java/org/apache/twill/internal/DefaultResourceReport.java
index 08c93f8..6133db9 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/DefaultResourceReport.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/DefaultResourceReport.java
@@ -19,13 +19,13 @@ package org.apache.twill.internal;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.TwillRunResources;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@@ -41,12 +41,8 @@ public final class DefaultResourceReport implements ResourceReport {
private final AtomicReference<List<String>> services;
public DefaultResourceReport(String applicationId, TwillRunResources masterResources) {
- this(applicationId, masterResources, ImmutableMap.<String, Collection<TwillRunResources>>of());
- }
-
- public DefaultResourceReport(String applicationId, TwillRunResources masterResources,
- Map<String, Collection<TwillRunResources>> resources) {
- this(applicationId, masterResources, resources, ImmutableList.<String>of());
+ this(applicationId, masterResources, Collections.<String, Collection<TwillRunResources>>emptyMap(),
+ Collections.<String>emptyList());
}
public DefaultResourceReport(String applicationId, TwillRunResources masterResources,
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 53c378a..ff8ddb2 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -43,6 +43,7 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
/**
* This class helps launching a container.
@@ -82,9 +83,11 @@ public final class TwillContainerLauncher {
* @param instanceId The Twill instance Id.
* @param mainClass The main class to run in the container.
* @param classPath The class path to load classes for the container.
+ * @param logLevelLocation The log level file location for the container to localize.
* @return instance of {@link TwillContainerController} to control the container run.
*/
- public TwillContainerController start(RunId runId, int instanceId, Class<?> mainClass, String classPath) {
+ public TwillContainerController start(RunId runId, int instanceId, Class<?> mainClass, String classPath,
+ @Nullable Location logLevelLocation) {
// Clean up zookeeper path in case this is a retry and there are old messages and state there.
Futures.getUnchecked(ZKOperations.ignoreError(
ZKOperations.recursiveDelete(zkClient, "/" + runId), KeeperException.NoNodeException.class, null));
@@ -92,7 +95,7 @@ public final class TwillContainerLauncher {
// Adds all file to be localized to container
launchContext.addResources(runtimeSpec.getLocalFiles());
- // Optionally localize secure store.
+ // Optionally localize secure store and log level file.
try {
if (secureStoreLocation != null && secureStoreLocation.exists()) {
launchContext.addResources(new DefaultLocalFile(Constants.Files.CREDENTIALS,
@@ -100,6 +103,12 @@ public final class TwillContainerLauncher {
secureStoreLocation.lastModified(),
secureStoreLocation.length(), false, null));
}
+ if (logLevelLocation != null && logLevelLocation.exists()) {
+ launchContext.addResources(new DefaultLocalFile(Constants.Files.LOG_LEVELS,
+ logLevelLocation.toURI(),
+ logLevelLocation.lastModified(),
+ logLevelLocation.length(), false, null));
+ }
} catch (IOException e) {
LOG.warn("Failed to launch container with secure store {}.", secureStoreLocation);
}
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
index a48133f..387b01d 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
@@ -20,9 +20,9 @@ package org.apache.twill.internal;
import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.api.logging.LogEntry;
import java.net.URI;
+import java.util.Map;
import javax.annotation.Nullable;
/**
@@ -39,12 +39,12 @@ public class TwillRuntimeSpecification {
private final String twillAppName;
private final int reservedMemory;
private final String rmSchedulerAddr;
- private final LogEntry.Level logLevel;
+ private final Map<String, Map<String, String>> logLevels;
public TwillRuntimeSpecification(TwillSpecification twillSpecification, String fsUser, URI twillAppDir,
String zkConnectStr, RunId twillRunId, String twillAppName,
int reservedMemory, @Nullable String rmSchedulerAddr,
- @Nullable LogEntry.Level logLevel) {
+ Map<String, Map<String, String>> logLevels) {
this.twillSpecification = twillSpecification;
this.fsUser = fsUser;
this.twillAppDir = twillAppDir;
@@ -53,7 +53,7 @@ public class TwillRuntimeSpecification {
this.twillAppName = twillAppName;
this.reservedMemory = reservedMemory;
this.rmSchedulerAddr = rmSchedulerAddr;
- this.logLevel = logLevel;
+ this.logLevels = logLevels;
}
public TwillSpecification getTwillSpecification() {
@@ -72,7 +72,7 @@ public class TwillRuntimeSpecification {
return zkConnectStr;
}
- public RunId getTwillRunId() {
+ public RunId getTwillAppRunId() {
return twillRunId;
}
@@ -89,8 +89,7 @@ public class TwillRuntimeSpecification {
return rmSchedulerAddr;
}
- @Nullable
- public LogEntry.Level getLogLevel() {
- return logLevel;
+ public Map<String, Map<String, String>> getLogLevels() {
+ return logLevels;
}
}
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
index 475871c..63d8b80 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
@@ -27,6 +27,7 @@ import com.google.gson.JsonSerializer;
import com.google.gson.reflect.TypeToken;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.internal.DefaultResourceReport;
import java.lang.reflect.Type;
@@ -45,12 +46,11 @@ public final class ResourceReportCodec implements JsonSerializer<ResourceReport>
JsonObject json = new JsonObject();
json.addProperty("appMasterId", src.getApplicationId());
- json.add("appMasterResources", context.serialize(
- src.getAppMasterResources(), new TypeToken<TwillRunResources>() { }.getType()));
+ json.add("appMasterResources", context.serialize(src.getAppMasterResources(), TwillRunResources.class));
json.add("runnableResources", context.serialize(
src.getResources(), new TypeToken<Map<String, Collection<TwillRunResources>>>() { }.getType()));
json.add("services", context.serialize(
- src.getServices(), new TypeToken<List<String>>() {}.getType()));
+ src.getServices(), new TypeToken<List<String>>() { }.getType()));
return json;
}
@@ -64,7 +64,7 @@ public final class ResourceReportCodec implements JsonSerializer<ResourceReport>
Map<String, Collection<TwillRunResources>> resources = context.deserialize(
jsonObj.get("runnableResources"), new TypeToken<Map<String, Collection<TwillRunResources>>>() { }.getType());
List<String> services = context.deserialize(
- jsonObj.get("services"), new TypeToken<List<String>>() {}.getType());
+ jsonObj.get("services"), new TypeToken<List<String>>() { }.getType());
return new DefaultResourceReport(appMasterId, masterResources, resources, services);
}
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
index 7dea371..bb4d435 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
@@ -24,11 +24,13 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
+import com.google.gson.reflect.TypeToken;
import org.apache.twill.api.TwillRunResources;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.internal.DefaultTwillRunResources;
import java.lang.reflect.Type;
+import java.util.Map;
/**
* Codec for serializing and deserializing a {@link org.apache.twill.api.TwillRunResources} object using json.
@@ -41,7 +43,7 @@ public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunReso
private static final String MEMORY_MB = "memoryMB";
private static final String VIRTUAL_CORES = "virtualCores";
private static final String DEBUG_PORT = "debugPort";
- private static final String LOG_LEVEL = "logLevel";
+ private static final String LOG_LEVELS = "logLevels";
@Override
public JsonElement serialize(TwillRunResources src, Type typeOfSrc, JsonSerializationContext context) {
@@ -55,9 +57,8 @@ public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunReso
if (src.getDebugPort() != null) {
json.addProperty(DEBUG_PORT, src.getDebugPort());
}
- if (src.getLogLevel() != null) {
- json.addProperty(LOG_LEVEL, src.getLogLevel().toString());
- }
+ json.add(LOG_LEVELS, context.serialize(src.getLogLevels(),
+ new TypeToken<Map<String, LogEntry.Level>>() { }.getType()));
return json;
}
@@ -66,13 +67,14 @@ public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunReso
public TwillRunResources deserialize(JsonElement json, Type typeOfT,
JsonDeserializationContext context) throws JsonParseException {
JsonObject jsonObj = json.getAsJsonObject();
- return new DefaultTwillRunResources(jsonObj.get("instanceId").getAsInt(),
- jsonObj.get("containerId").getAsString(),
- jsonObj.get("virtualCores").getAsInt(),
- jsonObj.get("memoryMB").getAsInt(),
- jsonObj.get("host").getAsString(),
- jsonObj.has("debugPort") ? jsonObj.get("debugPort").getAsInt() : null,
- jsonObj.has("logLevel") ? LogEntry.Level.valueOf(
- jsonObj.get("logLevel").getAsString()) : LogEntry.Level.INFO);
+ Map<String, LogEntry.Level> logLevels =
+ context.deserialize(jsonObj.get(LOG_LEVELS), new TypeToken<Map<String, LogEntry.Level>>() { }.getType());
+ return new DefaultTwillRunResources(jsonObj.get(INSTANCE_ID).getAsInt(),
+ jsonObj.get(CONTAINER_ID).getAsString(),
+ jsonObj.get(VIRTUAL_CORES).getAsInt(),
+ jsonObj.get(MEMORY_MB).getAsInt(),
+ jsonObj.get(HOST).getAsString(),
+ jsonObj.has(DEBUG_PORT) ? jsonObj.get(DEBUG_PORT).getAsInt() : null,
+ logLevels);
}
}
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java
index ae9747d..4df9081 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java
@@ -46,6 +46,7 @@ import java.io.Writer;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Map;
+import javax.annotation.Nullable;
/**
*
@@ -108,6 +109,7 @@ public final class TwillRuntimeSpecificationAdapter {
@SuppressWarnings("unchecked")
@Override
+ @Nullable
public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) {
Class<?> rawType = type.getRawType();
if (!Map.class.isAssignableFrom(rawType)) {
@@ -141,6 +143,7 @@ public final class TwillRuntimeSpecificationAdapter {
}
@Override
+ @Nullable
public Map<String, V> read(JsonReader reader) throws IOException {
if (reader.peek() == JsonToken.NULL) {
reader.nextNull();
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
index 6100c99..133371d 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
@@ -26,12 +26,12 @@ import com.google.gson.JsonParseException;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.internal.RunIds;
import org.apache.twill.internal.TwillRuntimeSpecification;
import java.lang.reflect.Type;
import java.net.URI;
+import java.util.Map;
/**
* Codec for serializing and deserializing a {@link TwillRuntimeSpecification} object using json.
@@ -46,8 +46,8 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
private static final String TWILL_APP_NAME = "twillAppName";
private static final String RESERVED_MEMORY = "reservedMemory";
private static final String RM_SCHEDULER_ADDR = "rmSchedulerAddr";
- private static final String LOG_LEVEL = "logLevel";
private static final String TWILL_SPEC = "twillSpecification";
+ private static final String LOG_LEVELS = "logLevels";
@Override
public JsonElement serialize(TwillRuntimeSpecification src, Type typeOfSrc, JsonSerializationContext context) {
@@ -55,17 +55,16 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
json.addProperty(FS_USER, src.getFsUser());
json.addProperty(TWILL_APP_DIR, src.getTwillAppDir().toASCIIString());
json.addProperty(ZK_CONNECT_STR, src.getZkConnectStr());
- json.addProperty(TWILL_RUNID, src.getTwillRunId().getId());
+ json.addProperty(TWILL_RUNID, src.getTwillAppRunId().getId());
json.addProperty(TWILL_APP_NAME, src.getTwillAppName());
json.addProperty(RESERVED_MEMORY, src.getReservedMemory());
if (src.getRmSchedulerAddr() != null) {
json.addProperty(RM_SCHEDULER_ADDR, src.getRmSchedulerAddr());
}
- if (src.getLogLevel() != null) {
- json.addProperty(LOG_LEVEL, src.getLogLevel().name());
- }
- json.add(TWILL_SPEC, context.serialize(src.getTwillSpecification(),
- new TypeToken<TwillSpecification>() { }.getType()));
+ json.add(TWILL_SPEC,
+ context.serialize(src.getTwillSpecification(), new TypeToken<TwillSpecification>() { }.getType()));
+ json.add(LOG_LEVELS,
+ context.serialize(src.getLogLevels(), new TypeToken<Map<String, Map<String, String>>>() { }.getType()));
return json;
}
@@ -76,6 +75,8 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
TwillSpecification twillSpecification = context.deserialize(
jsonObj.get(TWILL_SPEC), new TypeToken<TwillSpecification>() { }.getType());
+ Map<String, Map<String, String>> logLevels =
+ context.deserialize(jsonObj.get(LOG_LEVELS), new TypeToken<Map<String, Map<String, String>>>() { }.getType());
return new TwillRuntimeSpecification(twillSpecification,
jsonObj.get(FS_USER).getAsString(),
URI.create(jsonObj.get(TWILL_APP_DIR).getAsString()),
@@ -84,8 +85,7 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim
jsonObj.get(TWILL_APP_NAME).getAsString(),
jsonObj.get(RESERVED_MEMORY).getAsInt(),
jsonObj.has(RM_SCHEDULER_ADDR) ?
- jsonObj.get(RM_SCHEDULER_ADDR).getAsString() : null,
- jsonObj.has(LOG_LEVEL) ?
- LogEntry.Level.valueOf(jsonObj.get(LOG_LEVEL).getAsString()) : null);
+ jsonObj.get(RM_SCHEDULER_ADDR).getAsString() : null,
+ logLevels);
}
}
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java b/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
index 60c5da1..79a53d2 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
@@ -40,6 +40,7 @@ public final class MessageCodec {
private static final Type OPTIONS_TYPE = new TypeToken<Map<String, String>>() { }.getType();
private static final Gson GSON = new GsonBuilder()
+ .serializeNulls()
.registerTypeAdapter(Message.class, new MessageAdapter())
.registerTypeAdapter(Command.class, new CommandAdapter())
.create();
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java b/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
index 4c4bb8b..6a7a9c0 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
@@ -17,15 +17,24 @@
*/
package org.apache.twill.internal.state;
+import ch.qos.logback.classic.Logger;
+import com.google.common.base.Functions;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import org.apache.twill.api.Command;
+import org.apache.twill.api.logging.LogEntry;
-import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
/**
* Collection of predefined system messages.
*/
public final class SystemMessages {
+ public static final String LOG_LEVEL = "LOG_LEVEL";
+ public static final String RESET_LOG_LEVEL = "RESET_LOG_LEVEL";
public static final Command STOP_COMMAND = Command.Builder.of("stop").build();
public static final Message SECURE_STORE_UPDATED = new SimpleMessage(
Message.Type.SYSTEM, Message.Scope.APPLICATION, null, Command.Builder.of("secureStoreUpdated").build());
@@ -68,6 +77,62 @@ public final class SystemMessages {
return new SimpleMessage(Message.Type.SYSTEM, Message.Scope.RUNNABLES, null, updateCommand);
}
+ /**
+ * Helper method to get System {@link Message} for updating the log levels for all runnables.
+ *
+ * @param logLevels The log levels to be updated.
+ * @return An instance of System {@link Message} to update the log levels.
+ */
+ public static Message updateLogLevels(Map<String, LogEntry.Level> logLevels) {
+ return updateLogLevels(null, logLevels);
+ }
+
+ /**
+ * Helper method to get System {@link Message} for updating the log levels for one or all runnables.
+ *
+ * @param runnableName The name of the runnable to update the log level, null if apply to all runnables.
+ * @param logLevels The log levels to be updated.
+ * @return An instance of System {@link Message} to update the log levels.
+ */
+ public static Message updateLogLevels(@Nullable String runnableName, Map<String, LogEntry.Level> logLevels) {
+ Preconditions.checkNotNull(logLevels);
+ Preconditions.checkArgument(!(logLevels.containsKey(Logger.ROOT_LOGGER_NAME)
+ && logLevels.get(Logger.ROOT_LOGGER_NAME) == null));
+ Map<String, String> options = convertLogEntryToString(logLevels);
+ return new SimpleMessage(Message.Type.SYSTEM,
+ runnableName == null ? Message.Scope.ALL_RUNNABLE : Message.Scope.RUNNABLE,
+ runnableName, Command.Builder.of(LOG_LEVEL).addOptions(options).build());
+ }
+
+ /**
+ * Helper method to get System {@link Message} for resetting the log levels for all runnables.
+ */
+ public static Message resetLogLevels(Set<String> loggerNames) {
+ return resetLogLevels(null, loggerNames);
+ }
+
+ /**
+ * Helper method to get System {@link Message} for resetting log levels for one or all runnables.
+ *
+ * @param runnableName The name of the runnable to set the log level, null if apply to all runnables.
+ * @return An instance of System {@link Message} to reset the log levels.
+ */
+ public static Message resetLogLevels(@Nullable String runnableName, Set<String> loggerNames) {
+ return new SimpleMessage(Message.Type.SYSTEM,
+ runnableName == null ? Message.Scope.ALL_RUNNABLE : Message.Scope.RUNNABLE,
+ runnableName, Command.Builder.of(RESET_LOG_LEVEL)
+ .addOptions(Maps.uniqueIndex(loggerNames, Functions.toStringFunction())).build());
+ }
+
+ private static Map<String, String> convertLogEntryToString(Map<String, LogEntry.Level> logLevels) {
+ return Maps.transformEntries(logLevels, new Maps.EntryTransformer<String, LogEntry.Level, String>() {
+ @Override
+ public String transformEntry(String loggerName, LogEntry.Level level) {
+ return level == null ? null : level.name();
+ }
+ });
+ }
+
private SystemMessages() {
}
}
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index d6bbbff..7e858ec 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -28,8 +28,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.twill.api.RunId;
import org.apache.twill.filesystem.FileContextLocationFactory;
@@ -52,7 +50,6 @@ import org.xml.sax.InputSource;
import java.io.File;
import java.io.StringReader;
import java.net.URI;
-import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 2ec1a14..3f68bc3 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -68,7 +68,7 @@ public final class ApplicationMasterMain extends ServiceMain {
File twillSpec = new File(Constants.Files.TWILL_SPEC);
TwillRuntimeSpecification twillRuntimeSpec = TwillRuntimeSpecificationAdapter.create().fromJson(twillSpec);
String zkConnect = twillRuntimeSpec.getZkConnectStr();
- RunId runId = twillRuntimeSpec.getTwillRunId();
+ RunId runId = twillRuntimeSpec.getTwillAppRunId();
ZKClientService zkClientService = createZKClient(zkConnect, twillRuntimeSpec.getTwillAppName());
Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
@@ -76,8 +76,9 @@ public final class ApplicationMasterMain extends ServiceMain {
final YarnAMClient amClient = new VersionDetectYarnAMClientFactory(conf).create();
ApplicationMasterService service =
- new ApplicationMasterService(runId, zkClientService, twillRuntimeSpec, amClient, createAppLocation(
- conf, twillRuntimeSpec.getFsUser(), twillRuntimeSpec.getTwillAppDir()));
+ new ApplicationMasterService(runId, zkClientService, twillRuntimeSpec, amClient,
+ createAppLocation(conf, twillRuntimeSpec.getFsUser(),
+ twillRuntimeSpec.getTwillAppDir()));
TrackerService trackerService = new TrackerService(service);
new ApplicationMasterMain(service.getKafkaZKConnect())
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 063bed2..9faadc2 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -59,7 +59,6 @@ import org.apache.twill.api.TwillRunResources;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.common.Threads;
import org.apache.twill.filesystem.Location;
-import org.apache.twill.internal.Configs;
import org.apache.twill.internal.Constants;
import org.apache.twill.internal.ContainerInfo;
import org.apache.twill.internal.DefaultTwillRunResources;
@@ -72,6 +71,7 @@ import org.apache.twill.internal.json.JvmOptionsCodec;
import org.apache.twill.internal.json.LocalFileCodec;
import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.SystemMessages;
import org.apache.twill.internal.utils.Instances;
import org.apache.twill.internal.yarn.AbstractYarnTwillService;
import org.apache.twill.internal.yarn.YarnAMClient;
@@ -198,9 +198,10 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
appMasterContainerId.toString(),
Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES)),
Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_MEMORY_MB)),
- appMasterHost, null, null);
+ appMasterHost, null);
String appId = appMasterContainerId.getApplicationAttemptId().getApplicationId().toString();
- return new RunningContainers(appId, appMasterResources, zkClient);
+ return new RunningContainers(appId, appMasterResources, zkClient,
+ applicationLocation, twillSpec.getRunnables().keySet());
}
private ExpectedContainers initExpectedContainers(TwillSpecification twillSpec) {
@@ -320,6 +321,10 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
return result;
}
+ if (handleLogLevelMessages(message, completion)) {
+ return result;
+ }
+
// Replicate messages to all runnables
if (message.getScope() == Message.Scope.ALL_RUNNABLE) {
runningContainers.sendToAll(message, completion);
@@ -649,7 +654,8 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
String runnableName = provisionRequest.getRuntimeSpec().getName();
LOG.info("Starting runnable {} with {}", runnableName, processLauncher);
- LOG.debug("Log level for Twill runnable {} is {}", runnableName, twillRuntimeSpec.getLogLevel());
+ LOG.debug("Log level for Twill runnable {} is {}", runnableName,
+ twillRuntimeSpec.getLogLevels().get(runnableName).get(Logger.ROOT_LOGGER_NAME));
int containerCount = expectedContainers.getExpected(runnableName);
@@ -669,8 +675,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
ZKClients.namespace(zkClient, getZKNamespace(runnableName)),
containerCount, jvmOpts, reservedMemory, getSecureStoreLocation());
- runningContainers.start(runnableName, processLauncher.getContainerInfo(), launcher,
- twillRuntimeSpec.getLogLevel());
+ runningContainers.start(runnableName, processLauncher.getContainerInfo(), launcher);
// Need to call complete to workaround bug in YARN AMRMClient
if (provisionRequest.containerAcquired()) {
@@ -725,7 +730,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
* @return {@code true} if the message does requests for changes in number of running instances of a runnable,
* {@code false} otherwise.
*/
- private boolean handleSetInstances(final Message message, final Runnable completion) {
+ private boolean handleSetInstances(Message message, Runnable completion) {
if (message.getType() != Message.Type.SYSTEM || message.getScope() != Message.Scope.RUNNABLE) {
return false;
}
@@ -860,7 +865,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
*
* @return {@code true} if the message requests restarting some instances and {@code false} otherwise.
*/
- private boolean handleRestartRunnablesInstances(final Message message, final Runnable completion) {
+ private boolean handleRestartRunnablesInstances(Message message, Runnable completion) {
LOG.debug("Check if it should process a restart runnable instances.");
if (message.getType() != Message.Type.SYSTEM) {
@@ -951,4 +956,34 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
}
};
}
+
+ /**
+ * Attempt to change the log level from a runnable or all runnables.
+ *
+ * @return {@code true} if the message requests changing log levels and {@code false} otherwise.
+ */
+ private boolean handleLogLevelMessages(Message message, Runnable completion) {
+ Message.Scope scope = message.getScope();
+ if (message.getType() != Message.Type.SYSTEM ||
+ (scope != Message.Scope.RUNNABLE && scope != Message.Scope.ALL_RUNNABLE)) {
+ return false;
+ }
+
+ String command = message.getCommand().getCommand();
+ if (!command.equals(SystemMessages.LOG_LEVEL) && !command.equals(SystemMessages.RESET_LOG_LEVEL)) {
+ return false;
+ }
+
+ if (scope == Message.Scope.ALL_RUNNABLE) {
+ runningContainers.sendToAll(message, completion);
+ } else {
+ final String runnableName = message.getRunnableName();
+ if (runnableName == null || !twillSpec.getRunnables().containsKey(runnableName)) {
+ LOG.info("Unknown runnable {}", runnableName);
+ return false;
+ }
+ runningContainers.sendToRunnable(runnableName, message, completion);
+ }
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
index 2d49d02..e6234c3 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -17,6 +17,7 @@
*/
package org.apache.twill.internal.appmaster;
+import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
@@ -29,26 +30,31 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
import com.google.common.collect.Table;
+import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillRunResources;
import org.apache.twill.api.logging.LogEntry;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.Constants;
import org.apache.twill.internal.ContainerExitCodes;
import org.apache.twill.internal.ContainerInfo;
import org.apache.twill.internal.ContainerLiveNodeData;
import org.apache.twill.internal.DefaultResourceReport;
import org.apache.twill.internal.DefaultTwillRunResources;
-import org.apache.twill.internal.EnvKeys;
import org.apache.twill.internal.RunIds;
import org.apache.twill.internal.TwillContainerController;
import org.apache.twill.internal.TwillContainerLauncher;
import org.apache.twill.internal.container.TwillContainerMain;
import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.SystemMessages;
import org.apache.twill.internal.yarn.YarnContainerStatus;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.ZKClient;
@@ -56,18 +62,24 @@ import org.apache.twill.zookeeper.ZKOperations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
+import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
/**
* A helper class for ApplicationMasterService to keep track of running containers and to interact
@@ -98,8 +110,12 @@ final class RunningContainers {
private final Condition containerChange;
private final ZKClient zkClient;
private final Multimap<String, ContainerInfo> containerStats;
+ private final Location applicationLocation;
+ private final Set<String> runnableNames;
+ private final Map<String, Map<String, String>> logLevels;
- RunningContainers(String appId, TwillRunResources appMasterResources, ZKClient zookeeperClient) {
+ RunningContainers(String appId, TwillRunResources appMasterResources, ZKClient zookeeperClient,
+ Location applicationLocation, Set<String> runnableNames) {
containers = HashBasedTable.create();
runnableInstances = Maps.newHashMap();
completedContainerCount = Maps.newHashMap();
@@ -109,6 +125,9 @@ final class RunningContainers {
resourceReport = new DefaultResourceReport(appId, appMasterResources);
zkClient = zookeeperClient;
containerStats = HashMultimap.create();
+ this.applicationLocation = applicationLocation;
+ this.runnableNames = runnableNames;
+ this.logLevels = new TreeMap<>();
}
/**
@@ -126,22 +145,22 @@ final class RunningContainers {
/**
* Start a container for a runnable.
*/
- void start(String runnableName, ContainerInfo containerInfo, TwillContainerLauncher launcher,
- LogEntry.Level logLevel) {
+ void start(String runnableName, ContainerInfo containerInfo, TwillContainerLauncher launcher) {
containerLock.lock();
try {
int instanceId = getStartInstanceId(runnableName);
RunId runId = getRunId(runnableName, instanceId);
TwillContainerController controller = launcher.start(runId, instanceId,
- TwillContainerMain.class, "$HADOOP_CONF_DIR");
+ TwillContainerMain.class, "$HADOOP_CONF_DIR",
+ saveLogLevels());
containers.put(runnableName, containerInfo.getId(), controller);
TwillRunResources resources = new DynamicTwillRunResources(instanceId,
containerInfo.getId(),
containerInfo.getVirtualCores(),
containerInfo.getMemoryMB(),
containerInfo.getHost().getHostName(),
- controller,
- logLevel);
+ controller);
+
resourceReport.addRunResources(runnableName, resources);
containerStats.put(runnableName, containerInfo);
@@ -298,10 +317,12 @@ final class RunningContainers {
void sendToAll(Message message, Runnable completion) {
containerLock.lock();
try {
+ for (String runnableName : runnableNames) {
+ checkAndUpdateLogLevels(message, runnableName);
+ }
if (containers.isEmpty()) {
completion.run();
}
-
// Sends the command to all running containers
AtomicInteger count = new AtomicInteger(containers.size());
for (Map.Entry<String, Map<String, TwillContainerController>> entry : containers.rowMap().entrySet()) {
@@ -318,6 +339,7 @@ final class RunningContainers {
List<TwillContainerController> controllers;
containerLock.lock();
try {
+ checkAndUpdateLogLevels(message, runnableName);
controllers = new ArrayList<>(containers.row(runnableName).values());
} finally {
containerLock.unlock();
@@ -575,18 +597,77 @@ final class RunningContainers {
return false;
}
+ private void checkAndUpdateLogLevels(Message message, String runnableName) {
+ String command = message.getCommand().getCommand();
+ if (message.getType() != Message.Type.SYSTEM || (!SystemMessages.LOG_LEVEL.equals(command) &&
+ !SystemMessages.RESET_LOG_LEVEL.equals(command))) {
+ return;
+ }
+
+ // Need to copy to a tree map to maintain the ordering since we compute the md5 of the tree
+ Map<String, String> messageOptions = message.getCommand().getOptions();
+ Map<String, String> runnableLogLevels = logLevels.get(runnableName);
+
+ if (SystemMessages.RESET_LOG_LEVEL.equals(command)) {
+ // Reset case, remove the log levels that were set before
+ if (runnableLogLevels != null) {
+ if (messageOptions.isEmpty()) {
+ logLevels.remove(runnableName);
+ } else {
+ runnableLogLevels.keySet().removeAll(messageOptions.keySet());
+ }
+ }
+ return;
+ }
+
+ if (runnableLogLevels == null) {
+ runnableLogLevels = new TreeMap<>();
+ logLevels.put(runnableName, runnableLogLevels);
+ }
+ runnableLogLevels.putAll(messageOptions);
+ }
+
+ private Location saveLogLevels() {
+ LOG.debug("save the log level file");
+ try {
+ Gson gson = new GsonBuilder().serializeNulls().create();
+ String jsonStr = gson.toJson(logLevels);
+ String fileName = Hashing.md5().hashString(jsonStr) + "." + Constants.Files.LOG_LEVELS;
+ Location location = applicationLocation.append(fileName);
+ if (!location.exists()) {
+ try (Writer writer = new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8)) {
+ writer.write(jsonStr);
+ }
+ }
+ LOG.debug("Done saving the log level file");
+ return location;
+ } catch (IOException e) {
+ LOG.error("Failed to save the log level file.");
+ return null;
+ }
+ }
+
/**
* A helper class that overrides the debug port of the resources with the live info from the container controller.
*/
- private static class DynamicTwillRunResources extends DefaultTwillRunResources {
+ private static final class DynamicTwillRunResources extends DefaultTwillRunResources {
+ private static final Function<String, LogEntry.Level> LOG_LEVEL_CONVERTER = new Function<String, LogEntry.Level>() {
+ @Nullable
+ @Override
+ public LogEntry.Level apply(@Nullable String logLevel) {
+ // The logLevel is always a valid LogEntry.Level enum, because that's what the user can set through the
+ // Twill Preparer or Controller API
+ return logLevel == null ? null : LogEntry.Level.valueOf(logLevel);
+ }
+ };
private final TwillContainerController controller;
private Integer dynamicDebugPort = null;
private DynamicTwillRunResources(int instanceId, String containerId,
int cores, int memoryMB, String host,
- TwillContainerController controller, LogEntry.Level logLevel) {
- super(instanceId, containerId, cores, memoryMB, host, null, logLevel);
+ TwillContainerController controller) {
+ super(instanceId, containerId, cores, memoryMB, host, null);
this.controller = controller;
}
@@ -605,5 +686,14 @@ final class RunningContainers {
}
return dynamicDebugPort;
}
+
+ @Override
+ public synchronized Map<String, LogEntry.Level> getLogLevels() {
+ ContainerLiveNodeData liveData = controller.getLiveNodeData();
+ if (liveData != null) {
+ return Maps.transformValues(liveData.getLogLevels(), LOG_LEVEL_CONVERTER);
+ }
+ return Collections.emptyMap();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index 451db69..807e50f 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -19,9 +19,11 @@ package org.apache.twill.internal.container;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
+import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.AbstractService;
-import com.google.common.util.concurrent.Service;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.Credentials;
@@ -29,7 +31,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillRunnableSpecification;
-import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.discovery.ZKDiscoveryService;
import org.apache.twill.internal.Arguments;
import org.apache.twill.internal.BasicTwillContext;
@@ -54,6 +55,8 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.Reader;
+import java.util.HashMap;
+import java.util.Map;
/**
* The main class for launching a {@link TwillContainerService}.
@@ -61,25 +64,35 @@ import java.io.Reader;
public final class TwillContainerMain extends ServiceMain {
private static final Logger LOG = LoggerFactory.getLogger(TwillContainerMain.class);
- private static LogEntry.Level logLevel;
+
+ private final Map<String, String> logLevels = new HashMap<>();
/**
* Main method for launching a {@link TwillContainerService} which runs
* a {@link org.apache.twill.api.TwillRunnable}.
*/
- public static void main(final String[] args) throws Exception {
+ public static void main(String[] args) throws Exception {
+ new TwillContainerMain().doMain(args);
+ }
+
+ private void doMain(String[] args) throws Exception {
// Try to load the secure store from localized file, which AM requested RM to localize it for this container.
loadSecureStore();
-
File twillSpecFile = new File(Constants.Files.TWILL_SPEC);
TwillRuntimeSpecification twillRuntimeSpec = loadTwillSpec(twillSpecFile);
String zkConnectStr = twillRuntimeSpec.getZkConnectStr();
- RunId appRunId = twillRuntimeSpec.getTwillRunId();
+ RunId appRunId = twillRuntimeSpec.getTwillAppRunId();
RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
String runnableName = System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
- int instanceId = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_ID));
- int instanceCount = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT));
- logLevel = twillRuntimeSpec.getLogLevel();
+ int instanceId = Integer.valueOf(System.getenv(EnvKeys.TWILL_INSTANCE_ID));
+ int instanceCount = Integer.valueOf(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT));
+ Map<String, String> defaultLogLevels = twillRuntimeSpec.getLogLevels().get(runnableName);
+ Map<String, String> dynamicLogLevels = loadLogLevels().get(runnableName);
+
+ logLevels.putAll(defaultLogLevels);
+ if (dynamicLogLevels != null) {
+ logLevels.putAll(dynamicLogLevels);
+ }
ZKClientService zkClientService = createZKClient(zkConnectStr, twillRuntimeSpec.getTwillAppName());
ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
@@ -100,11 +113,12 @@ public final class TwillContainerMain extends ServiceMain {
ZKClient containerZKClient = getContainerZKClient(zkClientService, appRunId, runnableName);
Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
- Service service = new TwillContainerService(context, containerInfo, containerZKClient,
- runId, runnableSpec, getClassLoader(),
- createAppLocation(conf, twillRuntimeSpec.getFsUser(),
- twillRuntimeSpec.getTwillAppDir()));
- new TwillContainerMain().doMain(
+ TwillContainerService service = new TwillContainerService(context, containerInfo, containerZKClient,
+ runId, runnableSpec, getClassLoader(),
+ createAppLocation(conf, twillRuntimeSpec.getFsUser(),
+ twillRuntimeSpec.getTwillAppDir()),
+ defaultLogLevels, logLevels);
+ doMain(
service,
zkClientService,
new LogFlushService(),
@@ -115,7 +129,8 @@ public final class TwillContainerMain extends ServiceMain {
@Override
protected String getLoggerLevel(Logger logger) {
- return logLevel == null ? super.getLoggerLevel(logger) : logLevel.name();
+ String logLevel = logLevels.get(Logger.ROOT_LOGGER_NAME);
+ return logLevel == null ? super.getLoggerLevel(logger) : logLevel;
}
private static void loadSecureStore() throws IOException {
@@ -163,6 +178,17 @@ public final class TwillContainerMain extends ServiceMain {
}
}
+ private static Map<String, Map<String, String>> loadLogLevels() throws IOException {
+ File file = new File(Constants.Files.LOG_LEVELS);
+ if (file.exists()) {
+ try (Reader reader = Files.newReader(file, Charsets.UTF_8)) {
+ Gson gson = new GsonBuilder().serializeNulls().create();
+ return gson.fromJson(reader, new TypeToken<Map<String, Map<String, String>>>() { }.getType());
+ }
+ }
+ return new HashMap<>();
+ }
+
private static Arguments decodeArgs() throws IOException {
return ArgumentsCodec.decode(Files.newReaderSupplier(new File(Constants.Files.ARGUMENTS), Charsets.UTF_8));
}
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
index 96031ce..cd62578 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
@@ -17,7 +17,10 @@
*/
package org.apache.twill.internal.container;
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.LoggerContext;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -25,20 +28,27 @@ import org.apache.twill.api.Command;
import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillRunnable;
import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.common.Threads;
import org.apache.twill.filesystem.Location;
import org.apache.twill.internal.BasicTwillContext;
import org.apache.twill.internal.ContainerInfo;
import org.apache.twill.internal.ContainerLiveNodeData;
import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.SystemMessages;
import org.apache.twill.internal.utils.Instances;
import org.apache.twill.internal.yarn.AbstractYarnTwillService;
import org.apache.twill.zookeeper.ZKClient;
+import org.slf4j.ILoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
/**
* This class act as a yarn container and run a {@link org.apache.twill.api.TwillRunnable}.
@@ -51,29 +61,35 @@ public final class TwillContainerService extends AbstractYarnTwillService {
private final ClassLoader classLoader;
private final BasicTwillContext context;
private final ContainerLiveNodeData containerLiveNodeData;
+ private final Map<String, String> oldLogLevels;
+ private final Map<String, String> defaultLogLevels;
private ExecutorService commandExecutor;
private TwillRunnable runnable;
- public TwillContainerService(BasicTwillContext context, ContainerInfo containerInfo, ZKClient zkClient,
- RunId runId, TwillRunnableSpecification specification, ClassLoader classLoader,
- Location applicationLocation) {
+ TwillContainerService(BasicTwillContext context, ContainerInfo containerInfo, ZKClient zkClient,
+ RunId runId, TwillRunnableSpecification specification, ClassLoader classLoader,
+ Location applicationLocation, Map<String, String> defaultLogLevels,
+ Map<String, String> logLevels) {
super(zkClient, runId, applicationLocation);
this.specification = specification;
this.classLoader = classLoader;
- this.containerLiveNodeData = createLiveNodeData(containerInfo);
+ this.defaultLogLevels = ImmutableMap.copyOf(defaultLogLevels);
+ this.oldLogLevels = new HashMap<>(defaultLogLevels);
+ this.containerLiveNodeData = createLiveNodeData(
+ containerInfo, isLoggerContext() ? logLevels : Collections.<String, String>emptyMap());
this.context = context;
}
- private ContainerLiveNodeData createLiveNodeData(ContainerInfo containerInfo) {
+ private ContainerLiveNodeData createLiveNodeData(ContainerInfo containerInfo,
+ Map<String, String> logLevels) {
// if debugging is enabled, log the port and register it in service discovery.
String debugPort = System.getProperty("twill.debug.port");
if (debugPort != null) {
LOG.info("JVM is listening for debugger on port {}", debugPort);
}
return new ContainerLiveNodeData(containerInfo.getId(),
- containerInfo.getHost().getCanonicalHostName(),
- debugPort);
+ containerInfo.getHost().getCanonicalHostName(), debugPort, logLevels);
}
@Override
@@ -96,6 +112,46 @@ public final class TwillContainerService extends AbstractYarnTwillService {
context.setInstanceCount(Integer.parseInt(command.getOptions().get("count")));
}
+ String commandStr = command.getCommand();
+ if (message.getType() == Message.Type.SYSTEM &&
+ (SystemMessages.LOG_LEVEL.equals(commandStr) || SystemMessages.RESET_LOG_LEVEL.equals(commandStr))) {
+ if (SystemMessages.RESET_LOG_LEVEL.equals(commandStr)) {
+ Map<String, String> loggerNames = command.getOptions();
+ // Reset
+ for (Map.Entry<String, String> entry : oldLogLevels.entrySet()) {
+ String loggerName = entry.getKey();
+ // logger name is empty if we are resetting all loggers.
+ if (loggerNames.isEmpty() || loggerNames.containsKey(loggerName)) {
+ String oldLogLevel = entry.getValue();
+
+ setLogLevel(loggerName, oldLogLevel);
+ if (oldLogLevel == null || !defaultLogLevels.containsKey(loggerName)) {
+ containerLiveNodeData.removeLogLevel(loggerName);
+ } else {
+ containerLiveNodeData.setLogLevel(loggerName, oldLogLevel);
+ }
+ }
+ }
+ } else {
+ // Set log levels
+ for (Map.Entry<String, String> entry : command.getOptions().entrySet()) {
+ String loggerName = entry.getKey();
+ String logLevel = entry.getValue();
+
+ // Setting the log level in logging system as well as in the container live node
+ String oldLogLevel = setLogLevel(loggerName, logLevel);
+ containerLiveNodeData.setLogLevel(loggerName, logLevel);
+
+ if (!oldLogLevels.containsKey(loggerName)) {
+ String defaultLogLevel = defaultLogLevels.get(loggerName);
+ oldLogLevels.put(loggerName, defaultLogLevel == null ? oldLogLevel : defaultLogLevel);
+ }
+ }
+ }
+
+ updateLiveNode();
+ }
+
commandExecutor.execute(new Runnable() {
@Override
@@ -114,6 +170,10 @@ public final class TwillContainerService extends AbstractYarnTwillService {
@SuppressWarnings("unchecked")
@Override
protected void doStart() throws Exception {
+ for (Map.Entry<String, String> entry : containerLiveNodeData.getLogLevels().entrySet()) {
+ setLogLevel(entry.getKey(), entry.getValue());
+ }
+
commandExecutor = Executors.newSingleThreadExecutor(
Threads.createDaemonThreadFactory("runnable-command-executor"));
@@ -152,4 +212,35 @@ public final class TwillContainerService extends AbstractYarnTwillService {
LOG.error("Exception when stopping runnable.", t);
}
}
+
+ private boolean isLoggerContext() {
+ ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
+ return loggerFactory instanceof LoggerContext;
+ }
+
+ /**
+ * Set the log level for the requested logger name.
+ *
+ * @param loggerName name of the logger
+ * @param logLevel the log level to set to.
+ * @return the current log level of the given logger. If there is no log level configured for the given logger or
+ * if the logging implementation is not logback, {@code null} will be returned
+ */
+ @Nullable
+ private String setLogLevel(String loggerName, @Nullable String logLevel) {
+ ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
+ if (!(loggerFactory instanceof LoggerContext)) {
+ LOG.error("LoggerFactory is not a logback LoggerContext, cannot make the log level change");
+ return null;
+ }
+ LoggerContext loggerContext = (LoggerContext) loggerFactory;
+
+ ch.qos.logback.classic.Logger logger = loggerContext.getLogger(loggerName);
+ LogEntry.Level oldLogLevel = logger.getLevel() == null ? null :
+ LogEntry.Level.valueOf(logger.getLevel().toString());
+ LOG.debug("Log level of {} changed from {} to {}", loggerName, oldLogLevel, logLevel);
+ logger.setLevel(logLevel == null ? null : Level.toLevel(logLevel, Level.ERROR));
+
+ return oldLogLevel == null ? null : oldLogLevel.name();
+ }
}
http://git-wip-us.apache.org/repos/asf/twill/blob/21a3734d/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 811aa04..2aa24ab 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -130,11 +130,11 @@ final class YarnTwillPreparer implements TwillPreparer {
private final List<String> applicationClassPaths = Lists.newArrayList();
private final Credentials credentials;
private final int reservedMemory;
+ private final Map<String, Map<String, String>> logLevels = new HashMap<>();
private String schedulerQueue;
private String extraOptions;
private JvmOptions.DebugOptions debugOptions = JvmOptions.DebugOptions.NO_DEBUG;
private ClassAcceptor classAcceptor;
- private LogEntry.Level logLevel;
YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec,
YarnAppClient yarnAppClient, String zkConnectString,
@@ -151,8 +151,8 @@ final class YarnTwillPreparer implements TwillPreparer {
this.reservedMemory = yarnConfig.getInt(Configs.Keys.JAVA_RESERVED_MEMORY_MB,
Configs.Defaults.JAVA_RESERVED_MEMORY_MB);
this.extraOptions = extraOptions;
- this.logLevel = logLevel;
this.classAcceptor = new ClassAcceptor();
+ saveLogLevels(logLevel);
}
@Override
@@ -296,8 +296,27 @@ final class YarnTwillPreparer implements TwillPreparer {
@Override
public TwillPreparer setLogLevel(LogEntry.Level logLevel) {
- Preconditions.checkNotNull(logLevel);
- this.logLevel = logLevel;
+ return setLogLevels(ImmutableMap.of(Logger.ROOT_LOGGER_NAME, logLevel));
+ }
+
+ @Override
+ public TwillPreparer setLogLevels(Map<String, LogEntry.Level> logLevels) {
+ Preconditions.checkNotNull(logLevels);
+ for (String runnableName : twillSpec.getRunnables().keySet()) {
+ saveLogLevels(runnableName, logLevels);
+ }
+ return this;
+ }
+
+ @Override
+ public TwillPreparer setLogLevels(String runnableName, Map<String, LogEntry.Level> runnableLogLevels) {
+ Preconditions.checkNotNull(runnableName);
+ Preconditions.checkArgument(twillSpec.getRunnables().containsKey(runnableName),
+ "Runnable %s is not defined in the application.", runnableName);
+ Preconditions.checkNotNull(runnableLogLevels);
+ Preconditions.checkArgument(!(logLevels.containsKey(Logger.ROOT_LOGGER_NAME)
+ && logLevels.get(Logger.ROOT_LOGGER_NAME) == null));
+ saveLogLevels(runnableName, runnableLogLevels);
return this;
}
@@ -310,7 +329,6 @@ final class YarnTwillPreparer implements TwillPreparer {
new Callable<ProcessController<YarnApplicationReport>>() {
@Override
public ProcessController<YarnApplicationReport> call() throws Exception {
- String fsUser = locationFactory.getHomeLocation().getName();
// Local files needed by AM
Map<String, LocalFile> localFiles = Maps.newHashMap();
@@ -339,8 +357,6 @@ final class YarnTwillPreparer implements TwillPreparer {
// org.apache.twill.internal.appmaster.ApplicationMasterMain
// false
- LOG.debug("Log level is set to {} for the Twill application.", logLevel);
-
int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(),
Constants.APP_MASTER_RESERVED_MEMORY_MB, Constants.HEAP_MIN_RATIO);
return launcher.prepareLaunch(ImmutableMap.<String, String>of(), localFiles.values(), credentials)
@@ -384,6 +400,24 @@ final class YarnTwillPreparer implements TwillPreparer {
}
}
+ private void saveLogLevels(LogEntry.Level level) {
+ Preconditions.checkNotNull(level);
+ Map<String, String> appLogLevels = new HashMap<>();
+ appLogLevels.put(Logger.ROOT_LOGGER_NAME, level.name());
+ for (String runnableName : twillSpec.getRunnables().keySet()) {
+ this.logLevels.put(runnableName, appLogLevels);
+ }
+ }
+
+ private void saveLogLevels(String runnableName, Map<String, LogEntry.Level> logLevels) {
+ Map<String, String> newLevels = new HashMap<>();
+ for (Map.Entry<String, LogEntry.Level> entry : logLevels.entrySet()) {
+ Preconditions.checkArgument(entry.getValue() != null, "Log level cannot be null for logger {}", entry.getKey());
+ newLevels.put(entry.getKey(), entry.getValue().name());
+ }
+ this.logLevels.put(runnableName, newLevels);
+ }
+
private Credentials createCredentials() {
Credentials credentials = new Credentials();
@@ -521,7 +555,7 @@ final class YarnTwillPreparer implements TwillPreparer {
new TwillRuntimeSpecification(newTwillSpec, locationFactory.getHomeLocation().getName(),
getAppLocation().toURI(), zkConnectString, runId, twillSpec.getName(),
reservedMemory, yarnConfig.get(YarnConfiguration.RM_SCHEDULER_ADDRESS),
- logLevel), writer);
+ logLevels), writer);
}
LOG.debug("Done {}", Constants.Files.TWILL_SPEC);