You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jx...@apache.org on 2017/02/17 17:48:54 UTC

hive git commit: HIVE-15915: Emit progress percentage in getting operation status (Jimmy Xiang, reviewed by Xuefu Zhang)

Repository: hive
Updated Branches:
  refs/heads/master bb4d8db50 -> 1677ed956


HIVE-15915: Emit progress percentage in getting operation status (Jimmy Xiang, reviewed by Xuefu Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1677ed95
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1677ed95
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1677ed95

Branch: refs/heads/master
Commit: 1677ed956edb660c02b0d15253d486e2b0b626ba
Parents: bb4d8db
Author: Jimmy Xiang <jx...@apache.org>
Authored: Tue Feb 14 10:27:14 2017 -0800
Committer: Jimmy Xiang <jx...@apache.org>
Committed: Fri Feb 17 09:48:37 2017 -0800

----------------------------------------------------------------------
 .../hive/ql/exec/mr/HadoopJobExecHelper.java    |  6 +++-
 .../ql/exec/spark/status/SparkJobMonitor.java   | 12 +++++++
 .../hadoop/hive/ql/session/SessionState.java    | 36 +++++++++++++++++++-
 .../service/cli/thrift/ThriftCLIService.java    | 24 +++++++++++--
 4 files changed, 74 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1677ed95/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
index 41887d7..3c07197 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
@@ -92,6 +92,11 @@ public class HadoopJobExecHelper {
     reduceProgress = reduceProgress == 100 ? (int)Math.floor(rj.reduceProgress() * 100) : reduceProgress;
     task.taskCounters.put("CNTR_NAME_" + task.getId() + "_MAP_PROGRESS", Long.valueOf(mapProgress));
     task.taskCounters.put("CNTR_NAME_" + task.getId() + "_REDUCE_PROGRESS", Long.valueOf(reduceProgress));
+
+    if (SessionState.get() != null) {
+      final float progress = (rj.mapProgress() + rj.reduceProgress()) * 0.5f;
+      SessionState.get().updateProgressedPercentage(progress);
+    }
   }
 
   /**
@@ -196,7 +201,6 @@ public class HadoopJobExecHelper {
     }
   }
 
-  @SuppressWarnings("deprecation")
   public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
     if (ctrs == null) {
       // hadoop might return null if it cannot locate the job.

http://git-wip-us.apache.org/repos/asf/hive/blob/1677ed95/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
index cf0162d..0b224f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
@@ -179,6 +179,10 @@ abstract class SparkJobMonitor {
     String currentDate = dt.format(new Date());
     reportBuffer.append(currentDate + "\t");
 
+    // Num of total and completed tasks
+    int sumTotal = 0;
+    int sumComplete = 0;
+
     SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
     for (String s : keys) {
       SparkStageProgress progress = progressMap.get(s);
@@ -186,6 +190,9 @@ abstract class SparkJobMonitor {
       final int total = progress.getTotalTaskCount();
       final int running = progress.getRunningTaskCount();
       final int failed = progress.getFailedTaskCount();
+      sumTotal += total;
+      sumComplete += complete;
+
       String stageName = "Stage-" + s;
       if (total <= 0) {
         reportBuffer.append(String.format("%s: -/-\t", stageName));
@@ -230,6 +237,11 @@ abstract class SparkJobMonitor {
         }
       }
     }
+
+    if (SessionState.get() != null) {
+      final float progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / (float) sumTotal;
+      SessionState.get().updateProgressedPercentage(progress);
+    }
     return reportBuffer.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/1677ed95/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 3e01e92..ba2c9c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.ObjectStore;
@@ -69,7 +70,6 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.common.log.ProgressMonitor;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
@@ -1748,6 +1748,40 @@ public class SessionState {
     return StringUtils.join(preReloadableAuxJars, ',');
   }
 
+  public void updateProgressedPercentage(final double percentage) {
+    this.progressMonitor = new ProgressMonitor() {
+      @Override
+      public List<String> headers() {
+        return null;
+      }
+
+      @Override
+      public List<List<String>> rows() {
+        return null;
+      }
+
+      @Override
+      public String footerSummary() {
+        return null;
+      }
+
+      @Override
+      public long startTime() {
+        return 0;
+      }
+
+      @Override
+      public String executionStatus() {
+        return null;
+      }
+
+      @Override
+      public double progressedPercentage() {
+        return percentage;
+      }
+    };
+  }
+
   public void updateProgressMonitor(ProgressMonitor progressMonitor) {
     this.progressMonitor = progressMonitor;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/1677ed95/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index e09d9fe..211b33b 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -18,6 +18,8 @@
 
 package org.apache.hive.service.cli.thrift;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -27,9 +29,11 @@ import java.util.concurrent.TimeUnit;
 
 import javax.security.auth.login.LoginException;
 
+import org.apache.hadoop.hive.common.ServerUtils;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.common.ServerUtils;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hive.service.AbstractService;
@@ -46,11 +50,13 @@ import org.apache.hive.service.cli.HiveSQLException;
 import org.apache.hive.service.cli.JobProgressUpdate;
 import org.apache.hive.service.cli.OperationHandle;
 import org.apache.hive.service.cli.OperationStatus;
+import org.apache.hive.service.cli.OperationType;
 import org.apache.hive.service.cli.ProgressMonitorStatusMapper;
 import org.apache.hive.service.cli.RowSet;
 import org.apache.hive.service.cli.SessionHandle;
 import org.apache.hive.service.cli.TableSchema;
 import org.apache.hive.service.cli.TezProgressMonitorStatusMapper;
+import org.apache.hive.service.cli.operation.Operation;
 import org.apache.hive.service.cli.session.SessionManager;
 import org.apache.hive.service.rpc.thrift.TCLIService;
 import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenReq;
@@ -91,6 +97,7 @@ import org.apache.hive.service.rpc.thrift.TGetTablesReq;
 import org.apache.hive.service.rpc.thrift.TGetTablesResp;
 import org.apache.hive.service.rpc.thrift.TGetTypeInfoReq;
 import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp;
+import org.apache.hive.service.rpc.thrift.TJobExecutionStatus;
 import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
 import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
 import org.apache.hive.service.rpc.thrift.TProgressUpdateResp;
@@ -431,6 +438,13 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
     return sessionHandle;
   }
 
+  private double getProgressedPercentage(OperationHandle opHandle) throws HiveSQLException {
+    checkArgument(OperationType.EXECUTE_STATEMENT.equals(opHandle.getOperationType()));
+    Operation operation = cliService.getSessionManager().getOperationManager().getOperation(opHandle);
+    SessionState state = operation.getParentSession().getSessionState();
+    ProgressMonitor monitor = state.getProgressMonitor();
+    return monitor == null ? 0.0 : monitor.progressedPercentage();
+  }
 
   private String getDelegationToken(String userName)
       throws HiveSQLException, LoginException, IOException {
@@ -646,11 +660,13 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
         mapper = new TezProgressMonitorStatusMapper();
       }
 
+      TJobExecutionStatus executionStatus =
+          mapper.forStatus(progressUpdate.status);
       resp.setProgressUpdateResponse(new TProgressUpdateResp(
           progressUpdate.headers(),
           progressUpdate.rows(),
           progressUpdate.progressedPercentage,
-          mapper.forStatus(progressUpdate.status),
+          executionStatus,
           progressUpdate.footerSummary,
           progressUpdate.startTimeMillis
       ));
@@ -659,6 +675,10 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
         resp.setErrorCode(opException.getErrorCode());
         resp.setErrorMessage(org.apache.hadoop.util.StringUtils.
             stringifyException(opException));
+      } else if (executionStatus == TJobExecutionStatus.NOT_AVAILABLE
+          && OperationType.EXECUTE_STATEMENT.equals(operationHandle.getOperationType())) {
+        resp.getProgressUpdateResponse().setProgressedPercentage(
+            getProgressedPercentage(operationHandle));
       }
       resp.setStatus(OK_STATUS);
     } catch (Exception e) {