You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2016/01/25 00:36:55 UTC

[3/8] hive git commit: HIVE-12448. Change to tracking of dag status via dagIdentifier instead of dag name. (Siddharth Seth, reviewed by Sergey Shelukhin) (cherry picked from commit 16d495809382cf1db54ab26ff3a7ba5d57caa9b2)

http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java
index 3c9ad24..f1fc285 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java
@@ -27,6 +27,7 @@ public class HistoryLogger {
   private static final String HISTORY_START_TIME = "StartTime";
   private static final String HISTORY_END_TIME = "EndTime";
   private static final String HISTORY_DAG_NAME = "DagName";
+  private static final String HISTORY_DAG_ID = "DagId";
   private static final String HISTORY_VERTEX_NAME = "VertexName";
   private static final String HISTORY_TASK_ID = "TaskId";
   private static final String HISTORY_ATTEMPT_ID = "TaskAttemptId";
@@ -41,29 +42,30 @@ public class HistoryLogger {
 
   public static void logFragmentStart(String applicationIdStr, String containerIdStr,
                                       String hostname,
-                                      String dagName, String vertexName, int taskId,
+                                      String dagName, int dagIdentifier, String vertexName, int taskId,
                                       int attemptId) {
     HISTORY_LOGGER.info(
-        constructFragmentStartString(applicationIdStr, containerIdStr, hostname, dagName,
+        constructFragmentStartString(applicationIdStr, containerIdStr, hostname, dagName, dagIdentifier,
             vertexName, taskId, attemptId));
   }
 
   public static void logFragmentEnd(String applicationIdStr, String containerIdStr, String hostname,
-                                    String dagName, String vertexName, int taskId, int attemptId,
+                                    String dagName, int dagIdentifier, String vertexName, int taskId, int attemptId,
                                     String threadName, long startTime, boolean failed) {
     HISTORY_LOGGER.info(constructFragmentEndString(applicationIdStr, containerIdStr, hostname,
-        dagName, vertexName, taskId, attemptId, threadName, startTime, failed));
+        dagName, dagIdentifier, vertexName, taskId, attemptId, threadName, startTime, failed));
   }
 
 
   private static String constructFragmentStartString(String applicationIdStr, String containerIdStr,
-                                                     String hostname, String dagName,
+                                                     String hostname, String dagName, int dagIdentifier,
                                                      String vertexName, int taskId, int attemptId) {
     HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_START);
     lb.addHostName(hostname);
     lb.addAppid(applicationIdStr);
     lb.addContainerId(containerIdStr);
     lb.addDagName(dagName);
+    lb.addDagId(dagIdentifier);
     lb.addVertexName(vertexName);
     lb.addTaskId(taskId);
     lb.addTaskAttemptId(attemptId);
@@ -72,7 +74,7 @@ public class HistoryLogger {
   }
 
   private static String constructFragmentEndString(String applicationIdStr, String containerIdStr,
-                                                   String hostname, String dagName,
+                                                   String hostname, String dagName, int dagIdentifier,
                                                    String vertexName, int taskId, int attemptId,
                                                    String threadName, long startTime, boolean succeeded) {
     HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_END);
@@ -80,6 +82,7 @@ public class HistoryLogger {
     lb.addAppid(applicationIdStr);
     lb.addContainerId(containerIdStr);
     lb.addDagName(dagName);
+    lb.addDagId(dagIdentifier);
     lb.addVertexName(vertexName);
     lb.addTaskId(taskId);
     lb.addTaskAttemptId(attemptId);
@@ -113,6 +116,10 @@ public class HistoryLogger {
       return setKeyValue(HISTORY_DAG_NAME, dagName);
     }
 
+    HistoryLineBuilder addDagId(int dagId) {
+      return setKeyValue(HISTORY_DAG_ID, String.valueOf(dagId));
+    }
+
     HistoryLineBuilder addVertexName(String vertexName) {
       return setKeyValue(HISTORY_VERTEX_NAME, vertexName);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
index 7cb433b..e2caec2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
@@ -14,6 +14,7 @@
 
 package org.apache.hadoop.hive.llap.daemon;
 
+import org.apache.hadoop.hive.llap.daemon.impl.QueryIdentifier;
 import org.apache.hadoop.security.token.Token;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -24,6 +25,6 @@ public interface KilledTaskHandler {
   // inferred from this.
   // Passing in parameters until there's some dag information stored and tracked in the daemon.
   void taskKilled(String amLocation, int port, String user,
-                  Token<JobTokenIdentifier> jobToken, String queryId, String dagName,
+                  Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier,
                   TezTaskAttemptID taskAttemptId);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java
index 4e62a68..7f9553d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java
@@ -14,7 +14,9 @@
 
 package org.apache.hadoop.hive.llap.daemon;
 
+import org.apache.hadoop.hive.llap.daemon.impl.QueryIdentifier;
+
 public interface QueryFailedHandler {
 
-  public void queryFailed(String queryId, String dagName);
+  public void queryFailed(QueryIdentifier queryIdentifier);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index f6711d8..d1ec715 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -172,9 +172,9 @@ public class AMReporter extends AbstractService {
   }
 
   public void registerTask(String amLocation, int port, String user,
-                           Token<JobTokenIdentifier> jobToken, String queryId, String dagName) {
+                           Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier) {
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Registering for heartbeat: " + amLocation + ":" + port + " for dagName=" + dagName);
+      LOG.trace("Registering for heartbeat: " + amLocation + ":" + port + " for queryIdentifier=" + queryIdentifier);
     }
     AMNodeInfo amNodeInfo;
     synchronized (knownAppMasters) {
@@ -182,7 +182,7 @@ public class AMReporter extends AbstractService {
       amNodeInfo = knownAppMasters.get(amNodeId);
       if (amNodeInfo == null) {
         amNodeInfo =
-            new AMNodeInfo(amNodeId, user, jobToken, dagName, retryPolicy, retryTimeout, socketFactory,
+            new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory,
                 conf);
         knownAppMasters.put(amNodeId, amNodeInfo);
         // Add to the queue only the first time this is registered, and on
@@ -190,7 +190,7 @@ public class AMReporter extends AbstractService {
         amNodeInfo.setNextHeartbeatTime(System.currentTimeMillis() + heartbeatInterval);
         pendingHeartbeatQueeu.add(amNodeInfo);
       }
-      amNodeInfo.setCurrentDagName(dagName);
+      amNodeInfo.setCurrentQueryIdentifier(queryIdentifier);
       amNodeInfo.incrementAndGetTaskCount();
     }
   }
@@ -214,12 +214,12 @@ public class AMReporter extends AbstractService {
   }
 
   public void taskKilled(String amLocation, int port, String user, Token<JobTokenIdentifier> jobToken,
-                         final String queryId, final String dagName, final TezTaskAttemptID taskAttemptId) {
+                         final QueryIdentifier queryIdentifier, final TezTaskAttemptID taskAttemptId) {
     // Not re-using the connection for the AM heartbeat - which may or may not be open by this point.
     // knownAppMasters is used for sending heartbeats for queued tasks. Killed messages use a new connection.
     LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
     AMNodeInfo amNodeInfo =
-        new AMNodeInfo(amNodeId, user, jobToken, dagName, retryPolicy, retryTimeout, socketFactory,
+        new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory,
             conf);
 
     // Even if the service hasn't started up. It's OK to make this invocation since this will
@@ -251,8 +251,8 @@ public class AMReporter extends AbstractService {
             synchronized (knownAppMasters) {
               if (LOG.isDebugEnabled()) {
                 LOG.debug(
-                    "Removing am {} with last associated dag{} from heartbeat with taskCount={}, amFailed={}",
-                    amNodeInfo.amNodeId, amNodeInfo.getCurrentDagName(), amNodeInfo.getTaskCount(),
+                    "Removing am {} with last associated dag {} from heartbeat with taskCount={}, amFailed={}",
+                    amNodeInfo.amNodeId, amNodeInfo.getCurrentQueryIdentifier(), amNodeInfo.getTaskCount(),
                     amNodeInfo.hasAmFailed(), amNodeInfo);
               }
               knownAppMasters.remove(amNodeInfo.amNodeId);
@@ -272,11 +272,11 @@ public class AMReporter extends AbstractService {
 
               @Override
               public void onFailure(Throwable t) {
-                String currentDagName = amNodeInfo.getCurrentDagName();
+                QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier();
                 amNodeInfo.setAmFailed(true);
                 LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}",
-                    amNodeInfo.amNodeId, currentDagName, t);
-                queryFailedHandler.queryFailed(null, currentDagName);
+                    amNodeInfo.amNodeId, currentQueryIdentifier, t);
+                queryFailedHandler.queryFailed(currentQueryIdentifier);
               }
             });
           }
@@ -339,11 +339,11 @@ public class AMReporter extends AbstractService {
           amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()),
               nodeId.getPort());
         } catch (IOException e) {
-          String currentDagName = amNodeInfo.getCurrentDagName();
+          QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier();
           amNodeInfo.setAmFailed(true);
           LOG.warn("Failed to communicated with AM at {}. Killing remaining fragments for query {}",
-              amNodeInfo.amNodeId, currentDagName, e);
-          queryFailedHandler.queryFailed(null, currentDagName);
+              amNodeInfo.amNodeId, currentQueryIdentifier, e);
+          queryFailedHandler.queryFailed(currentQueryIdentifier);
         } catch (InterruptedException e) {
           if (!isShutdown.get()) {
             LOG.warn("Interrupted while trying to send heartbeat to AM {}", amNodeInfo.amNodeId, e);
@@ -370,21 +370,21 @@ public class AMReporter extends AbstractService {
     private final long timeout;
     private final SocketFactory socketFactory;
     private final AtomicBoolean amFailed = new AtomicBoolean(false);
-    private String currentDagName;
+    private QueryIdentifier currentQueryIdentifier;
     private LlapTaskUmbilicalProtocol umbilical;
     private long nextHeartbeatTime;
 
 
     public AMNodeInfo(LlapNodeId amNodeId, String user,
                       Token<JobTokenIdentifier> jobToken,
-                      String currentDagName,
+                      QueryIdentifier currentQueryIdentifier,
                       RetryPolicy retryPolicy,
                       long timeout,
                       SocketFactory socketFactory,
                       Configuration conf) {
       this.user = user;
       this.jobToken = jobToken;
-      this.currentDagName = currentDagName;
+      this.currentQueryIdentifier = currentQueryIdentifier;
       this.retryPolicy = retryPolicy;
       this.timeout = timeout;
       this.socketFactory = socketFactory;
@@ -439,12 +439,12 @@ public class AMReporter extends AbstractService {
       return taskCount.get();
     }
 
-    public synchronized String getCurrentDagName() {
-      return currentDagName;
+    public synchronized QueryIdentifier getCurrentQueryIdentifier() {
+      return currentQueryIdentifier;
     }
 
-    public synchronized void setCurrentDagName(String currentDagName) {
-      this.currentDagName = currentDagName;
+    public synchronized void setCurrentQueryIdentifier(QueryIdentifier queryIdentifier) {
+      this.currentQueryIdentifier = queryIdentifier;
     }
 
     synchronized void setNextHeartbeatTime(long nextTime) {

http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 0d85671..b0bf844 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -149,7 +149,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
   @Override
   public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException {
     HistoryLogger.logFragmentStart(request.getApplicationIdString(), request.getContainerIdString(),
-        localAddress.get().getHostName(), request.getFragmentSpec().getDagName(),
+        localAddress.get().getHostName(), request.getFragmentSpec().getDagName(), request.getFragmentSpec().getDagId(),
         request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(),
         request.getFragmentSpec().getAttemptNumber());
     if (LOG.isInfoEnabled()) {
@@ -172,8 +172,10 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
           fragmentSpec.getFragmentIdentifierString());
       int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId();
 
+      QueryIdentifier queryIdentifier = new QueryIdentifier(request.getApplicationIdString(), dagIdentifier);
+
       QueryFragmentInfo fragmentInfo = queryTracker
-          .registerFragment(null, request.getApplicationIdString(), fragmentSpec.getDagName(),
+          .registerFragment(queryIdentifier, request.getApplicationIdString(), fragmentSpec.getDagName(),
               dagIdentifier,
               fragmentSpec.getVertexName(), fragmentSpec.getFragmentNumber(),
               fragmentSpec.getAttemptNumber(), request.getUser(), request.getFragmentSpec());
@@ -239,28 +241,37 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
 
     @Override
     public void initializeHook(TezProcessor source) {
-      queryTracker.registerDagQueryId(source.getContext().getDAGName(),
+      queryTracker.registerDagQueryId(
+          new QueryIdentifier(source.getContext().getApplicationId().toString(),
+              source.getContext().getDagIdentifier()),
           HiveConf.getVar(source.getConf(), HiveConf.ConfVars.HIVEQUERYID));
     }
   }
 
   @Override
-  public SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request) {
+  public SourceStateUpdatedResponseProto sourceStateUpdated(
+      SourceStateUpdatedRequestProto request) {
     LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request));
-    queryTracker.registerSourceStateChange(request.getDagName(), request.getSrcName(),
+    queryTracker.registerSourceStateChange(
+        new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(),
+            request.getQueryIdentifier().getDagIdentifier()), request.getSrcName(),
         request.getState());
     return SourceStateUpdatedResponseProto.getDefaultInstance();
   }
 
   @Override
   public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request) {
-    LOG.info("Processing queryComplete notification for {}", request.getDagName());
+    QueryIdentifier queryIdentifier =
+        new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(),
+            request.getQueryIdentifier().getDagIdentifier());
+    LOG.info("Processing queryComplete notification for {}", queryIdentifier);
     List<QueryFragmentInfo> knownFragments =
-        queryTracker.queryComplete(null, request.getDagName(), request.getDeleteDelay());
-    LOG.info("DBG: Pending fragment count for completed query {} = {}", request.getDagName(),
+        queryTracker
+            .queryComplete(queryIdentifier, request.getDeleteDelay());
+    LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier,
         knownFragments.size());
     for (QueryFragmentInfo fragmentInfo : knownFragments) {
-      LOG.info("DBG: Issuing killFragment for completed query {} {}", request.getDagName(),
+      LOG.info("DBG: Issuing killFragment for completed query {} {}", queryIdentifier,
           fragmentInfo.getFragmentIdentifierString());
       executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
     }
@@ -276,7 +287,9 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
 
   private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) {
     StringBuilder sb = new StringBuilder();
-    sb.append("dagName=").append(request.getDagName())
+    QueryIdentifier queryIdentifier = new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(),
+        request.getQueryIdentifier().getDagIdentifier());
+    sb.append("queryIdentifier=").append(queryIdentifier)
         .append(", ").append("sourceName=").append(request.getSrcName())
         .append(", ").append("state=").append(request.getState());
     return sb.toString();
@@ -342,14 +355,14 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
   }
 
   @Override
-  public void queryFailed(String queryId, String dagName) {
-    LOG.info("Processing query failed notification for {}", dagName);
+  public void queryFailed(QueryIdentifier queryIdentifier) {
+    LOG.info("Processing query failed notification for {}", queryIdentifier);
     List<QueryFragmentInfo> knownFragments =
-        queryTracker.queryComplete(queryId, dagName, -1);
-    LOG.info("DBG: Pending fragment count for failed query {} = {}", dagName,
+        queryTracker.queryComplete(queryIdentifier, -1);
+    LOG.info("DBG: Pending fragment count for failed query {} = {}", queryIdentifier,
         knownFragments.size());
     for (QueryFragmentInfo fragmentInfo : knownFragments) {
-      LOG.info("DBG: Issuing killFragment for failed query {} {}", dagName,
+      LOG.info("DBG: Issuing killFragment for failed query {} {}", queryIdentifier,
           fragmentInfo.getFragmentIdentifierString());
       executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
     }
@@ -359,9 +372,9 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
 
     @Override
     public void taskKilled(String amLocation, int port, String user,
-                           Token<JobTokenIdentifier> jobToken, String queryId, String dagName,
+                           Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier,
                            TezTaskAttemptID taskAttemptId) {
-      amReporter.taskKilled(amLocation, port, user, jobToken, queryId, dagName, taskAttemptId);
+      amReporter.taskKilled(amLocation, port, user, jobToken, queryIdentifier, taskAttemptId);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 4f1299d..98951e1 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -435,8 +435,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
   private class QueryFailedHandlerProxy implements QueryFailedHandler {
 
     @Override
-    public void queryFailed(String queryId, String dagName) {
-      containerRunner.queryFailed(queryId, dagName);
+    public void queryFailed(QueryIdentifier queryIdentifier) {
+      containerRunner.queryFailed(queryIdentifier);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java
new file mode 100644
index 0000000..96e77e4
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+/**
+ * An identifier for a query, which is unique.
+ */
+public final class QueryIdentifier {
+
+  private final String appIdentifier;
+  private final int dagIdentifier;
+
+
+  public QueryIdentifier(String appIdentifier, int dagIdentifier) {
+    this.appIdentifier = appIdentifier;
+    this.dagIdentifier = dagIdentifier;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || !getClass().isAssignableFrom(o.getClass())) {
+      return false;
+    }
+
+    QueryIdentifier that = (QueryIdentifier) o;
+
+    if (dagIdentifier != that.dagIdentifier) {
+      return false;
+    }
+    return appIdentifier.equals(that.appIdentifier);
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = appIdentifier.hashCode();
+    result = 31 * result + dagIdentifier;
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "QueryIdentifier{" +
+        "appIdentifier='" + appIdentifier + '\'' +
+        ", dagIdentifier=" + dagIdentifier +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 27f2d4c..8bec95f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentS
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
 
 public class QueryInfo {
-  private final String queryId;
+  private final QueryIdentifier queryIdentifier;
   private final String appIdString;
   private final String dagName;
   private final int dagIdentifier;
@@ -54,10 +54,10 @@ public class QueryInfo {
 
   private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker();
 
-  public QueryInfo(String queryId, String appIdString, String dagName, int dagIdentifier,
+  public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagName, int dagIdentifier,
                    String user, ConcurrentMap<String, SourceStateProto> sourceStateMap,
                    String[] localDirsBase, FileSystem localFs) {
-    this.queryId = queryId;
+    this.queryIdentifier = queryIdentifier;
     this.appIdString = appIdString;
     this.dagName = dagName;
     this.dagIdentifier = dagIdentifier;
@@ -67,18 +67,14 @@ public class QueryInfo {
     this.localFs = localFs;
   }
 
-  public String getQueryId() {
-    return queryId;
+  public QueryIdentifier getQueryIdentifier() {
+    return queryIdentifier;
   }
 
   public String getAppIdString() {
     return appIdString;
   }
 
-  public String getDagName() {
-    return dagName;
-  }
-
   public int getDagIdentifier() {
     return dagIdentifier;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 6deaefc..0676edd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -56,8 +56,7 @@ public class QueryTracker extends AbstractService {
 
   private final ScheduledExecutorService executorService;
 
-  // TODO Make use if the query id for cachin when this is available.
-  private final ConcurrentHashMap<String, QueryInfo> queryInfoMap = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<QueryIdentifier, QueryInfo> queryInfoMap = new ConcurrentHashMap<>();
 
   private final String[] localDirsBase;
   private final FileSystem localFs;
@@ -70,22 +69,25 @@ public class QueryTracker extends AbstractService {
   // Alternately - send in an explicit dag start message before any other message is processed.
   // Multiple threads communicating from a single AM gets in the way of this.
 
-  // Keeps track of completed dags. Assumes dag names are unique across AMs.
-  private final Set<String> completedDagMap = Collections.newSetFromMap(
-      new ConcurrentHashMap<String, Boolean>());
+  // Keeps track of completed DAGS. QueryIdentifiers need to be unique across applications.
+  private final Set<QueryIdentifier> completedDagMap =
+      Collections.newSetFromMap(new ConcurrentHashMap<QueryIdentifier, Boolean>());
 
 
   private final Lock lock = new ReentrantLock();
-  private final ConcurrentMap<String, ReadWriteLock> dagSpecificLocks = new ConcurrentHashMap<>();
+  private final ConcurrentMap<QueryIdentifier, ReadWriteLock> dagSpecificLocks = new ConcurrentHashMap<>();
 
   // Tracks various maps for dagCompletions. This is setup here since stateChange messages
   // may be processed by a thread which ends up executing before a task.
-  private final ConcurrentMap<String, ConcurrentMap<String, SourceStateProto>>
-    sourceCompletionMap = new ConcurrentHashMap<>();
+  private final ConcurrentMap<QueryIdentifier, ConcurrentMap<String, SourceStateProto>>
+      sourceCompletionMap = new ConcurrentHashMap<>();
 
-  // Tracks queryId by dagName. This can only be set when config is parsed in TezProcessor,
+  // Tracks HiveQueryId by QueryIdentifier. This can only be set when config is parsed in TezProcessor.
   // all the other existing code passes queryId equal to 0 everywhere.
-  private final ConcurrentHashMap<String, String> dagNameToQueryId = new ConcurrentHashMap<>();
+  // If we switch the runtime and move to parsing the payload in the AM - the actual hive queryId could
+  // be sent over the wire from the AM, and will take the place of AppId+dagId in QueryIdentifier.
+  private final ConcurrentHashMap<QueryIdentifier, String> queryIdentifierToHiveQueryId =
+      new ConcurrentHashMap<>();
 
   public QueryTracker(Configuration conf, String[] localDirsBase) {
     super("QueryTracker");
@@ -107,7 +109,7 @@ public class QueryTracker extends AbstractService {
 
   /**
    * Register a new fragment for a specific query
-   * @param queryId
+   * @param queryIdentifier
    * @param appIdString
    * @param dagName
    * @param dagIdentifier
@@ -117,23 +119,23 @@ public class QueryTracker extends AbstractService {
    * @param user
    * @throws IOException
    */
-  QueryFragmentInfo registerFragment(String queryId, String appIdString, String dagName,
+  QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagName,
       int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, String user,
       FragmentSpecProto fragmentSpec) throws IOException {
-    ReadWriteLock dagLock = getDagLock(dagName);
+    ReadWriteLock dagLock = getDagLock(queryIdentifier);
     dagLock.readLock().lock();
     try {
-      if (!completedDagMap.contains(dagName)) {
-        QueryInfo queryInfo = queryInfoMap.get(dagName);
+      if (!completedDagMap.contains(queryIdentifier)) {
+        QueryInfo queryInfo = queryInfoMap.get(queryIdentifier);
         if (queryInfo == null) {
-          queryInfo = new QueryInfo(queryId, appIdString, dagName, dagIdentifier, user,
-              getSourceCompletionMap(dagName), localDirsBase, localFs);
-          queryInfoMap.putIfAbsent(dagName, queryInfo);
+          queryInfo = new QueryInfo(queryIdentifier, appIdString, dagName, dagIdentifier, user,
+              getSourceCompletionMap(queryIdentifier), localDirsBase, localFs);
+          queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
         }
         return queryInfo.registerFragment(vertexName, fragmentNumber, attemptNumber, fragmentSpec);
       } else {
         // Cleanup the dag lock here, since it may have been created after the query completed
-        dagSpecificLocks.remove(dagName);
+        dagSpecificLocks.remove(queryIdentifier);
         throw new RuntimeException(
             "Dag " + dagName + " already complete. Rejecting fragment ["
                 + vertexName + ", " + fragmentNumber + ", " + attemptNumber + "]");
@@ -148,12 +150,12 @@ public class QueryTracker extends AbstractService {
    * @param fragmentInfo
    */
   void fragmentComplete(QueryFragmentInfo fragmentInfo) {
-    String dagName = fragmentInfo.getQueryInfo().getDagName();
-    QueryInfo queryInfo = queryInfoMap.get(dagName);
+    QueryIdentifier qId = fragmentInfo.getQueryInfo().getQueryIdentifier();
+    QueryInfo queryInfo = queryInfoMap.get(qId);
     if (queryInfo == null) {
       // Possible because a queryComplete message from the AM can come in first - KILL / SUCCESSFUL,
       // before the fragmentComplete is reported
-      LOG.info("Ignoring fragmentComplete message for unknown query");
+      LOG.info("Ignoring fragmentComplete message for unknown query: {}", qId);
     } else {
       queryInfo.unregisterFragment(fragmentInfo);
     }
@@ -161,42 +163,40 @@ public class QueryTracker extends AbstractService {
 
   /**
    * Register completion for a query
-   * @param queryId
-   * @param dagName
+   * @param queryIdentifier
    * @param deleteDelay
    */
-  List<QueryFragmentInfo> queryComplete(String queryId, String dagName, long deleteDelay) {
+  List<QueryFragmentInfo> queryComplete(QueryIdentifier queryIdentifier, long deleteDelay) {
     if (deleteDelay == -1) {
       deleteDelay = defaultDeleteDelaySeconds;
     }
-    ReadWriteLock dagLock = getDagLock(dagName);
+    ReadWriteLock dagLock = getDagLock(queryIdentifier);
     dagLock.writeLock().lock();
     try {
-      rememberCompletedDag(dagName);
-      LOG.info("Processing queryComplete for dagName={} with deleteDelay={} seconds",
-          dagName, deleteDelay);
-      QueryInfo queryInfo = queryInfoMap.remove(dagName);
+      rememberCompletedDag(queryIdentifier);
+      LOG.info("Processing queryComplete for queryIdentifier={} with deleteDelay={} seconds", queryIdentifier,
+          deleteDelay);
+      QueryInfo queryInfo = queryInfoMap.remove(queryIdentifier);
       if (queryInfo == null) {
-        LOG.warn("Ignoring query complete for unknown dag: {}", dagName);
+        LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier);
         return Collections.emptyList();
       }
       String[] localDirs = queryInfo.getLocalDirsNoCreate();
       if (localDirs != null) {
         for (String localDir : localDirs) {
           cleanupDir(localDir, deleteDelay);
-          ShuffleHandler.get().unregisterDag(localDir, dagName, queryInfo.getDagIdentifier());
+          ShuffleHandler.get().unregisterDag(localDir, queryInfo.getAppIdString(), queryInfo.getDagIdentifier());
         }
       }
       // Clearing this before sending a kill is OK, since canFinish will change to false.
       // Ideally this should be a state machine where kills are issued to the executor,
       // and the structures are cleaned up once all tasks complete. New requests, however,
       // should not be allowed after a query complete is received.
-      sourceCompletionMap.remove(dagName);
-      String savedQueryId = dagNameToQueryId.remove(dagName);
-      queryId = queryId == null ? savedQueryId : queryId;
-      dagSpecificLocks.remove(dagName);
-      if (queryId != null) {
-        ObjectCacheFactory.removeLlapQueryCache(queryId);
+      sourceCompletionMap.remove(queryIdentifier);
+      String savedQueryId = queryIdentifierToHiveQueryId.remove(queryIdentifier);
+      dagSpecificLocks.remove(queryIdentifier);
+      if (savedQueryId != null) {
+        ObjectCacheFactory.removeLlapQueryCache(savedQueryId);
       }
       return queryInfo.getRegisteredFragments();
     } finally {
@@ -206,24 +206,24 @@ public class QueryTracker extends AbstractService {
 
 
 
-  public void rememberCompletedDag(String dagName) {
-    if (completedDagMap.add(dagName)) {
+  public void rememberCompletedDag(QueryIdentifier queryIdentifier) {
+    if (completedDagMap.add(queryIdentifier)) {
       // We will remember completed DAG for an hour to avoid execution out-of-order fragments.
-      executorService.schedule(new DagMapCleanerCallable(dagName), 1, TimeUnit.HOURS);
+      executorService.schedule(new DagMapCleanerCallable(queryIdentifier), 1, TimeUnit.HOURS);
     } else {
-      LOG.warn("Couldn't add {} to completed dag set", dagName);
+      LOG.warn("Couldn't add {} to completed dag set", queryIdentifier);
     }
   }
 
   /**
    * Register an update to a source within an executing dag
-   * @param dagName
+   * @param queryIdentifier
    * @param sourceName
    * @param sourceState
    */
-  void registerSourceStateChange(String dagName, String sourceName, SourceStateProto sourceState) {
-    getSourceCompletionMap(dagName).put(sourceName, sourceState);
-    QueryInfo queryInfo = queryInfoMap.get(dagName);
+  void registerSourceStateChange(QueryIdentifier queryIdentifier, String sourceName, SourceStateProto sourceState) {
+    getSourceCompletionMap(queryIdentifier).put(sourceName, sourceState);
+    QueryInfo queryInfo = queryInfoMap.get(queryIdentifier);
     if (queryInfo != null) {
       queryInfo.sourceStateUpdated(sourceName);
     } else {
@@ -233,13 +233,13 @@ public class QueryTracker extends AbstractService {
   }
 
 
-  private ReadWriteLock getDagLock(String dagName) {
+  private ReadWriteLock getDagLock(QueryIdentifier queryIdentifier) {
     lock.lock();
     try {
-      ReadWriteLock dagLock = dagSpecificLocks.get(dagName);
+      ReadWriteLock dagLock = dagSpecificLocks.get(queryIdentifier);
       if (dagLock == null) {
         dagLock = new ReentrantReadWriteLock();
-        dagSpecificLocks.put(dagName, dagLock);
+        dagSpecificLocks.put(queryIdentifier, dagLock);
       }
       return dagLock;
     } finally {
@@ -247,20 +247,20 @@ public class QueryTracker extends AbstractService {
     }
   }
 
-  private ConcurrentMap<String, SourceStateProto> getSourceCompletionMap(String dagName) {
-    ConcurrentMap<String, SourceStateProto> dagMap = sourceCompletionMap.get(dagName);
+  private ConcurrentMap<String, SourceStateProto> getSourceCompletionMap(QueryIdentifier queryIdentifier) {
+    ConcurrentMap<String, SourceStateProto> dagMap = sourceCompletionMap.get(queryIdentifier);
     if (dagMap == null) {
       dagMap = new ConcurrentHashMap<>();
       ConcurrentMap<String, SourceStateProto> old =
-          sourceCompletionMap.putIfAbsent(dagName, dagMap);
+          sourceCompletionMap.putIfAbsent(queryIdentifier, dagMap);
       dagMap = (old != null) ? old : dagMap;
     }
     return dagMap;
   }
 
-  public void registerDagQueryId(String dagName, String queryId) {
-    if (queryId == null) return;
-    dagNameToQueryId.putIfAbsent(dagName, queryId);
+  public void registerDagQueryId(QueryIdentifier queryIdentifier, String hiveQueryIdString) {
+    if (hiveQueryIdString == null) return;
+    queryIdentifierToHiveQueryId.putIfAbsent(queryIdentifier, hiveQueryIdString);
   }
 
   @Override
@@ -302,15 +302,15 @@ public class QueryTracker extends AbstractService {
   }
 
   private class DagMapCleanerCallable extends CallableWithNdc<Void> {
-    private final String dagName;
+    private final QueryIdentifier queryIdentifier;
 
-    private DagMapCleanerCallable(String dagName) {
-      this.dagName = dagName;
+    private DagMapCleanerCallable(QueryIdentifier queryIdentifier) {
+      this.queryIdentifier = queryIdentifier;
     }
 
     @Override
     protected Void callInternal() {
-      completedDagMap.remove(dagName);
+      completedDagMap.remove(queryIdentifier);
       return null;
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index f03a2ff..b60f71f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -131,7 +131,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     // Register with the AMReporter when the callable is setup. Unregister once it starts running.
     if (jobToken != null) {
     this.amReporter.registerTask(request.getAmHost(), request.getAmPort(),
-        request.getUser(), jobToken, null, request.getFragmentSpec().getDagName());
+        request.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier());
     }
     this.metrics = metrics;
     this.requestId = request.getFragmentSpec().getFragmentIdentifierString();
@@ -297,9 +297,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
    */
   public void reportTaskKilled() {
     killedTaskHandler
-        .taskKilled(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken, null,
-            taskSpec.getDAGName(),
-            taskSpec.getTaskAttemptID());
+        .taskKilled(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken,
+            fragmentInfo.getQueryInfo().getQueryIdentifier(), taskSpec.getTaskAttemptID());
   }
 
   public boolean canFinish() {
@@ -428,6 +427,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
       HistoryLogger
           .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(),
               executionContext.getHostName(), request.getFragmentSpec().getDagName(),
+              fragmentInfo.getQueryInfo().getDagIdentifier(),
               request.getFragmentSpec().getVertexName(),
               request.getFragmentSpec().getFragmentNumber(),
               request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName,
@@ -445,6 +445,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
       HistoryLogger
           .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(),
               executionContext.getHostName(), request.getFragmentSpec().getDagName(),
+              fragmentInfo.getQueryInfo().getDagIdentifier(),
               request.getFragmentSpec().getVertexName(),
               request.getFragmentSpec().getFragmentNumber(),
               request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName,

http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
index 7428a6a..f61d62f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
@@ -262,5 +262,4 @@ public class Converters {
         throw new RuntimeException("Unexpected state: " + state);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 5c370ee..ae7d291 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
@@ -85,7 +86,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
   private static final boolean isDebugEnabed = LOG.isDebugEnabled();
 
   private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST;
-  private final ConcurrentMap<String, ByteBuffer> credentialMap;
+
+  private final ConcurrentMap<QueryIdentifierProto, ByteBuffer> credentialMap;
 
   // Tracks containerIds and taskAttemptIds, so can be kept independent of the running DAG.
   // When DAG specific cleanup happens, it'll be better to link this to a DAG though.
@@ -104,7 +106,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
   private final ConcurrentMap<LlapNodeId, PingingNodeInfo> pingedNodeMap = new ConcurrentHashMap<>();
 
 
-  private volatile String currentDagName;
+  private volatile int currentDagId;
+  private volatile QueryIdentifierProto currentQueryIdentifierProto;
 
   public LlapTaskCommunicator(
       TaskCommunicatorContext taskCommunicatorContext) {
@@ -226,8 +229,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
                                          int priority)  {
     super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials,
         credentialsChanged, priority);
-    if (taskSpec.getDAGName() != currentDagName) {
-      resetCurrentDag(taskSpec.getDAGName());
+    int dagId = taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
+    if (currentQueryIdentifierProto == null || (dagId != currentQueryIdentifierProto.getDagIdentifier())) {
+      resetCurrentDag(dagId);
     }
 
 
@@ -251,7 +255,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     nodesForQuery.add(nodeId);
 
     sourceStateTracker.registerTaskForStateUpdates(host, port, taskSpec.getInputs());
-    FragmentRuntimeInfo fragmentRuntimeInfo = sourceStateTracker.getFragmentRuntimeInfo(taskSpec.getDAGName(),
+    FragmentRuntimeInfo fragmentRuntimeInfo = sourceStateTracker.getFragmentRuntimeInfo(
         taskSpec.getVertexName(), taskSpec.getTaskAttemptID().getTaskID().getId(), priority);
     SubmitWorkRequestProto requestProto;
 
@@ -349,7 +353,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     // NodeId can be null if the task gets unregistered due to failure / being killed by the daemon itself
     if (nodeId != null) {
       TerminateFragmentRequestProto request =
-          TerminateFragmentRequestProto.newBuilder().setDagName(currentDagName)
+          TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(currentQueryIdentifierProto)
               .setFragmentIdentifierString(taskAttemptId.toString()).build();
       communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(),
           new LlapDaemonProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>() {
@@ -370,12 +374,16 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     }
   }
 
+
+
+
   @Override
-  public void dagComplete(final String dagName) {
-    QueryCompleteRequestProto request = QueryCompleteRequestProto.newBuilder().setDagName(
-        dagName).setDeleteDelay(deleteDelayOnDagComplete).build();
+  public void dagComplete(final int dagIdentifier) {
+    QueryCompleteRequestProto request = QueryCompleteRequestProto.newBuilder()
+        .setQueryIdentifier(constructQueryIdentifierProto(dagIdentifier))
+        .setDeleteDelay(deleteDelayOnDagComplete).build();
     for (final LlapNodeId llapNodeId : nodesForQuery) {
-      LOG.info("Sending dagComplete message for {}, to {}", dagName, llapNodeId);
+      LOG.info("Sending dagComplete message for {}, to {}", dagIdentifier, llapNodeId);
       communicator.sendQueryComplete(request, llapNodeId.getHostname(), llapNodeId.getPort(),
           new LlapDaemonProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.QueryCompleteResponseProto>() {
             @Override
@@ -384,7 +392,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
 
             @Override
             public void indicateError(Throwable t) {
-              LOG.warn("Failed to indicate dag complete dagId={} to node {}", dagName, llapNodeId);
+              LOG.warn("Failed to indicate dag complete dagId={} to node {}", dagIdentifier, llapNodeId);
             }
           });
     }
@@ -495,12 +503,12 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     }
   }
 
-  private void resetCurrentDag(String newDagName) {
+  private void resetCurrentDag(int newDagId) {
     // Working on the assumption that a single DAG runs at a time per AM.
-    currentDagName = newDagName;
-    sourceStateTracker.resetState(newDagName);
+    currentQueryIdentifierProto = constructQueryIdentifierProto(newDagId);
+    sourceStateTracker.resetState(newDagId);
     nodesForQuery.clear();
-    LOG.info("CurrentDag set to: " + newDagName);
+    LOG.info("CurrentDagId set to: " + newDagId + ", name=" + getContext().getCurrentDagName());
     // TODO Is it possible for heartbeats to come in from lost tasks - those should be told to die, which
     // is likely already happening.
   }
@@ -518,10 +526,12 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     // Credentials can change across DAGs. Ideally construct only once per DAG.
     taskCredentials.addAll(getContext().getCredentials());
 
-    ByteBuffer credentialsBinary = credentialMap.get(taskSpec.getDAGName());
+    Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() ==
+        taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
+    ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
     if (credentialsBinary == null) {
       credentialsBinary = serializeCredentials(getContext().getCredentials());
-      credentialMap.putIfAbsent(taskSpec.getDAGName(), credentialsBinary.duplicate());
+      credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate());
     } else {
       credentialsBinary = credentialsBinary.duplicate();
     }
@@ -736,4 +746,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     }
 
   }
+
+  private QueryIdentifierProto constructQueryIdentifierProto(int dagIdentifier) {
+    return QueryIdentifierProto.newBuilder()
+        .setAppIdentifier(getContext().getCurrentAppIdentifier()).setDagIdentifier(dagIdentifier)
+        .build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
index 066fae5..628fe9c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.hive.llap.daemon.impl.QueryIdentifier;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.llap.LlapNodeId;
@@ -45,28 +47,33 @@ public class SourceStateTracker {
   private final TaskCommunicatorContext taskCommunicatorContext;
   private final LlapTaskCommunicator taskCommunicator;
 
+  private final QueryIdentifierProto BASE_QUERY_IDENTIFIER;
+
   // Tracks vertices for which notifications have been registered
   private final Set<String> notificationRegisteredVertices = new HashSet<>();
 
   private final Map<String, SourceInfo> sourceInfoMap = new HashMap<>();
   private final Map<LlapNodeId, NodeInfo> nodeInfoMap = new HashMap<>();
 
-  private volatile String currentDagName;
+  private volatile QueryIdentifierProto currentQueryIdentifier;
 
   public SourceStateTracker(TaskCommunicatorContext taskCommunicatorContext,
                             LlapTaskCommunicator taskCommunicator) {
     this.taskCommunicatorContext = taskCommunicatorContext;
     this.taskCommunicator = taskCommunicator;
+    BASE_QUERY_IDENTIFIER = QueryIdentifierProto.newBuilder()
+        .setAppIdentifier(taskCommunicatorContext.getCurrentAppIdentifier()).build();
   }
 
   /**
    * To be invoked after each DAG completes.
    */
-  public synchronized void resetState(String newDagName) {
+  public synchronized void resetState(int newDagId) {
     sourceInfoMap.clear();
     nodeInfoMap.clear();
     notificationRegisteredVertices.clear();
-    this.currentDagName = newDagName;
+    this.currentQueryIdentifier =
+        QueryIdentifierProto.newBuilder(BASE_QUERY_IDENTIFIER).setDagIdentifier(newDagId).build();
   }
 
   /**
@@ -139,16 +146,16 @@ public class SourceStateTracker {
   }
 
 
+  // Assumes serialized DAGs within an AM, and a reset of structures after each DAG completes.
   /**
    * Constructs FragmentRuntimeInfo for scheduling within LLAP daemons.
    * Also caches state based on state updates.
-   * @param dagName
    * @param vertexName
    * @param fragmentNumber
    * @param priority
    * @return
    */
-  public synchronized FragmentRuntimeInfo getFragmentRuntimeInfo(String dagName, String vertexName, int fragmentNumber,
+  public synchronized FragmentRuntimeInfo getFragmentRuntimeInfo(String vertexName, int fragmentNumber,
                                                                  int priority) {
     FragmentRuntimeInfo.Builder builder = FragmentRuntimeInfo.newBuilder();
     maybeRegisterForVertexUpdates(vertexName);
@@ -282,9 +289,8 @@ public class SourceStateTracker {
 
   void sendStateUpdateToNode(LlapNodeId nodeId, String sourceName, VertexState state) {
     taskCommunicator.sendStateUpdate(nodeId.getHostname(), nodeId.getPort(),
-        SourceStateUpdatedRequestProto.newBuilder().setDagName(currentDagName).setSrcName(
-            sourceName)
-            .setState(Converters.fromVertexState(state)).build());
+        SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(currentQueryIdentifier)
+            .setSrcName(sourceName).setState(Converters.fromVertexState(state)).build());
   }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-server/src/protobuf/LlapDaemonProtocol.proto b/llap-server/src/protobuf/LlapDaemonProtocol.proto
index a2d944f..944c96c 100644
--- a/llap-server/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-server/src/protobuf/LlapDaemonProtocol.proto
@@ -50,6 +50,7 @@ message GroupInputSpecProto {
 message FragmentSpecProto {
   optional string fragment_identifier_string = 1;
   optional string dag_name = 2;
+  optional int32 dag_id = 11;
   optional string vertex_name = 3;
   optional EntityDescriptorProto processor_descriptor = 4;
   repeated IOSpecProto input_specs = 5;
@@ -74,6 +75,11 @@ enum SourceStateProto {
   S_RUNNING = 2;
 }
 
+message QueryIdentifierProto {
+  optional string app_identifier = 1;
+  optional int32 dag_identifier = 2;
+}
+
 message SubmitWorkRequestProto {
   optional string container_id_string = 1;
   optional string am_host = 2;
@@ -98,7 +104,7 @@ message SubmitWorkResponseProto {
 }
 
 message SourceStateUpdatedRequestProto {
-  optional string dag_name = 1;
+  optional QueryIdentifierProto query_identifier = 1;
   optional string src_name = 2;
   optional SourceStateProto state = 3;
 }
@@ -108,17 +114,16 @@ message SourceStateUpdatedResponseProto {
 
 message QueryCompleteRequestProto {
   optional string query_id = 1;
-  optional string dag_name = 2;
-  optional int64 delete_delay = 3 [default = 0];
+  optional QueryIdentifierProto query_identifier = 2;
+  optional int64 delete_delay = 4 [default = 0];
 }
 
 message QueryCompleteResponseProto {
 }
 
 message TerminateFragmentRequestProto {
-  optional string query_id = 1;
-  optional string dag_name = 2;
-  optional string fragment_identifier_string = 7;
+  optional QueryIdentifierProto query_identifier = 1;
+  optional string fragment_identifier_string = 2;
 }
 
 message TerminateFragmentResponseProto {

http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 38af07e..ef49714 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hive.llap.daemon.impl;
 
 import static org.mockito.Mockito.mock;
 
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
@@ -25,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.security.Credentials;
@@ -48,18 +50,36 @@ public class TaskExecutorTestHelpers {
     SubmitWorkRequestProto
         requestProto = createSubmitWorkRequestProto(fragmentNum, parallelism,
         startTime);
-    MockRequest mockRequest = new MockRequest(requestProto, canFinish, workTime);
+    QueryFragmentInfo queryFragmentInfo = createQueryFragmentInfo(requestProto.getFragmentSpec());
+    MockRequest mockRequest = new MockRequest(requestProto, queryFragmentInfo, canFinish, workTime);
     return mockRequest;
   }
 
   public static TaskExecutorService.TaskWrapper createTaskWrapper(
       SubmitWorkRequestProto request, boolean canFinish, int workTime) {
-    MockRequest mockRequest = new MockRequest(request, canFinish, workTime);
+    QueryFragmentInfo queryFragmentInfo = createQueryFragmentInfo(request.getFragmentSpec());
+    MockRequest mockRequest = new MockRequest(request, queryFragmentInfo, canFinish, workTime);
     TaskExecutorService.TaskWrapper
         taskWrapper = new TaskExecutorService.TaskWrapper(mockRequest, null);
     return taskWrapper;
   }
 
+  public static QueryFragmentInfo createQueryFragmentInfo(FragmentSpecProto fragmentSpecProto) {
+    QueryInfo queryInfo = createQueryInfo();
+    QueryFragmentInfo fragmentInfo =
+        new QueryFragmentInfo(queryInfo, "fakeVertexName", fragmentSpecProto.getFragmentNumber(), 0,
+            fragmentSpecProto);
+    return fragmentInfo;
+  }
+
+  public static QueryInfo createQueryInfo() {
+    QueryIdentifier queryIdentifier = new QueryIdentifier("fake_app_id_string", 1);
+    QueryInfo queryInfo =
+        new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_name", 1, "fakeUser",
+            new ConcurrentHashMap<String, LlapDaemonProtocolProtos.SourceStateProto>(),
+            new String[0], null);
+    return queryInfo;
+  }
 
   public static SubmitWorkRequestProto createSubmitWorkRequestProto(
       int fragmentNumber, int selfAndUpstreamParallelism,
@@ -80,7 +100,7 @@ public class TaskExecutorTestHelpers {
     return SubmitWorkRequestProto
         .newBuilder()
         .setFragmentSpec(
-            LlapDaemonProtocolProtos.FragmentSpecProto
+            FragmentSpecProto
                 .newBuilder()
                 .setAttemptNumber(0)
                 .setDagName("MockDag")
@@ -119,9 +139,9 @@ public class TaskExecutorTestHelpers {
     private boolean shouldSleep = true;
     private final Condition finishedCondition = lock.newCondition();
 
-    public MockRequest(SubmitWorkRequestProto requestProto,
+    public MockRequest(SubmitWorkRequestProto requestProto, QueryFragmentInfo fragmentInfo,
                        boolean canFinish, long workTime) {
-      super(requestProto, mock(QueryFragmentInfo.class), new Configuration(),
+      super(requestProto, fragmentInfo, new Configuration(),
           new ExecutionContextImpl("localhost"), null, new Credentials(), 0, null, null, mock(
               LlapDaemonExecutorMetrics.class),
           mock(KilledTaskHandler.class), mock(

http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestQueryIdentifier.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestQueryIdentifier.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestQueryIdentifier.java
new file mode 100644
index 0000000..39a3865
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestQueryIdentifier.java
@@ -0,0 +1,48 @@
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import org.junit.Test;
+
+public class TestQueryIdentifier {
+
+  @Test (timeout = 5000)
+  public void testEquality() {
+
+    String appIdString1 = "app1";
+    String appIdString2 = "app2";
+
+    int dagId1 = 1;
+    int dagId2 = 2;
+
+    QueryIdentifier[] queryIdentifiers = new QueryIdentifier[4];
+
+    queryIdentifiers[0] = new QueryIdentifier(appIdString1, dagId1);
+    queryIdentifiers[1] = new QueryIdentifier(appIdString1, dagId2);
+    queryIdentifiers[2] = new QueryIdentifier(appIdString2, dagId1);
+    queryIdentifiers[3] = new QueryIdentifier(appIdString2, dagId2);
+
+    for (int i = 0 ; i < 4 ; i++) {
+      for (int j = 0 ; j < 4 ; j++) {
+        if (i == j) {
+          assertEquals(queryIdentifiers[i], queryIdentifiers[j]);
+        } else {
+          assertNotEquals(queryIdentifiers[i], queryIdentifiers[j]);
+        }
+      }
+    }
+
+    QueryIdentifier q11 = new QueryIdentifier(appIdString1, dagId1);
+    QueryIdentifier q12 = new QueryIdentifier(appIdString1, dagId2);
+    QueryIdentifier q21 = new QueryIdentifier(appIdString2, dagId1);
+    QueryIdentifier q22 = new QueryIdentifier(appIdString2, dagId2);
+
+    assertEquals(queryIdentifiers[0], q11);
+    assertEquals(queryIdentifiers[1], q12);
+    assertEquals(queryIdentifiers[2], q21);
+    assertEquals(queryIdentifiers[3], q22);
+
+
+  }
+}