You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2014/03/08 19:14:36 UTC
svn commit: r1575583 - in
/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql:
Driver.java DriverContext.java exec/ConditionalTask.java exec/TaskRunner.java
Author: thejas
Date: Sat Mar 8 18:14:35 2014
New Revision: 1575583
URL: http://svn.apache.org/r1575583
Log:
HIVE-5901 : Query cancel should stop running MR tasks (Navis via Thejas Nair)
Modified:
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1575583&r1=1575582&r2=1575583&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Sat Mar 8 18:14:35 2014
@@ -19,6 +19,19 @@
package org.apache.hadoop.hive.ql;
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -67,13 +80,6 @@ import org.apache.hadoop.mapred.ClusterS
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import java.io.DataInput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-
public class Driver implements CommandProcessor {
static final private String CLASS_NAME = Driver.class.getName();
@@ -88,6 +94,7 @@ public class Driver implements CommandPr
private HiveConf conf;
private DataInput resStream;
private Context ctx;
+ private DriverContext driverCxt;
private QueryPlan plan;
private Schema schema;
private String errorMessage;
@@ -97,8 +104,9 @@ public class Driver implements CommandPr
// A limit on the number of threads that can be launched
private int maxthreads;
- private static final int SLEEP_TIME = 2000;
- protected int tryCount = Integer.MAX_VALUE;
+ private int tryCount = Integer.MAX_VALUE;
+
+ private boolean destroyed;
private String userName;
@@ -1199,14 +1207,13 @@ public class Driver implements CommandPr
// At any time, at most maxthreads tasks can be running
// The main thread polls the TaskRunners to check if they have finished.
- Queue<Task<? extends Serializable>> runnable = new ConcurrentLinkedQueue<Task<? extends Serializable>>();
- Map<TaskResult, TaskRunner> running = new HashMap<TaskResult, TaskRunner>();
-
- DriverContext driverCxt = new DriverContext(runnable, ctx);
+ DriverContext driverCxt = new DriverContext(ctx);
driverCxt.prepare(plan);
ctx.setHDFSCleanup(true);
+ this.driverCxt = driverCxt; // for canceling the query (should be bound to session?)
+
SessionState.get().setLastMapRedStatsList(new ArrayList<MapRedStats>());
SessionState.get().setStackTraces(new HashMap<String, List<List<String>>>());
SessionState.get().setLocalMapRedErrors(new HashMap<String, List<String>>());
@@ -1222,27 +1229,32 @@ public class Driver implements CommandPr
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TIME_TO_SUBMIT);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);
// Loop while you either have tasks running, or tasks queued up
- while (running.size() != 0 || runnable.peek() != null) {
+ while (!destroyed && driverCxt.isRunning()) {
+
// Launch upto maxthreads tasks
- while (runnable.peek() != null && running.size() < maxthreads) {
- Task<? extends Serializable> tsk = runnable.remove();
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + tsk.getName() + "." + tsk.getId());
- launchTask(tsk, queryId, noName, running, jobname, jobs, driverCxt);
+ Task<? extends Serializable> task;
+ while ((task = driverCxt.getRunnable(maxthreads)) != null) {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + task.getName() + "." + task.getId());
+ TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
+ if (!runner.isRunning()) {
+ break;
+ }
}
// poll the Tasks to see which one completed
- TaskResult tskRes = pollTasks(running.keySet());
- TaskRunner tskRun = running.remove(tskRes);
- Task<? extends Serializable> tsk = tskRun.getTask();
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TASK + tsk.getName() + "." + tsk.getId());
+ TaskRunner tskRun = driverCxt.pollFinished();
+ if (tskRun == null) {
+ continue;
+ }
hookContext.addCompleteTask(tskRun);
- int exitVal = tskRes.getExitVal();
+ Task<? extends Serializable> tsk = tskRun.getTask();
+ TaskResult result = tskRun.getTaskResult();
+
+ int exitVal = result.getExitVal();
if (exitVal != 0) {
if (tsk.ifRetryCmdWhenFail()) {
- if (!running.isEmpty()) {
- taskCleanup(running);
- }
+ driverCxt.shutdown();
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
ctx.restoreOriginalTracker();
@@ -1250,7 +1262,7 @@ public class Driver implements CommandPr
}
Task<? extends Serializable> backupTask = tsk.getAndInitBackupTask();
if (backupTask != null) {
- setErrorMsgAndDetail(exitVal, tskRes.getTaskError(), tsk);
+ setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
console.printError(errorMessage);
errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName();
console.printError(errorMessage);
@@ -1271,12 +1283,10 @@ public class Driver implements CommandPr
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName());
}
- setErrorMsgAndDetail(exitVal, tskRes.getTaskError(), tsk);
+ setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
SQLState = "08S01";
console.printError(errorMessage);
- if (!running.isEmpty()) {
- taskCleanup(running);
- }
+ driverCxt.shutdown();
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
ctx.restoreOriginalTracker();
@@ -1306,6 +1316,13 @@ public class Driver implements CommandPr
// the jobtracker setting to its initial value
ctx.restoreOriginalTracker();
+ if (driverCxt.isShutdown()) {
+ SQLState = "HY008";
+ errorMessage = "FAILED: Operation cancelled";
+ console.printError(errorMessage);
+ return 1000;
+ }
+
// remove incomplete outputs.
// Some incomplete outputs may be added at the beginning, for eg: for dynamic partitions.
// remove them
@@ -1427,10 +1444,8 @@ public class Driver implements CommandPr
* @param cxt
* the driver context
*/
-
- public void launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName,
- Map<TaskResult, TaskRunner> running, String jobname, int jobs, DriverContext cxt) {
-
+ private TaskRunner launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName,
+ String jobname, int jobs, DriverContext cxt) throws HiveException {
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName());
}
@@ -1447,8 +1462,7 @@ public class Driver implements CommandPr
TaskResult tskRes = new TaskResult();
TaskRunner tskRun = new TaskRunner(tsk, tskRes);
- cxt.prepare(tskRun);
-
+ cxt.launching(tskRun);
// Launch Task
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) {
// Launch it in the parallel mode, as a separate thread only for MR tasks
@@ -1456,53 +1470,7 @@ public class Driver implements CommandPr
} else {
tskRun.runSequential();
}
- running.put(tskRes, tskRun);
- }
-
- /**
- * Cleans up remaining tasks in case of failure
- */
- public void taskCleanup(Map<TaskResult, TaskRunner> running) {
- for (Map.Entry<TaskResult, TaskRunner> entry : running.entrySet()) {
- if (entry.getKey().isRunning()) {
- Task<?> task = entry.getValue().getTask();
- try {
- task.shutdown();
- } catch (Exception e) {
- console.printError("Exception on shutting down task " + task.getId() + ": " + e);
- }
- }
- }
- running.clear();
- }
-
- /**
- * Polls running tasks to see if a task has ended.
- *
- * @param results
- * Set of result objects for running tasks
- * @return The result object for any completed/failed task
- */
-
- public TaskResult pollTasks(Set<TaskResult> results) {
- Iterator<TaskResult> resultIterator = results.iterator();
- while (true) {
- while (resultIterator.hasNext()) {
- TaskResult tskRes = resultIterator.next();
- if (!tskRes.isRunning()) {
- return tskRes;
- }
- }
-
- // In this loop, nothing was found
- // Sleep 10 seconds and restart
- try {
- Thread.sleep(SLEEP_TIME);
- } catch (InterruptedException ie) {
- // Do Nothing
- }
- resultIterator = results.iterator();
- }
+ return tskRun;
}
public boolean isFetchingTable() {
@@ -1510,6 +1478,9 @@ public class Driver implements CommandPr
}
public boolean getResults(List res) throws IOException, CommandNeedRetryException {
+ if (destroyed) {
+ throw new IOException("FAILED: Operation cancelled");
+ }
if (isFetchingTable()) {
FetchTask ft = plan.getFetchTask();
ft.setMaxRows(maxRows);
@@ -1597,6 +1568,10 @@ public class Driver implements CommandPr
}
}
}
+ if (driverCxt != null) {
+ driverCxt.shutdown();
+ driverCxt = null;
+ }
if (ctx != null) {
ctx.clear();
}
@@ -1617,6 +1592,10 @@ public class Driver implements CommandPr
}
public void destroy() {
+ if (destroyed) {
+ return;
+ }
+ destroyed = true;
if (ctx != null) {
try {
releaseLocks(ctx.getHiveLocks());
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java?rev=1575583&r1=1575582&r2=1575583&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java Sat Mar 8 18:14:35 2014
@@ -26,16 +26,23 @@ import org.apache.hadoop.hive.ql.exec.St
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskRunner;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Iterator;
import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.session.SessionState;
/**
* DriverContext.
@@ -43,27 +50,103 @@ import java.util.Queue;
*/
public class DriverContext {
- Queue<Task<? extends Serializable>> runnable = new LinkedList<Task<? extends Serializable>>();
+ private static final Log LOG = LogFactory.getLog(Driver.class.getName());
+ private static final SessionState.LogHelper console = new SessionState.LogHelper(LOG);
+
+ private static final int SLEEP_TIME = 2000;
+
+ private Queue<Task<? extends Serializable>> runnable;
+ private Queue<TaskRunner> running;
// how many jobs have been started
- int curJobNo;
+ private int curJobNo;
- Context ctx;
+ private Context ctx;
+ private boolean shutdown;
final Map<String, StatsTask> statsTasks = new HashMap<String, StatsTask>(1);
public DriverContext() {
- this.runnable = null;
- this.ctx = null;
}
- public DriverContext(Queue<Task<? extends Serializable>> runnable, Context ctx) {
- this.runnable = runnable;
+ public DriverContext(Context ctx) {
+ this.runnable = new ConcurrentLinkedQueue<Task<? extends Serializable>>();
+ this.running = new LinkedBlockingQueue<TaskRunner>();
this.ctx = ctx;
}
- public Queue<Task<? extends Serializable>> getRunnable() {
- return runnable;
+ public synchronized boolean isShutdown() {
+ return shutdown;
+ }
+
+ public synchronized boolean isRunning() {
+ return !shutdown && (!running.isEmpty() || !runnable.isEmpty());
+ }
+
+ public synchronized void remove(Task<? extends Serializable> task) {
+ runnable.remove(task);
+ }
+
+ public synchronized void launching(TaskRunner runner) throws HiveException {
+ checkShutdown();
+ running.add(runner);
+ }
+
+ public synchronized Task<? extends Serializable> getRunnable(int maxthreads) throws HiveException {
+ checkShutdown();
+ if (runnable.peek() != null && running.size() < maxthreads) {
+ return runnable.remove();
+ }
+ return null;
+ }
+
+ /**
+ * Polls running tasks to see if a task has ended.
+ *
+ * @return The result object for any completed/failed task
+ */
+ public synchronized TaskRunner pollFinished() throws InterruptedException {
+ while (!shutdown) {
+ Iterator<TaskRunner> it = running.iterator();
+ while (it.hasNext()) {
+ TaskRunner runner = it.next();
+ if (runner != null && !runner.isRunning()) {
+ it.remove();
+ return runner;
+ }
+ }
+ wait(SLEEP_TIME);
+ }
+ return null;
+ }
+
+ private void checkShutdown() throws HiveException {
+ if (shutdown) {
+ throw new HiveException("FAILED: Operation cancelled");
+ }
+ }
+ /**
+ * Cleans up remaining tasks in case of failure
+ */
+ public synchronized void shutdown() {
+ LOG.warn("Shutting down query " + ctx.getCmd());
+ shutdown = true;
+ for (TaskRunner runner : running) {
+ if (runner.isRunning()) {
+ Task<?> task = runner.getTask();
+ LOG.warn("Shutting down task : " + task);
+ try {
+ task.shutdown();
+ } catch (Exception e) {
+ console.printError("Exception on shutting down task " + task.getId() + ": " + e);
+ }
+ Thread thread = runner.getRunner();
+ if (thread != null) {
+ thread.interrupt();
+ }
+ }
+ }
+ running.clear();
}
/**
@@ -80,9 +163,14 @@ public class DriverContext {
return !tsk.getQueued() && !tsk.getInitialized() && tsk.isRunnable();
}
- public void addToRunnable(Task<? extends Serializable> tsk) {
+ public synchronized boolean addToRunnable(Task<? extends Serializable> tsk) throws HiveException {
+ if (runnable.contains(tsk)) {
+ return false;
+ }
+ checkShutdown();
runnable.add(tsk);
tsk.setQueued();
+ return true;
}
public int getCurJobNo() {
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java?rev=1575583&r1=1575582&r2=1575583&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java Sat Mar 8 18:14:35 2014
@@ -22,9 +22,9 @@ import java.io.Serializable;
import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ConditionalResolver;
import org.apache.hadoop.hive.ql.plan.ConditionalWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -81,9 +81,19 @@ public class ConditionalTask extends Tas
resTasks = resolver.getTasks(conf, resolverCtx);
resolved = true;
+ try {
+ resolveTask(driverContext);
+ } catch (Exception e) {
+ setException(e);
+ return 1;
+ }
+ return 0;
+ }
+
+ private void resolveTask(DriverContext driverContext) throws HiveException {
for (Task<? extends Serializable> tsk : getListTasks()) {
if (!resTasks.contains(tsk)) {
- driverContext.getRunnable().remove(tsk);
+ driverContext.remove(tsk);
console.printInfo(tsk.getId() + " is filtered out by condition resolver.");
if (tsk.isMapRedTask()) {
driverContext.incCurJobNo(1);
@@ -98,13 +108,11 @@ public class ConditionalTask extends Tas
}
}
// resolved task
- if (!driverContext.getRunnable().contains(tsk)) {
+ if (driverContext.addToRunnable(tsk)) {
console.printInfo(tsk.getId() + " is selected by condition resolver.");
- driverContext.addToRunnable(tsk);
}
}
}
- return 0;
}
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java?rev=1575583&r1=1575582&r2=1575583&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java Sat Mar 8 18:14:35 2014
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.session
**/
public class TaskRunner extends Thread {
+
protected Task<? extends Serializable> tsk;
protected TaskResult result;
protected SessionState ss;
@@ -39,6 +40,8 @@ public class TaskRunner extends Thread {
}
};
+ protected Thread runner;
+
public TaskRunner(Task<? extends Serializable> tsk, TaskResult result) {
this.tsk = tsk;
this.result = result;
@@ -49,10 +52,27 @@ public class TaskRunner extends Thread {
return tsk;
}
+ public TaskResult getTaskResult() {
+ return result;
+ }
+
+ public Thread getRunner() {
+ return runner;
+ }
+
+ public boolean isRunning() {
+ return result.isRunning();
+ }
+
@Override
public void run() {
- SessionState.start(ss);
- runSequential();
+ runner = Thread.currentThread();
+ try {
+ SessionState.start(ss);
+ runSequential();
+ } finally {
+ runner = null;
+ }
}
/**
@@ -64,6 +84,9 @@ public class TaskRunner extends Thread {
try {
exitVal = tsk.executeTask();
} catch (Throwable t) {
+ if (tsk.getException() == null) {
+ tsk.setException(t);
+ }
t.printStackTrace();
}
result.setExitVal(exitVal, tsk.getException());