You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2016/07/15 22:49:19 UTC

hive git commit: HIVE-9756: LLAP: use log4j 2 for llap (log to separate files, etc.) (Prasanth Jayachandran reviewed by Siddharth Seth)

Repository: hive
Updated Branches:
  refs/heads/master 8c11d370f -> 04597681d


HIVE-9756: LLAP: use log4j 2 for llap (log to separate files, etc.) (Prasanth Jayachandran reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/04597681
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/04597681
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/04597681

Branch: refs/heads/master
Commit: 04597681d296e7121cbd71af6408a11681bd4c80
Parents: 8c11d37
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Fri Jul 15 15:49:05 2016 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Fri Jul 15 15:49:05 2016 -0700

----------------------------------------------------------------------
 llap-server/bin/llap-daemon-env.sh              |   2 +-
 .../llap/daemon/impl/ContainerRunnerImpl.java   |  20 +-
 .../hive/llap/daemon/impl/LlapDaemon.java       |   2 +
 .../daemon/impl/StatsRecordingThreadPool.java   |  41 ++++
 .../llap/daemon/impl/TaskRunnerCallable.java    | 194 ++++++++++---------
 .../hive/llap/io/api/impl/LlapInputFormat.java  |  11 +-
 .../resources/llap-daemon-log4j2.properties     |  47 ++++-
 llap-server/src/main/resources/package.py       |   2 +
 llap-server/src/main/resources/templates.py     |   2 +-
 .../hive/llap/tezplugins/LlapTezUtils.java      |  14 +-
 10 files changed, 227 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/bin/llap-daemon-env.sh
----------------------------------------------------------------------
diff --git a/llap-server/bin/llap-daemon-env.sh b/llap-server/bin/llap-daemon-env.sh
index 02c4315..14cab3d 100755
--- a/llap-server/bin/llap-daemon-env.sh
+++ b/llap-server/bin/llap-daemon-env.sh
@@ -32,7 +32,7 @@
 #export LLAP_DAEMON_USER_CLASSPATH=
 
 # Logger setup for LLAP daemon
-#export LLAP_DAEMON_LOGGER=RFA
+#export LLAP_DAEMON_LOGGER=query-routing
 
 # Log level for LLAP daemon
 #export LLAP_DAEMON_LOG_LEVEL=INFO

http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 2f9dea0..103115e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrB
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.security.LlapSignerImpl;
 import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -62,6 +63,7 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.log4j.MDC;
 import org.apache.log4j.NDC;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
@@ -185,8 +187,19 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
         vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber());
 
     // This is the start of container-annotated logging.
-    // TODO Reduce the length of this string. Way too verbose at the moment.
-    NDC.push(fragmentIdString);
+    final String dagId = attemptId.getTaskID().getVertexID().getDAGId().toString();
+    final String queryId = vertex.getHiveQueryId();
+    final String fragId = LlapTezUtils.stripAttemptPrefix(fragmentIdString);
+    MDC.put("dagId", dagId);
+    MDC.put("queryId", queryId);
+    MDC.put("fragmentId", fragId);
+    // TODO: Ideally we want tez to use CallableWithMdc that retains the MDC for threads created in
+    // thread pool. For now, we will push both dagId and queryId into NDC and the custom thread
+    // pool that we use for task execution and llap io (StatsRecordingThreadPool) will pop them
+    // using reflection and update the MDC.
+    NDC.push(dagId);
+    NDC.push(queryId);
+    NDC.push(fragId);
     Scheduler.SubmissionState submissionState;
     SubmitWorkResponseProto.Builder responseBuilder = SubmitWorkResponseProto.newBuilder();
     try {
@@ -246,7 +259,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
         metrics.incrExecutorTotalRequestsHandled();
       }
     } finally {
-      NDC.pop();
+      MDC.clear();
+      NDC.clear();
     }
 
     responseBuilder.setSubmissionState(SubmissionStateProto.valueOf(submissionState.name()));

