You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2014/03/08 19:14:36 UTC

svn commit: r1575583 - in /hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql: Driver.java DriverContext.java exec/ConditionalTask.java exec/TaskRunner.java

Author: thejas
Date: Sat Mar  8 18:14:35 2014
New Revision: 1575583

URL: http://svn.apache.org/r1575583
Log:
HIVE-5901 : Query cancel should stop running MR tasks (Navis via Thejas Nair)

Modified:
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1575583&r1=1575582&r2=1575583&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Sat Mar  8 18:14:35 2014
@@ -19,6 +19,19 @@
 
 package org.apache.hadoop.hive.ql;
 
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -67,13 +80,6 @@ import org.apache.hadoop.mapred.ClusterS
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 
-import java.io.DataInput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-
 public class Driver implements CommandProcessor {
 
   static final private String CLASS_NAME = Driver.class.getName();
@@ -88,6 +94,7 @@ public class Driver implements CommandPr
   private HiveConf conf;
   private DataInput resStream;
   private Context ctx;
+  private DriverContext driverCxt;
   private QueryPlan plan;
   private Schema schema;
   private String errorMessage;
@@ -97,8 +104,9 @@ public class Driver implements CommandPr
 
   // A limit on the number of threads that can be launched
   private int maxthreads;
-  private static final int SLEEP_TIME = 2000;
-  protected int tryCount = Integer.MAX_VALUE;
+  private int tryCount = Integer.MAX_VALUE;
+
+  private boolean destroyed;
 
   private String userName;
 
@@ -1199,14 +1207,13 @@ public class Driver implements CommandPr
       // At any time, at most maxthreads tasks can be running
       // The main thread polls the TaskRunners to check if they have finished.
 
-      Queue<Task<? extends Serializable>> runnable = new ConcurrentLinkedQueue<Task<? extends Serializable>>();
-      Map<TaskResult, TaskRunner> running = new HashMap<TaskResult, TaskRunner>();
-
-      DriverContext driverCxt = new DriverContext(runnable, ctx);
+      DriverContext driverCxt = new DriverContext(ctx);
       driverCxt.prepare(plan);
 
       ctx.setHDFSCleanup(true);
 
+      this.driverCxt = driverCxt; // for canceling the query (should be bound to session?)
+
       SessionState.get().setLastMapRedStatsList(new ArrayList<MapRedStats>());
       SessionState.get().setStackTraces(new HashMap<String, List<List<String>>>());
       SessionState.get().setLocalMapRedErrors(new HashMap<String, List<String>>());
@@ -1222,27 +1229,32 @@ public class Driver implements CommandPr
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TIME_TO_SUBMIT);
       perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);
       // Loop while you either have tasks running, or tasks queued up
-      while (running.size() != 0 || runnable.peek() != null) {
+      while (!destroyed && driverCxt.isRunning()) {
+
         // Launch upto maxthreads tasks
-        while (runnable.peek() != null && running.size() < maxthreads) {
-          Task<? extends Serializable> tsk = runnable.remove();
-          perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + tsk.getName() + "." + tsk.getId());
-          launchTask(tsk, queryId, noName, running, jobname, jobs, driverCxt);
+        Task<? extends Serializable> task;
+        while ((task = driverCxt.getRunnable(maxthreads)) != null) {
+          perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + task.getName() + "." + task.getId());
+          TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
+          if (!runner.isRunning()) {
+            break;
+          }
         }
 
         // poll the Tasks to see which one completed
-        TaskResult tskRes = pollTasks(running.keySet());
-        TaskRunner tskRun = running.remove(tskRes);
-        Task<? extends Serializable> tsk = tskRun.getTask();
-        perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TASK + tsk.getName() + "." + tsk.getId());
+        TaskRunner tskRun = driverCxt.pollFinished();
+        if (tskRun == null) {
+          continue;
+        }
         hookContext.addCompleteTask(tskRun);
 
