You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/24 01:08:41 UTC
[09/50] [abbrv] hive git commit: HIVE-15915: Emit progress percentage
in getting operation status (Jimmy Xiang, reviewed by Xuefu Zhang)
HIVE-15915: Emit progress percentage in getting operation status (Jimmy Xiang, reviewed by Xuefu Zhang)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1677ed95
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1677ed95
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1677ed95
Branch: refs/heads/hive-14535
Commit: 1677ed956edb660c02b0d15253d486e2b0b626ba
Parents: bb4d8db
Author: Jimmy Xiang <jx...@apache.org>
Authored: Tue Feb 14 10:27:14 2017 -0800
Committer: Jimmy Xiang <jx...@apache.org>
Committed: Fri Feb 17 09:48:37 2017 -0800
----------------------------------------------------------------------
.../hive/ql/exec/mr/HadoopJobExecHelper.java | 6 +++-
.../ql/exec/spark/status/SparkJobMonitor.java | 12 +++++++
.../hadoop/hive/ql/session/SessionState.java | 36 +++++++++++++++++++-
.../service/cli/thrift/ThriftCLIService.java | 24 +++++++++++--
4 files changed, 74 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1677ed95/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
index 41887d7..3c07197 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
@@ -92,6 +92,11 @@ public class HadoopJobExecHelper {
reduceProgress = reduceProgress == 100 ? (int)Math.floor(rj.reduceProgress() * 100) : reduceProgress;
task.taskCounters.put("CNTR_NAME_" + task.getId() + "_MAP_PROGRESS", Long.valueOf(mapProgress));
task.taskCounters.put("CNTR_NAME_" + task.getId() + "_REDUCE_PROGRESS", Long.valueOf(reduceProgress));
+
+ if (SessionState.get() != null) {
+ final float progress = (rj.mapProgress() + rj.reduceProgress()) * 0.5f;
+ SessionState.get().updateProgressedPercentage(progress);
+ }
}
/**
@@ -196,7 +201,6 @@ public class HadoopJobExecHelper {
}
}
- @SuppressWarnings("deprecation")
public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
if (ctrs == null) {
// hadoop might return null if it cannot locate the job.
http://git-wip-us.apache.org/repos/asf/hive/blob/1677ed95/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
index cf0162d..0b224f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
@@ -179,6 +179,10 @@ abstract class SparkJobMonitor {
String currentDate = dt.format(new Date());
reportBuffer.append(currentDate + "\t");
+ // Num of total and completed tasks
+ int sumTotal = 0;
+ int sumComplete = 0;
+
SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
for (String s : keys) {
SparkStageProgress progress = progressMap.get(s);
@@ -186,6 +190,9 @@ abstract class SparkJobMonitor {
final int total = progress.getTotalTaskCount();
final int running = progress.getRunningTaskCount();
final int failed = progress.getFailedTaskCount();
+ sumTotal += total;
+ sumComplete += complete;
+
String stageName = "Stage-" + s;
if (total <= 0) {
reportBuffer.append(String.format("%s: -/-\t", stageName));
@@ -230,6 +237,11 @@ abstract class SparkJobMonitor {
}
}
}
+
+ if (SessionState.get() != null) {
+ final float progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / (float) sumTotal;
+ SessionState.get().updateProgressedPercentage(progress);
+ }
return reportBuffer.toString();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1677ed95/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 3e01e92..ba2c9c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.ObjectStore;
@@ -69,7 +70,6 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.common.log.ProgressMonitor;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
@@ -1748,6 +1748,40 @@ public class SessionState {
return StringUtils.join(preReloadableAuxJars, ',');
}
+ public void updateProgressedPercentage(final double percentage) {
+ this.progressMonitor = new ProgressMonitor() {
+ @Override
+ public List<String> headers() {
+ return null;
+ }
+
+ @Override
+ public List<List<String>> rows() {
+ return null;
+ }
+
+ @Override
+ public String footerSummary() {
+ return null;
+ }
+
+ @Override
+ public long startTime() {
+ return 0;
+ }
+
+ @Override
+ public String executionStatus() {
+ return null;
+ }
+
+ @Override
+ public double progressedPercentage() {
+ return percentage;
+ }
+ };
+ }
+
public void updateProgressMonitor(ProgressMonitor progressMonitor) {
this.progressMonitor = progressMonitor;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1677ed95/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index e09d9fe..211b33b 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -18,6 +18,8 @@
package org.apache.hive.service.cli.thrift;
+import static com.google.common.base.Preconditions.checkArgument;
+
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -27,9 +29,11 @@ import java.util.concurrent.TimeUnit;
import javax.security.auth.login.LoginException;
+import org.apache.hadoop.hive.common.ServerUtils;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.common.ServerUtils;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.service.AbstractService;
@@ -46,11 +50,13 @@ import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.JobProgressUpdate;
import org.apache.hive.service.cli.OperationHandle;
import org.apache.hive.service.cli.OperationStatus;
+import org.apache.hive.service.cli.OperationType;
import org.apache.hive.service.cli.ProgressMonitorStatusMapper;
import org.apache.hive.service.cli.RowSet;
import org.apache.hive.service.cli.SessionHandle;
import org.apache.hive.service.cli.TableSchema;
import org.apache.hive.service.cli.TezProgressMonitorStatusMapper;
+import org.apache.hive.service.cli.operation.Operation;
import org.apache.hive.service.cli.session.SessionManager;
import org.apache.hive.service.rpc.thrift.TCLIService;
import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenReq;
@@ -91,6 +97,7 @@ import org.apache.hive.service.rpc.thrift.TGetTablesReq;
import org.apache.hive.service.rpc.thrift.TGetTablesResp;
import org.apache.hive.service.rpc.thrift.TGetTypeInfoReq;
import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp;
+import org.apache.hive.service.rpc.thrift.TJobExecutionStatus;
import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
import org.apache.hive.service.rpc.thrift.TProgressUpdateResp;
@@ -431,6 +438,13 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
return sessionHandle;
}
+ private double getProgressedPercentage(OperationHandle opHandle) throws HiveSQLException {
+ checkArgument(OperationType.EXECUTE_STATEMENT.equals(opHandle.getOperationType()));
+ Operation operation = cliService.getSessionManager().getOperationManager().getOperation(opHandle);
+ SessionState state = operation.getParentSession().getSessionState();
+ ProgressMonitor monitor = state.getProgressMonitor();
+ return monitor == null ? 0.0 : monitor.progressedPercentage();
+ }
private String getDelegationToken(String userName)
throws HiveSQLException, LoginException, IOException {
@@ -646,11 +660,13 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
mapper = new TezProgressMonitorStatusMapper();
}
+ TJobExecutionStatus executionStatus =
+ mapper.forStatus(progressUpdate.status);
resp.setProgressUpdateResponse(new TProgressUpdateResp(
progressUpdate.headers(),
progressUpdate.rows(),
progressUpdate.progressedPercentage,
- mapper.forStatus(progressUpdate.status),
+ executionStatus,
progressUpdate.footerSummary,
progressUpdate.startTimeMillis
));
@@ -659,6 +675,10 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
resp.setErrorCode(opException.getErrorCode());
resp.setErrorMessage(org.apache.hadoop.util.StringUtils.
stringifyException(opException));
+ } else if (executionStatus == TJobExecutionStatus.NOT_AVAILABLE
+ && OperationType.EXECUTE_STATEMENT.equals(operationHandle.getOperationType())) {
+ resp.getProgressUpdateResponse().setProgressedPercentage(
+ getProgressedPercentage(operationHandle));
}
resp.setStatus(OK_STATUS);
} catch (Exception e) {