http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index c7e9d32..91b8727 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -287,6 +287,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
     URL llap_l4j2 = LlapDaemon.class.getClassLoader().getResource(LOG4j2_PROPERTIES_FILE);
     if (llap_l4j2 != null) {
       final boolean async = LogUtils.checkAndSetAsyncLogging(conf);
+      // required for MDC based routing appender so that child threads can inherit the MDC context
+      System.setProperty("isThreadContextMapInheritable", "true");
       Configurator.initialize("LlapDaemonLog4j2", llap_l4j2.toString());
       long end = System.currentTimeMillis();
       LOG.warn("LLAP daemon logging initialized from {} in {} ms. Async: {}",

http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
index 9b3ce7e..363b9b1 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
@@ -17,8 +17,10 @@
  */
 package org.apache.hadoop.hive.llap.daemon.impl;
 
+import java.lang.reflect.Field;
 import java.util.List;
 import java.util.Map;
+import java.util.Stack;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.FutureTask;
@@ -30,6 +32,9 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader;
+import org.apache.log4j.MDC;
+import org.apache.log4j.NDC;
+import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.counters.FileSystemCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.runtime.task.TaskRunner2Callable;
@@ -100,10 +105,46 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor {
       // clone thread local file system statistics
       List<LlapUtil.StatisticsData> statsBefore = LlapUtil.cloneThreadLocalFileSystemStatistics();
 
+      setupMDCFromNDC(actualCallable);
       try {
         return actualCallable.call();
       } finally {
         updateFileSystemCounters(statsBefore, actualCallable);
+        MDC.clear();
+      }
+    }
+
+    private void setupMDCFromNDC(final Callable<V> actualCallable) {
+      if (actualCallable instanceof CallableWithNdc) {
+        CallableWithNdc callableWithNdc = (CallableWithNdc) actualCallable;
+        try {
+          // CallableWithNdc inherits from NDC only when call() is invoked. CallableWithNdc has to
+          // extended to provide access to its ndcStack that is cloned during creation. Until, then
+          // we will use reflection to access the private field.
+          // FIXME: HIVE-14243 follow to remove this reflection
+          Field field = callableWithNdc.getClass().getSuperclass().getDeclaredField("ndcStack");
+          field.setAccessible(true);
+          Stack ndcStack = (Stack) field.get(callableWithNdc);
+
+          final Stack clonedStack = (Stack) ndcStack.clone();
+          final String fragmentId = (String) clonedStack.pop();
+          final String queryId = (String) clonedStack.pop();
+          final String dagId = (String) clonedStack.pop();
+          MDC.put("dagId", dagId);
+          MDC.put("queryId", queryId);
+          MDC.put("fragmentId", fragmentId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Received dagId: {} queryId: {} instanceType: {}",
+                dagId, queryId, actualCallable.getClass().getSimpleName());
+          }
+        } catch (Exception e) {
+          LOG.warn("Not setting up MDC as NDC stack cannot be accessed reflectively for" +
+                  " instance type: {} exception type: {}",
+              actualCallable.getClass().getSimpleName(), e.getClass().getSimpleName());
+        }
+      } else {
+        LOG.warn("Not setting up MDC as unknown callable instance type received: {}",
+            actualCallable.getClass().getSimpleName());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index fb64f0b..87bd5c8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Stack;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -50,6 +51,8 @@ import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.log4j.MDC;
+import org.apache.log4j.NDC;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
@@ -166,109 +169,126 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
 
   @Override
   protected TaskRunner2Result callInternal() throws Exception {
-    isStarted.set(true);
+    setMDCFromNDC();
 
-    this.startTime = System.currentTimeMillis();
-    this.threadName = Thread.currentThread().getName();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("canFinish: " + taskSpec.getTaskAttemptID() + ": " + canFinish());
-    }
-
-    // Unregister from the AMReporter, since the task is now running.
-    this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort());
+    try {
+      isStarted.set(true);
 
-    synchronized (this) {
-      if (!shouldRunTask) {
-        LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID());
-        return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
+      this.startTime = System.currentTimeMillis();
+      this.threadName = Thread.currentThread().getName();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("canFinish: " + taskSpec.getTaskAttemptID() + ": " + canFinish());
       }
-    }
 
-    // TODO This executor seems unnecessary. Here and TezChild
-    executor = new StatsRecordingThreadPool(1, 1,
-        0L, TimeUnit.MILLISECONDS,
-        new LinkedBlockingQueue<Runnable>(),
-        new ThreadFactoryBuilder()
-            .setDaemon(true)
-            .setNameFormat("TezTaskRunner")
-            .build());
-
-    // TODO Consolidate this code with TezChild.
-    runtimeWatch.start();
-    if (taskUgi == null) {
-      taskUgi = UserGroupInformation.createRemoteUser(vertex.getUser());
-    }
-    taskUgi.addCredentials(credentials);
-
-    Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<>();
-    serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
-        TezCommonUtils.convertJobTokenToBytes(jobToken));
-    Multimap<String, String> startedInputsMap = createStartedInputMap(vertex);
-
-    UserGroupInformation taskOwner =
-        UserGroupInformation.createRemoteUser(vertex.getTokenIdentifier());
-    final InetSocketAddress address =
-        NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort());
-    SecurityUtil.setTokenService(jobToken, address);
-    taskOwner.addToken(jobToken);
-    umbilical = taskOwner.doAs(new PrivilegedExceptionAction<LlapTaskUmbilicalProtocol>() {
-      @Override
-      public LlapTaskUmbilicalProtocol run() throws Exception {
-        return RPC.getProxy(LlapTaskUmbilicalProtocol.class,
-            LlapTaskUmbilicalProtocol.versionID, address, conf);
-      }
-    });
-
-    String fragmentId = LlapTezUtils.stripAttemptPrefix(taskSpec.getTaskAttemptID().toString());
-    taskReporter = new LlapTaskReporter(
-        umbilical,
-        confParams.amHeartbeatIntervalMsMax,
-        confParams.amCounterHeartbeatInterval,
-        confParams.amMaxEventsPerHeartbeat,
-        new AtomicLong(0),
-        request.getContainerIdString(),
-        fragmentId,
-        initialEvent);
-
-    String attemptId = fragmentInfo.getFragmentIdentifierString();
-    IOContextMap.setThreadAttemptId(attemptId);
-    try {
+      // Unregister from the AMReporter, since the task is now running.
+      this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort());
+
       synchronized (this) {
-        if (shouldRunTask) {
-          taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(),
-              taskSpec,
-              vertex.getQueryIdentifier().getAppAttemptNumber(),
-              serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor,
-              objectRegistry,
-              pid,
-              executionContext, memoryAvailable, false, tezHadoopShim);
+        if (!shouldRunTask) {
+          LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID());
+          return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
         }
       }