-        int exitVal = tskRes.getExitVal();
+        Task<? extends Serializable> tsk = tskRun.getTask();
+        TaskResult result = tskRun.getTaskResult();
+
+        int exitVal = result.getExitVal();
         if (exitVal != 0) {
           if (tsk.ifRetryCmdWhenFail()) {
-            if (!running.isEmpty()) {
-              taskCleanup(running);
-            }
+            driverCxt.shutdown();
             // in case we decided to run everything in local mode, restore the
             // the jobtracker setting to its initial value
             ctx.restoreOriginalTracker();
@@ -1250,7 +1262,7 @@ public class Driver implements CommandPr
           }
           Task<? extends Serializable> backupTask = tsk.getAndInitBackupTask();
           if (backupTask != null) {
-            setErrorMsgAndDetail(exitVal, tskRes.getTaskError(), tsk);
+            setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
             console.printError(errorMessage);
             errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName();
             console.printError(errorMessage);
@@ -1271,12 +1283,10 @@ public class Driver implements CommandPr
 
               perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName());
             }
-            setErrorMsgAndDetail(exitVal, tskRes.getTaskError(), tsk);
+            setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
             SQLState = "08S01";
             console.printError(errorMessage);
-            if (!running.isEmpty()) {
-              taskCleanup(running);
-            }
+            driverCxt.shutdown();
             // in case we decided to run everything in local mode, restore the
             // the jobtracker setting to its initial value
             ctx.restoreOriginalTracker();
@@ -1306,6 +1316,13 @@ public class Driver implements CommandPr
       // the jobtracker setting to its initial value
       ctx.restoreOriginalTracker();
 
+      if (driverCxt.isShutdown()) {
+        SQLState = "HY008";
+        errorMessage = "FAILED: Operation cancelled";
+        console.printError(errorMessage);
+        return 1000;
+      }
+
       // remove incomplete outputs.
       // Some incomplete outputs may be added at the beginning, for eg: for dynamic partitions.
       // remove them
@@ -1427,10 +1444,8 @@ public class Driver implements CommandPr
    * @param cxt
    *          the driver context
    */
-
-  public void launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName,
-      Map<TaskResult, TaskRunner> running, String jobname, int jobs, DriverContext cxt) {
-
+  private TaskRunner launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName,
+      String jobname, int jobs, DriverContext cxt) throws HiveException {
     if (SessionState.get() != null) {
       SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName());
     }
@@ -1447,8 +1462,7 @@ public class Driver implements CommandPr
     TaskResult tskRes = new TaskResult();
     TaskRunner tskRun = new TaskRunner(tsk, tskRes);
 
-    cxt.prepare(tskRun);
-
+    cxt.launching(tskRun);
     // Launch Task
     if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) {
       // Launch it in the parallel mode, as a separate thread only for MR tasks
@@ -1456,53 +1470,7 @@ public class Driver implements CommandPr
     } else {
       tskRun.runSequential();
     }
-    running.put(tskRes, tskRun);
-  }
-
-  /**
-   * Cleans up remaining tasks in case of failure
-   */
-  public void taskCleanup(Map<TaskResult, TaskRunner> running) {
-    for (Map.Entry<TaskResult, TaskRunner> entry : running.entrySet()) {
-      if (entry.getKey().isRunning()) {
-        Task<?> task = entry.getValue().getTask();
-        try {
-          task.shutdown();
-        } catch (Exception e) {
-          console.printError("Exception on shutting down task " + task.getId() + ": " + e);
-        }
-      }
-    }
-    running.clear();
-  }
-
-  /**
-   * Polls running tasks to see if a task has ended.
-   *
-   * @param results
-   *          Set of result objects for running tasks
-   * @return The result object for any completed/failed task
-   */
-
-  public TaskResult pollTasks(Set<TaskResult> results) {
-    Iterator<TaskResult> resultIterator = results.iterator();
-    while (true) {
-      while (resultIterator.hasNext()) {
-        TaskResult tskRes = resultIterator.next();
-        if (!tskRes.isRunning()) {
-          return tskRes;
-        }
-      }
-
-      // In this loop, nothing was found
-      // Sleep 10 seconds and restart
-      try {
-        Thread.sleep(SLEEP_TIME);
-      } catch (InterruptedException ie) {
-        // Do Nothing
-      }
-      resultIterator = results.iterator();
-    }
+    return tskRun;
   }
 
   public boolean isFetchingTable() {
@@ -1510,6 +1478,9 @@ public class Driver implements CommandPr
   }
 
   public boolean getResults(List res) throws IOException, CommandNeedRetryException {
+    if (destroyed) {
+      throw new IOException("FAILED: Operation cancelled");
+    }
     if (isFetchingTable()) {
       FetchTask ft = plan.getFetchTask();
       ft.setMaxRows(maxRows);
@@ -1597,6 +1568,10 @@ public class Driver implements CommandPr
           }
         }
       }
