You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2017/11/13 05:08:26 UTC

hive git commit: HIVE-17615: Task.executeTask has to be thread safe for parallel execution (Anishek Agarwal reviewed by Daniel Dai)

Repository: hive
Updated Branches:
  refs/heads/master 67888cfe1 -> 25a6f4cf3


HIVE-17615: Task.executeTask has to be thread safe for parallel execution (Anishek Agarwal reviewed by Daniel Dai)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/25a6f4cf
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/25a6f4cf
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/25a6f4cf

Branch: refs/heads/master
Commit: 25a6f4cf30345f47deeab84fe657f709c8a6168a
Parents: 67888cf
Author: Anishek Agarwal <an...@gmail.com>
Authored: Mon Nov 13 10:38:15 2017 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Mon Nov 13 10:38:15 2017 +0530

----------------------------------------------------------------------
 ql/src/java/org/apache/hadoop/hive/ql/Driver.java      |  3 +--
 ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java   | 13 ++++++-------
 .../org/apache/hadoop/hive/ql/exec/TaskRunner.java     |  6 +++---
 .../apache/hadoop/hive/ql/history/HiveHistoryImpl.java |  4 ++--
 .../hive/ql/parse/TestMacroSemanticAnalyzer.java       |  2 +-
 5 files changed, 13 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/25a6f4cf/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 766cbeb..017373c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -2244,8 +2244,7 @@ public class Driver implements CommandProcessor {
       console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
     }
     tsk.initialize(queryState, plan, cxt, ctx.getOpContext());
-    TaskResult tskRes = new TaskResult();
-    TaskRunner tskRun = new TaskRunner(tsk, tskRes);
+    TaskRunner tskRun = new TaskRunner(tsk);
 
     cxt.launching(tskRun);
     // Launch Task

http://git-wip-us.apache.org/repos/asf/hive/blob/25a6f4cf/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index 75603ab..1f0487f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -25,13 +25,13 @@ import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryDisplay;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.history.HiveHistory;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
@@ -196,17 +196,16 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
    *
    * @return return value of execute()
    */
-  public int executeTask() {
+  public int executeTask(HiveHistory hiveHistory) {
     try {
-      SessionState ss = SessionState.get();
       this.setStarted();
-      if (ss != null) {
-        ss.getHiveHistory().logPlanProgress(queryPlan);
+      if (hiveHistory != null) {
+        hiveHistory.logPlanProgress(queryPlan);
       }
       int retval = execute(driverContext);
       this.setDone();
-      if (ss != null) {
-        ss.getHiveHistory().logPlanProgress(queryPlan);
+      if (hiveHistory != null) {
+        hiveHistory.logPlanProgress(queryPlan);
       }
       return retval;
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/25a6f4cf/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
index eddc31e..3c988dd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
@@ -46,9 +46,9 @@ public class TaskRunner extends Thread {
 
   private static transient final Logger LOG = LoggerFactory.getLogger(TaskRunner.class);
 
-  public TaskRunner(Task<? extends Serializable> tsk, TaskResult result) {
+  public TaskRunner(Task<? extends Serializable> tsk) {
     this.tsk = tsk;
-    this.result = result;
+    this.result = new TaskResult();
     ss = SessionState.get();
   }
 
@@ -94,7 +94,7 @@ public class TaskRunner extends Thread {
   public void runSequential() {
     int exitVal = -101;
     try {
-      exitVal = tsk.executeTask();
+      exitVal = tsk.executeTask(ss == null ? null : ss.getHiveHistory());
     } catch (Throwable t) {
       if (tsk.getException() == null) {
         tsk.setException(t);

http://git-wip-us.apache.org/repos/asf/hive/blob/25a6f4cf/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
index c23d202..83d87f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
@@ -308,12 +308,12 @@ public class HiveHistoryImpl implements HiveHistory{
       new ThreadLocal<Map<String, String>>() {
     @Override
     protected Map<String,String> initialValue() {
-      return new HashMap<String,String>();
+      return new HashMap<>();
     }
   };
 
   @Override
-  public void logPlanProgress(QueryPlan plan) throws IOException {
+  public synchronized void logPlanProgress(QueryPlan plan) throws IOException {
     if (plan != null) {
       Map<String,String> ctrmap = ctrMapFactory.get();
       ctrmap.put("plan", plan.toString());

http://git-wip-us.apache.org/repos/asf/hive/blob/25a6f4cf/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMacroSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMacroSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMacroSemanticAnalyzer.java
index deba1d5..d00df78 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMacroSemanticAnalyzer.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMacroSemanticAnalyzer.java
@@ -57,7 +57,7 @@ public class TestMacroSemanticAnalyzer {
     List<Task<? extends Serializable>> rootTasks = analyzer.getRootTasks();
     Assert.assertEquals(1, rootTasks.size());
     for(Task<? extends Serializable> task : rootTasks) {
-      Assert.assertEquals(0, task.executeTask());
+      Assert.assertEquals(0, task.executeTask(null));
     }
   }
   @Test