-      if (taskRunner == null) {
-        LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID());
-        return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
-      }
 
+      // TODO This executor seems unnecessary. Here and TezChild
+      executor = new StatsRecordingThreadPool(1, 1,
+          0L, TimeUnit.MILLISECONDS,
+          new LinkedBlockingQueue<Runnable>(),
+          new ThreadFactoryBuilder()
+              .setDaemon(true)
+              .setNameFormat("TezTaskRunner")
+              .build());
+
+      // TODO Consolidate this code with TezChild.
+      runtimeWatch.start();
+      if (taskUgi == null) {
+        taskUgi = UserGroupInformation.createRemoteUser(vertex.getUser());
+      }
+      taskUgi.addCredentials(credentials);
+
+      Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<>();
+      serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+          TezCommonUtils.convertJobTokenToBytes(jobToken));
+      Multimap<String, String> startedInputsMap = createStartedInputMap(vertex);
+
+      UserGroupInformation taskOwner =
+          UserGroupInformation.createRemoteUser(vertex.getTokenIdentifier());
+      final InetSocketAddress address =
+          NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort());
+      SecurityUtil.setTokenService(jobToken, address);
+      taskOwner.addToken(jobToken);
+      umbilical = taskOwner.doAs(new PrivilegedExceptionAction<LlapTaskUmbilicalProtocol>() {
+        @Override
+        public LlapTaskUmbilicalProtocol run() throws Exception {
+          return RPC.getProxy(LlapTaskUmbilicalProtocol.class,
+              LlapTaskUmbilicalProtocol.versionID, address, conf);
+        }
+      });
+
+      String fragmentId = LlapTezUtils.stripAttemptPrefix(taskSpec.getTaskAttemptID().toString());
+      taskReporter = new LlapTaskReporter(
+          umbilical,
+          confParams.amHeartbeatIntervalMsMax,
+          confParams.amCounterHeartbeatInterval,
+          confParams.amMaxEventsPerHeartbeat,
+          new AtomicLong(0),
+          request.getContainerIdString(),
+          fragmentId,
+          initialEvent);
+
+      String attemptId = fragmentInfo.getFragmentIdentifierString();
+      IOContextMap.setThreadAttemptId(attemptId);
       try {
-        TaskRunner2Result result = taskRunner.run();
-        if (result.isContainerShutdownRequested()) {
-          LOG.warn("Unexpected container shutdown requested while running task. Ignoring");
+        synchronized (this) {
+          if (shouldRunTask) {
+            taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(),
+                taskSpec,
+                vertex.getQueryIdentifier().getAppAttemptNumber(),
+                serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor,
+                objectRegistry,
+                pid,
+                executionContext, memoryAvailable, false, tezHadoopShim);
+          }
         }
-        isCompleted.set(true);
-        return result;
-      } finally {
-        FileSystem.closeAllForUGI(taskUgi);
-        LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
-            runtimeWatch.stop().elapsedMillis());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish());
+        if (taskRunner == null) {
+          LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID());
+          return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
+        }
+
+        try {
+          TaskRunner2Result result = taskRunner.run();
+          if (result.isContainerShutdownRequested()) {
+            LOG.warn("Unexpected container shutdown requested while running task. Ignoring");
+          }
+          isCompleted.set(true);
+          return result;
+        } finally {
+          FileSystem.closeAllForUGI(taskUgi);
+          LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
+              runtimeWatch.stop().elapsedMillis());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                "canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish());
+          }
         }
