You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/03 17:47:37 UTC

[09/46] hive git commit: HIVE-19176: Add HoS support to progress bar on Beeline client (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar)

HIVE-19176: Add HoS support to progress bar on Beeline client (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e7d1781e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e7d1781e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e7d1781e

Branch: refs/heads/master-txnstats
Commit: e7d1781ec4662e088dcd6ffbe3f866738792ad9b
Parents: e19b861
Author: Bharathkrishna Guruvayoor Murali <bh...@cloudera.com>
Authored: Mon Jul 2 11:42:59 2018 -0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Mon Jul 2 11:42:59 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +-
 .../org/apache/hive/jdbc/HiveStatement.java     |   4 +-
 .../exec/spark/status/LocalSparkJobMonitor.java |   4 +-
 .../spark/status/RemoteSparkJobMonitor.java     |   4 +-
 .../ql/exec/spark/status/RenderStrategy.java    | 246 +++++++++++++++++++
 .../ql/exec/spark/status/SparkJobMonitor.java   | 157 +-----------
 .../hive/ql/exec/spark/TestSparkTask.java       |   1 +
 .../exec/spark/status/TestSparkJobMonitor.java  |  29 ++-
 .../org/apache/hive/service/ServiceUtils.java   |   5 +-
 .../cli/SparkProgressMonitorStatusMapper.java   |  52 ++++
 .../service/cli/thrift/ThriftCLIService.java    |   5 +-
 11 files changed, 349 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a3dd53e..7ef22d6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3780,7 +3780,7 @@ public class HiveConf extends Configuration {
         "hive.server2.in.place.progress",
         true,
         "Allows hive server 2 to send progress bar update information. This is currently available"
-            + " only if the execution engine is tez."),
+            + " only if the execution engine is tez or Spark."),
     TEZ_DAG_STATUS_CHECK_INTERVAL("hive.tez.dag.status.check.interval", "500ms",
       new TimeValidator(TimeUnit.MILLISECONDS), "Interval between subsequent DAG status invocation."),
     SPARK_EXEC_INPLACE_PROGRESS("hive.spark.exec.inplace.progress", true,

http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index ad8d1a7..0b38f9c 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -376,7 +376,9 @@ public class HiveStatement implements java.sql.Statement {
          * essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires
          */
         statusResp = client.GetOperationStatus(statusReq);
-        inPlaceUpdateStream.update(statusResp.getProgressUpdateResponse());
+        if(!isOperationComplete) {
+          inPlaceUpdateStream.update(statusResp.getProgressUpdateResponse());
+        }
         Utils.verifySuccessWithInfo(statusResp.getStatus());
         if (statusResp.isSetOperationState()) {
           switch (statusResp.getOperationState()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
index 2a6c33b..aeef3c1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
@@ -89,11 +89,11 @@ public class LocalSparkJobMonitor extends SparkJobMonitor {
                 + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]");
             }
 
-            printStatus(progressMap, lastProgressMap);
+            updateFunction.printStatus(progressMap, lastProgressMap);
             lastProgressMap = progressMap;
             break;
           case SUCCEEDED:
-            printStatus(progressMap, lastProgressMap);
+            updateFunction.printStatus(progressMap, lastProgressMap);
             lastProgressMap = progressMap;
             double duration = (System.currentTimeMillis() - startTime) / 1000.0;
             console.printInfo("Status: Finished successfully in "

http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index 560fb58..87b69cb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -131,13 +131,13 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
               }
             }
 
-            printStatus(progressMap, lastProgressMap);
+            updateFunction.printStatus(progressMap, lastProgressMap);
             lastProgressMap = progressMap;
           }
           break;
         case SUCCEEDED:
           Map<SparkStage, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
-          printStatus(progressMap, lastProgressMap);
+          updateFunction.printStatus(progressMap, lastProgressMap);
           lastProgressMap = progressMap;
           double duration = (System.currentTimeMillis() - startTime) / 1000.0;
           console.printInfo("Spark job[" + sparkJobStatus.getJobId() + "] finished successfully in "

http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RenderStrategy.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RenderStrategy.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RenderStrategy.java
new file mode 100644
index 0000000..67a3a9c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RenderStrategy.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to render progress bar for Hive on Spark job status.
+ * Based on the configuration, appropriate render strategy is selected
+ * to show the progress bar on beeline or Hive CLI, as well as for logging
+ * the report String.
+ */
+class RenderStrategy {
+
+  interface UpdateFunction {
+    void printStatus(Map<SparkStage, SparkStageProgress> progressMap,
+        Map<SparkStage, SparkStageProgress> lastProgressMap);
+  }
+
+  private abstract static class BaseUpdateFunction implements UpdateFunction {
+    protected final SparkJobMonitor monitor;
+    private final PerfLogger perfLogger;
+    private long lastPrintTime;
+    private static final int PRINT_INTERVAL = 3000;
+    private final Set<String> completed = new HashSet<String>();
+    private String lastReport = null;
+
+    BaseUpdateFunction(SparkJobMonitor monitor) {
+      this.monitor = monitor;
+      this.perfLogger = SessionState.getPerfLogger();
+    }
+
+    private String getReport(Map<SparkStage, SparkStageProgress> progressMap) {
+      StringBuilder reportBuffer = new StringBuilder();
+      SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+      String currentDate = dt.format(new Date());
+      reportBuffer.append(currentDate + "\t");
+
+      // Num of total and completed tasks
+      int sumTotal = 0;
+      int sumComplete = 0;
+
+      SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet());
+      for (SparkStage stage : keys) {
+        SparkStageProgress progress = progressMap.get(stage);
+        final int complete = progress.getSucceededTaskCount();
+        final int total = progress.getTotalTaskCount();
+        final int running = progress.getRunningTaskCount();
+        final int failed = progress.getFailedTaskCount();
+        sumTotal += total;
+        sumComplete += complete;
+        String s = stage.toString();
+        String stageName = "Stage-" + s;
+        if (total <= 0) {
+          reportBuffer.append(String.format("%s: -/-\t", stageName));
+        } else {
+          if (complete == total && !completed.contains(s)) {
+            completed.add(s);
+
+            if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) {
+              perfLogger.PerfLogBegin(SparkJobMonitor.CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
+            }
+            perfLogger.PerfLogEnd(SparkJobMonitor.CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
+          }
+          if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
+            /* stage is started, but not complete */
+            if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) {
+              perfLogger.PerfLogBegin(SparkJobMonitor.CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
+            }
+            if (failed > 0) {
+              reportBuffer.append(
+                  String.format(
+                      "%s: %d(+%d,-%d)/%d\t", stageName, complete, running, failed, total));
+            } else {
+              reportBuffer.append(
+                  String.format("%s: %d(+%d)/%d\t", stageName, complete, running, total));
+            }
+          } else {
+            /* stage is waiting for input/slots or complete */
+            if (failed > 0) {
+              /* tasks finished but some failed */
+              reportBuffer.append(
+                  String.format(
+                      "%s: %d(-%d)/%d Finished with failed tasks\t",
+                      stageName, complete, failed, total));
+            } else {
+              if (complete == total) {
+                reportBuffer.append(
+                    String.format("%s: %d/%d Finished\t", stageName, complete, total));
+              } else {
+                reportBuffer.append(String.format("%s: %d/%d\t", stageName, complete, total));
+              }
+            }
+          }
+        }
+      }
+
+      if (SessionState.get() != null) {
+        final float progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / (float) sumTotal;
+        SessionState.get().updateProgressedPercentage(progress);
+      }
+      return reportBuffer.toString();
+    }
+
+    private boolean isSameAsPreviousProgress(
+        Map<SparkStage, SparkStageProgress> progressMap,
+        Map<SparkStage, SparkStageProgress> lastProgressMap) {
+
+      if (lastProgressMap == null) {
+        return false;
+      }
+
+      if (progressMap.isEmpty()) {
+        return lastProgressMap.isEmpty();
+      } else {
+        if (lastProgressMap.isEmpty()) {
+          return false;
+        } else {
+          if (progressMap.size() != lastProgressMap.size()) {
+            return false;
+          }
+          for (Map.Entry<SparkStage, SparkStageProgress> entry : progressMap.entrySet()) {
+            if (!lastProgressMap.containsKey(entry.getKey())
+                || !progressMap.get(entry.getKey()).equals(lastProgressMap.get(entry.getKey()))) {
+              return false;
+            }
+          }
+        }
+      }
+      return true;
+    }
+
+
+    private boolean showReport(String report) {
+      return !report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + PRINT_INTERVAL;
+    }
+
+    @Override
+    public void printStatus(Map<SparkStage, SparkStageProgress> progressMap,
+        Map<SparkStage, SparkStageProgress> lastProgressMap) {
+      // do not print duplicate status while still in middle of print interval.
+      boolean isDuplicateState = isSameAsPreviousProgress(progressMap, lastProgressMap);
+      boolean withinInterval = System.currentTimeMillis() <= lastPrintTime + PRINT_INTERVAL;
+      if (isDuplicateState && withinInterval) {
+        return;
+      }
+
+      String report = getReport(progressMap);
+      renderProgress(monitor.getProgressMonitor(progressMap));
+      if (showReport(report)) {
+        renderReport(report);
+        lastReport = report;
+        lastPrintTime = System.currentTimeMillis();
+      }
+    }
+
+    abstract void renderProgress(ProgressMonitor monitor);
+
+    abstract void renderReport(String report);
+  }
+
+  /**
+   * This is used to show progress bar on Beeline while using HiveServer2.
+   */
+  static class LogToFileFunction extends BaseUpdateFunction {
+    private static final Logger LOGGER = LoggerFactory.getLogger(LogToFileFunction.class);
+    private boolean hiveServer2InPlaceProgressEnabled =
+        SessionState.get().getConf().getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_INPLACE_PROGRESS);
+
+    LogToFileFunction(SparkJobMonitor monitor) {
+      super(monitor);
+    }
+
+    @Override
+    void renderProgress(ProgressMonitor monitor) {
+      SessionState.get().updateProgressMonitor(monitor);
+    }
+
+    @Override
+    void renderReport(String report) {
+      if (hiveServer2InPlaceProgressEnabled) {
+        LOGGER.info(report);
+      } else {
+        monitor.console.printInfo(report);
+      }
+    }
+  }
+
+  /**
+   * This is used to show progress bar on Hive CLI.
+   */
+  static class InPlaceUpdateFunction extends BaseUpdateFunction {
+    /**
+     * Have to use the same instance to render else the number lines printed earlier is lost and the
+     * screen will print the table again and again.
+     */
+    private final InPlaceUpdate inPlaceUpdate;
+
+    InPlaceUpdateFunction(SparkJobMonitor monitor) {
+      super(monitor);
+      inPlaceUpdate = new InPlaceUpdate(SessionState.LogHelper.getInfoStream());
+    }
+
+    @Override
+    void renderProgress(ProgressMonitor monitor) {
+      inPlaceUpdate.render(monitor);
+    }
+
+    @Override
+    void renderReport(String report) {
+      monitor.console.logInfo(report);
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
index 3531ac2..5fd0c02 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec.spark.status;
 
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.common.log.InPlaceUpdate;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -25,13 +26,7 @@ import org.apache.hadoop.hive.ql.session.SessionState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 
 abstract class SparkJobMonitor {
@@ -42,60 +37,27 @@ abstract class SparkJobMonitor {
   protected final PerfLogger perfLogger = SessionState.getPerfLogger();
   protected final int checkInterval = 1000;
   protected final long monitorTimeoutInterval;
-  private final InPlaceUpdate inPlaceUpdateFn;
-
-  private final Set<String> completed = new HashSet<String>();
-  private final int printInterval = 3000;
-  private long lastPrintTime;
-
+  final RenderStrategy.UpdateFunction updateFunction;
   protected long startTime;
 
   protected enum StageState {
-    PENDING,
-    RUNNING,
-    FINISHED
+    PENDING, RUNNING, FINISHED
   }
 
   protected final boolean inPlaceUpdate;
 
   protected SparkJobMonitor(HiveConf hiveConf) {
-    monitorTimeoutInterval = hiveConf.getTimeVar(
-        HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS);
+    monitorTimeoutInterval = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS);
     inPlaceUpdate = InPlaceUpdate.canRenderInPlace(hiveConf) && !SessionState.getConsole().getIsSilent();
     console = new SessionState.LogHelper(LOG);
-    inPlaceUpdateFn = new InPlaceUpdate(SessionState.LogHelper.getInfoStream());
+    updateFunction = updateFunction();
   }
 
   public abstract int startMonitor();
 
-  private void printStatusInPlace(Map<SparkStage, SparkStageProgress> progressMap) {
-    inPlaceUpdateFn.render(getProgressMonitor(progressMap));
-  }
-
-  protected void printStatus(Map<SparkStage, SparkStageProgress> progressMap,
-      Map<SparkStage, SparkStageProgress> lastProgressMap) {
-
-    // do not print duplicate status while still in middle of print interval.
-    boolean isDuplicateState = isSameAsPreviousProgress(progressMap, lastProgressMap);
-    boolean withinInterval = System.currentTimeMillis() <= lastPrintTime + printInterval;
-    if (isDuplicateState && withinInterval) {
-      return;
-    }
-
-    String report = getReport(progressMap);
-    if (inPlaceUpdate) {
-      printStatusInPlace(progressMap);
-      console.logInfo(report);
-    } else {
-      console.printInfo(report);
-    }
-
-    lastPrintTime = System.currentTimeMillis();
-  }
-
   protected int getTotalTaskCount(Map<SparkStage, SparkStageProgress> progressMap) {
     int totalTasks = 0;
-    for (SparkStageProgress progress: progressMap.values() ) {
+    for (SparkStageProgress progress : progressMap.values()) {
       totalTasks += progress.getTotalTaskCount();
     }
 
@@ -104,7 +66,7 @@ abstract class SparkJobMonitor {
 
   protected int getStageMaxTaskCount(Map<SparkStage, SparkStageProgress> progressMap) {
     int stageMaxTasks = 0;
-    for (SparkStageProgress progress: progressMap.values() ) {
+    for (SparkStageProgress progress : progressMap.values()) {
       int tasks = progress.getTotalTaskCount();
       if (tasks > stageMaxTasks) {
         stageMaxTasks = tasks;
@@ -114,107 +76,12 @@ abstract class SparkJobMonitor {
     return stageMaxTasks;
   }
 
-  private String getReport(Map<SparkStage, SparkStageProgress> progressMap) {
-    StringBuilder reportBuffer = new StringBuilder();
-    SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
-    String currentDate = dt.format(new Date());
-    reportBuffer.append(currentDate + "\t");
-
-    // Num of total and completed tasks
-    int sumTotal = 0;
-    int sumComplete = 0;
-
-    SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet());
-    for (SparkStage stage : keys) {
-      SparkStageProgress progress = progressMap.get(stage);
-      final int complete = progress.getSucceededTaskCount();
-      final int total = progress.getTotalTaskCount();
-      final int running = progress.getRunningTaskCount();
-      final int failed = progress.getFailedTaskCount();
-      sumTotal += total;
-      sumComplete += complete;
-      String s = stage.toString();
-      String stageName = "Stage-" + s;
-      if (total <= 0) {
-        reportBuffer.append(String.format("%s: -/-\t", stageName));
-      } else {
-        if (complete == total && !completed.contains(s)) {
-          completed.add(s);
-
-          if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) {
-            perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
-          }
-          perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
-        }
-        if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
-          /* stage is started, but not complete */
-          if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) {
-            perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
-          }
-          if (failed > 0) {
-            reportBuffer.append(
-                String.format(
-                    "%s: %d(+%d,-%d)/%d\t", stageName, complete, running, failed, total));
-          } else {
-            reportBuffer.append(
-                String.format("%s: %d(+%d)/%d\t", stageName, complete, running, total));
-          }
-        } else {
-          /* stage is waiting for input/slots or complete */
-          if (failed > 0) {
-            /* tasks finished but some failed */
-            reportBuffer.append(
-                String.format(
-                    "%s: %d(-%d)/%d Finished with failed tasks\t",
-                    stageName, complete, failed, total));
-          } else {
-            if (complete == total) {
-              reportBuffer.append(
-                  String.format("%s: %d/%d Finished\t", stageName, complete, total));
-            } else {
-              reportBuffer.append(String.format("%s: %d/%d\t", stageName, complete, total));
-            }
-          }
-        }
-      }
-    }
-
-    if (SessionState.get() != null) {
-      final float progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / (float) sumTotal;
-      SessionState.get().updateProgressedPercentage(progress);
-    }
-    return reportBuffer.toString();
-  }
-
-  private boolean isSameAsPreviousProgress(
-      Map<SparkStage, SparkStageProgress> progressMap,
-      Map<SparkStage, SparkStageProgress> lastProgressMap) {
-
-    if (lastProgressMap == null) {
-      return false;
-    }
-
-    if (progressMap.isEmpty()) {
-      return lastProgressMap.isEmpty();
-    } else {
-      if (lastProgressMap.isEmpty()) {
-        return false;
-      } else {
-        if (progressMap.size() != lastProgressMap.size()) {
-          return false;
-        }
-        for (SparkStage key : progressMap.keySet()) {
-          if (!lastProgressMap.containsKey(key)
-              || !progressMap.get(key).equals(lastProgressMap.get(key))) {
-            return false;
-          }
-        }
-      }
-    }
-    return true;
+  ProgressMonitor getProgressMonitor(Map<SparkStage, SparkStageProgress> progressMap) {
+    return new SparkProgressMonitor(progressMap, startTime);
   }
 
-  private SparkProgressMonitor getProgressMonitor(Map<SparkStage, SparkStageProgress> progressMap) {
-    return new SparkProgressMonitor(progressMap, startTime);
+  private RenderStrategy.UpdateFunction updateFunction() {
+    return inPlaceUpdate && !SessionState.get().isHiveServerQuery() ? new RenderStrategy.InPlaceUpdateFunction(
+        this) : new RenderStrategy.LogToFileFunction(this);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
index 368fa9f..2017fc1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
@@ -105,6 +105,7 @@ public class TestSparkTask {
     when(jobSts.getRemoteJobState()).thenReturn(State.CANCELLED);
     when(jobSts.isRemoteActive()).thenReturn(true);
     HiveConf hiveConf = new HiveConf();
+    SessionState.start(hiveConf);
     RemoteSparkJobMonitor remoteSparkJobMonitor = new RemoteSparkJobMonitor(hiveConf, jobSts);
     Assert.assertEquals(remoteSparkJobMonitor.startMonitor(), 3);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java
index e66354f..7257b32 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.exec.spark.status;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,22 +41,23 @@ public class TestSparkJobMonitor {
   private SparkJobMonitor monitor;
   private PrintStream curOut;
   private PrintStream curErr;
+  private RenderStrategy.InPlaceUpdateFunction updateFunction;
 
   @Before
   public void setUp() {
-    testConf = new HiveConf();
     curOut = System.out;
     curErr = System.err;
     System.setOut(new PrintStream(outContent));
     System.setErr(new PrintStream(errContent));
-
+    testConf = new HiveConf();
+    SessionState.start(testConf);
     monitor = new SparkJobMonitor(testConf) {
       @Override
       public int startMonitor() {
         return 0;
       }
     };
-
+    updateFunction = new RenderStrategy.InPlaceUpdateFunction(monitor);
   }
 
   private Map<SparkStage, SparkStageProgress> progressMap() {
@@ -72,12 +74,27 @@ public class TestSparkJobMonitor {
   }
 
   @Test
-  public void testGetReport() {
+  public void testProgress() {
     Map<SparkStage, SparkStageProgress> progressMap = progressMap();
-    monitor.printStatus(progressMap, null);
-    assertTrue(errContent.toString().contains(
+    updateFunction.printStatus(progressMap, null);
+    String testOutput = errContent.toString();
+    assertTrue(testOutput.contains(
         "Stage-1_0: 3(+1)/4\tStage-3_1: 4(+1,-1)/6\tStage-9_0: 5/5 Finished\tStage-10_2: 3(+2)/5\t"
             + "Stage-15_1: 3(+1)/4\tStage-15_2: 4/4 Finished\tStage-20_3: 1(+1,-1)/3\tStage-21_1: 2/2 Finished"));
+    String[] testStrings = new String[]{
+        "STAGES   ATTEMPT        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED",
+        "Stage-1 ......           0       RUNNING      4          3        1        0       0",
+        "Stage-3 .....            1       RUNNING      6          4        1        1       1",
+        "Stage-9 ........         0      FINISHED      5          5        0        0       0",
+        "Stage-10 ....            2       RUNNING      5          3        2        0       0",
+        "Stage-15 .....           1       RUNNING      4          3        1        0       0",
+        "Stage-15 .......         2      FINISHED      4          4        0        0       0",
+        "Stage-20 ..              3       RUNNING      3          1        1        1       1",
+        "Stage-21 .......         1      FINISHED      2          2        0        0       0",
+        "STAGES: 03/08    [===================>>-------] 75%   ELAPSED TIME:"};
+    for(String testString : testStrings) {
+      assertTrue(testOutput.contains(testString));
+    }
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/service/src/java/org/apache/hive/service/ServiceUtils.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/ServiceUtils.java b/service/src/java/org/apache/hive/service/ServiceUtils.java
index 226e432..49fb5d5 100644
--- a/service/src/java/org/apache/hive/service/ServiceUtils.java
+++ b/service/src/java/org/apache/hive/service/ServiceUtils.java
@@ -69,8 +69,9 @@ public class ServiceUtils {
   }
 
   public static boolean canProvideProgressLog(HiveConf hiveConf) {
-    return "tez".equals(hiveConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))
-        && hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_INPLACE_PROGRESS);
+    return ("tez".equals(hiveConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) || "spark"
+        .equals(hiveConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) && hiveConf
+        .getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_INPLACE_PROGRESS);
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/service/src/java/org/apache/hive/service/cli/SparkProgressMonitorStatusMapper.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/SparkProgressMonitorStatusMapper.java b/service/src/java/org/apache/hive/service/cli/SparkProgressMonitorStatusMapper.java
new file mode 100644
index 0000000..c2a222e
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/cli/SparkProgressMonitorStatusMapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli;
+
+import org.apache.hive.service.rpc.thrift.TJobExecutionStatus;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Maps status of spark stages to job execution status.
+ */
+public class SparkProgressMonitorStatusMapper implements ProgressMonitorStatusMapper {
+  /**
+   * These states are taken form DAGStatus.State, could not use that here directly as it was
+   * optional dependency and did not want to include it just for the enum.
+   */
+  enum SparkStatus {
+    PENDING, RUNNING, FINISHED
+
+  }
+
+  @Override
+  public TJobExecutionStatus forStatus(String status) {
+    if (StringUtils.isEmpty(status)) {
+      return TJobExecutionStatus.NOT_AVAILABLE;
+    }
+    SparkProgressMonitorStatusMapper.SparkStatus sparkStatus =
+        SparkProgressMonitorStatusMapper.SparkStatus.valueOf(status);
+    switch (sparkStatus) {
+    case PENDING:
+    case RUNNING:
+      return TJobExecutionStatus.IN_PROGRESS;
+    default:
+      return TJobExecutionStatus.COMPLETE;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 68fe8d8..259ca63 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -21,6 +21,7 @@ package org.apache.hive.service.cli.thrift;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import org.apache.hive.service.cli.OperationState;
+import org.apache.hive.service.cli.SparkProgressMonitorStatusMapper;
 import org.apache.hive.service.rpc.thrift.TSetClientInfoReq;
 import org.apache.hive.service.rpc.thrift.TSetClientInfoResp;
 
@@ -707,7 +708,9 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
       if ("tez".equals(hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE))) {
         mapper = new TezProgressMonitorStatusMapper();
       }
-
+      if ("spark".equals(hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE))) {
+        mapper = new SparkProgressMonitorStatusMapper();
+      }
       TJobExecutionStatus executionStatus =
           mapper.forStatus(progressUpdate.status);
       resp.setProgressUpdateResponse(new TProgressUpdateResp(