+      if (driverCxt != null) {
+        driverCxt.shutdown();
+        driverCxt = null;
+      }
       if (ctx != null) {
         ctx.clear();
       }
@@ -1617,6 +1592,10 @@ public class Driver implements CommandPr
   }
 
   public void destroy() {
+    if (destroyed) {
+      return;
+    }
+    destroyed = true;
     if (ctx != null) {
       try {
         releaseLocks(ctx.getHiveLocks());

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java?rev=1575583&r1=1575582&r2=1575583&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java Sat Mar  8 18:14:35 2014
@@ -26,16 +26,23 @@ import org.apache.hadoop.hive.ql.exec.St
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskRunner;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Iterator;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.session.SessionState;
 
 /**
  * DriverContext.
@@ -43,27 +50,103 @@ import java.util.Queue;
  */
 public class DriverContext {
 
-  Queue<Task<? extends Serializable>> runnable = new LinkedList<Task<? extends Serializable>>();
+  private static final Log LOG = LogFactory.getLog(Driver.class.getName());
+  private static final SessionState.LogHelper console = new SessionState.LogHelper(LOG);
+
+  private static final int SLEEP_TIME = 2000;
+
+  private Queue<Task<? extends Serializable>> runnable;
+  private Queue<TaskRunner> running;
 
   // how many jobs have been started
-  int curJobNo;
+  private int curJobNo;
 
-  Context ctx;
+  private Context ctx;
+  private boolean shutdown;
 
   final Map<String, StatsTask> statsTasks = new HashMap<String, StatsTask>(1);
 
   public DriverContext() {
-    this.runnable = null;
-    this.ctx = null;
   }
 
-  public DriverContext(Queue<Task<? extends Serializable>> runnable, Context ctx) {
-    this.runnable = runnable;
+  public DriverContext(Context ctx) {
+    this.runnable = new ConcurrentLinkedQueue<Task<? extends Serializable>>();
+    this.running = new LinkedBlockingQueue<TaskRunner>();
     this.ctx = ctx;
   }
 
-  public Queue<Task<? extends Serializable>> getRunnable() {
-    return runnable;
+  public synchronized boolean isShutdown() {
+    return shutdown;
+  }
+
+  public synchronized boolean isRunning() {
+    return !shutdown && (!running.isEmpty() || !runnable.isEmpty());
+  }
+
+  public synchronized void remove(Task<? extends Serializable> task) {
+    runnable.remove(task);
+  }
+
+  public synchronized void launching(TaskRunner runner) throws HiveException {
+    checkShutdown();
+    running.add(runner);
+  }
+
+  public synchronized Task<? extends Serializable> getRunnable(int maxthreads) throws HiveException {
+    checkShutdown();
+    if (runnable.peek() != null && running.size() < maxthreads) {
+      return runnable.remove();
+    }
+    return null;
+  }
+
+  /**
+   * Polls running tasks to see if a task has ended.
+   *
+   * @return The result object for any completed/failed task
+   */
+  public synchronized TaskRunner pollFinished() throws InterruptedException {
+    while (!shutdown) {
+      Iterator<TaskRunner> it = running.iterator();
+      while (it.hasNext()) {
+        TaskRunner runner = it.next();
+        if (runner != null && !runner.isRunning()) {
+          it.remove();
+          return runner;
+        }
+      }
+      wait(SLEEP_TIME);
+    }
+    return null;
+  }
+
+  private void checkShutdown() throws HiveException {
+    if (shutdown) {
+      throw new HiveException("FAILED: Operation cancelled");
+    }
+  }
+  /**
+   * Cleans up remaining tasks in case of failure
+   */
+  public synchronized void shutdown() {
+    LOG.warn("Shutting down query " + ctx.getCmd());
+    shutdown = true;
+    for (TaskRunner runner : running) {
+      if (runner.isRunning()) {
+        Task<?> task = runner.getTask();
+        LOG.warn("Shutting down task : " + task);
+        try {
+          task.shutdown();
+        } catch (Exception e) {
+          console.printError("Exception on shutting down task " + task.getId() + ": " + e);
+        }
+        Thread thread = runner.getRunner();
+        if (thread != null) {
+          thread.interrupt();
+        }
+      }
+    }
+    running.clear();
   }
 
   /**
@@ -80,9 +163,14 @@ public class DriverContext {
     return !tsk.getQueued() && !tsk.getInitialized() && tsk.isRunnable();
   }
 
-  public void addToRunnable(Task<? extends Serializable> tsk) {
+  public synchronized boolean addToRunnable(Task<? extends Serializable> tsk) throws HiveException {
+    if (runnable.contains(tsk)) {
+      return false;
+    }
+    checkShutdown();
     runnable.add(tsk);
     tsk.setQueued();
+    return true;
   }
 
   public int getCurJobNo() {

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java?rev=1575583&r1=1575582&r2=1575583&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java Sat Mar  8 18:14:35 2014
@@ -22,9 +22,9 @@ import java.io.Serializable;
 import java.util.List;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolver;
 import org.apache.hadoop.hive.ql.plan.ConditionalWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -81,9 +81,19 @@ public class ConditionalTask extends Tas
     resTasks = resolver.getTasks(conf, resolverCtx);
     resolved = true;
 
+    try {
+      resolveTask(driverContext);
+    } catch (Exception e) {
+      setException(e);
+      return 1;
+    }
+    return 0;
+  }
+
+  private void resolveTask(DriverContext driverContext) throws HiveException {
     for (Task<? extends Serializable> tsk : getListTasks()) {
       if (!resTasks.contains(tsk)) {
-        driverContext.getRunnable().remove(tsk);
+        driverContext.remove(tsk);
         console.printInfo(tsk.getId() + " is filtered out by condition resolver.");
         if (tsk.isMapRedTask()) {
           driverContext.incCurJobNo(1);
@@ -98,13 +108,11 @@ public class ConditionalTask extends Tas
           }
         }
         // resolved task
-        if (!driverContext.getRunnable().contains(tsk)) {
+        if (driverContext.addToRunnable(tsk)) {
           console.printInfo(tsk.getId() + " is selected by condition resolver.");
-          driverContext.addToRunnable(tsk);
         }
       }
     }
-    return 0;
   }
 
 

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java?rev=1575583&r1=1575582&r2=1575583&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java Sat Mar  8 18:14:35 2014
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.session
  **/
 
 public class TaskRunner extends Thread {
+
   protected Task<? extends Serializable> tsk;
   protected TaskResult result;
   protected SessionState ss;
@@ -39,6 +40,8 @@ public class TaskRunner extends Thread {
     }
   };
 
+  protected Thread runner;
+
   public TaskRunner(Task<? extends Serializable> tsk, TaskResult result) {
     this.tsk = tsk;
     this.result = result;
@@ -49,10 +52,27 @@ public class TaskRunner extends Thread {
     return tsk;
   }
 
+  public TaskResult getTaskResult() {
+    return result;
+  }
+
+  public Thread getRunner() {
+    return runner;
+  }
+
+  public boolean isRunning() {
+    return result.isRunning();
+  }
+
   @Override
   public void run() {
-    SessionState.start(ss);
-    runSequential();
+    runner = Thread.currentThread();
+    try {
+      SessionState.start(ss);
+      runSequential();
+    } finally {
+      runner = null;
+    }
   }
 
   /**
@@ -64,6 +84,9 @@ public class TaskRunner extends Thread {
     try {
       exitVal = tsk.executeTask();
     } catch (Throwable t) {
+      if (tsk.getException() == null) {
+        tsk.setException(t);
+      }
       t.printStackTrace();
     }
     result.setExitVal(exitVal, tsk.getException());