+      } finally {
+        IOContextMap.clearThreadAttempt(attemptId);
       }
     } finally {
-      IOContextMap.clearThreadAttempt(attemptId);
+      MDC.clear();
     }
   }
 
+  private void setMDCFromNDC() {
+    final Stack<String> clonedNDC = NDC.cloneStack();
+    final String fragId = clonedNDC.pop();
+    final String queryId = clonedNDC.pop();
+    final String dagId = clonedNDC.pop();
+    MDC.put("dagId", dagId);
+    MDC.put("queryId", queryId);
+    MDC.put("fragmentId", fragId);
+  }
+
   /**
    * Attempt to kill a running task. If the task has not started running, it will not start.
    * If it's already running, a kill request will be sent to it.

http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index cc4e10b..c5d0680 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -42,6 +42,7 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
 import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
@@ -84,8 +85,10 @@ import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 
 public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowBatch>,
     VectorizedInputFormatInterface, SelfDescribingInputFormatInterface,
@@ -189,9 +192,14 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
       this.columnIds = includedCols;
       this.sarg = ConvertAstToSearchArg.createFromConf(job);
       this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
-      String fragmentId = LlapTezUtils.getFragmentId(job);
+      final String fragmentId = LlapTezUtils.getFragmentId(job);
+      final String dagId = LlapTezUtils.getDagId(job);
+      final String queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID);
+      MDC.put("dagId", dagId);
+      MDC.put("queryId", queryId);
       TezCounters taskCounters = null;
       if (fragmentId != null) {
+        MDC.put("fragmentId", fragmentId);
         taskCounters = FragmentCountersMap.getCountersForFragment(fragmentId);
         LOG.info("Received fragment id: {}", fragmentId);
       } else {
@@ -341,6 +349,7 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
       LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged!
       feedback.stop();
       rethrowErrorIfAny();
+      MDC.clear();
     }
 
     private void rethrowErrorIfAny() throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/main/resources/llap-daemon-log4j2.properties
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/llap-daemon-log4j2.properties b/llap-server/src/main/resources/llap-daemon-log4j2.properties
index 7488ba2..1a0387c 100644
--- a/llap-server/src/main/resources/llap-daemon-log4j2.properties
+++ b/llap-server/src/main/resources/llap-daemon-log4j2.properties
@@ -25,26 +25,29 @@ property.llap.daemon.log.dir = .
 property.llap.daemon.log.file = llapdaemon.log
 property.llap.daemon.historylog.file = llapdaemon_history.log
 property.llap.daemon.log.maxfilesize = 256MB
-property.llap.daemon.log.maxbackupindex = 20
+property.llap.daemon.log.maxbackupindex = 240
 
 # list of all appenders
-appenders = console, RFA, HISTORYAPPENDER
+appenders = console, RFA, HISTORYAPPENDER, dag-routing, query-routing
 
 # console appender
 appender.console.type = Console
 appender.console.name = console
 appender.console.target = SYSTEM_ERR
 appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %d{ISO8601} %5p [%t%x] %c{2}: %m%n
+appender.console.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n
 
 # rolling file appender
 appender.RFA.type = RollingRandomAccessFile
 appender.RFA.name = RFA
 appender.RFA.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}
-appender.RFA.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}_%i
+appender.RFA.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}_%d{yyyy-MM-dd-HH}_%i
 appender.RFA.layout.type = PatternLayout
-appender.RFA.layout.pattern = %d{ISO8601} %5p [%t%x] %c{2}: %m%n
+appender.RFA.layout.pattern = %d{ISO8601} %-5p [%t (%X{fragmentId})] %c: %m%n
 appender.RFA.policies.type = Policies
+appender.RFA.policies.time.type = TimeBasedTriggeringPolicy
+appender.RFA.policies.time.interval = 1
+appender.RFA.policies.time.modulate = true
 appender.RFA.policies.size.type = SizeBasedTriggeringPolicy
 appender.RFA.policies.size.size = ${sys:llap.daemon.log.maxfilesize}
 appender.RFA.strategy.type = DefaultRolloverStrategy
@@ -63,6 +66,40 @@ appender.HISTORYAPPENDER.policies.size.size = ${sys:llap.daemon.log.maxfilesize}
 appender.HISTORYAPPENDER.strategy.type = DefaultRolloverStrategy
 appender.HISTORYAPPENDER.strategy.max = ${sys:llap.daemon.log.maxbackupindex}
 
+# dagId based routing file appender
+appender.dag-routing.type = Routing
+appender.dag-routing.name = dag-routing
+appender.dag-routing.routes.type = Routes
+appender.dag-routing.routes.pattern = $${ctx:dagId}
+# default route
+appender.dag-routing.routes.route-default.type = Route
+appender.dag-routing.routes.route-default.key = $${ctx:dagId}
+appender.dag-routing.routes.route-default.ref = RFA
+# dagId based route
+appender.dag-routing.routes.route-mdc.type = Route
+appender.dag-routing.routes.route-mdc.file-mdc.type = RandomAccessFile
+appender.dag-routing.routes.route-mdc.file-mdc.name = file-mdc
+appender.dag-routing.routes.route-mdc.file-mdc.fileName = ${sys:llap.daemon.log.dir}/${ctx:dagId}.log
+appender.dag-routing.routes.route-mdc.file-mdc.layout.type = PatternLayout
+appender.dag-routing.routes.route-mdc.file-mdc.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n
+
+# queryId based routing file appender
+appender.query-routing.type = Routing
+appender.query-routing.name = query-routing
+appender.query-routing.routes.type = Routes
+appender.query-routing.routes.pattern = $${ctx:queryId}
+# default route
+appender.query-routing.routes.route-default.type = Route
+appender.query-routing.routes.route-default.key = $${ctx:queryId}
+appender.query-routing.routes.route-default.ref = RFA
+# queryId based route
+appender.query-routing.routes.route-mdc.type = Route
+appender.query-routing.routes.route-mdc.file-mdc.type = RandomAccessFile
+appender.query-routing.routes.route-mdc.file-mdc.name = file-mdc
+appender.query-routing.routes.route-mdc.file-mdc.fileName = ${sys:llap.daemon.log.dir}/${ctx:queryId}.log
+appender.query-routing.routes.route-mdc.file-mdc.layout.type = PatternLayout
+appender.query-routing.routes.route-mdc.file-mdc.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n
+
 # list of all loggers
 loggers = NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, HistoryLogger, LlapIoImpl, LlapIoOrc, LlapIoCache, LlapIoLocking
 

http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/main/resources/package.py
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/package.py b/llap-server/src/main/resources/package.py
index 83fe918..a200414 100644
--- a/llap-server/src/main/resources/package.py
+++ b/llap-server/src/main/resources/package.py
@@ -71,6 +71,7 @@ def main(args):
 	parser.add_argument("--args", default="")
 	parser.add_argument("--name", default="llap0")
 	parser.add_argument("--loglevel", default="INFO")
+	parser.add_argument("--logger", default="query-routing")
 	parser.add_argument("--chaosmonkey", type=int, default=0)
 	parser.add_argument("--slider-am-container-mb", type=int, default=1024)
 	parser.add_argument("--slider-keytab-dir", default="")
@@ -120,6 +121,7 @@ def main(args):
 		"name" : resource.clusterName,
 		"daemon_args" : daemon_args,
 		"daemon_loglevel" : args.loglevel,
+		"daemon_logger" : args.logger,
 		"queue.string" : resource.queueString,
 		"monkey_interval" : args.chaosmonkey,
 		"monkey_percentage" : monkey_percentage,

http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/main/resources/templates.py
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/templates.py b/llap-server/src/main/resources/templates.py
index 8baa927..505219a 100644
--- a/llap-server/src/main/resources/templates.py
+++ b/llap-server/src/main/resources/templates.py
@@ -74,7 +74,7 @@ appConfig = """
     "site.global.app_user": "yarn",
     "site.global.app_root": "${AGENT_WORK_ROOT}/app/install/",
     "site.global.app_tmp_dir": "${AGENT_WORK_ROOT}/tmp/",
-    "site.global.app_logger": "RFA",
+    "site.global.app_logger": "%(daemon_logger)s",
     "site.global.app_log_level": "%(daemon_loglevel)s",
     "site.global.additional_cp": "%(hadoop_home)s",
     "site.global.daemon_args": "%(daemon_args)s",

http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
index eda8862..e4af660 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
@@ -14,24 +14,14 @@
 
 package org.apache.hadoop.hive.llap.tezplugins;
 
-import java.text.NumberFormat;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.input.MultiMRInput;
 
-import com.google.common.base.Joiner;
-
 @InterfaceAudience.Private
 public class LlapTezUtils {
   public static boolean isSourceOfInterest(String inputClassName) {
@@ -40,6 +30,10 @@ public class LlapTezUtils {
         MultiMRInput.class.getName()) || inputClassName.equals(MRInput.class.getName()));
   }
 
+  public static String getDagId(final JobConf job) {
+    return job.get(MRInput.TEZ_MAPREDUCE_DAG_ID);
+  }
+
   public static String getFragmentId(final JobConf job) {
     String taskAttemptId = job.get(MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_ID);
     if (taskAttemptId != null) {