You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by am...@apache.org on 2016/03/14 08:01:53 UTC
hive git commit: HIVE-4570 : Add more information to
GetOperationStatus in Hive Server2 when query is still executing (Rajat
Khandelwal, reviwed by Amareshwari)
Repository: hive
Updated Branches:
refs/heads/master ca165db8d -> de260b45d
HIVE-4570 : Add more information to GetOperationStatus in Hive Server2 when query is still executing (Rajat Khandelwal, reviwed by Amareshwari)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/de260b45
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/de260b45
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/de260b45
Branch: refs/heads/master
Commit: de260b45d7334dbc8b0e728ed141582e9e74a380
Parents: ca165db
Author: Rajat Khandelwal <pr...@apache.org>
Authored: Mon Mar 14 12:31:34 2016 +0530
Committer: Amareshwari Sriramadasu <am...@apache.org>
Committed: Mon Mar 14 12:31:34 2016 +0530
----------------------------------------------------------------------
.../cli/TestEmbeddedThriftBinaryCLIService.java | 1 +
.../service/cli/session/TestQueryDisplay.java | 7 +-
.../java/org/apache/hadoop/hive/ql/Driver.java | 11 +-
.../org/apache/hadoop/hive/ql/QueryDisplay.java | 133 +++++---
.../org/apache/hadoop/hive/ql/QueryPlan.java | 14 +
.../org/apache/hadoop/hive/ql/exec/Task.java | 75 +++--
.../hadoop/hive/ql/exec/mr/ExecDriver.java | 6 +
.../hadoop/hive/ql/exec/mr/MapRedTask.java | 8 +-
.../hadoop/hive/ql/history/HiveHistory.java | 2 +-
service-rpc/if/TCLIService.thrift | 9 +
.../gen/thrift/gen-cpp/TCLIService_types.cpp | 66 ++++
.../src/gen/thrift/gen-cpp/TCLIService_types.h | 28 +-
.../rpc/thrift/TGetOperationStatusResp.java | 312 ++++++++++++++++++-
service-rpc/src/gen/thrift/gen-php/Types.php | 69 ++++
.../src/gen/thrift/gen-py/TCLIService/ttypes.py | 41 ++-
.../gen/thrift/gen-rb/t_c_l_i_service_types.rb | 8 +-
.../org/apache/hive/tmpl/QueryProfileTmpl.jamon | 18 +-
.../hive/service/cli/OperationStatus.java | 20 +-
.../hive/service/cli/operation/Operation.java | 41 ++-
.../service/cli/operation/SQLOperation.java | 49 ++-
.../service/cli/thrift/ThriftCLIService.java | 3 +
.../cli/thrift/ThriftCLIServiceClient.java | 3 +-
.../apache/hive/service/cli/CLIServiceTest.java | 104 ++++++-
23 files changed, 902 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java
index de66d9e..ac9b306 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java
@@ -37,6 +37,7 @@ public class TestEmbeddedThriftBinaryCLIService extends CLIServiceTest {
public static void setUpBeforeClass() throws Exception {
service = new EmbeddedThriftBinaryCLIService();
HiveConf conf = new HiveConf();
+ conf.setBoolean("datanucleus.schema.autoCreateTables", true);
conf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
service.init(conf);
client = new ThriftCLIServiceClient(service);
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
index 9765b9d..418f71e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
@@ -23,15 +23,12 @@ import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.service.cli.OperationHandle;
import org.apache.hive.service.cli.SessionHandle;
-import org.apache.hive.service.cli.operation.ExecuteStatementOperation;
import org.apache.hive.service.cli.operation.SQLOperationDisplay;
import org.apache.hive.service.rpc.thrift.TProtocolVersion;
import org.apache.hive.service.server.HiveServer2;
-import org.apache.hive.service.servlet.QueryProfileServlet;
import org.apache.hive.tmpl.QueryProfileTmpl;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import java.io.StringWriter;
@@ -155,8 +152,8 @@ public class TestQueryDisplay {
Assert.assertTrue(qDisplay1.getPerfLogStarts(QueryDisplay.Phase.COMPILATION).size() > 0);
Assert.assertTrue(qDisplay1.getPerfLogEnds(QueryDisplay.Phase.COMPILATION).size() > 0);
- Assert.assertEquals(qDisplay1.getTaskInfos().size(), 1);
- QueryDisplay.TaskInfo tInfo1 = qDisplay1.getTaskInfos().get(0);
+ Assert.assertEquals(qDisplay1.getTaskDisplays().size(), 2);
+ QueryDisplay.TaskDisplay tInfo1 = qDisplay1.getTaskDisplays().get(1);
Assert.assertEquals(tInfo1.getTaskId(), "Stage-0");
Assert.assertEquals(tInfo1.getTaskType(), StageType.DDL);
Assert.assertTrue(tInfo1.getBeginTime() > 0 && tInfo1.getBeginTime() <= System.currentTimeMillis());
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index f0fda05..7327a42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -496,7 +496,7 @@ public class Driver implements CommandProcessor {
schema = getSchema(sem, conf);
plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
- SessionState.get().getHiveOperation(), schema);
+ SessionState.get().getHiveOperation(), schema, queryDisplay);
conf.setQueryString(queryStr);
@@ -1189,7 +1189,7 @@ public class Driver implements CommandProcessor {
private int compileInternal(String command) {
int ret;
final ReentrantLock compileLock = tryAcquireCompileLock(isParallelEnabled,
- command);
+ command);
if (compileLock == null) {
return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode();
}
@@ -1232,8 +1232,8 @@ public class Driver implements CommandProcessor {
final ReentrantLock compileLock = isParallelEnabled ?
SessionState.get().getCompileLock() : globalCompileLock;
long maxCompileLockWaitTime = HiveConf.getTimeVar(
- this.conf, ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT,
- TimeUnit.SECONDS);
+ this.conf, ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT,
+ TimeUnit.SECONDS);
if (maxCompileLockWaitTime > 0) {
try {
if (LOG.isDebugEnabled()) {
@@ -1576,7 +1576,6 @@ public class Driver implements CommandProcessor {
// Launch upto maxthreads tasks
Task<? extends Serializable> task;
while ((task = driverCxt.getRunnable(maxthreads)) != null) {
- queryDisplay.addTask(task);
TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
if (!runner.isRunning()) {
break;
@@ -1589,7 +1588,7 @@ public class Driver implements CommandProcessor {
continue;
}
hookContext.addCompleteTask(tskRun);
- queryDisplay.setTaskCompleted(tskRun.getTask().getId(), tskRun.getTaskResult());
+ queryDisplay.setTaskResult(tskRun.getTask().getId(), tskRun.getTaskResult());
Task<? extends Serializable> tsk = tskRun.getTask();
TaskResult result = tskRun.getTaskResult();
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
index c87c825..467dab6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
@@ -22,11 +22,12 @@ import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskResult;
import org.apache.hadoop.hive.ql.plan.api.StageType;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+import java.io.Serializable;
+import java.util.*;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonWriteNullProperties;
+import org.codehaus.jackson.annotate.JsonIgnore;
/**
* Some limited query information to save for WebUI.
@@ -41,39 +42,56 @@ public class QueryDisplay {
private String errorMessage;
private String queryId;
- private final Map<Phase, Map<String, Long>> hmsTimingMap = new HashMap();
- private final Map<Phase, Map<String, Long>> perfLogStartMap = new HashMap();
- private final Map<Phase, Map<String, Long>> perfLogEndMap = new HashMap();
+ private final Map<Phase, Map<String, Long>> hmsTimingMap = new HashMap<Phase, Map<String, Long>>();
+ private final Map<Phase, Map<String, Long>> perfLogStartMap = new HashMap<Phase, Map<String, Long>>();
+ private final Map<Phase, Map<String, Long>> perfLogEndMap = new HashMap<Phase, Map<String, Long>>();
+
+ private final LinkedHashMap<String, TaskDisplay> tasks = new LinkedHashMap<String, TaskDisplay>();
- private final LinkedHashMap<String, TaskInfo> tasks = new LinkedHashMap<String, TaskInfo>();
+ public synchronized <T extends Serializable> void updateTaskStatus(Task<T> tTask) {
+ if (!tasks.containsKey(tTask.getId())) {
+ tasks.put(tTask.getId(), new TaskDisplay(tTask));
+ }
+ tasks.get(tTask.getId()).updateStatus(tTask);
+ }
//Inner classes
- public static enum Phase {
+ public enum Phase {
COMPILATION,
EXECUTION,
}
- public static class TaskInfo {
+ @JsonWriteNullProperties(false)
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class TaskDisplay {
+
private Integer returnVal; //if set, determines that task is complete.
private String errorMsg;
- private long endTime;
- final long beginTime;
- final String taskId;
- final StageType taskType;
- final String name;
- final boolean requireLock;
- final boolean retryIfFail;
+ private Long beginTime;
+ private Long endTime;
+
+ private String taskId;
+ private String taskExternalHandle;
+
+ public Task.TaskState taskState;
+ private StageType taskType;
+ private String name;
+ private boolean requireLock;
+ private boolean retryIfFail;
+ // required for jackson
+ public TaskDisplay() {
- public TaskInfo (Task task) {
- beginTime = System.currentTimeMillis();
+ }
+ public TaskDisplay(Task task) {
taskId = task.getId();
+ taskExternalHandle = task.getExternalHandle();
taskType = task.getType();
name = task.getName();
requireLock = task.requireLock();
retryIfFail = task.ifRetryCmdWhenFail();
}
-
+ @JsonIgnore
public synchronized String getStatus() {
if (returnVal == null) {
return "Running";
@@ -84,67 +102,82 @@ public class QueryDisplay {
}
}
- public synchronized long getElapsedTime() {
- if (endTime == 0) {
+ public synchronized Long getElapsedTime() {
+ if (endTime == null) {
+ if (beginTime == null) {
+ return null;
+ }
return System.currentTimeMillis() - beginTime;
} else {
return endTime - beginTime;
}
}
+ public synchronized Integer getReturnValue() {
+ return returnVal;
+ }
+
public synchronized String getErrorMsg() {
return errorMsg;
}
- public synchronized long getEndTime() {
- return endTime;
+ public synchronized Long getBeginTime() {
+ return beginTime;
}
- //Following methods do not need to be synchronized, because they are final fields.
- public long getBeginTime() {
- return beginTime;
+ public synchronized Long getEndTime() {
+ return endTime;
}
- public String getTaskId() {
+ public synchronized String getTaskId() {
return taskId;
}
- public StageType getTaskType() {
+ public synchronized StageType getTaskType() {
return taskType;
}
- public String getName() {
+ public synchronized String getName() {
return name;
}
-
- public boolean isRequireLock() {
+ @JsonIgnore
+ public synchronized boolean isRequireLock() {
return requireLock;
}
-
- public boolean isRetryIfFail() {
+ @JsonIgnore
+ public synchronized boolean isRetryIfFail() {
return retryIfFail;
}
- }
- public synchronized void addTask(Task task) {
- tasks.put(task.getId(), new TaskInfo(task));
- }
+ public synchronized String getExternalHandle() {
+ return taskExternalHandle;
+ }
- public synchronized void setTaskCompleted(String taskId, TaskResult result) {
- TaskInfo taskInfo = tasks.get(taskId);
- if (taskInfo != null) {
- taskInfo.returnVal = result.getExitVal();
+ public synchronized <T extends Serializable> void updateStatus(Task<T> tTask) {
+ this.taskState = tTask.getTaskState();
+ switch(taskState) {
+ case RUNNING:
+ beginTime = System.currentTimeMillis();
+ break;
+ case FINISHED:
+ endTime = System.currentTimeMillis();
+ break;
+ }
+ }
+ }
+ public synchronized void setTaskResult(String taskId, TaskResult result) {
+ TaskDisplay taskDisplay = tasks.get(taskId);
+ if (taskDisplay != null) {
+ taskDisplay.returnVal = result.getExitVal();
if (result.getTaskError() != null) {
- taskInfo.errorMsg = result.getTaskError().toString();
+ taskDisplay.errorMsg = result.getTaskError().toString();
}
- taskInfo.endTime = System.currentTimeMillis();
}
}
-
- public synchronized List<TaskInfo> getTaskInfos() {
- List<TaskInfo> taskInfos = new ArrayList<TaskInfo>();
- taskInfos.addAll(tasks.values());
- return taskInfos;
+ public synchronized List<TaskDisplay> getTaskDisplays() {
+ List<TaskDisplay> taskDisplays = new ArrayList<TaskDisplay>();
+ taskDisplays.addAll(tasks.values());
+ return taskDisplays;
}
public synchronized void setQueryStr(String queryStr) {
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
index 4933b34..ef0923d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
@@ -114,11 +114,25 @@ public class QueryPlan implements Serializable {
public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
HiveOperation operation, Schema resultSchema) {
+ this(queryString, sem, startTime, queryId, operation, resultSchema, null);
+ }
+ public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
+ HiveOperation operation, Schema resultSchema, QueryDisplay queryDisplay) {
this.queryString = queryString;
rootTasks = new ArrayList<Task<? extends Serializable>>(sem.getAllRootTasks());
reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
fetchTask = sem.getFetchTask();
+ if (queryDisplay != null) {
+ if (fetchTask != null) {
+ fetchTask.setQueryDisplay(queryDisplay);
+ }
+ if (rootTasks!= null) {
+ for (Task t : rootTasks) {
+ t.setQueryDisplay(queryDisplay);
+ }
+ }
+ }
// Note that inputs and outputs can be changed when the query gets executed
inputs = sem.getAllInputs();
outputs = sem.getAllOutputs();
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index e199e5e..6c677f5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -30,9 +30,7 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.CompilationOpContext;
-import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.*;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -52,10 +50,6 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
private static final long serialVersionUID = 1L;
public transient HashMap<String, Long> taskCounters;
public transient TaskHandle taskHandle;
- protected transient boolean started;
- protected transient boolean initialized;
- protected transient boolean isdone;
- protected transient boolean queued;
protected transient HiveConf conf;
protected transient LogHelper console;
protected transient QueryPlan queryPlan;
@@ -81,18 +75,32 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
// created in case the mapjoin failed.
public static final int MAPJOIN_ONLY_NOBACKUP = 7;
public static final int CONVERTED_SORTMERGEJOIN = 8;
-
+ public QueryDisplay queryDisplay = null;
// Descendants tasks who subscribe feeds from this task
protected transient List<Task<? extends Serializable>> feedSubscribers;
protected String id;
protected T work;
-
+ private TaskState taskState = TaskState.CREATED;
private transient boolean fetchSource;
- public static enum FeedType {
+ public enum FeedType {
DYNAMIC_PARTITIONS, // list of dynamic partitions
}
+ public enum TaskState {
+ // Task data structures have been initialized
+ INITIALIZED,
+ // Task has been queued for execution by the driver
+ QUEUED,
+ // Task is currently running
+ RUNNING,
+ // Task has completed
+ FINISHED,
+ // Task is just created
+ CREATED,
+ // Task state is unkown
+ UNKNOWN
+ }
// Bean methods
@@ -108,10 +116,6 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
private Throwable exception;
public Task() {
- isdone = false;
- started = false;
- initialized = false;
- queued = false;
this.taskCounters = new HashMap<String, Long>();
taskTag = Task.NO_TAG;
}
@@ -123,13 +127,25 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext,
CompilationOpContext opContext) {
this.queryPlan = queryPlan;
- isdone = false;
- started = false;
setInitialized();
this.conf = conf;
this.driverContext = driverContext;
console = new LogHelper(LOG);
}
+ public void setQueryDisplay(QueryDisplay queryDisplay) {
+ this.queryDisplay = queryDisplay;
+ }
+
+ private void updateStatusInQueryDisplay() {
+ if (queryDisplay != null) {
+ queryDisplay.updateTaskStatus(this);
+ }
+ }
+
+ private void setState(TaskState state) {
+ this.taskState = state;
+ updateStatusInQueryDisplay();
+ }
protected Hive getHive() {
try {
@@ -323,37 +339,36 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
}
}
}
-
public void setStarted() {
- this.started = true;
+ setState(TaskState.RUNNING);
}
public boolean started() {
- return started;
+ return taskState == TaskState.RUNNING;
}
public boolean done() {
- return isdone;
+ return taskState == TaskState.FINISHED;
}
public void setDone() {
- isdone = true;
+ setState(TaskState.FINISHED);
}
public void setQueued() {
- queued = true;
+ setState(TaskState.QUEUED);
}
public boolean getQueued() {
- return queued;
+ return taskState == TaskState.QUEUED;
}
public void setInitialized() {
- initialized = true;
+ setState(TaskState.INITIALIZED);
}
public boolean getInitialized() {
- return initialized;
+ return taskState == TaskState.INITIALIZED;
}
public boolean isRunnable() {
@@ -391,6 +406,14 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
return id;
}
+ public String getExternalHandle() {
+ return null;
+ }
+
+ public TaskState getTaskState() {
+ return taskState;
+ }
+
public boolean isMapRedTask() {
return false;
}
@@ -572,4 +595,6 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
public boolean equals(Object obj) {
return toString().equals(String.valueOf(obj));
}
+
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index ce020a5..d164859 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -430,6 +430,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
// Finally SUBMIT the JOB!
rj = jc.submitJob(job);
+ this.jobID = rj.getJobID();
returnVal = jobExecHelper.progress(rj, jc);
success = (returnVal == 0);
@@ -849,5 +850,10 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
rj = null;
}
}
+
+ @Override
+ public String getExternalHandle() {
+ return this.jobID;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
index 5bc3d9e..310356c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
@@ -363,25 +363,25 @@ public class MapRedTask extends ExecDriver implements Serializable {
@Override
public boolean mapStarted() {
boolean b = super.mapStarted();
- return runningViaChild ? isdone : b;
+ return runningViaChild ? done() : b;
}
@Override
public boolean reduceStarted() {
boolean b = super.reduceStarted();
- return runningViaChild ? isdone : b;
+ return runningViaChild ? done() : b;
}
@Override
public boolean mapDone() {
boolean b = super.mapDone();
- return runningViaChild ? isdone : b;
+ return runningViaChild ? done() : b;
}
@Override
public boolean reduceDone() {
boolean b = super.reduceDone();
- return runningViaChild ? isdone : b;
+ return runningViaChild ? done() : b;
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
index 45cd533..687f551 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
@@ -109,7 +109,7 @@ public interface HiveHistory {
};
/**
- * TaskInfo.
+ * TaskDisplay.
*
*/
public static class TaskInfo extends Info {
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/service-rpc/if/TCLIService.thrift
----------------------------------------------------------------------
diff --git a/service-rpc/if/TCLIService.thrift b/service-rpc/if/TCLIService.thrift
index 0aa9d13..aa28b6e 100644
--- a/service-rpc/if/TCLIService.thrift
+++ b/service-rpc/if/TCLIService.thrift
@@ -977,6 +977,15 @@ struct TGetOperationStatusResp {
// Error message
5: optional string errorMessage
+
+ // List of statuses of sub tasks
+ 6: optional string taskStatus
+
+ // When was the operation started
+ 7: optional i64 operationStarted
+ // When was the operation completed
+ 8: optional i64 operationCompleted
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
index e62e8b7..3a27a60 100644
--- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
+++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
@@ -7612,6 +7612,21 @@ void TGetOperationStatusResp::__set_errorMessage(const std::string& val) {
__isset.errorMessage = true;
}
+void TGetOperationStatusResp::__set_taskStatus(const std::string& val) {
+ this->taskStatus = val;
+__isset.taskStatus = true;
+}
+
+void TGetOperationStatusResp::__set_operationStarted(const int64_t val) {
+ this->operationStarted = val;
+__isset.operationStarted = true;
+}
+
+void TGetOperationStatusResp::__set_operationCompleted(const int64_t val) {
+ this->operationCompleted = val;
+__isset.operationCompleted = true;
+}
+
uint32_t TGetOperationStatusResp::read(::apache::thrift::protocol::TProtocol* iprot) {
apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
@@ -7676,6 +7691,30 @@ uint32_t TGetOperationStatusResp::read(::apache::thrift::protocol::TProtocol* ip
xfer += iprot->skip(ftype);
}
break;
+ case 6:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->taskStatus);
+ this->__isset.taskStatus = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 7:
+ if (ftype == ::apache::thrift::protocol::T_I64) {
+ xfer += iprot->readI64(this->operationStarted);
+ this->__isset.operationStarted = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 8:
+ if (ftype == ::apache::thrift::protocol::T_I64) {
+ xfer += iprot->readI64(this->operationCompleted);
+ this->__isset.operationCompleted = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
default:
xfer += iprot->skip(ftype);
break;
@@ -7719,6 +7758,21 @@ uint32_t TGetOperationStatusResp::write(::apache::thrift::protocol::TProtocol* o
xfer += oprot->writeString(this->errorMessage);
xfer += oprot->writeFieldEnd();
}
+ if (this->__isset.taskStatus) {
+ xfer += oprot->writeFieldBegin("taskStatus", ::apache::thrift::protocol::T_STRING, 6);
+ xfer += oprot->writeString(this->taskStatus);
+ xfer += oprot->writeFieldEnd();
+ }
+ if (this->__isset.operationStarted) {
+ xfer += oprot->writeFieldBegin("operationStarted", ::apache::thrift::protocol::T_I64, 7);
+ xfer += oprot->writeI64(this->operationStarted);
+ xfer += oprot->writeFieldEnd();
+ }
+ if (this->__isset.operationCompleted) {
+ xfer += oprot->writeFieldBegin("operationCompleted", ::apache::thrift::protocol::T_I64, 8);
+ xfer += oprot->writeI64(this->operationCompleted);
+ xfer += oprot->writeFieldEnd();
+ }
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -7731,6 +7785,9 @@ void swap(TGetOperationStatusResp &a, TGetOperationStatusResp &b) {
swap(a.sqlState, b.sqlState);
swap(a.errorCode, b.errorCode);
swap(a.errorMessage, b.errorMessage);
+ swap(a.taskStatus, b.taskStatus);
+ swap(a.operationStarted, b.operationStarted);
+ swap(a.operationCompleted, b.operationCompleted);
swap(a.__isset, b.__isset);
}
@@ -7740,6 +7797,9 @@ TGetOperationStatusResp::TGetOperationStatusResp(const TGetOperationStatusResp&
sqlState = other263.sqlState;
errorCode = other263.errorCode;
errorMessage = other263.errorMessage;
+ taskStatus = other263.taskStatus;
+ operationStarted = other263.operationStarted;
+ operationCompleted = other263.operationCompleted;
__isset = other263.__isset;
}
TGetOperationStatusResp& TGetOperationStatusResp::operator=(const TGetOperationStatusResp& other264) {
@@ -7748,6 +7808,9 @@ TGetOperationStatusResp& TGetOperationStatusResp::operator=(const TGetOperationS
sqlState = other264.sqlState;
errorCode = other264.errorCode;
errorMessage = other264.errorMessage;
+ taskStatus = other264.taskStatus;
+ operationStarted = other264.operationStarted;
+ operationCompleted = other264.operationCompleted;
__isset = other264.__isset;
return *this;
}
@@ -7759,6 +7822,9 @@ void TGetOperationStatusResp::printTo(std::ostream& out) const {
out << ", " << "sqlState="; (__isset.sqlState ? (out << to_string(sqlState)) : (out << "<null>"));
out << ", " << "errorCode="; (__isset.errorCode ? (out << to_string(errorCode)) : (out << "<null>"));
out << ", " << "errorMessage="; (__isset.errorMessage ? (out << to_string(errorMessage)) : (out << "<null>"));
+ out << ", " << "taskStatus="; (__isset.taskStatus ? (out << to_string(taskStatus)) : (out << "<null>"));
+ out << ", " << "operationStarted="; (__isset.operationStarted ? (out << to_string(operationStarted)) : (out << "<null>"));
+ out << ", " << "operationCompleted="; (__isset.operationCompleted ? (out << to_string(operationCompleted)) : (out << "<null>"));
out << ")";
}
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
index 5364293..7f1d9dd 100644
--- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
+++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
@@ -3408,11 +3408,14 @@ inline std::ostream& operator<<(std::ostream& out, const TGetOperationStatusReq&
}
typedef struct _TGetOperationStatusResp__isset {
- _TGetOperationStatusResp__isset() : operationState(false), sqlState(false), errorCode(false), errorMessage(false) {}
+ _TGetOperationStatusResp__isset() : operationState(false), sqlState(false), errorCode(false), errorMessage(false), taskStatus(false), operationStarted(false), operationCompleted(false) {}
bool operationState :1;
bool sqlState :1;
bool errorCode :1;
bool errorMessage :1;
+ bool taskStatus :1;
+ bool operationStarted :1;
+ bool operationCompleted :1;
} _TGetOperationStatusResp__isset;
class TGetOperationStatusResp {
@@ -3420,7 +3423,7 @@ class TGetOperationStatusResp {
TGetOperationStatusResp(const TGetOperationStatusResp&);
TGetOperationStatusResp& operator=(const TGetOperationStatusResp&);
- TGetOperationStatusResp() : operationState((TOperationState::type)0), sqlState(), errorCode(0), errorMessage() {
+ TGetOperationStatusResp() : operationState((TOperationState::type)0), sqlState(), errorCode(0), errorMessage(), taskStatus(), operationStarted(0), operationCompleted(0) {
}
virtual ~TGetOperationStatusResp() throw();
@@ -3429,6 +3432,9 @@ class TGetOperationStatusResp {
std::string sqlState;
int32_t errorCode;
std::string errorMessage;
+ std::string taskStatus;
+ int64_t operationStarted;
+ int64_t operationCompleted;
_TGetOperationStatusResp__isset __isset;
@@ -3442,6 +3448,12 @@ class TGetOperationStatusResp {
void __set_errorMessage(const std::string& val);
+ void __set_taskStatus(const std::string& val);
+
+ void __set_operationStarted(const int64_t val);
+
+ void __set_operationCompleted(const int64_t val);
+
bool operator == (const TGetOperationStatusResp & rhs) const
{
if (!(status == rhs.status))
@@ -3462,6 +3474,18 @@ class TGetOperationStatusResp {
return false;
else if (__isset.errorMessage && !(errorMessage == rhs.errorMessage))
return false;
+ if (__isset.taskStatus != rhs.__isset.taskStatus)
+ return false;
+ else if (__isset.taskStatus && !(taskStatus == rhs.taskStatus))
+ return false;
+ if (__isset.operationStarted != rhs.__isset.operationStarted)
+ return false;
+ else if (__isset.operationStarted && !(operationStarted == rhs.operationStarted))
+ return false;
+ if (__isset.operationCompleted != rhs.__isset.operationCompleted)
+ return false;
+ else if (__isset.operationCompleted && !(operationCompleted == rhs.operationCompleted))
+ return false;
return true;
}
bool operator != (const TGetOperationStatusResp &rhs) const {
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusResp.java
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusResp.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusResp.java
index a7a8ebc..3049280 100644
--- a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusResp.java
+++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusResp.java
@@ -43,6 +43,9 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
private static final org.apache.thrift.protocol.TField SQL_STATE_FIELD_DESC = new org.apache.thrift.protocol.TField("sqlState", org.apache.thrift.protocol.TType.STRING, (short)3);
private static final org.apache.thrift.protocol.TField ERROR_CODE_FIELD_DESC = new org.apache.thrift.protocol.TField("errorCode", org.apache.thrift.protocol.TType.I32, (short)4);
private static final org.apache.thrift.protocol.TField ERROR_MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("errorMessage", org.apache.thrift.protocol.TType.STRING, (short)5);
+ private static final org.apache.thrift.protocol.TField TASK_STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("taskStatus", org.apache.thrift.protocol.TType.STRING, (short)6);
+ private static final org.apache.thrift.protocol.TField OPERATION_STARTED_FIELD_DESC = new org.apache.thrift.protocol.TField("operationStarted", org.apache.thrift.protocol.TType.I64, (short)7);
+ private static final org.apache.thrift.protocol.TField OPERATION_COMPLETED_FIELD_DESC = new org.apache.thrift.protocol.TField("operationCompleted", org.apache.thrift.protocol.TType.I64, (short)8);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -55,6 +58,9 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
private String sqlState; // optional
private int errorCode; // optional
private String errorMessage; // optional
+ private String taskStatus; // optional
+ private long operationStarted; // optional
+ private long operationCompleted; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -66,7 +72,10 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
OPERATION_STATE((short)2, "operationState"),
SQL_STATE((short)3, "sqlState"),
ERROR_CODE((short)4, "errorCode"),
- ERROR_MESSAGE((short)5, "errorMessage");
+ ERROR_MESSAGE((short)5, "errorMessage"),
+ TASK_STATUS((short)6, "taskStatus"),
+ OPERATION_STARTED((short)7, "operationStarted"),
+ OPERATION_COMPLETED((short)8, "operationCompleted");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -91,6 +100,12 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
return ERROR_CODE;
case 5: // ERROR_MESSAGE
return ERROR_MESSAGE;
+ case 6: // TASK_STATUS
+ return TASK_STATUS;
+ case 7: // OPERATION_STARTED
+ return OPERATION_STARTED;
+ case 8: // OPERATION_COMPLETED
+ return OPERATION_COMPLETED;
default:
return null;
}
@@ -132,8 +147,10 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
// isset id assignments
private static final int __ERRORCODE_ISSET_ID = 0;
+ private static final int __OPERATIONSTARTED_ISSET_ID = 1;
+ private static final int __OPERATIONCOMPLETED_ISSET_ID = 2;
private byte __isset_bitfield = 0;
- private static final _Fields optionals[] = {_Fields.OPERATION_STATE,_Fields.SQL_STATE,_Fields.ERROR_CODE,_Fields.ERROR_MESSAGE};
+ private static final _Fields optionals[] = {_Fields.OPERATION_STATE,_Fields.SQL_STATE,_Fields.ERROR_CODE,_Fields.ERROR_MESSAGE,_Fields.TASK_STATUS,_Fields.OPERATION_STARTED,_Fields.OPERATION_COMPLETED};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -147,6 +164,12 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
tmpMap.put(_Fields.ERROR_MESSAGE, new org.apache.thrift.meta_data.FieldMetaData("errorMessage", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.TASK_STATUS, new org.apache.thrift.meta_data.FieldMetaData("taskStatus", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.OPERATION_STARTED, new org.apache.thrift.meta_data.FieldMetaData("operationStarted", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+ tmpMap.put(_Fields.OPERATION_COMPLETED, new org.apache.thrift.meta_data.FieldMetaData("operationCompleted", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TGetOperationStatusResp.class, metaDataMap);
}
@@ -179,6 +202,11 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
if (other.isSetErrorMessage()) {
this.errorMessage = other.errorMessage;
}
+ if (other.isSetTaskStatus()) {
+ this.taskStatus = other.taskStatus;
+ }
+ this.operationStarted = other.operationStarted;
+ this.operationCompleted = other.operationCompleted;
}
public TGetOperationStatusResp deepCopy() {
@@ -193,6 +221,11 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
setErrorCodeIsSet(false);
this.errorCode = 0;
this.errorMessage = null;
+ this.taskStatus = null;
+ setOperationStartedIsSet(false);
+ this.operationStarted = 0;
+ setOperationCompletedIsSet(false);
+ this.operationCompleted = 0;
}
public TStatus getStatus() {
@@ -317,6 +350,73 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
}
}
+ public String getTaskStatus() {
+ return this.taskStatus;
+ }
+
+ public void setTaskStatus(String taskStatus) {
+ this.taskStatus = taskStatus;
+ }
+
+ public void unsetTaskStatus() {
+ this.taskStatus = null;
+ }
+
+ /** Returns true if field taskStatus is set (has been assigned a value) and false otherwise */
+ public boolean isSetTaskStatus() {
+ return this.taskStatus != null;
+ }
+
+ public void setTaskStatusIsSet(boolean value) {
+ if (!value) {
+ this.taskStatus = null;
+ }
+ }
+
+ public long getOperationStarted() {
+ return this.operationStarted;
+ }
+
+ public void setOperationStarted(long operationStarted) {
+ this.operationStarted = operationStarted;
+ setOperationStartedIsSet(true);
+ }
+
+ public void unsetOperationStarted() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __OPERATIONSTARTED_ISSET_ID);
+ }
+
+ /** Returns true if field operationStarted is set (has been assigned a value) and false otherwise */
+ public boolean isSetOperationStarted() {
+ return EncodingUtils.testBit(__isset_bitfield, __OPERATIONSTARTED_ISSET_ID);
+ }
+
+ public void setOperationStartedIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __OPERATIONSTARTED_ISSET_ID, value);
+ }
+
+ public long getOperationCompleted() {
+ return this.operationCompleted;
+ }
+
+ public void setOperationCompleted(long operationCompleted) {
+ this.operationCompleted = operationCompleted;
+ setOperationCompletedIsSet(true);
+ }
+
+ public void unsetOperationCompleted() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __OPERATIONCOMPLETED_ISSET_ID);
+ }
+
+ /** Returns true if field operationCompleted is set (has been assigned a value) and false otherwise */
+ public boolean isSetOperationCompleted() {
+ return EncodingUtils.testBit(__isset_bitfield, __OPERATIONCOMPLETED_ISSET_ID);
+ }
+
+ public void setOperationCompletedIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __OPERATIONCOMPLETED_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case STATUS:
@@ -359,6 +459,30 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
}
break;
+ case TASK_STATUS:
+ if (value == null) {
+ unsetTaskStatus();
+ } else {
+ setTaskStatus((String)value);
+ }
+ break;
+
+ case OPERATION_STARTED:
+ if (value == null) {
+ unsetOperationStarted();
+ } else {
+ setOperationStarted((Long)value);
+ }
+ break;
+
+ case OPERATION_COMPLETED:
+ if (value == null) {
+ unsetOperationCompleted();
+ } else {
+ setOperationCompleted((Long)value);
+ }
+ break;
+
}
}
@@ -379,6 +503,15 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
case ERROR_MESSAGE:
return getErrorMessage();
+ case TASK_STATUS:
+ return getTaskStatus();
+
+ case OPERATION_STARTED:
+ return getOperationStarted();
+
+ case OPERATION_COMPLETED:
+ return getOperationCompleted();
+
}
throw new IllegalStateException();
}
@@ -400,6 +533,12 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
return isSetErrorCode();
case ERROR_MESSAGE:
return isSetErrorMessage();
+ case TASK_STATUS:
+ return isSetTaskStatus();
+ case OPERATION_STARTED:
+ return isSetOperationStarted();
+ case OPERATION_COMPLETED:
+ return isSetOperationCompleted();
}
throw new IllegalStateException();
}
@@ -462,6 +601,33 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
return false;
}
+ boolean this_present_taskStatus = true && this.isSetTaskStatus();
+ boolean that_present_taskStatus = true && that.isSetTaskStatus();
+ if (this_present_taskStatus || that_present_taskStatus) {
+ if (!(this_present_taskStatus && that_present_taskStatus))
+ return false;
+ if (!this.taskStatus.equals(that.taskStatus))
+ return false;
+ }
+
+ boolean this_present_operationStarted = true && this.isSetOperationStarted();
+ boolean that_present_operationStarted = true && that.isSetOperationStarted();
+ if (this_present_operationStarted || that_present_operationStarted) {
+ if (!(this_present_operationStarted && that_present_operationStarted))
+ return false;
+ if (this.operationStarted != that.operationStarted)
+ return false;
+ }
+
+ boolean this_present_operationCompleted = true && this.isSetOperationCompleted();
+ boolean that_present_operationCompleted = true && that.isSetOperationCompleted();
+ if (this_present_operationCompleted || that_present_operationCompleted) {
+ if (!(this_present_operationCompleted && that_present_operationCompleted))
+ return false;
+ if (this.operationCompleted != that.operationCompleted)
+ return false;
+ }
+
return true;
}
@@ -494,6 +660,21 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
if (present_errorMessage)
list.add(errorMessage);
+ boolean present_taskStatus = true && (isSetTaskStatus());
+ list.add(present_taskStatus);
+ if (present_taskStatus)
+ list.add(taskStatus);
+
+ boolean present_operationStarted = true && (isSetOperationStarted());
+ list.add(present_operationStarted);
+ if (present_operationStarted)
+ list.add(operationStarted);
+
+ boolean present_operationCompleted = true && (isSetOperationCompleted());
+ list.add(present_operationCompleted);
+ if (present_operationCompleted)
+ list.add(operationCompleted);
+
return list.hashCode();
}
@@ -555,6 +736,36 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetTaskStatus()).compareTo(other.isSetTaskStatus());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetTaskStatus()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.taskStatus, other.taskStatus);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetOperationStarted()).compareTo(other.isSetOperationStarted());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetOperationStarted()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.operationStarted, other.operationStarted);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetOperationCompleted()).compareTo(other.isSetOperationCompleted());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetOperationCompleted()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.operationCompleted, other.operationCompleted);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -618,6 +829,28 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
}
first = false;
}
+ if (isSetTaskStatus()) {
+ if (!first) sb.append(", ");
+ sb.append("taskStatus:");
+ if (this.taskStatus == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.taskStatus);
+ }
+ first = false;
+ }
+ if (isSetOperationStarted()) {
+ if (!first) sb.append(", ");
+ sb.append("operationStarted:");
+ sb.append(this.operationStarted);
+ first = false;
+ }
+ if (isSetOperationCompleted()) {
+ if (!first) sb.append(", ");
+ sb.append("operationCompleted:");
+ sb.append(this.operationCompleted);
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -711,6 +944,30 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 6: // TASK_STATUS
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.taskStatus = iprot.readString();
+ struct.setTaskStatusIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 7: // OPERATION_STARTED
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.operationStarted = iprot.readI64();
+ struct.setOperationStartedIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 8: // OPERATION_COMPLETED
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.operationCompleted = iprot.readI64();
+ struct.setOperationCompletedIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -755,6 +1012,23 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
oprot.writeFieldEnd();
}
}
+ if (struct.taskStatus != null) {
+ if (struct.isSetTaskStatus()) {
+ oprot.writeFieldBegin(TASK_STATUS_FIELD_DESC);
+ oprot.writeString(struct.taskStatus);
+ oprot.writeFieldEnd();
+ }
+ }
+ if (struct.isSetOperationStarted()) {
+ oprot.writeFieldBegin(OPERATION_STARTED_FIELD_DESC);
+ oprot.writeI64(struct.operationStarted);
+ oprot.writeFieldEnd();
+ }
+ if (struct.isSetOperationCompleted()) {
+ oprot.writeFieldBegin(OPERATION_COMPLETED_FIELD_DESC);
+ oprot.writeI64(struct.operationCompleted);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -786,7 +1060,16 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
if (struct.isSetErrorMessage()) {
optionals.set(3);
}
- oprot.writeBitSet(optionals, 4);
+ if (struct.isSetTaskStatus()) {
+ optionals.set(4);
+ }
+ if (struct.isSetOperationStarted()) {
+ optionals.set(5);
+ }
+ if (struct.isSetOperationCompleted()) {
+ optionals.set(6);
+ }
+ oprot.writeBitSet(optionals, 7);
if (struct.isSetOperationState()) {
oprot.writeI32(struct.operationState.getValue());
}
@@ -799,6 +1082,15 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
if (struct.isSetErrorMessage()) {
oprot.writeString(struct.errorMessage);
}
+ if (struct.isSetTaskStatus()) {
+ oprot.writeString(struct.taskStatus);
+ }
+ if (struct.isSetOperationStarted()) {
+ oprot.writeI64(struct.operationStarted);
+ }
+ if (struct.isSetOperationCompleted()) {
+ oprot.writeI64(struct.operationCompleted);
+ }
}
@Override
@@ -807,7 +1099,7 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
struct.status = new TStatus();
struct.status.read(iprot);
struct.setStatusIsSet(true);
- BitSet incoming = iprot.readBitSet(4);
+ BitSet incoming = iprot.readBitSet(7);
if (incoming.get(0)) {
struct.operationState = org.apache.hive.service.rpc.thrift.TOperationState.findByValue(iprot.readI32());
struct.setOperationStateIsSet(true);
@@ -824,6 +1116,18 @@ public class TGetOperationStatusResp implements org.apache.thrift.TBase<TGetOper
struct.errorMessage = iprot.readString();
struct.setErrorMessageIsSet(true);
}
+ if (incoming.get(4)) {
+ struct.taskStatus = iprot.readString();
+ struct.setTaskStatusIsSet(true);
+ }
+ if (incoming.get(5)) {
+ struct.operationStarted = iprot.readI64();
+ struct.setOperationStartedIsSet(true);
+ }
+ if (incoming.get(6)) {
+ struct.operationCompleted = iprot.readI64();
+ struct.setOperationCompletedIsSet(true);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/service-rpc/src/gen/thrift/gen-php/Types.php
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-php/Types.php b/service-rpc/src/gen/thrift/gen-php/Types.php
index 76805df..b7df50a 100644
--- a/service-rpc/src/gen/thrift/gen-php/Types.php
+++ b/service-rpc/src/gen/thrift/gen-php/Types.php
@@ -7416,6 +7416,18 @@ class TGetOperationStatusResp {
* @var string
*/
public $errorMessage = null;
+ /**
+ * @var string
+ */
+ public $taskStatus = null;
+ /**
+ * @var int
+ */
+ public $operationStarted = null;
+ /**
+ * @var int
+ */
+ public $operationCompleted = null;
public function __construct($vals=null) {
if (!isset(self::$_TSPEC)) {
@@ -7441,6 +7453,18 @@ class TGetOperationStatusResp {
'var' => 'errorMessage',
'type' => TType::STRING,
),
+ 6 => array(
+ 'var' => 'taskStatus',
+ 'type' => TType::STRING,
+ ),
+ 7 => array(
+ 'var' => 'operationStarted',
+ 'type' => TType::I64,
+ ),
+ 8 => array(
+ 'var' => 'operationCompleted',
+ 'type' => TType::I64,
+ ),
);
}
if (is_array($vals)) {
@@ -7459,6 +7483,15 @@ class TGetOperationStatusResp {
if (isset($vals['errorMessage'])) {
$this->errorMessage = $vals['errorMessage'];
}
+ if (isset($vals['taskStatus'])) {
+ $this->taskStatus = $vals['taskStatus'];
+ }
+ if (isset($vals['operationStarted'])) {
+ $this->operationStarted = $vals['operationStarted'];
+ }
+ if (isset($vals['operationCompleted'])) {
+ $this->operationCompleted = $vals['operationCompleted'];
+ }
}
}
@@ -7517,6 +7550,27 @@ class TGetOperationStatusResp {
$xfer += $input->skip($ftype);
}
break;
+ case 6:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->taskStatus);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 7:
+ if ($ftype == TType::I64) {
+ $xfer += $input->readI64($this->operationStarted);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 8:
+ if ($ftype == TType::I64) {
+ $xfer += $input->readI64($this->operationCompleted);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
default:
$xfer += $input->skip($ftype);
break;
@@ -7558,6 +7612,21 @@ class TGetOperationStatusResp {
$xfer += $output->writeString($this->errorMessage);
$xfer += $output->writeFieldEnd();
}
+ if ($this->taskStatus !== null) {
+ $xfer += $output->writeFieldBegin('taskStatus', TType::STRING, 6);
+ $xfer += $output->writeString($this->taskStatus);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->operationStarted !== null) {
+ $xfer += $output->writeFieldBegin('operationStarted', TType::I64, 7);
+ $xfer += $output->writeI64($this->operationStarted);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->operationCompleted !== null) {
+ $xfer += $output->writeFieldBegin('operationCompleted', TType::I64, 8);
+ $xfer += $output->writeI64($this->operationCompleted);
+ $xfer += $output->writeFieldEnd();
+ }
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();
return $xfer;
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py b/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py
index ef5f5f5..c691781 100644
--- a/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py
+++ b/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py
@@ -5635,6 +5635,9 @@ class TGetOperationStatusResp:
- sqlState
- errorCode
- errorMessage
+ - taskStatus
+ - operationStarted
+ - operationCompleted
"""
thrift_spec = (
@@ -5644,14 +5647,20 @@ class TGetOperationStatusResp:
(3, TType.STRING, 'sqlState', None, None, ), # 3
(4, TType.I32, 'errorCode', None, None, ), # 4
(5, TType.STRING, 'errorMessage', None, None, ), # 5
+ (6, TType.STRING, 'taskStatus', None, None, ), # 6
+ (7, TType.I64, 'operationStarted', None, None, ), # 7
+ (8, TType.I64, 'operationCompleted', None, None, ), # 8
)
- def __init__(self, status=None, operationState=None, sqlState=None, errorCode=None, errorMessage=None,):
+ def __init__(self, status=None, operationState=None, sqlState=None, errorCode=None, errorMessage=None, taskStatus=None, operationStarted=None, operationCompleted=None,):
self.status = status
self.operationState = operationState
self.sqlState = sqlState
self.errorCode = errorCode
self.errorMessage = errorMessage
+ self.taskStatus = taskStatus
+ self.operationStarted = operationStarted
+ self.operationCompleted = operationCompleted
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -5688,6 +5697,21 @@ class TGetOperationStatusResp:
self.errorMessage = iprot.readString()
else:
iprot.skip(ftype)
+ elif fid == 6:
+ if ftype == TType.STRING:
+ self.taskStatus = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 7:
+ if ftype == TType.I64:
+ self.operationStarted = iprot.readI64()
+ else:
+ iprot.skip(ftype)
+ elif fid == 8:
+ if ftype == TType.I64:
+ self.operationCompleted = iprot.readI64()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -5718,6 +5742,18 @@ class TGetOperationStatusResp:
oprot.writeFieldBegin('errorMessage', TType.STRING, 5)
oprot.writeString(self.errorMessage)
oprot.writeFieldEnd()
+ if self.taskStatus is not None:
+ oprot.writeFieldBegin('taskStatus', TType.STRING, 6)
+ oprot.writeString(self.taskStatus)
+ oprot.writeFieldEnd()
+ if self.operationStarted is not None:
+ oprot.writeFieldBegin('operationStarted', TType.I64, 7)
+ oprot.writeI64(self.operationStarted)
+ oprot.writeFieldEnd()
+ if self.operationCompleted is not None:
+ oprot.writeFieldBegin('operationCompleted', TType.I64, 8)
+ oprot.writeI64(self.operationCompleted)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -5734,6 +5770,9 @@ class TGetOperationStatusResp:
value = (value * 31) ^ hash(self.sqlState)
value = (value * 31) ^ hash(self.errorCode)
value = (value * 31) ^ hash(self.errorMessage)
+ value = (value * 31) ^ hash(self.taskStatus)
+ value = (value * 31) ^ hash(self.operationStarted)
+ value = (value * 31) ^ hash(self.operationCompleted)
return value
def __repr__(self):
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
index f004ec4..07ed97c 100644
--- a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
+++ b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb
@@ -1471,13 +1471,19 @@ class TGetOperationStatusResp
SQLSTATE = 3
ERRORCODE = 4
ERRORMESSAGE = 5
+ TASKSTATUS = 6
+ OPERATIONSTARTED = 7
+ OPERATIONCOMPLETED = 8
FIELDS = {
STATUS => {:type => ::Thrift::Types::STRUCT, :name => 'status', :class => ::TStatus},
OPERATIONSTATE => {:type => ::Thrift::Types::I32, :name => 'operationState', :optional => true, :enum_class => ::TOperationState},
SQLSTATE => {:type => ::Thrift::Types::STRING, :name => 'sqlState', :optional => true},
ERRORCODE => {:type => ::Thrift::Types::I32, :name => 'errorCode', :optional => true},
- ERRORMESSAGE => {:type => ::Thrift::Types::STRING, :name => 'errorMessage', :optional => true}
+ ERRORMESSAGE => {:type => ::Thrift::Types::STRING, :name => 'errorMessage', :optional => true},
+ TASKSTATUS => {:type => ::Thrift::Types::STRING, :name => 'taskStatus', :optional => true},
+ OPERATIONSTARTED => {:type => ::Thrift::Types::I64, :name => 'operationStarted', :optional => true},
+ OPERATIONCOMPLETED => {:type => ::Thrift::Types::I64, :name => 'operationCompleted', :optional => true}
}
def struct_fields; FIELDS; end
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/service/src/jamon/org/apache/hive/tmpl/QueryProfileTmpl.jamon
----------------------------------------------------------------------
diff --git a/service/src/jamon/org/apache/hive/tmpl/QueryProfileTmpl.jamon b/service/src/jamon/org/apache/hive/tmpl/QueryProfileTmpl.jamon
index c513689..8d51a73 100644
--- a/service/src/jamon/org/apache/hive/tmpl/QueryProfileTmpl.jamon
+++ b/service/src/jamon/org/apache/hive/tmpl/QueryProfileTmpl.jamon
@@ -176,16 +176,16 @@ org.apache.hive.service.cli.operation.SQLOperationDisplay;
<th>Retry If Fail</th>
</tr>
- <%if sod.getQueryDisplay() != null && sod.getQueryDisplay().getTaskInfos() != null %>
- <%for QueryDisplay.TaskInfo taskInfo : sod.getQueryDisplay().getTaskInfos() %>
+ <%if sod.getQueryDisplay() != null && sod.getQueryDisplay().getTaskDisplays() != null %>
+ <%for QueryDisplay.TaskDisplay taskDisplay : sod.getQueryDisplay().getTaskDisplays() %>
<tr>
- <td><% taskInfo.getTaskId() + ":" + taskInfo.getTaskType() %></td>
- <td><% taskInfo.getStatus() %></td>
- <td><% new Date(taskInfo.getBeginTime()) %>
- <td><% taskInfo.getEndTime() == 0 ? "" : new Date(taskInfo.getEndTime()) %></td>
- <td><% taskInfo.getElapsedTime()/1000 %> (s) </td>
- <td><% taskInfo.isRequireLock() %></td>
- <td><% taskInfo.isRetryIfFail() %></td>
+ <td><% taskDisplay.getTaskId() + ":" + taskDisplay.getTaskType() %></td>
+ <td><% taskDisplay.getStatus() %></td>
+ <td><% taskDisplay.getBeginTime() == null ? "" : new Date(taskDisplay.getBeginTime()) %></td>
+ <td><% taskDisplay.getEndTime() == null ? "" : new Date(taskDisplay.getEndTime()) %></td>
+ <td><% taskDisplay.getElapsedTime() == null ? "" : taskDisplay.getElapsedTime()/1000 %> (s) </td>
+ <td><% taskDisplay.isRequireLock() %></td>
+ <td><% taskDisplay.isRetryIfFail() %></td>
</tr>
</%for>
</%if>
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/service/src/java/org/apache/hive/service/cli/OperationStatus.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/OperationStatus.java b/service/src/java/org/apache/hive/service/cli/OperationStatus.java
index e45b828..5e24d38 100644
--- a/service/src/java/org/apache/hive/service/cli/OperationStatus.java
+++ b/service/src/java/org/apache/hive/service/cli/OperationStatus.java
@@ -25,10 +25,16 @@ package org.apache.hive.service.cli;
public class OperationStatus {
private final OperationState state;
+ private final String taskStatus;
+ private final long operationStarted;
+ private final long operationCompleted;
private final HiveSQLException operationException;
- public OperationStatus(OperationState state, HiveSQLException operationException) {
+ public OperationStatus(OperationState state, String taskStatus, long operationStarted, long operationCompleted, HiveSQLException operationException) {
this.state = state;
+ this.taskStatus = taskStatus;
+ this.operationStarted = operationStarted;
+ this.operationCompleted = operationCompleted;
this.operationException = operationException;
}
@@ -36,6 +42,18 @@ public class OperationStatus {
return state;
}
+ public String getTaskStatus() {
+ return taskStatus;
+ }
+
+ public long getOperationStarted() {
+ return operationStarted;
+ }
+
+ public long getOperationCompleted() {
+ return operationCompleted;
+ }
+
public HiveSQLException getOperationException() {
return operationException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/service/src/java/org/apache/hive/service/cli/operation/Operation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
index 22f725c..d9a273b 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
@@ -77,6 +77,9 @@ public abstract class Operation {
private volatile long lastAccessTime;
private final long beginTime;
+ protected long operationStart;
+ protected long operationComplete;
+
protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
@@ -137,7 +140,13 @@ public abstract class Operation {
}
public OperationStatus getStatus() {
- return new OperationStatus(state, operationException);
+ String taskStatus = null;
+ try {
+ taskStatus = getTaskStatus();
+ } catch (HiveSQLException sqlException) {
+ LOG.error("Error getting task status for " + opHandle.toString(), sqlException);
+ }
+ return new OperationStatus(state, taskStatus, operationStart, operationComplete, operationException);
}
public boolean hasResultSet() {
@@ -346,6 +355,10 @@ public abstract class Operation {
return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS);
}
+ public String getTaskStatus() throws HiveSQLException {
+ return null;
+ }
+
/**
* Verify if the given fetch orientation is part of the default orientation types.
* @param orientation
@@ -431,5 +444,31 @@ public abstract class Operation {
}
protected void onNewState(OperationState state, OperationState prevState) {
+ switch(state) {
+ case RUNNING:
+ markOperationStartTime();
+ break;
+ case ERROR:
+ case FINISHED:
+ case CANCELED:
+ markOperationCompletedTime();
+ break;
+ }
+ }
+
+ public long getOperationComplete() {
+ return operationComplete;
+ }
+
+ public long getOperationStart() {
+ return operationStart;
+ }
+
+ protected void markOperationStartTime() {
+ operationStart = System.currentTimeMillis();
+ }
+
+ protected void markOperationCompletedTime() {
+ operationComplete = System.currentTimeMillis();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 100dc6a..04d816a 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -18,17 +18,10 @@
package org.apache.hive.service.cli.operation;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
+import java.io.*;
import java.security.PrivilegedExceptionAction;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -44,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.QueryDisplay;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -69,6 +63,9 @@ import org.apache.hive.service.cli.RowSetFactory;
import org.apache.hive.service.cli.TableSchema;
import org.apache.hive.service.cli.session.HiveSession;
import org.apache.hive.service.server.ThreadWithGarbageCleanup;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
/**
* SQLOperation.
@@ -128,7 +125,6 @@ public class SQLOperation extends ExecuteStatementOperation {
*/
public void prepare(HiveConf sqlOperationConf) throws HiveSQLException {
setState(OperationState.RUNNING);
-
try {
driver = new Driver(sqlOperationConf, getParentSession().getUserName());
sqlOpDisplay.setQueryDisplay(driver.getQueryDisplay());
@@ -387,6 +383,38 @@ public class SQLOperation extends ExecuteStatementOperation {
}
}
+ @Override
+ public String getTaskStatus() throws HiveSQLException {
+ if (driver != null) {
+ List<QueryDisplay.TaskDisplay> statuses = driver.getQueryDisplay().getTaskDisplays();
+ if (statuses != null) {
+ ByteArrayOutputStream out = null;
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ out = new ByteArrayOutputStream();
+ mapper.writeValue(out, statuses);
+ return out.toString("UTF-8");
+ } catch (JsonGenerationException e) {
+ throw new HiveSQLException(e);
+ } catch (JsonMappingException e) {
+ throw new HiveSQLException(e);
+ } catch (IOException e) {
+ throw new HiveSQLException(e);
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ throw new HiveSQLException(e);
+ }
+ }
+ }
+ }
+ }
+ // Driver not initialized
+ return null;
+ }
+
private RowSet decode(List<Object> rows, RowSet rowSet) throws Exception {
if (driver.isFetchingTable()) {
return prepareFromRow(rows, rowSet);
@@ -508,6 +536,7 @@ public class SQLOperation extends ExecuteStatementOperation {
@Override
protected void onNewState(OperationState state, OperationState prevState) {
+ super.onNewState(state, prevState);
currentSQLStateScope = setMetrics(currentSQLStateScope, MetricsConstant.SQL_OPERATION_PREFIX,
MetricsConstant.COMPLETED_SQL_OPERATION_PREFIX, state);
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 8dff264..62fcde5 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -679,6 +679,9 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
new OperationHandle(req.getOperationHandle()));
resp.setOperationState(operationStatus.getState().toTOperationState());
HiveSQLException opException = operationStatus.getOperationException();
+ resp.setTaskStatus(operationStatus.getTaskStatus());
+ resp.setOperationStarted(operationStatus.getOperationStarted());
+ resp.setOperationCompleted(operationStatus.getOperationCompleted());
if (opException != null) {
resp.setSqlState(opException.getSQLState());
resp.setErrorCode(opException.getErrorCode());
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
index 5f01165..ccce6dc 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
@@ -361,7 +361,8 @@ public class ThriftCLIServiceClient extends CLIServiceClient {
if (opState == OperationState.ERROR) {
opException = new HiveSQLException(resp.getErrorMessage(), resp.getSqlState(), resp.getErrorCode());
}
- return new OperationStatus(opState, opException);
+ return new OperationStatus(opState, resp.getTaskStatus(), resp.getOperationStarted(),
+ resp.getOperationCompleted(), opException);
} catch (HiveSQLException e) {
throw e;
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/de260b45/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
index e78181a..e145eb4 100644
--- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
+++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
@@ -18,11 +18,9 @@
package org.apache.hive.service.cli;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
+import java.io.ByteArrayInputStream;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
@@ -36,15 +34,19 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.QueryDisplay;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook;
import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.session.SessionState;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -169,6 +171,9 @@ public abstract class CLIServiceTest {
// Blocking execute
queryString = "SELECT ID+1 FROM TEST_EXEC";
opHandle = client.executeStatement(sessionHandle, queryString, confOverlay);
+
+ OperationStatus opStatus = client.getOperationStatus(opHandle);
+ checkOperationTimes(opHandle, opStatus);
// Expect query to be completed now
assertEquals("Query should be finished",
OperationState.FINISHED, client.getOperationStatus(opHandle).getState());
@@ -266,6 +271,10 @@ public abstract class CLIServiceTest {
opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
System.out.println("Cancelling " + opHandle);
client.cancelOperation(opHandle);
+
+ OperationStatus operationStatus = client.getOperationStatus(opHandle);
+ checkOperationTimes(opHandle, operationStatus);
+
state = client.getOperationStatus(opHandle).getState();
System.out.println(opHandle + " after cancelling, state= " + state);
assertEquals("Query should be cancelled", OperationState.CANCELED, state);
@@ -489,7 +498,7 @@ public abstract class CLIServiceTest {
SessionState.get().setIsHiveServerQuery(true); // Pretend we are in HS2.
String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname
- + " = false";
+ + " = false";
client.executeStatement(sessionHandle, queryString, confOverlay);
return sessionHandle;
}
@@ -620,4 +629,89 @@ public abstract class CLIServiceTest {
client.closeOperation(opHandle);
client.closeSession(sessionHandle);
}
+
+ @Test
+ public void testTaskStatus() throws Exception {
+ HashMap<String, String> confOverlay = new HashMap<String, String>();
+ String tableName = "TEST_EXEC_ASYNC";
+ String columnDefinitions = "(ID STRING)";
+
+ // Open a session and set up the test data
+ SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay);
+ assertNotNull(sessionHandle);
+ // nonblocking execute
+ String select = "SELECT ID + ' ' FROM TEST_EXEC_ASYNC";
+ OperationHandle ophandle =
+ client.executeStatementAsync(sessionHandle, select, confOverlay);
+
+ OperationStatus status = null;
+ int count = 0;
+ while (true) {
+ status = client.getOperationStatus(ophandle);
+ checkOperationTimes(ophandle, status);
+ OperationState state = status.getState();
+ System.out.println("Polling: " + ophandle + " count=" + (++count)
+ + " state=" + state);
+
+ String jsonTaskStatus = status.getTaskStatus();
+ assertNotNull(jsonTaskStatus);
+ ObjectMapper mapper = new ObjectMapper();
+ ByteArrayInputStream in = new ByteArrayInputStream(jsonTaskStatus.getBytes("UTF-8"));
+ List<QueryDisplay.TaskDisplay> taskStatuses =
+ mapper.readValue(in, new TypeReference<List<QueryDisplay.TaskDisplay>>(){});
+ checkTaskStatuses(taskStatuses);
+ System.out.println("task statuses: " + jsonTaskStatus); // TaskDisplay doesn't have a toString, using json
+ if (OperationState.CANCELED == state || state == OperationState.CLOSED
+ || state == OperationState.FINISHED
+ || state == OperationState.ERROR) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ }
+
+ private void checkTaskStatuses(List<QueryDisplay.TaskDisplay> taskDisplays) {
+ assertNotNull(taskDisplays);
+ for (QueryDisplay.TaskDisplay taskDisplay: taskDisplays) {
+ switch (taskDisplay.taskState) {
+ case INITIALIZED:
+ case QUEUED:
+ assertNull(taskDisplay.getBeginTime());
+ assertNull(taskDisplay.getEndTime());
+ assertNull(taskDisplay.getElapsedTime());
+ assertNull(taskDisplay.getErrorMsg());
+ assertNull(taskDisplay.getReturnValue());
+ break;
+ case RUNNING:
+ assertNotNull(taskDisplay.getBeginTime());
+ assertNull(taskDisplay.getEndTime());
+ assertNotNull(taskDisplay.getElapsedTime());
+ assertNull(taskDisplay.getErrorMsg());
+ assertNull(taskDisplay.getReturnValue());
+ break;
+ case FINISHED:
+ assertNotNull(taskDisplay.getBeginTime());
+ assertNotNull(taskDisplay.getEndTime());
+ assertNotNull(taskDisplay.getElapsedTime());
+ break;
+ case UNKNOWN:
+ default:
+ fail("unknown task status: " + taskDisplay);
+ }
+ }
+ }
+
+
+ private void checkOperationTimes(OperationHandle operationHandle, OperationStatus status) {
+ OperationState state = status.getState();
+ assertFalse(status.getOperationStarted() == 0);
+ if (OperationState.CANCELED == state || state == OperationState.CLOSED
+ || state == OperationState.FINISHED || state == OperationState.ERROR) {
+ System.out.println("##OP " + operationHandle.getHandleIdentifier() + " STATE:" + status.getState()
+ +" START:" + status.getOperationStarted()
+ + " END:" + status.getOperationCompleted());
+ assertFalse(status.getOperationCompleted() == 0);
+ assertTrue(status.getOperationCompleted() - status.getOperationStarted() >= 0);
+ }
+ }
}