You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2017/11/13 05:08:26 UTC
hive git commit: HIVE-17615: Task.executeTask has to be thread safe
for parallel execution (Anishek Agarwal reviewed by Daniel Dai)
Repository: hive
Updated Branches:
refs/heads/master 67888cfe1 -> 25a6f4cf3
HIVE-17615: Task.executeTask has to be thread safe for parallel execution (Anishek Agarwal reviewed by Daniel Dai)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/25a6f4cf
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/25a6f4cf
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/25a6f4cf
Branch: refs/heads/master
Commit: 25a6f4cf30345f47deeab84fe657f709c8a6168a
Parents: 67888cf
Author: Anishek Agarwal <an...@gmail.com>
Authored: Mon Nov 13 10:38:15 2017 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Mon Nov 13 10:38:15 2017 +0530
----------------------------------------------------------------------
ql/src/java/org/apache/hadoop/hive/ql/Driver.java | 3 +--
ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java | 13 ++++++-------
.../org/apache/hadoop/hive/ql/exec/TaskRunner.java | 6 +++---
.../apache/hadoop/hive/ql/history/HiveHistoryImpl.java | 4 ++--
.../hive/ql/parse/TestMacroSemanticAnalyzer.java | 2 +-
5 files changed, 13 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/25a6f4cf/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 766cbeb..017373c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -2244,8 +2244,7 @@ public class Driver implements CommandProcessor {
console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
}
tsk.initialize(queryState, plan, cxt, ctx.getOpContext());
- TaskResult tskRes = new TaskResult();
- TaskRunner tskRun = new TaskRunner(tsk, tskRes);
+ TaskRunner tskRun = new TaskRunner(tsk);
cxt.launching(tskRun);
// Launch Task
http://git-wip-us.apache.org/repos/asf/hive/blob/25a6f4cf/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index 75603ab..1f0487f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -25,13 +25,13 @@ import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryDisplay;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.history.HiveHistory;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
@@ -196,17 +196,16 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
*
* @return return value of execute()
*/
- public int executeTask() {
+ public int executeTask(HiveHistory hiveHistory) {
try {
- SessionState ss = SessionState.get();
this.setStarted();
- if (ss != null) {
- ss.getHiveHistory().logPlanProgress(queryPlan);
+ if (hiveHistory != null) {
+ hiveHistory.logPlanProgress(queryPlan);
}
int retval = execute(driverContext);
this.setDone();
- if (ss != null) {
- ss.getHiveHistory().logPlanProgress(queryPlan);
+ if (hiveHistory != null) {
+ hiveHistory.logPlanProgress(queryPlan);
}
return retval;
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/25a6f4cf/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
index eddc31e..3c988dd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
@@ -46,9 +46,9 @@ public class TaskRunner extends Thread {
private static transient final Logger LOG = LoggerFactory.getLogger(TaskRunner.class);
- public TaskRunner(Task<? extends Serializable> tsk, TaskResult result) {
+ public TaskRunner(Task<? extends Serializable> tsk) {
this.tsk = tsk;
- this.result = result;
+ this.result = new TaskResult();
ss = SessionState.get();
}
@@ -94,7 +94,7 @@ public class TaskRunner extends Thread {
public void runSequential() {
int exitVal = -101;
try {
- exitVal = tsk.executeTask();
+ exitVal = tsk.executeTask(ss == null ? null : ss.getHiveHistory());
} catch (Throwable t) {
if (tsk.getException() == null) {
tsk.setException(t);
http://git-wip-us.apache.org/repos/asf/hive/blob/25a6f4cf/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
index c23d202..83d87f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
@@ -308,12 +308,12 @@ public class HiveHistoryImpl implements HiveHistory{
new ThreadLocal<Map<String, String>>() {
@Override
protected Map<String,String> initialValue() {
- return new HashMap<String,String>();
+ return new HashMap<>();
}
};
@Override
- public void logPlanProgress(QueryPlan plan) throws IOException {
+ public synchronized void logPlanProgress(QueryPlan plan) throws IOException {
if (plan != null) {
Map<String,String> ctrmap = ctrMapFactory.get();
ctrmap.put("plan", plan.toString());
http://git-wip-us.apache.org/repos/asf/hive/blob/25a6f4cf/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMacroSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMacroSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMacroSemanticAnalyzer.java
index deba1d5..d00df78 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMacroSemanticAnalyzer.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMacroSemanticAnalyzer.java
@@ -57,7 +57,7 @@ public class TestMacroSemanticAnalyzer {
List<Task<? extends Serializable>> rootTasks = analyzer.getRootTasks();
Assert.assertEquals(1, rootTasks.size());
for(Task<? extends Serializable> task : rootTasks) {
- Assert.assertEquals(0, task.executeTask());
+ Assert.assertEquals(0, task.executeTask(null));
}
}
@Test