You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by am...@apache.org on 2016/04/30 04:47:01 UTC
hive git commit: HIVE-13421 : Propagate job progress in operation
status
Repository: hive
Updated Branches:
refs/heads/master 9179178e4 -> 9e1fa0ce6
HIVE-13421 : Propagate job progress in operation status
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9e1fa0ce
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9e1fa0ce
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9e1fa0ce
Branch: refs/heads/master
Commit: 9e1fa0ce6f2003300640f0bee9b267e33d714084
Parents: 9179178
Author: Rajat Khandelwal <pr...@apache.org>
Authored: Sat Apr 30 08:16:51 2016 +0530
Committer: Amareshwari Sriramadasu <am...@apache.org>
Committed: Sat Apr 30 08:16:51 2016 +0530
----------------------------------------------------------------------
.../service/cli/session/TestQueryDisplay.java | 4 ++--
.../java/org/apache/hadoop/hive/ql/Driver.java | 13 +++++++++++--
.../org/apache/hadoop/hive/ql/QueryDisplay.java | 19 +++++++++++++++++--
.../org/apache/hadoop/hive/ql/QueryPlan.java | 16 +---------------
.../hadoop/hive/ql/exec/ConditionalTask.java | 1 +
.../org/apache/hadoop/hive/ql/exec/Task.java | 14 ++++++++++++--
.../hadoop/hive/ql/exec/mr/ExecDriver.java | 2 +-
.../hive/ql/exec/mr/HadoopJobExecHelper.java | 6 ++++--
.../apache/hive/service/cli/CLIServiceTest.java | 4 +++-
9 files changed, 52 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
index 98581e0..cc18ce7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
@@ -154,8 +154,8 @@ public class TestQueryDisplay {
Assert.assertTrue(qDisplay1.getPerfLogStarts(QueryDisplay.Phase.COMPILATION).size() > 0);
Assert.assertTrue(qDisplay1.getPerfLogEnds(QueryDisplay.Phase.COMPILATION).size() > 0);
- Assert.assertEquals(qDisplay1.getTaskDisplays().size(), 2);
- QueryDisplay.TaskDisplay tInfo1 = qDisplay1.getTaskDisplays().get(1);
+ Assert.assertEquals(qDisplay1.getTaskDisplays().size(), 1);
+ QueryDisplay.TaskDisplay tInfo1 = qDisplay1.getTaskDisplays().get(0);
Assert.assertEquals(tInfo1.getTaskId(), "Stage-0");
Assert.assertEquals(tInfo1.getTaskType(), StageType.DDL);
Assert.assertTrue(tInfo1.getBeginTime() > 0 && tInfo1.getBeginTime() <= System.currentTimeMillis());
http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index dad43fb..32d2cb2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -450,7 +450,7 @@ public class Driver implements CommandProcessor {
schema = getSchema(sem, conf);
plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
- queryState.getHiveOperation(), schema, queryDisplay);
+ queryState.getHiveOperation(), schema);
conf.setQueryString(queryStr);
@@ -1507,7 +1507,7 @@ public class Driver implements CommandProcessor {
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PRE_HOOK + peh.getClass().getName());
}
}
-
+ setQueryDisplays(plan.getRootTasks());
int mrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
int jobs = mrJobs
+ Utilities.getTezTasks(plan.getRootTasks()).size()
@@ -1739,6 +1739,15 @@ public class Driver implements CommandProcessor {
return (0);
}
+ private void setQueryDisplays(List<Task<? extends Serializable>> tasks) {
+ if (tasks != null) {
+ for (Task<? extends Serializable> task : tasks) {
+ task.setQueryDisplay(queryDisplay);
+ setQueryDisplays(task.getDependentTasks());
+ }
+ }
+ }
+
private void logMrWarning(int mrJobs) {
if (mrJobs <= 0 || !("mr".equals(HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE)))) {
return;
http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
index d582bc0..703e997 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
@@ -79,6 +79,8 @@ public class QueryDisplay {
private String name;
private boolean requireLock;
private boolean retryIfFail;
+ private String statusMessage;
+
// required for jackson
public TaskDisplay() {
@@ -158,15 +160,28 @@ public class QueryDisplay {
if (externalHandle == null && tTask.getExternalHandle() != null) {
this.externalHandle = tTask.getExternalHandle();
}
+ setStatusMessage(tTask.getStatusMessage());
switch (taskState) {
case RUNNING:
- beginTime = System.currentTimeMillis();
+ if (beginTime == null) {
+ beginTime = System.currentTimeMillis();
+ }
break;
case FINISHED:
- endTime = System.currentTimeMillis();
+ if (endTime == null) {
+ endTime = System.currentTimeMillis();
+ }
break;
}
}
+
+ public synchronized String getStatusMessage() {
+ return statusMessage;
+ }
+
+ public synchronized void setStatusMessage(String statusMessage) {
+ this.statusMessage = statusMessage;
+ }
}
public synchronized void setTaskResult(String taskId, TaskResult result) {
TaskDisplay taskDisplay = tasks.get(taskId);
http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
index ef0923d..e8c8ae6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
@@ -113,26 +113,12 @@ public class QueryPlan implements Serializable {
}
public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
- HiveOperation operation, Schema resultSchema) {
- this(queryString, sem, startTime, queryId, operation, resultSchema, null);
- }
- public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
- HiveOperation operation, Schema resultSchema, QueryDisplay queryDisplay) {
+ HiveOperation operation, Schema resultSchema) {
this.queryString = queryString;
rootTasks = new ArrayList<Task<? extends Serializable>>(sem.getAllRootTasks());
reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
fetchTask = sem.getFetchTask();
- if (queryDisplay != null) {
- if (fetchTask != null) {
- fetchTask.setQueryDisplay(queryDisplay);
- }
- if (rootTasks!= null) {
- for (Task t : rootTasks) {
- t.setQueryDisplay(queryDisplay);
- }
- }
- }
// Note that inputs and outputs can be changed when the query gets executed
inputs = sem.getAllInputs();
outputs = sem.getAllOutputs();
http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
index c96c813..52cb445 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
@@ -200,6 +200,7 @@ public class ConditionalTask extends Task<ConditionalWork> implements Serializab
public boolean addDependentTask(Task<? extends Serializable> dependent) {
boolean ret = false;
if (getListTasks() != null) {
+ ret = true;
for (Task<? extends Serializable> tsk : getListTasks()) {
ret = ret & tsk.addDependentTask(dependent);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index 897af5e..34bdafd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -83,8 +83,18 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
protected String id;
protected T work;
private TaskState taskState = TaskState.CREATED;
+ private String statusMessage;
private transient boolean fetchSource;
+ public void setStatusMessage(String statusMessage) {
+ this.statusMessage = statusMessage;
+ updateStatusInQueryDisplay();
+ }
+
+ public String getStatusMessage() {
+ return statusMessage;
+ }
+
public enum FeedType {
DYNAMIC_PARTITIONS, // list of dynamic partitions
}
@@ -138,13 +148,13 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
this.queryDisplay = queryDisplay;
}
- private void updateStatusInQueryDisplay() {
+ protected void updateStatusInQueryDisplay() {
if (queryDisplay != null) {
queryDisplay.updateTaskStatus(this);
}
}
- private void setState(TaskState state) {
+ protected void setState(TaskState state) {
this.taskState = state;
updateStatusInQueryDisplay();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index 639b0da..926f6e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -432,7 +432,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
// Finally SUBMIT the JOB!
rj = jc.submitJob(job);
this.jobID = rj.getJobID();
-
+ updateStatusInQueryDisplay();
returnVal = jobExecHelper.progress(rj, jc);
success = (returnVal == 0);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
index 760ba6c..11f5cfd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
@@ -367,12 +367,14 @@ public class HadoopJobExecHelper {
}
}
console.printInfo(output);
+ task.setStatusMessage(output);
reportTime = System.currentTimeMillis();
}
if (cpuMsec > 0) {
- console.printInfo("MapReduce Total cumulative CPU time: "
- + Utilities.formatMsecToStr(cpuMsec));
+ String status = "MapReduce Total cumulative CPU time: " + Utilities.formatMsecToStr(cpuMsec);
+ console.printInfo(status);
+ task.setStatusMessage(status);
}
boolean success;
http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
index ff7e9a4..fb8ee4c 100644
--- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
+++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
@@ -641,7 +641,8 @@ public abstract class CLIServiceTest {
SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay);
assertNotNull(sessionHandle);
// nonblocking execute
- String select = "SELECT ID + ' ' FROM TEST_EXEC_ASYNC";
+ String select = "select a.id, b.id from (SELECT ID + ' ' `ID` FROM TEST_EXEC_ASYNC) a full outer join "
+ + "(SELECT ID + ' ' `ID` FROM TEST_EXEC_ASYNC) b on a.ID=b.ID";
OperationHandle ophandle =
client.executeStatementAsync(sessionHandle, select, confOverlay);
@@ -697,6 +698,7 @@ public abstract class CLIServiceTest {
case FINISHED:
if (taskDisplay.getTaskType() == StageType.MAPRED || taskDisplay.getTaskType() == StageType.MAPREDLOCAL) {
assertNotNull(taskDisplay.getExternalHandle());
+ assertNotNull(taskDisplay.getStatusMessage());
}
assertNotNull(taskDisplay.getBeginTime());
assertNotNull(taskDisplay.getEndTime());