You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2015/05/20 07:38:48 UTC

hive git commit: HIVE-10767. LLAP: Improve the way task finishable information is processed. (Siddharth Seth)

Repository: hive
Updated Branches:
  refs/heads/llap 97fa2202f -> ece61d033


HIVE-10767. LLAP: Improve the way task finishable information is processed. (Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ece61d03
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ece61d03
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ece61d03

Branch: refs/heads/llap
Commit: ece61d0336d4679aa5d25526e56168f64761b701
Parents: 97fa220
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue May 19 22:38:24 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue May 19 22:38:24 2015 -0700

----------------------------------------------------------------------
 .../llap/daemon/FragmentCompletionHandler.java  |  22 ++
 .../hive/llap/daemon/impl/AMReporter.java       |   2 +
 .../llap/daemon/impl/ContainerRunnerImpl.java   |  61 +++---
 .../impl/EvictingPriorityBlockingQueue.java     |   4 +
 .../llap/daemon/impl/QueryFragmentInfo.java     | 149 +++++++++++++
 .../hadoop/hive/llap/daemon/impl/QueryInfo.java | 128 +++++++++++
 .../hive/llap/daemon/impl/QueryTracker.java     | 212 ++++++++++++-------
 .../llap/daemon/impl/TaskExecutorService.java   |  25 ++-
 .../llap/daemon/impl/TaskRunnerCallable.java    |  84 +++-----
 .../daemon/impl/TestTaskExecutorService.java    |   9 +-
 10 files changed, 525 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FragmentCompletionHandler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FragmentCompletionHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FragmentCompletionHandler.java
new file mode 100644
index 0000000..511347a
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FragmentCompletionHandler.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon;
+
+import org.apache.hadoop.hive.llap.daemon.impl.QueryFragmentInfo;
+
+public interface FragmentCompletionHandler {
+
+  void fragmentComplete(QueryFragmentInfo fragmentInfo);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 39b3634..ea2d77a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -55,6 +55,8 @@ import org.slf4j.LoggerFactory;
  */
 public class AMReporter extends AbstractService {
 
+  // TODO In case of a failure to heartbeat, tasks for the specific DAG should ideally be KILLED
+
     /*
   registrations and un-registrations will happen as and when tasks are submitted or are removed.
   reference counting is likely required.

http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 3fd7920..a208bdd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -20,13 +20,13 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
+import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
 import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
@@ -34,7 +34,6 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentS
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
@@ -57,11 +56,11 @@ import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 
 import com.google.common.base.Preconditions;
 
-public class ContainerRunnerImpl extends AbstractService implements ContainerRunner {
+// TODO Convert this to a CompositeService
+public class ContainerRunnerImpl extends AbstractService implements ContainerRunner, FragmentCompletionHandler {
 
-  public static final String THREAD_NAME_FORMAT_PREFIX = "ContainerExecutor ";
-  public static final String THREAD_NAME_FORMAT = THREAD_NAME_FORMAT_PREFIX + "%d";
   private static final Logger LOG = Logger.getLogger(ContainerRunnerImpl.class);
+  public static final String THREAD_NAME_FORMAT_PREFIX = "ContainerExecutor ";
 
   private volatile AMReporter amReporter;
   private final QueryTracker queryTracker;
@@ -74,10 +73,6 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
   private final TaskRunnerCallable.ConfParams confParams;
   private final KilledTaskHandler killedTaskHandler = new KilledTaskHandlerImpl();
 
-  // Map of dagId to vertices and associated state.
-  private final ConcurrentMap<String, ConcurrentMap<String, SourceStateProto>> sourceCompletionMap = new ConcurrentHashMap<>();
-  // TODO Support for removing queued containers, interrupting / killing specific containers
-
   public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSize,
       boolean enablePreemption, String[] localDirsBase, int localShufflePort,
       AtomicReference<InetSocketAddress> localAddress,
@@ -114,9 +109,14 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
     );
   }
 
+  public void serviceInit(Configuration conf) {
+    queryTracker.init(conf);
+  }
+
   @Override
   public void serviceStart() {
     // The node id will only be available at this point, since the server has been started in LlapDaemon
+    queryTracker.start();
     LlapNodeId llapNodeId = LlapNodeId.getInstance(localAddress.get().getHostName(),
         localAddress.get().getPort());
     this.amReporter = new AMReporter(llapNodeId, conf);
@@ -130,7 +130,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
       amReporter.stop();
       amReporter = null;
     }
-    queryTracker.shutdown();
+    queryTracker.stop();
     super.serviceStop();
   }
 
@@ -151,7 +151,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
             request.getFragmentSpec().getAttemptNumber();
     NDC.push(ndcContextString);
     try {
-      Map<String, String> env = new HashMap<String, String>();
+      Map<String, String> env = new HashMap<>();
       // TODO What else is required in this environment map.
       env.putAll(localEnv);
       env.put(ApplicationConstants.Environment.USER.name(), request.getUser());
@@ -161,13 +161,13 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
           fragmentSpec.getTaskAttemptIdString());
       int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId();
 
-      queryTracker
+      QueryFragmentInfo fragmentInfo = queryTracker
           .registerFragment(null, request.getApplicationIdString(), fragmentSpec.getDagName(),
               dagIdentifier,
               fragmentSpec.getVertexName(), fragmentSpec.getFragmentNumber(),
-              fragmentSpec.getAttemptNumber(), request.getUser());
+              fragmentSpec.getAttemptNumber(), request.getUser(), request.getFragmentSpec());
 
-      String []localDirs = queryTracker.getLocalDirs(null, fragmentSpec.getDagName(), request.getUser());
+      String []localDirs = fragmentInfo.getLocalDirs();
       Preconditions.checkNotNull(localDirs);
 
       if (LOG.isDebugEnabled()) {
@@ -190,11 +190,17 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
           .registerDag(request.getApplicationIdString(), dagIdentifier, jobToken,
               request.getUser(), localDirs);
 
-      ConcurrentMap<String, SourceStateProto> sourceCompletionMap = getSourceCompletionMap(request.getFragmentSpec().getDagName());
-      TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()),
-          new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs,
-          credentials, memoryPerExecutor, amReporter, sourceCompletionMap, confParams, metrics, killedTaskHandler);
-      executorService.schedule(callable);
+      TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, new Configuration(getConfig()),
+          new ExecutionContextImpl(localAddress.get().getHostName()), env,
+          credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler,
+          this);
+      try {
+        executorService.schedule(callable);
+      } catch (RejectedExecutionException e) {
+        // Stop tracking the fragment and re-throw the error.
+        fragmentComplete(fragmentInfo);
+        throw e;
+      }
       metrics.incrExecutorTotalRequestsHandled();
       metrics.incrExecutorNumQueuedRequests();
     } finally {
@@ -205,8 +211,8 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
   @Override
   public void sourceStateUpdated(SourceStateUpdatedRequestProto request) {
     LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request));
-    ConcurrentMap<String, SourceStateProto> dagMap = getSourceCompletionMap(request.getDagName());
-    dagMap.put(request.getSrcName(), request.getState());
+    queryTracker.registerSourceStateChange(request.getDagName(), request.getSrcName(),
+        request.getState());
   }
 
   @Override
@@ -280,14 +286,9 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
     return sb.toString();
   }
 
-  private ConcurrentMap<String, SourceStateProto> getSourceCompletionMap(String dagName) {
-    ConcurrentMap<String, SourceStateProto> dagMap = sourceCompletionMap.get(dagName);
-    if (dagMap == null) {
-      dagMap = new ConcurrentHashMap<>();
-      ConcurrentMap<String, SourceStateProto> old = sourceCompletionMap.putIfAbsent(dagName, dagMap);
-      dagMap = (old != null) ? old : dagMap;
-    }
-    return dagMap;
+  @Override
+  public void fragmentComplete(QueryFragmentInfo fragmentInfo) {
+    queryTracker.fragmentComplete(fragmentInfo);
   }
 
   private class KilledTaskHandlerImpl implements KilledTaskHandler {

http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
index 926835b..ab3a130 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
@@ -65,6 +65,10 @@ public class EvictingPriorityBlockingQueue<E> {
     deque.remove(e);
   }
 
+  public synchronized int size() {
+    return deque.size();
+  }
+
   @Override
   public synchronized String toString() {
     return deque.toString();

http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
new file mode 100644
index 0000000..f6cd8ab
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QueryFragmentInfo {
+
+  private static final Logger LOG = LoggerFactory.getLogger(QueryFragmentInfo.class);
+
+  private final QueryInfo queryInfo;
+  private final String vertexName;
+  private final int fragmentNumber;
+  private final int attemptNumber;
+  private final FragmentSpecProto fragmentSpec;
+
+  public QueryFragmentInfo(QueryInfo queryInfo, String vertexName, int fragmentNumber,
+                           int attemptNumber,
+                           FragmentSpecProto fragmentSpec) {
+    Preconditions.checkNotNull(queryInfo);
+    Preconditions.checkNotNull(vertexName);
+    Preconditions.checkNotNull(fragmentSpec);
+    this.queryInfo = queryInfo;
+    this.vertexName = vertexName;
+    this.fragmentNumber = fragmentNumber;
+    this.attemptNumber = attemptNumber;
+    this.fragmentSpec = fragmentSpec;
+  }
+
+  // Only meant for use by the QueryTracker
+  QueryInfo getQueryInfo() {
+    return this.queryInfo;
+  }
+
+  public FragmentSpecProto getFragmentSpec() {
+    return fragmentSpec;
+  }
+
+  public String getVertexName() {
+    return vertexName;
+  }
+
+  public int getFragmentNumber() {
+    return fragmentNumber;
+  }
+
+  public int getAttemptNumber() {
+    return attemptNumber;
+  }
+
+  /**
+   * Check whether a task can run to completion or may end up blocking on it's sources.
+   * This currently happens via looking up source state.
+   * TODO: Eventually, this should lookup the Hive Processor to figure out whether
+   * it's reached a state where it can finish - especially in cases of failures
+   * after data has been fetched.
+   *
+   * @return true if the task can finish, false otherwise
+   */
+  public boolean canFinish() {
+    List<IOSpecProto> inputSpecList = fragmentSpec.getInputSpecsList();
+    boolean canFinish = true;
+    if (inputSpecList != null && !inputSpecList.isEmpty()) {
+      for (IOSpecProto inputSpec : inputSpecList) {
+        if (isSourceOfInterest(inputSpec)) {
+          // Lookup the state in the map.
+          LlapDaemonProtocolProtos.SourceStateProto state = queryInfo.getSourceStateMap()
+              .get(inputSpec.getConnectedVertexName());
+          if (state != null && state == LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED) {
+            continue;
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Cannot finish due to source: " + inputSpec.getConnectedVertexName());
+            }
+            canFinish = false;
+            break;
+          }
+        }
+      }
+    }
+    return canFinish;
+  }
+
+  /**
+   * Get, and create if required, local-dirs for a fragment
+   * @return
+   * @throws IOException
+   */
+  public String[] getLocalDirs() throws IOException {
+    return queryInfo.getLocalDirs();
+  }
+
+  private boolean isSourceOfInterest(IOSpecProto inputSpec) {
+    String inputClassName = inputSpec.getIoDescriptor().getClassName();
+    // MRInput is not of interest since it'll always be ready.
+    return !inputClassName.equals(MRInputLegacy.class.getName());
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    QueryFragmentInfo that = (QueryFragmentInfo) o;
+
+    if (fragmentNumber != that.fragmentNumber) {
+      return false;
+    }
+    if (attemptNumber != that.attemptNumber) {
+      return false;
+    }
+    return vertexName.equals(that.vertexName);
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = vertexName.hashCode();
+    result = 31 * result + fragmentNumber;
+    result = 31 * result + attemptNumber;
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
new file mode 100644
index 0000000..efa18cd
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
+
+public class QueryInfo {
+  private final String queryId;
+  private final String appIdString;
+  private final String dagName;
+  private final int dagIdentifier;
+  private final String user;
+  private final String[] localDirsBase;
+  private final FileSystem localFs;
+  private String[] localDirs;
+  // Map of states for different vertices.
+
+  private final Set<QueryFragmentInfo> knownFragments =
+      Collections.newSetFromMap(new ConcurrentHashMap<QueryFragmentInfo, Boolean>());
+
+  private final ConcurrentMap<String, SourceStateProto> sourceStateMap;
+
+
+  public QueryInfo(String queryId, String appIdString, String dagName, int dagIdentifier,
+                   String user, ConcurrentMap<String, SourceStateProto> sourceStateMap,
+                   String[] localDirsBase, FileSystem localFs) {
+    this.queryId = queryId;
+    this.appIdString = appIdString;
+    this.dagName = dagName;
+    this.dagIdentifier = dagIdentifier;
+    this.sourceStateMap = sourceStateMap;
+    this.user = user;
+    this.localDirsBase = localDirsBase;
+    this.localFs = localFs;
+  }
+
+  public String getQueryId() {
+    return queryId;
+  }
+
+  public String getAppIdString() {
+    return appIdString;
+  }
+
+  public String getDagName() {
+    return dagName;
+  }
+
+  public int getDagIdentifier() {
+    return dagIdentifier;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public ConcurrentMap<String, SourceStateProto> getSourceStateMap() {
+    return sourceStateMap;
+  }
+
+  public QueryFragmentInfo registerFragment(String vertexName, int fragmentNumber, int attemptNumber, FragmentSpecProto fragmentSpec) {
+    QueryFragmentInfo fragmentInfo = new QueryFragmentInfo(this, vertexName, fragmentNumber, attemptNumber,
+        fragmentSpec);
+    knownFragments.add(fragmentInfo);
+    return fragmentInfo;
+  }
+
+  public void unregisterFragment(QueryFragmentInfo fragmentInfo) {
+    knownFragments.remove(fragmentInfo);
+  }
+
+  private synchronized void createLocalDirs() throws IOException {
+    if (localDirs == null) {
+      localDirs = new String[localDirsBase.length];
+      for (int i = 0; i < localDirsBase.length; i++) {
+        localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], appIdString, user, dagIdentifier);
+        localFs.mkdirs(new Path(localDirs[i]));
+      }
+    }
+  }
+
+  /**
+   * Get, and create if required, local-dirs for a query
+   * @return
+   * @throws IOException
+   */
+  public synchronized String[] getLocalDirs() throws IOException {
+    if (localDirs == null) {
+      createLocalDirs();
+    }
+    return localDirs;
+  }
+
+  public synchronized String[] getLocalDirsNoCreate() {
+    return this.localDirs;
+  }
+
+  private static String createAppSpecificLocalDir(String baseDir, String applicationIdString,
+                                                  String user, int dagIdentifier) {
+    // TODO This is broken for secure clusters. The app will not have permission to create these directories.
+    // May work via Slider - since the directory would already exist. Otherwise may need a custom shuffle handler.
+    // TODO This should be the process user - and not the user on behalf of whom the query is being submitted.
+    return baseDir + File.separator + "usercache" + File.separator + user + File.separator +
+        "appcache" + File.separator + applicationIdString + File.separator + dagIdentifier;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 5c8116e..90ad923 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -16,19 +16,28 @@ package org.apache.hadoop.hive.llap.daemon.impl;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
 import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
+import org.apache.hadoop.service.CompositeService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 
 /**
  * Tracks queries running within a daemon
  */
-public class QueryTracker {
+public class QueryTracker extends CompositeService {
 
   private static final Logger LOG = LoggerFactory.getLogger(QueryTracker.class);
   private final QueryFileCleaner queryFileCleaner;
@@ -39,107 +48,162 @@ public class QueryTracker {
   private final String[] localDirsBase;
   private final FileSystem localFs;
 
+  // TODO At the moment there's no way of knowing whether a query is running or not.
+  // A race is possible between dagComplete and registerFragment - where the registerFragment
+  // is processed after a dagCompletes.
+  // May need to keep track of completed dags for a certain time duration to avoid this.
+  // Alternately - send in an explicit dag start message before any other message is processed.
+  // Multiple threads communicating from a single AM gets in the way of this.
+
+  // Keeps track of completed dags. Assumes dag names are unique across AMs.
+  private final Set<String> completedDagMap = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+
+
+  private final Lock lock = new ReentrantLock();
+  private final ConcurrentMap<String, ReadWriteLock> dagSpecificLocks = new ConcurrentHashMap<>();
+
+  // Tracks various maps for dagCompletions. This is setup here since stateChange messages
+  // may be processed by a thread which ends up executing before a task.
+  private final ConcurrentMap<String, ConcurrentMap<String, SourceStateProto>> sourceCompletionMap = new ConcurrentHashMap<>();
+
   public QueryTracker(Configuration conf, String[] localDirsBase) {
+    super("QueryTracker");
     this.localDirsBase = localDirsBase;
     try {
       localFs = FileSystem.getLocal(conf);
     } catch (IOException e) {
       throw new RuntimeException("Failed to setup local filesystem instance", e);
     }
+
     queryFileCleaner = new QueryFileCleaner(conf, localFs);
-    queryFileCleaner.init(conf);
-    queryFileCleaner.start();
+    addService(queryFileCleaner);
   }
 
 
-  void registerFragment(String queryId, String appIdString, String dagName, int dagIdentifier,
+
+  /**
+   * Register a new fragment for a specific query
+   * @param queryId
+   * @param appIdString
+   * @param dagName
+   * @param dagIdentifier
+   * @param vertexName
+   * @param fragmentNumber
+   * @param attemptNumber
+   * @param user
+   * @throws IOException
+   */
+  QueryFragmentInfo registerFragment(String queryId, String appIdString, String dagName, int dagIdentifier,
                         String vertexName, int fragmentNumber, int attemptNumber,
-                        String user) throws
+                        String user, FragmentSpecProto fragmentSpec) throws
       IOException {
-    QueryInfo queryInfo = queryInfoMap.get(dagName);
-    if (queryInfo == null) {
-      queryInfo = new QueryInfo(queryId, appIdString, dagName, dagIdentifier, user);
-      queryInfoMap.putIfAbsent(dagName, queryInfo);
+    ReadWriteLock dagLock = getDagLock(dagName);
+    dagLock.readLock().lock();
+    try {
+      if (!completedDagMap.contains(dagName)) {
+        QueryInfo queryInfo = queryInfoMap.get(dagName);
+        if (queryInfo == null) {
+          queryInfo = new QueryInfo(queryId, appIdString, dagName, dagIdentifier, user,
+              getSourceCompletionMap(dagName), localDirsBase, localFs);
+          queryInfoMap.putIfAbsent(dagName, queryInfo);
+        }
+        return queryInfo.registerFragment(vertexName, fragmentNumber, attemptNumber, fragmentSpec);
+      } else {
+        // Cleanup the dag lock here, since it may have been created after the query completed
+        dagSpecificLocks.remove(dagName);
+        throw new RuntimeException(
+            "Dag " + dagName + " already complete. Rejecting fragment [" + vertexName + ", " + fragmentNumber +
+                ", " + attemptNumber);
+      }
+    } finally {
+      dagLock.readLock().unlock();
     }
-    // TODO Start tracking individual fragments, so that taskKilled etc messages
-    // can be routed through this layer to simplify the interfaces.
   }
 
-  String[] getLocalDirs(String queryId, String dagName, String user) throws IOException {
+  /**
+   * Indicate to the tracker that a fragment is complete. This is from internal execution within the daemon
+   * @param fragmentInfo
+   */
+  void fragmentComplete(QueryFragmentInfo fragmentInfo) {
+    String dagName = fragmentInfo.getQueryInfo().getDagName();
     QueryInfo queryInfo = queryInfoMap.get(dagName);
-    return queryInfo.getLocalDirs();
+    if (queryInfo == null) {
+      // Possible because a queryComplete message from the AM can come in first - KILL / SUCCESSFUL,
+      // before the fragmentComplete is reported
+      LOG.info("Ignoring fragmentComplete message for unknown query");
+    } else {
+      queryInfo.unregisterFragment(fragmentInfo);
+    }
   }
 
+  /**
+   * Register completion for a query
+   * @param queryId
+   * @param dagName
+   * @param deleteDelay
+   */
   void queryComplete(String queryId, String dagName, long deleteDelay) {
-    LOG.info("Processing queryComplete for dagName={} with deleteDelay={} seconds", dagName, deleteDelay);
-    QueryInfo queryInfo = queryInfoMap.remove(dagName);
-    if (queryInfo == null) {
-      LOG.warn("Ignoring query complete for unknown dag: {}", dagName);
-    }
-    String []localDirs = queryInfo.getLocalDirsNoCreate();
-    if (localDirs != null) {
-      for (String localDir : localDirs) {
-        queryFileCleaner.cleanupDir(localDir, deleteDelay);
-        ShuffleHandler.get().unregisterDag(localDir, dagName, queryInfo.dagIdentifier);
+    ReadWriteLock dagLock = getDagLock(dagName);
+    dagLock.writeLock().lock();
+    try {
+      completedDagMap.add(dagName);
+      LOG.info("Processing queryComplete for dagName={} with deleteDelay={} seconds", dagName,
+          deleteDelay);
+      completedDagMap.add(dagName);
+      QueryInfo queryInfo = queryInfoMap.remove(dagName);
+      if (queryInfo == null) {
+        LOG.warn("Ignoring query complete for unknown dag: {}", dagName);
       }
+      String[] localDirs = queryInfo.getLocalDirsNoCreate();
+      if (localDirs != null) {
+        for (String localDir : localDirs) {
+          queryFileCleaner.cleanupDir(localDir, deleteDelay);
+          ShuffleHandler.get().unregisterDag(localDir, dagName, queryInfo.getDagIdentifier());
+        }
+      }
+      sourceCompletionMap.remove(dagName);
+      dagSpecificLocks.remove(dagName);
+      // TODO HIVE-10762 Issue a kill message to all running fragments for this container.
+      // TODO HIVE-10535 Cleanup map join cache
+    } finally {
+      dagLock.writeLock().unlock();
     }
-    // TODO HIVE-10535 Cleanup map join cache
   }
 
-  void shutdown() {
-    queryFileCleaner.stop();
+  /**
+   * Register an update to a source within an executing dag
+   * @param dagName
+   * @param sourceName
+   * @param sourceState
+   */
+  void registerSourceStateChange(String dagName, String sourceName, SourceStateProto sourceState) {
+    getSourceCompletionMap(dagName).put(sourceName, sourceState);
+    // TODO HIVE-10758 source completion notifications
   }
 
 
-  private class QueryInfo {
-
-    private final String queryId;
-    private final String appIdString;
-    private final String dagName;
-    private final int dagIdentifier;
-    private final String user;
-    private String[] localDirs;
-
-    public QueryInfo(String queryId, String appIdString, String dagName, int dagIdentifier,
-                     String user) {
-      this.queryId = queryId;
-      this.appIdString = appIdString;
-      this.dagName = dagName;
-      this.dagIdentifier = dagIdentifier;
-      this.user = user;
-    }
-
-
-
-
-    private synchronized void createLocalDirs() throws IOException {
-      if (localDirs == null) {
-        localDirs = new String[localDirsBase.length];
-        for (int i = 0; i < localDirsBase.length; i++) {
-          localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], appIdString, user, dagIdentifier);
-          localFs.mkdirs(new Path(localDirs[i]));
-        }
-      }
-    }
-
-    private synchronized String[] getLocalDirs() throws IOException {
-      if (localDirs == null) {
-        createLocalDirs();
+  private ReadWriteLock getDagLock(String dagName) {
+    lock.lock();
+    try {
+      ReadWriteLock dagLock = dagSpecificLocks.get(dagName);
+      if (dagLock == null) {
+        dagLock = new ReentrantReadWriteLock();
+        dagSpecificLocks.put(dagName, dagLock);
       }
-      return localDirs;
-    }
-
-    private synchronized String[] getLocalDirsNoCreate() {
-      return this.localDirs;
+      return dagLock;
+    } finally {
+      lock.unlock();
     }
   }
 
-  private static String createAppSpecificLocalDir(String baseDir, String applicationIdString,
-                                                  String user, int dagIdentifier) {
-    // TODO This is broken for secure clusters. The app will not have permission to create these directories.
-    // May work via Slider - since the directory would already exist. Otherwise may need a custom shuffle handler.
-    // TODO This should be the process user - and not the user on behalf of whom the query is being submitted.
-    return baseDir + File.separator + "usercache" + File.separator + user + File.separator +
-        "appcache" + File.separator + applicationIdString + File.separator + dagIdentifier;
+  private ConcurrentMap<String, SourceStateProto> getSourceCompletionMap(String dagName) {
+    ConcurrentMap<String, SourceStateProto> dagMap = sourceCompletionMap.get(dagName);
+    if (dagMap == null) {
+      dagMap = new ConcurrentHashMap<>();
+      ConcurrentMap<String, SourceStateProto> old =
+          sourceCompletionMap.putIfAbsent(dagName, dagMap);
+      dagMap = (old != null) ? old : dagMap;
+    }
+    return dagMap;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 599c759..18daa75 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -108,6 +108,11 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
     executionCompletionExecutorService = MoreExecutors.listeningDecorator(executionCompletionExecutorServiceRaw);
     ListenableFuture<?> future = waitQueueExecutorService.submit(new WaitQueueWorker());
     Futures.addCallback(future, new WaitQueueWorkerCallback());
+
+    LOG.info("TaskExecutorService started with parameters: "
+            + "numExecutors=" + numExecutors
+            + ", waitQueueSize=" + waitQueueSize
+            + ", enablePreemption=" + enablePreemption);
   }
 
   /**
@@ -134,11 +139,20 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
           // if the task cannot finish and if no slots are available then don't schedule it.
           // TODO: Event notifications that change canFinish state should notify waitLock
           synchronized (waitLock) {
-            // KKK Is this a tight loop when there's only finishable tasks available ?
-            if (!task.canFinish() && numSlotsAvailable.get() == 0) {
-              waitLock.wait();
+            boolean shouldWait = false;
+            if (task.canFinish()) {
+              if (numSlotsAvailable.get() == 0 && preemptionQueue.isEmpty()) {
+                shouldWait = true;
+              }
+            } else {
+              if (numSlotsAvailable.get() == 0) {
+                shouldWait = true;
+              }
+            }
+            if (shouldWait) {
               // Another task at a higher priority may have come in during the wait. Lookup the
               // queue again to pick up the task at the highest priority.
+              waitLock.wait();
               continue;
             }
           }
@@ -174,7 +188,7 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
 
     @Override
     public void onFailure(Throwable t) {
-      LOG.error("Wait queue scheduler worker exited with failure!");
+      LOG.error("Wait queue scheduler worker exited with failure!", t);
     }
   }
 
@@ -237,7 +251,8 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
         if (pRequest != null) {
 
           if (isInfoEnabled) {
-            LOG.info("Invoking kill task for {} due to pre-emption.", pRequest.getRequestId());
+            LOG.info("Invoking kill task for {} due to pre-emption to run {}",
+                pRequest.getRequestId(), task.getRequestId());
           }
 
           // The task will either be killed or is already in the process of completing, which will

http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 94512d6..166dac5 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -21,9 +21,7 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -31,9 +29,9 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.CallableWithNdc;
+import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
 import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
@@ -50,9 +48,7 @@ import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.runtime.api.ExecutionContext;
-import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
 import org.apache.tez.runtime.internals.api.TaskReporterInterface;
@@ -76,9 +72,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  */
 public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerCallable.class);
-  private final LlapDaemonProtocolProtos.SubmitWorkRequestProto request;
+  private final SubmitWorkRequestProto request;
   private final Configuration conf;
-  private final String[] localDirs;
   private final Map<String, String> envMap;
   private final String pid = null;
   private final ObjectRegistryImpl objectRegistry;
@@ -88,16 +83,17 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   private final ConfParams confParams;
   private final Token<JobTokenIdentifier> jobToken;
   private final AMReporter amReporter;
-  private final ConcurrentMap<String, LlapDaemonProtocolProtos.SourceStateProto> sourceCompletionMap;
   private final TaskSpec taskSpec;
+  private final QueryFragmentInfo fragmentInfo;
   private final KilledTaskHandler killedTaskHandler;
+  private final FragmentCompletionHandler fragmentCompletionHanler;
   private volatile TezTaskRunner2 taskRunner;
   private volatile TaskReporterInterface taskReporter;
   private volatile ListeningExecutorService executor;
   private LlapTaskUmbilicalProtocol umbilical;
   private volatile long startTime;
   private volatile String threadName;
-  private LlapDaemonExecutorMetrics metrics;
+  private final LlapDaemonExecutorMetrics metrics;
   private final String requestId;
   private boolean shouldRunTask = true;
   final Stopwatch runtimeWatch = new Stopwatch();
@@ -105,20 +101,20 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   private final AtomicBoolean isCompleted = new AtomicBoolean(false);
   private final AtomicBoolean killInvoked = new AtomicBoolean(false);
 
-  TaskRunnerCallable(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, Configuration conf,
-      ExecutionContext executionContext, Map<String, String> envMap,
-      String[] localDirs, Credentials credentials,
-      long memoryAvailable, AMReporter amReporter,
-      ConcurrentMap<String, LlapDaemonProtocolProtos.SourceStateProto> sourceCompletionMap,
-      ConfParams confParams, LlapDaemonExecutorMetrics metrics,
-      KilledTaskHandler killedTaskHandler) {
+  TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo,
+                     Configuration conf,
+                     ExecutionContext executionContext, Map<String, String> envMap,
+                     Credentials credentials,
+                     long memoryAvailable, AMReporter amReporter,
+                     ConfParams confParams, LlapDaemonExecutorMetrics metrics,
+                     KilledTaskHandler killedTaskHandler,
+                     FragmentCompletionHandler fragmentCompleteHandler) {
     this.request = request;
+    this.fragmentInfo = fragmentInfo;
     this.conf = conf;
     this.executionContext = executionContext;
     this.envMap = envMap;
-    this.localDirs = localDirs;
     this.objectRegistry = new ObjectRegistryImpl();
-    this.sourceCompletionMap = sourceCompletionMap;
     this.credentials = credentials;
     this.memoryAvailable = memoryAvailable;
     this.confParams = confParams;
@@ -133,6 +129,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     this.metrics = metrics;
     this.requestId = getTaskAttemptId(request);
     this.killedTaskHandler = killedTaskHandler;
+    this.fragmentCompletionHanler = fragmentCompleteHandler;
   }
 
   @Override
@@ -189,7 +186,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
 
     synchronized (this) {
       if (shouldRunTask) {
-        taskRunner = new TezTaskRunner2(conf, taskUgi, localDirs,
+        taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(),
             taskSpec,
             request.getAppAttemptNumber(),
             serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor,
@@ -240,6 +237,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
             boolean killed = taskRunner.killTask();
             if (killed) {
               // Sending a kill message to the AM right here. Don't need to wait for the task to complete.
+              LOG.info("Kill request for task {} completed. Informing AM", taskSpec.getTaskAttemptID());
               reportTaskKilled();
             } else {
               LOG.info("Kill request for task {} did not complete because the task is already complete",
@@ -268,43 +266,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
             taskSpec.getTaskAttemptID());
   }
 
-  /**
-   * Check whether a task can run to completion or may end up blocking on it's sources.
-   * This currently happens via looking up source state.
-   * TODO: Eventually, this should lookup the Hive Processor to figure out whether
-   * it's reached a state where it can finish - especially in cases of failures
-   * after data has been fetched.
-   *
-   * @return
-   */
   public boolean canFinish() {
-    List<InputSpec> inputSpecList = taskSpec.getInputs();
-    boolean canFinish = true;
-    if (inputSpecList != null && !inputSpecList.isEmpty()) {
-      for (InputSpec inputSpec : inputSpecList) {
-        if (isSourceOfInterest(inputSpec)) {
-          // Lookup the state in the map.
-          LlapDaemonProtocolProtos.SourceStateProto state = sourceCompletionMap
-              .get(inputSpec.getSourceVertexName());
-          if (state != null && state == LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED) {
-            continue;
-          } else {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Cannot finish due to source: " + inputSpec.getSourceVertexName());
-            }
-            canFinish = false;
-            break;
-          }
-        }
-      }
-    }
-    return canFinish;
-  }
-
-  private boolean isSourceOfInterest(InputSpec inputSpec) {
-    String inputClassName = inputSpec.getInputDescriptor().getClassName();
-    // MRInput is not of interest since it'll always be ready.
-    return !inputClassName.equals(MRInputLegacy.class.getName());
+    return fragmentInfo.canFinish();
   }
 
   private Multimap<String, String> createStartedInputMap(FragmentSpecProto fragmentSpec) {
@@ -371,10 +334,10 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
 
   final class TaskRunnerCallback implements FutureCallback<TaskRunner2Result> {
 
-    private final LlapDaemonProtocolProtos.SubmitWorkRequestProto request;
+    private final SubmitWorkRequestProto request;
     private final TaskRunnerCallable taskRunnerCallable;
 
-    TaskRunnerCallback(LlapDaemonProtocolProtos.SubmitWorkRequestProto request,
+    TaskRunnerCallback(SubmitWorkRequestProto request,
         TaskRunnerCallable taskRunnerCallable) {
       this.request = request;
       this.taskRunnerCallable = taskRunnerCallable;
@@ -385,6 +348,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     @Override
     public void onSuccess(TaskRunner2Result result) {
       isCompleted.set(true);
+
       switch(result.getEndReason()) {
         // Only the KILLED case requires a message to be sent out to the AM.
         case SUCCESS:
@@ -413,6 +377,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
           metrics.incrExecutorTotalExecutionFailed();
           break;
       }
+      fragmentCompletionHanler.fragmentComplete(fragmentInfo);
 
       taskRunnerCallable.shutdown();
       HistoryLogger
@@ -427,8 +392,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
 
     @Override
     public void onFailure(Throwable t) {
-      isCompleted.set(true);
       LOG.error("TezTaskRunner execution failed for : " + getTaskIdentifierString(request), t);
+      isCompleted.set(true);
+      fragmentCompletionHanler.fragmentComplete(fragmentInfo);
       // TODO HIVE-10236 Report a fatal error over the umbilical
       taskRunnerCallable.shutdown();
       HistoryLogger
@@ -458,7 +424,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   }
 
   public static String getTaskIdentifierString(
-      LlapDaemonProtocolProtos.SubmitWorkRequestProto request) {
+      SubmitWorkRequestProto request) {
     StringBuilder sb = new StringBuilder();
     sb.append("AppId=").append(request.getApplicationIdString())
         .append(", containerId=").append(request.getContainerIdString())

http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index f0e53a7..a2e9501 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
@@ -52,15 +53,17 @@ public class TestTaskExecutorService {
 
     public MockRequest(LlapDaemonProtocolProtos.SubmitWorkRequestProto requestProto,
         boolean canFinish, int workTime) {
-      super(requestProto, conf, new ExecutionContextImpl("localhost"), null, null, cred, 0, null,
-          null, null, null, mock(KilledTaskHandler.class));
+      super(requestProto, mock(QueryFragmentInfo.class), conf,
+          new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null,
+          mock(KilledTaskHandler.class), mock(
+          FragmentCompletionHandler.class));
       this.workTime = workTime;
       this.canFinish = canFinish;
     }
 
     @Override
     protected TaskRunner2Result callInternal() {
-      System.out.println(requestId + " is executing..");
+      System.out.println(super.getRequestId() + " is executing..");
       try {
         Thread.sleep(workTime);
       } catch (InterruptedException e) {