You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2016/07/15 22:49:19 UTC
hive git commit: HIVE-9756: LLAP: use log4j 2 for llap (log to
separate files, etc.) (Prasanth Jayachandran reviewed by Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/master 8c11d370f -> 04597681d
HIVE-9756: LLAP: use log4j 2 for llap (log to separate files, etc.) (Prasanth Jayachandran reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/04597681
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/04597681
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/04597681
Branch: refs/heads/master
Commit: 04597681d296e7121cbd71af6408a11681bd4c80
Parents: 8c11d37
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Fri Jul 15 15:49:05 2016 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Fri Jul 15 15:49:05 2016 -0700
----------------------------------------------------------------------
llap-server/bin/llap-daemon-env.sh | 2 +-
.../llap/daemon/impl/ContainerRunnerImpl.java | 20 +-
.../hive/llap/daemon/impl/LlapDaemon.java | 2 +
.../daemon/impl/StatsRecordingThreadPool.java | 41 ++++
.../llap/daemon/impl/TaskRunnerCallable.java | 194 ++++++++++---------
.../hive/llap/io/api/impl/LlapInputFormat.java | 11 +-
.../resources/llap-daemon-log4j2.properties | 47 ++++-
llap-server/src/main/resources/package.py | 2 +
llap-server/src/main/resources/templates.py | 2 +-
.../hive/llap/tezplugins/LlapTezUtils.java | 14 +-
10 files changed, 227 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/bin/llap-daemon-env.sh
----------------------------------------------------------------------
diff --git a/llap-server/bin/llap-daemon-env.sh b/llap-server/bin/llap-daemon-env.sh
index 02c4315..14cab3d 100755
--- a/llap-server/bin/llap-daemon-env.sh
+++ b/llap-server/bin/llap-daemon-env.sh
@@ -32,7 +32,7 @@
#export LLAP_DAEMON_USER_CLASSPATH=
# Logger setup for LLAP daemon
-#export LLAP_DAEMON_LOGGER=RFA
+#export LLAP_DAEMON_LOGGER=query-routing
# Log level for LLAP daemon
#export LLAP_DAEMON_LOG_LEVEL=INFO
http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 2f9dea0..103115e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrB
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
import org.apache.hadoop.hive.llap.security.LlapSignerImpl;
import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -62,6 +63,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.log4j.MDC;
import org.apache.log4j.NDC;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
@@ -185,8 +187,19 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber());
// This is the start of container-annotated logging.
- // TODO Reduce the length of this string. Way too verbose at the moment.
- NDC.push(fragmentIdString);
+ final String dagId = attemptId.getTaskID().getVertexID().getDAGId().toString();
+ final String queryId = vertex.getHiveQueryId();
+ final String fragId = LlapTezUtils.stripAttemptPrefix(fragmentIdString);
+ MDC.put("dagId", dagId);
+ MDC.put("queryId", queryId);
+ MDC.put("fragmentId", fragId);
+ // TODO: Ideally we want tez to use CallableWithMdc that retains the MDC for threads created in
+ // thread pool. For now, we will push both dagId and queryId into NDC and the custom thread
+ // pool that we use for task execution and llap io (StatsRecordingThreadPool) will pop them
+ // using reflection and update the MDC.
+ NDC.push(dagId);
+ NDC.push(queryId);
+ NDC.push(fragId);
Scheduler.SubmissionState submissionState;
SubmitWorkResponseProto.Builder responseBuilder = SubmitWorkResponseProto.newBuilder();
try {
@@ -246,7 +259,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
metrics.incrExecutorTotalRequestsHandled();
}
} finally {
- NDC.pop();
+ MDC.clear();
+ NDC.clear();
}
responseBuilder.setSubmissionState(SubmissionStateProto.valueOf(submissionState.name()));
http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index c7e9d32..91b8727 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -287,6 +287,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
URL llap_l4j2 = LlapDaemon.class.getClassLoader().getResource(LOG4j2_PROPERTIES_FILE);
if (llap_l4j2 != null) {
final boolean async = LogUtils.checkAndSetAsyncLogging(conf);
+ // required for MDC based routing appender so that child threads can inherit the MDC context
+ System.setProperty("isThreadContextMapInheritable", "true");
Configurator.initialize("LlapDaemonLog4j2", llap_l4j2.toString());
long end = System.currentTimeMillis();
LOG.warn("LLAP daemon logging initialized from {} in {} ms. Async: {}",
http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
index 9b3ce7e..363b9b1 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
@@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hive.llap.daemon.impl;
+import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
+import java.util.Stack;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
@@ -30,6 +32,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader;
+import org.apache.log4j.MDC;
+import org.apache.log4j.NDC;
+import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.counters.FileSystemCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.runtime.task.TaskRunner2Callable;
@@ -100,10 +105,46 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor {
// clone thread local file system statistics
List<LlapUtil.StatisticsData> statsBefore = LlapUtil.cloneThreadLocalFileSystemStatistics();
+ setupMDCFromNDC(actualCallable);
try {
return actualCallable.call();
} finally {
updateFileSystemCounters(statsBefore, actualCallable);
+ MDC.clear();
+ }
+ }
+
+ private void setupMDCFromNDC(final Callable<V> actualCallable) {
+ if (actualCallable instanceof CallableWithNdc) {
+ CallableWithNdc callableWithNdc = (CallableWithNdc) actualCallable;
+ try {
+ // CallableWithNdc inherits from NDC only when call() is invoked. CallableWithNdc has to
+ // extended to provide access to its ndcStack that is cloned during creation. Until, then
+ // we will use reflection to access the private field.
+ // FIXME: HIVE-14243 follow to remove this reflection
+ Field field = callableWithNdc.getClass().getSuperclass().getDeclaredField("ndcStack");
+ field.setAccessible(true);
+ Stack ndcStack = (Stack) field.get(callableWithNdc);
+
+ final Stack clonedStack = (Stack) ndcStack.clone();
+ final String fragmentId = (String) clonedStack.pop();
+ final String queryId = (String) clonedStack.pop();
+ final String dagId = (String) clonedStack.pop();
+ MDC.put("dagId", dagId);
+ MDC.put("queryId", queryId);
+ MDC.put("fragmentId", fragmentId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received dagId: {} queryId: {} instanceType: {}",
+ dagId, queryId, actualCallable.getClass().getSimpleName());
+ }
+ } catch (Exception e) {
+ LOG.warn("Not setting up MDC as NDC stack cannot be accessed reflectively for" +
+ " instance type: {} exception type: {}",
+ actualCallable.getClass().getSimpleName(), e.getClass().getSimpleName());
+ }
+ } else {
+ LOG.warn("Not setting up MDC as unknown callable instance type received: {}",
+ actualCallable.getClass().getSimpleName());
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index fb64f0b..87bd5c8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Map;
+import java.util.Stack;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -50,6 +51,8 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.log4j.MDC;
+import org.apache.log4j.NDC;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
@@ -166,109 +169,126 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
@Override
protected TaskRunner2Result callInternal() throws Exception {
- isStarted.set(true);
+ setMDCFromNDC();
- this.startTime = System.currentTimeMillis();
- this.threadName = Thread.currentThread().getName();
- if (LOG.isDebugEnabled()) {
- LOG.debug("canFinish: " + taskSpec.getTaskAttemptID() + ": " + canFinish());
- }
-
- // Unregister from the AMReporter, since the task is now running.
- this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort());
+ try {
+ isStarted.set(true);
- synchronized (this) {
- if (!shouldRunTask) {
- LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID());
- return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
+ this.startTime = System.currentTimeMillis();
+ this.threadName = Thread.currentThread().getName();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("canFinish: " + taskSpec.getTaskAttemptID() + ": " + canFinish());
}
- }
- // TODO This executor seems unnecessary. Here and TezChild
- executor = new StatsRecordingThreadPool(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("TezTaskRunner")
- .build());
-
- // TODO Consolidate this code with TezChild.
- runtimeWatch.start();
- if (taskUgi == null) {
- taskUgi = UserGroupInformation.createRemoteUser(vertex.getUser());
- }
- taskUgi.addCredentials(credentials);
-
- Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<>();
- serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
- TezCommonUtils.convertJobTokenToBytes(jobToken));
- Multimap<String, String> startedInputsMap = createStartedInputMap(vertex);
-
- UserGroupInformation taskOwner =
- UserGroupInformation.createRemoteUser(vertex.getTokenIdentifier());
- final InetSocketAddress address =
- NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort());
- SecurityUtil.setTokenService(jobToken, address);
- taskOwner.addToken(jobToken);
- umbilical = taskOwner.doAs(new PrivilegedExceptionAction<LlapTaskUmbilicalProtocol>() {
- @Override
- public LlapTaskUmbilicalProtocol run() throws Exception {
- return RPC.getProxy(LlapTaskUmbilicalProtocol.class,
- LlapTaskUmbilicalProtocol.versionID, address, conf);
- }
- });
-
- String fragmentId = LlapTezUtils.stripAttemptPrefix(taskSpec.getTaskAttemptID().toString());
- taskReporter = new LlapTaskReporter(
- umbilical,
- confParams.amHeartbeatIntervalMsMax,
- confParams.amCounterHeartbeatInterval,
- confParams.amMaxEventsPerHeartbeat,
- new AtomicLong(0),
- request.getContainerIdString(),
- fragmentId,
- initialEvent);
-
- String attemptId = fragmentInfo.getFragmentIdentifierString();
- IOContextMap.setThreadAttemptId(attemptId);
- try {
+ // Unregister from the AMReporter, since the task is now running.
+ this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort());
+
synchronized (this) {
- if (shouldRunTask) {
- taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(),
- taskSpec,
- vertex.getQueryIdentifier().getAppAttemptNumber(),
- serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor,
- objectRegistry,
- pid,
- executionContext, memoryAvailable, false, tezHadoopShim);
+ if (!shouldRunTask) {
+ LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID());
+ return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
}
}
- if (taskRunner == null) {
- LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID());
- return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
- }
+ // TODO This executor seems unnecessary. Here and TezChild
+ executor = new StatsRecordingThreadPool(1, 1,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("TezTaskRunner")
+ .build());
+
+ // TODO Consolidate this code with TezChild.
+ runtimeWatch.start();
+ if (taskUgi == null) {
+ taskUgi = UserGroupInformation.createRemoteUser(vertex.getUser());
+ }
+ taskUgi.addCredentials(credentials);
+
+ Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<>();
+ serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+ TezCommonUtils.convertJobTokenToBytes(jobToken));
+ Multimap<String, String> startedInputsMap = createStartedInputMap(vertex);
+
+ UserGroupInformation taskOwner =
+ UserGroupInformation.createRemoteUser(vertex.getTokenIdentifier());
+ final InetSocketAddress address =
+ NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort());
+ SecurityUtil.setTokenService(jobToken, address);
+ taskOwner.addToken(jobToken);
+ umbilical = taskOwner.doAs(new PrivilegedExceptionAction<LlapTaskUmbilicalProtocol>() {
+ @Override
+ public LlapTaskUmbilicalProtocol run() throws Exception {
+ return RPC.getProxy(LlapTaskUmbilicalProtocol.class,
+ LlapTaskUmbilicalProtocol.versionID, address, conf);
+ }
+ });
+
+ String fragmentId = LlapTezUtils.stripAttemptPrefix(taskSpec.getTaskAttemptID().toString());
+ taskReporter = new LlapTaskReporter(
+ umbilical,
+ confParams.amHeartbeatIntervalMsMax,
+ confParams.amCounterHeartbeatInterval,
+ confParams.amMaxEventsPerHeartbeat,
+ new AtomicLong(0),
+ request.getContainerIdString(),
+ fragmentId,
+ initialEvent);
+
+ String attemptId = fragmentInfo.getFragmentIdentifierString();
+ IOContextMap.setThreadAttemptId(attemptId);
try {
- TaskRunner2Result result = taskRunner.run();
- if (result.isContainerShutdownRequested()) {
- LOG.warn("Unexpected container shutdown requested while running task. Ignoring");
+ synchronized (this) {
+ if (shouldRunTask) {
+ taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(),
+ taskSpec,
+ vertex.getQueryIdentifier().getAppAttemptNumber(),
+ serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor,
+ objectRegistry,
+ pid,
+ executionContext, memoryAvailable, false, tezHadoopShim);
+ }
}
- isCompleted.set(true);
- return result;
- } finally {
- FileSystem.closeAllForUGI(taskUgi);
- LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
- runtimeWatch.stop().elapsedMillis());
- if (LOG.isDebugEnabled()) {
- LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish());
+ if (taskRunner == null) {
+ LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID());
+ return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
+ }
+
+ try {
+ TaskRunner2Result result = taskRunner.run();
+ if (result.isContainerShutdownRequested()) {
+ LOG.warn("Unexpected container shutdown requested while running task. Ignoring");
+ }
+ isCompleted.set(true);
+ return result;
+ } finally {
+ FileSystem.closeAllForUGI(taskUgi);
+ LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
+ runtimeWatch.stop().elapsedMillis());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish());
+ }
}
+ } finally {
+ IOContextMap.clearThreadAttempt(attemptId);
}
} finally {
- IOContextMap.clearThreadAttempt(attemptId);
+ MDC.clear();
}
}
+ private void setMDCFromNDC() {
+ final Stack<String> clonedNDC = NDC.cloneStack();
+ final String fragId = clonedNDC.pop();
+ final String queryId = clonedNDC.pop();
+ final String dagId = clonedNDC.pop();
+ MDC.put("dagId", dagId);
+ MDC.put("queryId", queryId);
+ MDC.put("fragmentId", fragId);
+ }
+
/**
* Attempt to kill a running task. If the task has not started running, it will not start.
* If it's already running, a kill request will be sent to it.
http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index cc4e10b..c5d0680 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -42,6 +42,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
@@ -84,8 +85,10 @@ import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.impl.TaskSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowBatch>,
VectorizedInputFormatInterface, SelfDescribingInputFormatInterface,
@@ -189,9 +192,14 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
this.columnIds = includedCols;
this.sarg = ConvertAstToSearchArg.createFromConf(job);
this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
- String fragmentId = LlapTezUtils.getFragmentId(job);
+ final String fragmentId = LlapTezUtils.getFragmentId(job);
+ final String dagId = LlapTezUtils.getDagId(job);
+ final String queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID);
+ MDC.put("dagId", dagId);
+ MDC.put("queryId", queryId);
TezCounters taskCounters = null;
if (fragmentId != null) {
+ MDC.put("fragmentId", fragmentId);
taskCounters = FragmentCountersMap.getCountersForFragment(fragmentId);
LOG.info("Received fragment id: {}", fragmentId);
} else {
@@ -341,6 +349,7 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged!
feedback.stop();
rethrowErrorIfAny();
+ MDC.clear();
}
private void rethrowErrorIfAny() throws IOException {
http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/main/resources/llap-daemon-log4j2.properties
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/llap-daemon-log4j2.properties b/llap-server/src/main/resources/llap-daemon-log4j2.properties
index 7488ba2..1a0387c 100644
--- a/llap-server/src/main/resources/llap-daemon-log4j2.properties
+++ b/llap-server/src/main/resources/llap-daemon-log4j2.properties
@@ -25,26 +25,29 @@ property.llap.daemon.log.dir = .
property.llap.daemon.log.file = llapdaemon.log
property.llap.daemon.historylog.file = llapdaemon_history.log
property.llap.daemon.log.maxfilesize = 256MB
-property.llap.daemon.log.maxbackupindex = 20
+property.llap.daemon.log.maxbackupindex = 240
# list of all appenders
-appenders = console, RFA, HISTORYAPPENDER
+appenders = console, RFA, HISTORYAPPENDER, dag-routing, query-routing
# console appender
appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %d{ISO8601} %5p [%t%x] %c{2}: %m%n
+appender.console.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n
# rolling file appender
appender.RFA.type = RollingRandomAccessFile
appender.RFA.name = RFA
appender.RFA.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}
-appender.RFA.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}_%i
+appender.RFA.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}_%d{yyyy-MM-dd-HH}_%i
appender.RFA.layout.type = PatternLayout
-appender.RFA.layout.pattern = %d{ISO8601} %5p [%t%x] %c{2}: %m%n
+appender.RFA.layout.pattern = %d{ISO8601} %-5p [%t (%X{fragmentId})] %c: %m%n
appender.RFA.policies.type = Policies
+appender.RFA.policies.time.type = TimeBasedTriggeringPolicy
+appender.RFA.policies.time.interval = 1
+appender.RFA.policies.time.modulate = true
appender.RFA.policies.size.type = SizeBasedTriggeringPolicy
appender.RFA.policies.size.size = ${sys:llap.daemon.log.maxfilesize}
appender.RFA.strategy.type = DefaultRolloverStrategy
@@ -63,6 +66,40 @@ appender.HISTORYAPPENDER.policies.size.size = ${sys:llap.daemon.log.maxfilesize}
appender.HISTORYAPPENDER.strategy.type = DefaultRolloverStrategy
appender.HISTORYAPPENDER.strategy.max = ${sys:llap.daemon.log.maxbackupindex}
+# dagId based routing file appender
+appender.dag-routing.type = Routing
+appender.dag-routing.name = dag-routing
+appender.dag-routing.routes.type = Routes
+appender.dag-routing.routes.pattern = $${ctx:dagId}
+# default route
+appender.dag-routing.routes.route-default.type = Route
+appender.dag-routing.routes.route-default.key = $${ctx:dagId}
+appender.dag-routing.routes.route-default.ref = RFA
+# dagId based route
+appender.dag-routing.routes.route-mdc.type = Route
+appender.dag-routing.routes.route-mdc.file-mdc.type = RandomAccessFile
+appender.dag-routing.routes.route-mdc.file-mdc.name = file-mdc
+appender.dag-routing.routes.route-mdc.file-mdc.fileName = ${sys:llap.daemon.log.dir}/${ctx:dagId}.log
+appender.dag-routing.routes.route-mdc.file-mdc.layout.type = PatternLayout
+appender.dag-routing.routes.route-mdc.file-mdc.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n
+
+# queryId based routing file appender
+appender.query-routing.type = Routing
+appender.query-routing.name = query-routing
+appender.query-routing.routes.type = Routes
+appender.query-routing.routes.pattern = $${ctx:queryId}
+# default route
+appender.query-routing.routes.route-default.type = Route
+appender.query-routing.routes.route-default.key = $${ctx:queryId}
+appender.query-routing.routes.route-default.ref = RFA
+# queryId based route
+appender.query-routing.routes.route-mdc.type = Route
+appender.query-routing.routes.route-mdc.file-mdc.type = RandomAccessFile
+appender.query-routing.routes.route-mdc.file-mdc.name = file-mdc
+appender.query-routing.routes.route-mdc.file-mdc.fileName = ${sys:llap.daemon.log.dir}/${ctx:queryId}.log
+appender.query-routing.routes.route-mdc.file-mdc.layout.type = PatternLayout
+appender.query-routing.routes.route-mdc.file-mdc.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n
+
# list of all loggers
loggers = NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, HistoryLogger, LlapIoImpl, LlapIoOrc, LlapIoCache, LlapIoLocking
http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/main/resources/package.py
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/package.py b/llap-server/src/main/resources/package.py
index 83fe918..a200414 100644
--- a/llap-server/src/main/resources/package.py
+++ b/llap-server/src/main/resources/package.py
@@ -71,6 +71,7 @@ def main(args):
parser.add_argument("--args", default="")
parser.add_argument("--name", default="llap0")
parser.add_argument("--loglevel", default="INFO")
+ parser.add_argument("--logger", default="query-routing")
parser.add_argument("--chaosmonkey", type=int, default=0)
parser.add_argument("--slider-am-container-mb", type=int, default=1024)
parser.add_argument("--slider-keytab-dir", default="")
@@ -120,6 +121,7 @@ def main(args):
"name" : resource.clusterName,
"daemon_args" : daemon_args,
"daemon_loglevel" : args.loglevel,
+ "daemon_logger" : args.logger,
"queue.string" : resource.queueString,
"monkey_interval" : args.chaosmonkey,
"monkey_percentage" : monkey_percentage,
http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/main/resources/templates.py
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/templates.py b/llap-server/src/main/resources/templates.py
index 8baa927..505219a 100644
--- a/llap-server/src/main/resources/templates.py
+++ b/llap-server/src/main/resources/templates.py
@@ -74,7 +74,7 @@ appConfig = """
"site.global.app_user": "yarn",
"site.global.app_root": "${AGENT_WORK_ROOT}/app/install/",
"site.global.app_tmp_dir": "${AGENT_WORK_ROOT}/tmp/",
- "site.global.app_logger": "RFA",
+ "site.global.app_logger": "%(daemon_logger)s",
"site.global.app_log_level": "%(daemon_loglevel)s",
"site.global.additional_cp": "%(hadoop_home)s",
"site.global.daemon_args": "%(daemon_args)s",
http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
index eda8862..e4af660 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
@@ -14,24 +14,14 @@
package org.apache.hadoop.hive.llap.tezplugins;
-import java.text.NumberFormat;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.input.MultiMRInput;
-import com.google.common.base.Joiner;
-
@InterfaceAudience.Private
public class LlapTezUtils {
public static boolean isSourceOfInterest(String inputClassName) {
@@ -40,6 +30,10 @@ public class LlapTezUtils {
MultiMRInput.class.getName()) || inputClassName.equals(MRInput.class.getName()));
}
+ public static String getDagId(final JobConf job) {
+ return job.get(MRInput.TEZ_MAPREDUCE_DAG_ID);
+ }
+
public static String getFragmentId(final JobConf job) {
String taskAttemptId = job.get(MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_ID);
if (taskAttemptId != null) {