You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2015/05/20 07:38:48 UTC
hive git commit: HIVE-10767. LLAP: Improve the way task finishable
information is processed. (Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/llap 97fa2202f -> ece61d033
HIVE-10767. LLAP: Improve the way task finishable information is processed. (Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ece61d03
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ece61d03
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ece61d03
Branch: refs/heads/llap
Commit: ece61d0336d4679aa5d25526e56168f64761b701
Parents: 97fa220
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue May 19 22:38:24 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue May 19 22:38:24 2015 -0700
----------------------------------------------------------------------
.../llap/daemon/FragmentCompletionHandler.java | 22 ++
.../hive/llap/daemon/impl/AMReporter.java | 2 +
.../llap/daemon/impl/ContainerRunnerImpl.java | 61 +++---
.../impl/EvictingPriorityBlockingQueue.java | 4 +
.../llap/daemon/impl/QueryFragmentInfo.java | 149 +++++++++++++
.../hadoop/hive/llap/daemon/impl/QueryInfo.java | 128 +++++++++++
.../hive/llap/daemon/impl/QueryTracker.java | 212 ++++++++++++-------
.../llap/daemon/impl/TaskExecutorService.java | 25 ++-
.../llap/daemon/impl/TaskRunnerCallable.java | 84 +++-----
.../daemon/impl/TestTaskExecutorService.java | 9 +-
10 files changed, 525 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FragmentCompletionHandler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FragmentCompletionHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FragmentCompletionHandler.java
new file mode 100644
index 0000000..511347a
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FragmentCompletionHandler.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon;
+
+import org.apache.hadoop.hive.llap.daemon.impl.QueryFragmentInfo;
+
+public interface FragmentCompletionHandler {
+
+ void fragmentComplete(QueryFragmentInfo fragmentInfo);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 39b3634..ea2d77a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -55,6 +55,8 @@ import org.slf4j.LoggerFactory;
*/
public class AMReporter extends AbstractService {
+ // TODO In case of a failure to heartbeat, tasks for the specific DAG should ideally be KILLED
+
/*
registrations and un-registrations will happen as and when tasks are submitted or are removed.
reference counting is likely required.
http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 3fd7920..a208bdd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -20,13 +20,13 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
+import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
@@ -34,7 +34,6 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentS
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
@@ -57,11 +56,11 @@ import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import com.google.common.base.Preconditions;
-public class ContainerRunnerImpl extends AbstractService implements ContainerRunner {
+// TODO Convert this to a CompositeService
+public class ContainerRunnerImpl extends AbstractService implements ContainerRunner, FragmentCompletionHandler {
- public static final String THREAD_NAME_FORMAT_PREFIX = "ContainerExecutor ";
- public static final String THREAD_NAME_FORMAT = THREAD_NAME_FORMAT_PREFIX + "%d";
private static final Logger LOG = Logger.getLogger(ContainerRunnerImpl.class);
+ public static final String THREAD_NAME_FORMAT_PREFIX = "ContainerExecutor ";
private volatile AMReporter amReporter;
private final QueryTracker queryTracker;
@@ -74,10 +73,6 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
private final TaskRunnerCallable.ConfParams confParams;
private final KilledTaskHandler killedTaskHandler = new KilledTaskHandlerImpl();
- // Map of dagId to vertices and associated state.
- private final ConcurrentMap<String, ConcurrentMap<String, SourceStateProto>> sourceCompletionMap = new ConcurrentHashMap<>();
- // TODO Support for removing queued containers, interrupting / killing specific containers
-
public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSize,
boolean enablePreemption, String[] localDirsBase, int localShufflePort,
AtomicReference<InetSocketAddress> localAddress,
@@ -114,9 +109,14 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
);
}
+ public void serviceInit(Configuration conf) {
+ queryTracker.init(conf);
+ }
+
@Override
public void serviceStart() {
// The node id will only be available at this point, since the server has been started in LlapDaemon
+ queryTracker.start();
LlapNodeId llapNodeId = LlapNodeId.getInstance(localAddress.get().getHostName(),
localAddress.get().getPort());
this.amReporter = new AMReporter(llapNodeId, conf);
@@ -130,7 +130,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
amReporter.stop();
amReporter = null;
}
- queryTracker.shutdown();
+ queryTracker.stop();
super.serviceStop();
}
@@ -151,7 +151,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
request.getFragmentSpec().getAttemptNumber();
NDC.push(ndcContextString);
try {
- Map<String, String> env = new HashMap<String, String>();
+ Map<String, String> env = new HashMap<>();
// TODO What else is required in this environment map.
env.putAll(localEnv);
env.put(ApplicationConstants.Environment.USER.name(), request.getUser());
@@ -161,13 +161,13 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
fragmentSpec.getTaskAttemptIdString());
int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId();
- queryTracker
+ QueryFragmentInfo fragmentInfo = queryTracker
.registerFragment(null, request.getApplicationIdString(), fragmentSpec.getDagName(),
dagIdentifier,
fragmentSpec.getVertexName(), fragmentSpec.getFragmentNumber(),
- fragmentSpec.getAttemptNumber(), request.getUser());
+ fragmentSpec.getAttemptNumber(), request.getUser(), request.getFragmentSpec());
- String []localDirs = queryTracker.getLocalDirs(null, fragmentSpec.getDagName(), request.getUser());
+ String []localDirs = fragmentInfo.getLocalDirs();
Preconditions.checkNotNull(localDirs);
if (LOG.isDebugEnabled()) {
@@ -190,11 +190,17 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
.registerDag(request.getApplicationIdString(), dagIdentifier, jobToken,
request.getUser(), localDirs);
- ConcurrentMap<String, SourceStateProto> sourceCompletionMap = getSourceCompletionMap(request.getFragmentSpec().getDagName());
- TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()),
- new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs,
- credentials, memoryPerExecutor, amReporter, sourceCompletionMap, confParams, metrics, killedTaskHandler);
- executorService.schedule(callable);
+ TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, new Configuration(getConfig()),
+ new ExecutionContextImpl(localAddress.get().getHostName()), env,
+ credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler,
+ this);
+ try {
+ executorService.schedule(callable);
+ } catch (RejectedExecutionException e) {
+ // Stop tracking the fragment and re-throw the error.
+ fragmentComplete(fragmentInfo);
+ throw e;
+ }
metrics.incrExecutorTotalRequestsHandled();
metrics.incrExecutorNumQueuedRequests();
} finally {
@@ -205,8 +211,8 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
@Override
public void sourceStateUpdated(SourceStateUpdatedRequestProto request) {
LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request));
- ConcurrentMap<String, SourceStateProto> dagMap = getSourceCompletionMap(request.getDagName());
- dagMap.put(request.getSrcName(), request.getState());
+ queryTracker.registerSourceStateChange(request.getDagName(), request.getSrcName(),
+ request.getState());
}
@Override
@@ -280,14 +286,9 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
return sb.toString();
}
- private ConcurrentMap<String, SourceStateProto> getSourceCompletionMap(String dagName) {
- ConcurrentMap<String, SourceStateProto> dagMap = sourceCompletionMap.get(dagName);
- if (dagMap == null) {
- dagMap = new ConcurrentHashMap<>();
- ConcurrentMap<String, SourceStateProto> old = sourceCompletionMap.putIfAbsent(dagName, dagMap);
- dagMap = (old != null) ? old : dagMap;
- }
- return dagMap;
+ @Override
+ public void fragmentComplete(QueryFragmentInfo fragmentInfo) {
+ queryTracker.fragmentComplete(fragmentInfo);
}
private class KilledTaskHandlerImpl implements KilledTaskHandler {
http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
index 926835b..ab3a130 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
@@ -65,6 +65,10 @@ public class EvictingPriorityBlockingQueue<E> {
deque.remove(e);
}
+ public synchronized int size() {
+ return deque.size();
+ }
+
@Override
public synchronized String toString() {
return deque.toString();
http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
new file mode 100644
index 0000000..f6cd8ab
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QueryFragmentInfo {
+
+ private static final Logger LOG = LoggerFactory.getLogger(QueryFragmentInfo.class);
+
+ private final QueryInfo queryInfo;
+ private final String vertexName;
+ private final int fragmentNumber;
+ private final int attemptNumber;
+ private final FragmentSpecProto fragmentSpec;
+
+ public QueryFragmentInfo(QueryInfo queryInfo, String vertexName, int fragmentNumber,
+ int attemptNumber,
+ FragmentSpecProto fragmentSpec) {
+ Preconditions.checkNotNull(queryInfo);
+ Preconditions.checkNotNull(vertexName);
+ Preconditions.checkNotNull(fragmentSpec);
+ this.queryInfo = queryInfo;
+ this.vertexName = vertexName;
+ this.fragmentNumber = fragmentNumber;
+ this.attemptNumber = attemptNumber;
+ this.fragmentSpec = fragmentSpec;
+ }
+
+ // Only meant for use by the QueryTracker
+ QueryInfo getQueryInfo() {
+ return this.queryInfo;
+ }
+
+ public FragmentSpecProto getFragmentSpec() {
+ return fragmentSpec;
+ }
+
+ public String getVertexName() {
+ return vertexName;
+ }
+
+ public int getFragmentNumber() {
+ return fragmentNumber;
+ }
+
+ public int getAttemptNumber() {
+ return attemptNumber;
+ }
+
+ /**
+ * Check whether a task can run to completion or may end up blocking on it's sources.
+ * This currently happens via looking up source state.
+ * TODO: Eventually, this should lookup the Hive Processor to figure out whether
+ * it's reached a state where it can finish - especially in cases of failures
+ * after data has been fetched.
+ *
+ * @return true if the task can finish, false otherwise
+ */
+ public boolean canFinish() {
+ List<IOSpecProto> inputSpecList = fragmentSpec.getInputSpecsList();
+ boolean canFinish = true;
+ if (inputSpecList != null && !inputSpecList.isEmpty()) {
+ for (IOSpecProto inputSpec : inputSpecList) {
+ if (isSourceOfInterest(inputSpec)) {
+ // Lookup the state in the map.
+ LlapDaemonProtocolProtos.SourceStateProto state = queryInfo.getSourceStateMap()
+ .get(inputSpec.getConnectedVertexName());
+ if (state != null && state == LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED) {
+ continue;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cannot finish due to source: " + inputSpec.getConnectedVertexName());
+ }
+ canFinish = false;
+ break;
+ }
+ }
+ }
+ }
+ return canFinish;
+ }
+
+ /**
+ * Get, and create if required, local-dirs for a fragment
+ * @return
+ * @throws IOException
+ */
+ public String[] getLocalDirs() throws IOException {
+ return queryInfo.getLocalDirs();
+ }
+
+ private boolean isSourceOfInterest(IOSpecProto inputSpec) {
+ String inputClassName = inputSpec.getIoDescriptor().getClassName();
+ // MRInput is not of interest since it'll always be ready.
+ return !inputClassName.equals(MRInputLegacy.class.getName());
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ QueryFragmentInfo that = (QueryFragmentInfo) o;
+
+ if (fragmentNumber != that.fragmentNumber) {
+ return false;
+ }
+ if (attemptNumber != that.attemptNumber) {
+ return false;
+ }
+ return vertexName.equals(that.vertexName);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = vertexName.hashCode();
+ result = 31 * result + fragmentNumber;
+ result = 31 * result + attemptNumber;
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
new file mode 100644
index 0000000..efa18cd
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
+
+public class QueryInfo {
+ private final String queryId;
+ private final String appIdString;
+ private final String dagName;
+ private final int dagIdentifier;
+ private final String user;
+ private final String[] localDirsBase;
+ private final FileSystem localFs;
+ private String[] localDirs;
+ // Map of states for different vertices.
+
+ private final Set<QueryFragmentInfo> knownFragments =
+ Collections.newSetFromMap(new ConcurrentHashMap<QueryFragmentInfo, Boolean>());
+
+ private final ConcurrentMap<String, SourceStateProto> sourceStateMap;
+
+
+ public QueryInfo(String queryId, String appIdString, String dagName, int dagIdentifier,
+ String user, ConcurrentMap<String, SourceStateProto> sourceStateMap,
+ String[] localDirsBase, FileSystem localFs) {
+ this.queryId = queryId;
+ this.appIdString = appIdString;
+ this.dagName = dagName;
+ this.dagIdentifier = dagIdentifier;
+ this.sourceStateMap = sourceStateMap;
+ this.user = user;
+ this.localDirsBase = localDirsBase;
+ this.localFs = localFs;
+ }
+
+ public String getQueryId() {
+ return queryId;
+ }
+
+ public String getAppIdString() {
+ return appIdString;
+ }
+
+ public String getDagName() {
+ return dagName;
+ }
+
+ public int getDagIdentifier() {
+ return dagIdentifier;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public ConcurrentMap<String, SourceStateProto> getSourceStateMap() {
+ return sourceStateMap;
+ }
+
+ public QueryFragmentInfo registerFragment(String vertexName, int fragmentNumber, int attemptNumber, FragmentSpecProto fragmentSpec) {
+ QueryFragmentInfo fragmentInfo = new QueryFragmentInfo(this, vertexName, fragmentNumber, attemptNumber,
+ fragmentSpec);
+ knownFragments.add(fragmentInfo);
+ return fragmentInfo;
+ }
+
+ public void unregisterFragment(QueryFragmentInfo fragmentInfo) {
+ knownFragments.remove(fragmentInfo);
+ }
+
+ private synchronized void createLocalDirs() throws IOException {
+ if (localDirs == null) {
+ localDirs = new String[localDirsBase.length];
+ for (int i = 0; i < localDirsBase.length; i++) {
+ localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], appIdString, user, dagIdentifier);
+ localFs.mkdirs(new Path(localDirs[i]));
+ }
+ }
+ }
+
+ /**
+ * Get, and create if required, local-dirs for a query
+ * @return
+ * @throws IOException
+ */
+ public synchronized String[] getLocalDirs() throws IOException {
+ if (localDirs == null) {
+ createLocalDirs();
+ }
+ return localDirs;
+ }
+
+ public synchronized String[] getLocalDirsNoCreate() {
+ return this.localDirs;
+ }
+
+ private static String createAppSpecificLocalDir(String baseDir, String applicationIdString,
+ String user, int dagIdentifier) {
+ // TODO This is broken for secure clusters. The app will not have permission to create these directories.
+ // May work via Slider - since the directory would already exist. Otherwise may need a custom shuffle handler.
+ // TODO This should be the process user - and not the user on behalf of whom the query is being submitted.
+ return baseDir + File.separator + "usercache" + File.separator + user + File.separator +
+ "appcache" + File.separator + applicationIdString + File.separator + dagIdentifier;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 5c8116e..90ad923 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -16,19 +16,28 @@ package org.apache.hadoop.hive.llap.daemon.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
+import org.apache.hadoop.service.CompositeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
/**
* Tracks queries running within a daemon
*/
-public class QueryTracker {
+public class QueryTracker extends CompositeService {
private static final Logger LOG = LoggerFactory.getLogger(QueryTracker.class);
private final QueryFileCleaner queryFileCleaner;
@@ -39,107 +48,162 @@ public class QueryTracker {
private final String[] localDirsBase;
private final FileSystem localFs;
+ // TODO At the moment there's no way of knowing whether a query is running or not.
+ // A race is possible between dagComplete and registerFragment - where the registerFragment
+ // is processed after a dagCompletes.
+ // May need to keep track of completed dags for a certain time duration to avoid this.
+ // Alternately - send in an explicit dag start message before any other message is processed.
+ // Multiple threads communicating from a single AM gets in the way of this.
+
+ // Keeps track of completed dags. Assumes dag names are unique across AMs.
+ private final Set<String> completedDagMap = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+
+
+ private final Lock lock = new ReentrantLock();
+ private final ConcurrentMap<String, ReadWriteLock> dagSpecificLocks = new ConcurrentHashMap<>();
+
+ // Tracks various maps for dagCompletions. This is setup here since stateChange messages
+ // may be processed by a thread which ends up executing before a task.
+ private final ConcurrentMap<String, ConcurrentMap<String, SourceStateProto>> sourceCompletionMap = new ConcurrentHashMap<>();
+
public QueryTracker(Configuration conf, String[] localDirsBase) {
+ super("QueryTracker");
this.localDirsBase = localDirsBase;
try {
localFs = FileSystem.getLocal(conf);
} catch (IOException e) {
throw new RuntimeException("Failed to setup local filesystem instance", e);
}
+
queryFileCleaner = new QueryFileCleaner(conf, localFs);
- queryFileCleaner.init(conf);
- queryFileCleaner.start();
+ addService(queryFileCleaner);
}
- void registerFragment(String queryId, String appIdString, String dagName, int dagIdentifier,
+
+ /**
+ * Register a new fragment for a specific query
+ * @param queryId
+ * @param appIdString
+ * @param dagName
+ * @param dagIdentifier
+ * @param vertexName
+ * @param fragmentNumber
+ * @param attemptNumber
+ * @param user
+ * @throws IOException
+ */
+ QueryFragmentInfo registerFragment(String queryId, String appIdString, String dagName, int dagIdentifier,
String vertexName, int fragmentNumber, int attemptNumber,
- String user) throws
+ String user, FragmentSpecProto fragmentSpec) throws
IOException {
- QueryInfo queryInfo = queryInfoMap.get(dagName);
- if (queryInfo == null) {
- queryInfo = new QueryInfo(queryId, appIdString, dagName, dagIdentifier, user);
- queryInfoMap.putIfAbsent(dagName, queryInfo);
+ ReadWriteLock dagLock = getDagLock(dagName);
+ dagLock.readLock().lock();
+ try {
+ if (!completedDagMap.contains(dagName)) {
+ QueryInfo queryInfo = queryInfoMap.get(dagName);
+ if (queryInfo == null) {
+ queryInfo = new QueryInfo(queryId, appIdString, dagName, dagIdentifier, user,
+ getSourceCompletionMap(dagName), localDirsBase, localFs);
+ queryInfoMap.putIfAbsent(dagName, queryInfo);
+ }
+ return queryInfo.registerFragment(vertexName, fragmentNumber, attemptNumber, fragmentSpec);
+ } else {
+ // Cleanup the dag lock here, since it may have been created after the query completed
+ dagSpecificLocks.remove(dagName);
+ throw new RuntimeException(
+ "Dag " + dagName + " already complete. Rejecting fragment [" + vertexName + ", " + fragmentNumber +
+ ", " + attemptNumber);
+ }
+ } finally {
+ dagLock.readLock().unlock();
}
- // TODO Start tracking individual fragments, so that taskKilled etc messages
- // can be routed through this layer to simplify the interfaces.
}
- String[] getLocalDirs(String queryId, String dagName, String user) throws IOException {
+ /**
+ * Indicate to the tracker that a fragment is complete. This is from internal execution within the daemon
+ * @param fragmentInfo
+ */
+ void fragmentComplete(QueryFragmentInfo fragmentInfo) {
+ String dagName = fragmentInfo.getQueryInfo().getDagName();
QueryInfo queryInfo = queryInfoMap.get(dagName);
- return queryInfo.getLocalDirs();
+ if (queryInfo == null) {
+ // Possible because a queryComplete message from the AM can come in first - KILL / SUCCESSFUL,
+ // before the fragmentComplete is reported
+ LOG.info("Ignoring fragmentComplete message for unknown query");
+ } else {
+ queryInfo.unregisterFragment(fragmentInfo);
+ }
}
+ /**
+ * Register completion for a query
+ * @param queryId
+ * @param dagName
+ * @param deleteDelay
+ */
void queryComplete(String queryId, String dagName, long deleteDelay) {
- LOG.info("Processing queryComplete for dagName={} with deleteDelay={} seconds", dagName, deleteDelay);
- QueryInfo queryInfo = queryInfoMap.remove(dagName);
- if (queryInfo == null) {
- LOG.warn("Ignoring query complete for unknown dag: {}", dagName);
- }
- String []localDirs = queryInfo.getLocalDirsNoCreate();
- if (localDirs != null) {
- for (String localDir : localDirs) {
- queryFileCleaner.cleanupDir(localDir, deleteDelay);
- ShuffleHandler.get().unregisterDag(localDir, dagName, queryInfo.dagIdentifier);
+ ReadWriteLock dagLock = getDagLock(dagName);
+ dagLock.writeLock().lock();
+ try {
+ completedDagMap.add(dagName);
+ LOG.info("Processing queryComplete for dagName={} with deleteDelay={} seconds", dagName,
+ deleteDelay);
+ completedDagMap.add(dagName);
+ QueryInfo queryInfo = queryInfoMap.remove(dagName);
+ if (queryInfo == null) {
+ LOG.warn("Ignoring query complete for unknown dag: {}", dagName);
}
+ String[] localDirs = queryInfo.getLocalDirsNoCreate();
+ if (localDirs != null) {
+ for (String localDir : localDirs) {
+ queryFileCleaner.cleanupDir(localDir, deleteDelay);
+ ShuffleHandler.get().unregisterDag(localDir, dagName, queryInfo.getDagIdentifier());
+ }
+ }
+ sourceCompletionMap.remove(dagName);
+ dagSpecificLocks.remove(dagName);
+ // TODO HIVE-10762 Issue a kill message to all running fragments for this container.
+ // TODO HIVE-10535 Cleanup map join cache
+ } finally {
+ dagLock.writeLock().unlock();
}
- // TODO HIVE-10535 Cleanup map join cache
}
- void shutdown() {
- queryFileCleaner.stop();
+ /**
+ * Register an update to a source within an executing dag
+ * @param dagName
+ * @param sourceName
+ * @param sourceState
+ */
+ void registerSourceStateChange(String dagName, String sourceName, SourceStateProto sourceState) {
+ getSourceCompletionMap(dagName).put(sourceName, sourceState);
+ // TODO HIVE-10758 source completion notifications
}
- private class QueryInfo {
-
- private final String queryId;
- private final String appIdString;
- private final String dagName;
- private final int dagIdentifier;
- private final String user;
- private String[] localDirs;
-
- public QueryInfo(String queryId, String appIdString, String dagName, int dagIdentifier,
- String user) {
- this.queryId = queryId;
- this.appIdString = appIdString;
- this.dagName = dagName;
- this.dagIdentifier = dagIdentifier;
- this.user = user;
- }
-
-
-
-
- private synchronized void createLocalDirs() throws IOException {
- if (localDirs == null) {
- localDirs = new String[localDirsBase.length];
- for (int i = 0; i < localDirsBase.length; i++) {
- localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], appIdString, user, dagIdentifier);
- localFs.mkdirs(new Path(localDirs[i]));
- }
- }
- }
-
- private synchronized String[] getLocalDirs() throws IOException {
- if (localDirs == null) {
- createLocalDirs();
+ private ReadWriteLock getDagLock(String dagName) {
+ lock.lock();
+ try {
+ ReadWriteLock dagLock = dagSpecificLocks.get(dagName);
+ if (dagLock == null) {
+ dagLock = new ReentrantReadWriteLock();
+ dagSpecificLocks.put(dagName, dagLock);
}
- return localDirs;
- }
-
- private synchronized String[] getLocalDirsNoCreate() {
- return this.localDirs;
+ return dagLock;
+ } finally {
+ lock.unlock();
}
}
- private static String createAppSpecificLocalDir(String baseDir, String applicationIdString,
- String user, int dagIdentifier) {
- // TODO This is broken for secure clusters. The app will not have permission to create these directories.
- // May work via Slider - since the directory would already exist. Otherwise may need a custom shuffle handler.
- // TODO This should be the process user - and not the user on behalf of whom the query is being submitted.
- return baseDir + File.separator + "usercache" + File.separator + user + File.separator +
- "appcache" + File.separator + applicationIdString + File.separator + dagIdentifier;
+ private ConcurrentMap<String, SourceStateProto> getSourceCompletionMap(String dagName) {
+ ConcurrentMap<String, SourceStateProto> dagMap = sourceCompletionMap.get(dagName);
+ if (dagMap == null) {
+ dagMap = new ConcurrentHashMap<>();
+ ConcurrentMap<String, SourceStateProto> old =
+ sourceCompletionMap.putIfAbsent(dagName, dagMap);
+ dagMap = (old != null) ? old : dagMap;
+ }
+ return dagMap;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 599c759..18daa75 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -108,6 +108,11 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
executionCompletionExecutorService = MoreExecutors.listeningDecorator(executionCompletionExecutorServiceRaw);
ListenableFuture<?> future = waitQueueExecutorService.submit(new WaitQueueWorker());
Futures.addCallback(future, new WaitQueueWorkerCallback());
+
+ LOG.info("TaskExecutorService started with parameters: "
+ + "numExecutors=" + numExecutors
+ + ", waitQueueSize=" + waitQueueSize
+ + ", enablePreemption=" + enablePreemption);
}
/**
@@ -134,11 +139,20 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
// if the task cannot finish and if no slots are available then don't schedule it.
// TODO: Event notifications that change canFinish state should notify waitLock
synchronized (waitLock) {
- // KKK Is this a tight loop when there's only finishable tasks available ?
- if (!task.canFinish() && numSlotsAvailable.get() == 0) {
- waitLock.wait();
+ boolean shouldWait = false;
+ if (task.canFinish()) {
+ if (numSlotsAvailable.get() == 0 && preemptionQueue.isEmpty()) {
+ shouldWait = true;
+ }
+ } else {
+ if (numSlotsAvailable.get() == 0) {
+ shouldWait = true;
+ }
+ }
+ if (shouldWait) {
// Another task at a higher priority may have come in during the wait. Lookup the
// queue again to pick up the task at the highest priority.
+ waitLock.wait();
continue;
}
}
@@ -174,7 +188,7 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
@Override
public void onFailure(Throwable t) {
- LOG.error("Wait queue scheduler worker exited with failure!");
+ LOG.error("Wait queue scheduler worker exited with failure!", t);
}
}
@@ -237,7 +251,8 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
if (pRequest != null) {
if (isInfoEnabled) {
- LOG.info("Invoking kill task for {} due to pre-emption.", pRequest.getRequestId());
+ LOG.info("Invoking kill task for {} due to pre-emption to run {}",
+ pRequest.getRequestId(), task.getRequestId());
}
// The task will either be killed or is already in the process of completing, which will
http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 94512d6..166dac5 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -21,9 +21,7 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -31,9 +29,9 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.CallableWithNdc;
+import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
@@ -50,9 +48,7 @@ import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.runtime.api.ExecutionContext;
-import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.runtime.internals.api.TaskReporterInterface;
@@ -76,9 +72,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
*/
public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerCallable.class);
- private final LlapDaemonProtocolProtos.SubmitWorkRequestProto request;
+ private final SubmitWorkRequestProto request;
private final Configuration conf;
- private final String[] localDirs;
private final Map<String, String> envMap;
private final String pid = null;
private final ObjectRegistryImpl objectRegistry;
@@ -88,16 +83,17 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
private final ConfParams confParams;
private final Token<JobTokenIdentifier> jobToken;
private final AMReporter amReporter;
- private final ConcurrentMap<String, LlapDaemonProtocolProtos.SourceStateProto> sourceCompletionMap;
private final TaskSpec taskSpec;
+ private final QueryFragmentInfo fragmentInfo;
private final KilledTaskHandler killedTaskHandler;
+ private final FragmentCompletionHandler fragmentCompletionHanler;
private volatile TezTaskRunner2 taskRunner;
private volatile TaskReporterInterface taskReporter;
private volatile ListeningExecutorService executor;
private LlapTaskUmbilicalProtocol umbilical;
private volatile long startTime;
private volatile String threadName;
- private LlapDaemonExecutorMetrics metrics;
+ private final LlapDaemonExecutorMetrics metrics;
private final String requestId;
private boolean shouldRunTask = true;
final Stopwatch runtimeWatch = new Stopwatch();
@@ -105,20 +101,20 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
private final AtomicBoolean isCompleted = new AtomicBoolean(false);
private final AtomicBoolean killInvoked = new AtomicBoolean(false);
- TaskRunnerCallable(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, Configuration conf,
- ExecutionContext executionContext, Map<String, String> envMap,
- String[] localDirs, Credentials credentials,
- long memoryAvailable, AMReporter amReporter,
- ConcurrentMap<String, LlapDaemonProtocolProtos.SourceStateProto> sourceCompletionMap,
- ConfParams confParams, LlapDaemonExecutorMetrics metrics,
- KilledTaskHandler killedTaskHandler) {
+ TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo,
+ Configuration conf,
+ ExecutionContext executionContext, Map<String, String> envMap,
+ Credentials credentials,
+ long memoryAvailable, AMReporter amReporter,
+ ConfParams confParams, LlapDaemonExecutorMetrics metrics,
+ KilledTaskHandler killedTaskHandler,
+ FragmentCompletionHandler fragmentCompleteHandler) {
this.request = request;
+ this.fragmentInfo = fragmentInfo;
this.conf = conf;
this.executionContext = executionContext;
this.envMap = envMap;
- this.localDirs = localDirs;
this.objectRegistry = new ObjectRegistryImpl();
- this.sourceCompletionMap = sourceCompletionMap;
this.credentials = credentials;
this.memoryAvailable = memoryAvailable;
this.confParams = confParams;
@@ -133,6 +129,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
this.metrics = metrics;
this.requestId = getTaskAttemptId(request);
this.killedTaskHandler = killedTaskHandler;
+ this.fragmentCompletionHanler = fragmentCompleteHandler;
}
@Override
@@ -189,7 +186,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
synchronized (this) {
if (shouldRunTask) {
- taskRunner = new TezTaskRunner2(conf, taskUgi, localDirs,
+ taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(),
taskSpec,
request.getAppAttemptNumber(),
serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor,
@@ -240,6 +237,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
boolean killed = taskRunner.killTask();
if (killed) {
// Sending a kill message to the AM right here. Don't need to wait for the task to complete.
+ LOG.info("Kill request for task {} completed. Informing AM", taskSpec.getTaskAttemptID());
reportTaskKilled();
} else {
LOG.info("Kill request for task {} did not complete because the task is already complete",
@@ -268,43 +266,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
taskSpec.getTaskAttemptID());
}
- /**
- * Check whether a task can run to completion or may end up blocking on it's sources.
- * This currently happens via looking up source state.
- * TODO: Eventually, this should lookup the Hive Processor to figure out whether
- * it's reached a state where it can finish - especially in cases of failures
- * after data has been fetched.
- *
- * @return
- */
public boolean canFinish() {
- List<InputSpec> inputSpecList = taskSpec.getInputs();
- boolean canFinish = true;
- if (inputSpecList != null && !inputSpecList.isEmpty()) {
- for (InputSpec inputSpec : inputSpecList) {
- if (isSourceOfInterest(inputSpec)) {
- // Lookup the state in the map.
- LlapDaemonProtocolProtos.SourceStateProto state = sourceCompletionMap
- .get(inputSpec.getSourceVertexName());
- if (state != null && state == LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED) {
- continue;
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cannot finish due to source: " + inputSpec.getSourceVertexName());
- }
- canFinish = false;
- break;
- }
- }
- }
- }
- return canFinish;
- }
-
- private boolean isSourceOfInterest(InputSpec inputSpec) {
- String inputClassName = inputSpec.getInputDescriptor().getClassName();
- // MRInput is not of interest since it'll always be ready.
- return !inputClassName.equals(MRInputLegacy.class.getName());
+ return fragmentInfo.canFinish();
}
private Multimap<String, String> createStartedInputMap(FragmentSpecProto fragmentSpec) {
@@ -371,10 +334,10 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
final class TaskRunnerCallback implements FutureCallback<TaskRunner2Result> {
- private final LlapDaemonProtocolProtos.SubmitWorkRequestProto request;
+ private final SubmitWorkRequestProto request;
private final TaskRunnerCallable taskRunnerCallable;
- TaskRunnerCallback(LlapDaemonProtocolProtos.SubmitWorkRequestProto request,
+ TaskRunnerCallback(SubmitWorkRequestProto request,
TaskRunnerCallable taskRunnerCallable) {
this.request = request;
this.taskRunnerCallable = taskRunnerCallable;
@@ -385,6 +348,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
@Override
public void onSuccess(TaskRunner2Result result) {
isCompleted.set(true);
+
switch(result.getEndReason()) {
// Only the KILLED case requires a message to be sent out to the AM.
case SUCCESS:
@@ -413,6 +377,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
metrics.incrExecutorTotalExecutionFailed();
break;
}
+ fragmentCompletionHanler.fragmentComplete(fragmentInfo);
taskRunnerCallable.shutdown();
HistoryLogger
@@ -427,8 +392,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
@Override
public void onFailure(Throwable t) {
- isCompleted.set(true);
LOG.error("TezTaskRunner execution failed for : " + getTaskIdentifierString(request), t);
+ isCompleted.set(true);
+ fragmentCompletionHanler.fragmentComplete(fragmentInfo);
// TODO HIVE-10236 Report a fatal error over the umbilical
taskRunnerCallable.shutdown();
HistoryLogger
@@ -458,7 +424,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
}
public static String getTaskIdentifierString(
- LlapDaemonProtocolProtos.SubmitWorkRequestProto request) {
+ SubmitWorkRequestProto request) {
StringBuilder sb = new StringBuilder();
sb.append("AppId=").append(request.getApplicationIdString())
.append(", containerId=").append(request.getContainerIdString())
http://git-wip-us.apache.org/repos/asf/hive/blob/ece61d03/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index f0e53a7..a2e9501 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
@@ -52,15 +53,17 @@ public class TestTaskExecutorService {
public MockRequest(LlapDaemonProtocolProtos.SubmitWorkRequestProto requestProto,
boolean canFinish, int workTime) {
- super(requestProto, conf, new ExecutionContextImpl("localhost"), null, null, cred, 0, null,
- null, null, null, mock(KilledTaskHandler.class));
+ super(requestProto, mock(QueryFragmentInfo.class), conf,
+ new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null,
+ mock(KilledTaskHandler.class), mock(
+ FragmentCompletionHandler.class));
this.workTime = workTime;
this.canFinish = canFinish;
}
@Override
protected TaskRunner2Result callInternal() {
- System.out.println(requestId + " is executing..");
+ System.out.println(super.getRequestId() + " is executing..");
try {
Thread.sleep(workTime);
} catch (InterruptedException e) {