You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/01/22 19:51:31 UTC

svn commit: r1654003 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: ./ status/ status/impl/

Author: xuefu
Date: Thu Jan 22 18:51:30 2015
New Revision: 1654003

URL: http://svn.apache.org/r1654003
Log:
HIVE-9370: SparkJobMonitor timeout as sortByKey would launch extra Spark job before original job get submitted [Spark Branch] (Chengxiang via Xuefu)

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java?rev=1654003&r1=1654002&r2=1654003&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java Thu Jan 22 18:51:30 2015
@@ -137,7 +137,7 @@ public class LocalHiveSparkClient implem
     int jobId = future.jobIds().get(0);
     LocalSparkJobStatus sparkJobStatus = new LocalSparkJobStatus(
       sc, jobId, jobMetricsListener, sparkCounters, plan.getCachedRDDIds(), future);
-    return new LocalSparkJobRef(Integer.toString(jobId), sparkJobStatus, sc);
+    return new LocalSparkJobRef(Integer.toString(jobId), hiveConf,  sparkJobStatus, sc);
   }
 
   /**

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1654003&r1=1654002&r2=1654003&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Thu Jan 22 18:51:30 2015
@@ -77,10 +77,13 @@ public class RemoteHiveSparkClient imple
   private transient List<String> localJars = new ArrayList<String>();
   private transient List<String> localFiles = new ArrayList<String>();
 
+  private final transient long sparkClientTimtout;
+
   RemoteHiveSparkClient(HiveConf hiveConf, Map<String, String> conf) throws IOException, SparkException {
     this.hiveConf = hiveConf;
     sparkConf = HiveSparkClientFactory.generateSparkConf(conf);
     remoteClient = SparkClientFactory.createClient(conf, hiveConf);
+    sparkClientTimtout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS);
   }
 
   @Override
@@ -90,17 +93,14 @@ public class RemoteHiveSparkClient imple
 
   @Override
   public int getExecutorCount() throws Exception {
-    long timeout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS);
     Future<Integer> handler = remoteClient.getExecutorCount();
-    return handler.get(timeout, TimeUnit.SECONDS).intValue();
+    return handler.get(sparkClientTimtout, TimeUnit.SECONDS).intValue();
   }
 
   @Override
   public int getDefaultParallelism() throws Exception {
-    long timeout = hiveConf.getTimeVar(
-        HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS);
     Future<Integer> handler = remoteClient.getDefaultParallelism();
-    return handler.get(timeout, TimeUnit.SECONDS);
+    return handler.get(sparkClientTimtout, TimeUnit.SECONDS);
   }
 
   @Override
@@ -119,11 +119,10 @@ public class RemoteHiveSparkClient imple
     byte[] scratchDirBytes = KryoSerializer.serialize(emptyScratchDir);
     byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork);
 
-    long timeout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
-    JobHandle<Serializable> jobHandle = remoteClient.submit(
-        new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes));
-    return new RemoteSparkJobRef(jobHandle, new RemoteSparkJobStatus(remoteClient, jobHandle, timeout));
+    JobStatusJob job = new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes);
+    JobHandle<Serializable> jobHandle = remoteClient.submit(job);
+    RemoteSparkJobStatus sparkJobStatus = new RemoteSparkJobStatus(remoteClient, jobHandle, sparkClientTimtout);
+    return new RemoteSparkJobRef(hiveConf, jobHandle, sparkJobStatus);
   }
 
   private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1654003&r1=1654002&r2=1654003&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Thu Jan 22 18:51:30 2015
@@ -49,7 +49,7 @@ import org.apache.hadoop.hive.ql.exec.sp
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
-import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor;
+import org.apache.hadoop.hive.ql.exec.spark.status.LocalSparkJobMonitor;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -65,7 +65,6 @@ import org.apache.hadoop.hive.ql.plan.Op
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.plan.StatsWork;
-import org.apache.hadoop.hive.ql.plan.UnionWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.util.StringUtils;
@@ -102,23 +101,20 @@ public class SparkTask extends Task<Spar
       SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
 
+      rc = jobRef.monitorJob();
       SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
-      if (sparkJobStatus != null) {
-        SparkJobMonitor monitor = new SparkJobMonitor(conf, sparkJobStatus);
-        rc = monitor.startMonitor();
+      if (rc == 0) {
         sparkCounters = sparkJobStatus.getCounter();
-        if (rc == 0 ) {
-          // for RSC, we should get the counters after job has finished
-          SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
-          if (LOG.isInfoEnabled() && sparkStatistics != null) {
-            LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId()));
-            logSparkStatistic(sparkStatistics);
-          }
-        } else if (rc == 2) { // Cancel job if the monitor found job submission timeout.
-          jobRef.cancelJob();
+        // for RSC, we should get the counters after job has finished
+        SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
+        if (LOG.isInfoEnabled() && sparkStatistics != null) {
+          LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId()));
+          logSparkStatistic(sparkStatistics);
         }
-        sparkJobStatus.cleanup();
+      } else if (rc == 2) { // Cancel job if the monitor found job submission timeout.
+        jobRef.cancelJob();
       }
+      sparkJobStatus.cleanup();
     } catch (Exception e) {
       String msg = "Failed to execute spark task, with exception '" + Utilities.getNameMessage(e) + "'";
 

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java?rev=1654003&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java Thu Jan 22 18:51:30 2015
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.spark.JobExecutionStatus;
+
+/**
+ * LocalSparkJobMonitor monitor a single Spark job status in a loop until job finished/failed/killed.
+ * It print current job status to console and sleep current thread between monitor interval.
+ */
+public class LocalSparkJobMonitor extends SparkJobMonitor {
+
+  private SparkJobStatus sparkJobStatus;
+
+  public LocalSparkJobMonitor(HiveConf hiveConf, SparkJobStatus sparkJobStatus) {
+    super(hiveConf);
+    this.sparkJobStatus = sparkJobStatus;
+  }
+
+  public int startMonitor() {
+    boolean running = false;
+    boolean done = false;
+    int rc = 0;
+    JobExecutionStatus lastState = null;
+    Map<String, SparkStageProgress> lastProgressMap = null;
+
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
+
+    long startTime = System.currentTimeMillis();
+
+    while (true) {
+      try {
+        JobExecutionStatus state = sparkJobStatus.getState();
+        if (LOG.isDebugEnabled()) {
+          console.printInfo("state = " + state);
+        }
+
+        if (state == null) {
+          long timeCount = (System.currentTimeMillis() - startTime)/1000;
+          if (timeCount > monitorTimeoutInteval) {
+            LOG.info("Job hasn't been submitted after " + timeCount + "s. Aborting it.");
+            console.printError("Status: " + state);
+            running = false;
+            done = true;
+            rc = 2;
+            break;
+          }
+        } else if (state != lastState || state == JobExecutionStatus.RUNNING) {
+          lastState = state;
+          Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
+
+          switch (state) {
+          case RUNNING:
+            if (!running) {
+              perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
+              // print job stages.
+              console.printInfo("\nQuery Hive on Spark job["
+                + sparkJobStatus.getJobId() + "] stages:");
+              for (int stageId : sparkJobStatus.getStageIds()) {
+                console.printInfo(Integer.toString(stageId));
+              }
+
+              console.printInfo("\nStatus: Running (Hive on Spark job["
+                + sparkJobStatus.getJobId() + "])");
+              running = true;
+
+              console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: "
+                + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]");
+            }
+
+            printStatus(progressMap, lastProgressMap);
+            lastProgressMap = progressMap;
+            break;
+          case SUCCEEDED:
+            printStatus(progressMap, lastProgressMap);
+            lastProgressMap = progressMap;
+            double duration = (System.currentTimeMillis() - startTime) / 1000.0;
+            console.printInfo("Status: Finished successfully in "
+              + String.format("%.2f seconds", duration));
+            running = false;
+            done = true;
+            break;
+          case FAILED:
+            console.printError("Status: Failed");
+            running = false;
+            done = true;
+            rc = 3;
+            break;
+          case UNKNOWN:
+            console.printError("Status: Unknown");
+            running = false;
+            done = true;
+            rc = 4;
+            break;
+          }
+        }
+        if (!done) {
+          Thread.sleep(checkInterval);
+        }
+      } catch (Exception e) {
+        String msg = " with exception '" + Utilities.getNameMessage(e) + "'";
+        msg = "Failed to monitor Job[ " + sparkJobStatus.getJobId() + "]" + msg;
+
+        // Has to use full name to make sure it does not conflict with
+        // org.apache.commons.lang.StringUtils
+        LOG.error(msg, e);
+        console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+        rc = 1;
+        done = true;
+      } finally {
+        if (done) {
+          break;
+        }
+      }
+    }
+
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
+    return rc;
+  }
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java?rev=1654003&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java Thu Jan 22 18:51:30 2015
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hive.spark.client.JobHandle;
+import org.apache.spark.JobExecutionStatus;
+
+/**
+ * RemoteSparkJobMonitor monitor a RSC remote job status in a loop until job finished/failed/killed.
+ * It print current job status to console and sleep current thread between monitor interval.
+ */
+public class RemoteSparkJobMonitor extends SparkJobMonitor {
+
+  private RemoteSparkJobStatus sparkJobStatus;
+
+  public RemoteSparkJobMonitor(HiveConf hiveConf, RemoteSparkJobStatus sparkJobStatus) {
+    super(hiveConf);
+    this.sparkJobStatus = sparkJobStatus;
+  }
+
+  @Override
+  public int startMonitor() {
+    boolean running = false;
+    boolean done = false;
+    int rc = 0;
+    Map<String, SparkStageProgress> lastProgressMap = null;
+
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
+
+    long startTime = System.currentTimeMillis();
+
+    while (true) {
+      try {
+        JobHandle.State state = sparkJobStatus.getRemoteJobState();
+        if (LOG.isDebugEnabled()) {
+          console.printInfo("state = " + state);
+        }
+
+        switch (state) {
+        case SENT:
+        case QUEUED:
+          long timeCount = (System.currentTimeMillis() - startTime) / 1000;
+          if ((timeCount > monitorTimeoutInteval)) {
+            LOG.info("Job hasn't been submitted after " + timeCount + "s. Aborting it.");
+            console.printError("Status: " + state);
+            running = false;
+            done = true;
+            rc = 2;
+          }
+          break;
+        case STARTED:
+          JobExecutionStatus sparkJobState = sparkJobStatus.getState();
+          if (sparkJobState == JobExecutionStatus.RUNNING) {
+            Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
+            if (!running) {
+              perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
+              // print job stages.
+              console.printInfo("\nQuery Hive on Spark job["
+                + sparkJobStatus.getJobId() + "] stages:");
+              for (int stageId : sparkJobStatus.getStageIds()) {
+                console.printInfo(Integer.toString(stageId));
+              }
+
+              console.printInfo("\nStatus: Running (Hive on Spark job["
+                + sparkJobStatus.getJobId() + "])");
+              running = true;
+
+              console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: "
+                + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]");
+            }
+
+            printStatus(progressMap, lastProgressMap);
+            lastProgressMap = progressMap;
+          }
+          break;
+        case SUCCEEDED:
+          Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
+          printStatus(progressMap, lastProgressMap);
+          lastProgressMap = progressMap;
+          double duration = (System.currentTimeMillis() - startTime) / 1000.0;
+          console.printInfo("Status: Finished successfully in "
+            + String.format("%.2f seconds", duration));
+          running = false;
+          done = true;
+          break;
+        case FAILED:
+          console.printError("Status: Failed");
+          running = false;
+          done = true;
+          rc = 3;
+          break;
+        }
+
+        if (!done) {
+          Thread.sleep(checkInterval);
+        }
+      } catch (Exception e) {
+        String msg = " with exception '" + Utilities.getNameMessage(e) + "'";
+        msg = "Failed to monitor Job[ " + sparkJobStatus.getJobId() + "]" + msg;
+
+        // Has to use full name to make sure it does not conflict with
+        // org.apache.commons.lang.StringUtils
+        LOG.error(msg, e);
+        console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+        rc = 1;
+        done = true;
+      } finally {
+        if (done) {
+          break;
+        }
+      }
+    }
+
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
+    return rc;
+  }
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java?rev=1654003&r1=1654002&r2=1654003&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java Thu Jan 22 18:51:30 2015
@@ -15,8 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.hive.ql.exec.spark.status;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobStatus;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashSet;
@@ -26,147 +34,27 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.spark.JobExecutionStatus;
-
-/**
- * SparkJobMonitor monitor a single Spark job status in a loop until job finished/failed/killed.
- * It print current job status to console and sleep current thread between monitor interval.
- */
-public class SparkJobMonitor {
+abstract class SparkJobMonitor {
 
-  private static final String CLASS_NAME = SparkJobMonitor.class.getName();
-  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+  protected static final String CLASS_NAME = SparkJobMonitor.class.getName();
+  protected static final Log LOG = LogFactory.getLog(CLASS_NAME);
+  protected static SessionState.LogHelper console = new SessionState.LogHelper(LOG);
+  protected final PerfLogger perfLogger = PerfLogger.getPerfLogger();
+  protected final int checkInterval = 1000;
+  protected final long monitorTimeoutInteval;
 
-  private transient LogHelper console;
-  private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
-  private final long monitorTimeoutInteval;
-  private final int checkInterval = 1000;
+  private Set<String> completed = new HashSet<String>();
   private final int printInterval = 3000;
-  private final HiveConf hiveConf;
   private long lastPrintTime;
-  private Set<String> completed;
-
-  private SparkJobStatus sparkJobStatus;
 
-  public SparkJobMonitor(HiveConf hiveConf, SparkJobStatus sparkJobStatus) {
-    this.sparkJobStatus = sparkJobStatus;
-    this.hiveConf = hiveConf;
+  protected SparkJobMonitor(HiveConf hiveConf) {
     monitorTimeoutInteval = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS);
-    console = new LogHelper(LOG);
   }
 
-  public int startMonitor() {
-    completed = new HashSet<String>();
-
-    boolean running = false;
-    boolean done = false;
-    int rc = 0;
-    JobExecutionStatus lastState = null;
-    Map<String, SparkStageProgress> lastProgressMap = null;
-
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
-
-    long startTime = System.currentTimeMillis();
-
-    while (true) {
-      try {
-        JobExecutionStatus state = sparkJobStatus.getState();
-        if (LOG.isDebugEnabled()) {
-          console.printInfo("state = " + state);
-        }
-
-        if (state == null) {
-          long timeCount = (System.currentTimeMillis() - startTime)/1000;
-          if (timeCount > monitorTimeoutInteval) {
-            LOG.info("Job hasn't been submitted after " + timeCount + "s. Aborting it.");
-            console.printError("Status: " + state);
-            running = false;
-            done = true;
-            rc = 2;
-            break;
-          }
-        } else if (state != lastState || state == JobExecutionStatus.RUNNING) {
-          lastState = state;
-          Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
-
-          switch (state) {
-          case RUNNING:
-            if (!running) {
-              perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
-              // print job stages.
-              console.printInfo("\nQuery Hive on Spark job["
-                + sparkJobStatus.getJobId() + "] stages:");
-              for (int stageId : sparkJobStatus.getStageIds()) {
-                console.printInfo(Integer.toString(stageId));
-              }
-
-              console.printInfo("\nStatus: Running (Hive on Spark job["
-                + sparkJobStatus.getJobId() + "])");
-              running = true;
-
-              console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: "
-                + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]");
-            }
-
-            printStatus(progressMap, lastProgressMap);
-            lastProgressMap = progressMap;
-            break;
-          case SUCCEEDED:
-            printStatus(progressMap, lastProgressMap);
-            lastProgressMap = progressMap;
-            double duration = (System.currentTimeMillis() - startTime) / 1000.0;
-            console.printInfo("Status: Finished successfully in "
-              + String.format("%.2f seconds", duration));
-            running = false;
-            done = true;
-            break;
-          case FAILED:
-            console.printError("Status: Failed");
-            running = false;
-            done = true;
-            rc = 3;
-            break;
-          case UNKNOWN:
-            console.printError("Status: Unknown");
-            running = false;
-            done = true;
-            rc = 4;
-            break;
-          }
-        }
-        if (!done) {
-          Thread.sleep(checkInterval);
-        }
-      } catch (Exception e) {
-        String msg = " with exception '" + Utilities.getNameMessage(e) + "'";
-        msg = "Failed to monitor Job[ " + sparkJobStatus.getJobId() + "]" + msg;
-
-        // Has to use full name to make sure it does not conflict with
-        // org.apache.commons.lang.StringUtils
-        LOG.error(msg, e);
-        console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
-        rc = 1;
-        done = true;
-      } finally {
-        if (done) {
-          break;
-        }
-      }
-    }
+  public abstract int startMonitor();
 
-    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
-    return rc;
-  }
-
-  private void printStatus(Map<String, SparkStageProgress> progressMap,
-                           Map<String, SparkStageProgress> lastProgressMap) {
+  protected void printStatus(Map<String, SparkStageProgress> progressMap,
+    Map<String, SparkStageProgress> lastProgressMap) {
 
     // do not print duplicate status while still in middle of print interval.
     boolean isDuplicateState = isSameAsPreviousProgress(progressMap, lastProgressMap);
@@ -217,13 +105,13 @@ public class SparkJobMonitor {
           if (failed > 0) {
             /* tasks finished but some failed */
             reportBuffer.append(
-                String.format(
-                    "%s: %d(-%d)/%d Finished with failed tasks\t",
-                    stageName, complete, failed, total));
+              String.format(
+                "%s: %d(-%d)/%d Finished with failed tasks\t",
+                stageName, complete, failed, total));
           } else {
             if (complete == total) {
               reportBuffer.append(
-                  String.format("%s: %d/%d Finished\t", stageName, complete, total));
+                String.format("%s: %d/%d Finished\t", stageName, complete, total));
             } else {
               reportBuffer.append(String.format("%s: %d/%d\t", stageName, complete, total));
             }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java?rev=1654003&r1=1654002&r2=1654003&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java Thu Jan 22 18:51:30 2015
@@ -25,4 +25,5 @@ public interface SparkJobRef {
 
   public boolean cancelJob();
 
+  public int monitorJob();
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java?rev=1654003&r1=1654002&r2=1654003&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java Thu Jan 22 18:51:30 2015
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark.status.impl;
 
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.spark.status.LocalSparkJobMonitor;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -24,11 +26,18 @@ import org.apache.spark.api.java.JavaSpa
 public class LocalSparkJobRef implements SparkJobRef {
 
   private final String jobId;
-  private final SparkJobStatus sparkJobStatus;
+  private final HiveConf hiveConf;
+  private final LocalSparkJobStatus sparkJobStatus;
   private final JavaSparkContext javaSparkContext;
 
-  public LocalSparkJobRef(String jobId, SparkJobStatus sparkJobStatus, JavaSparkContext javaSparkContext) {
+  public LocalSparkJobRef(
+    String jobId,
+    HiveConf hiveConf,
+    LocalSparkJobStatus sparkJobStatus,
+    JavaSparkContext javaSparkContext) {
+
     this.jobId = jobId;
+    this.hiveConf = hiveConf;
     this.sparkJobStatus = sparkJobStatus;
     this.javaSparkContext = javaSparkContext;
   }
@@ -49,4 +58,10 @@ public class LocalSparkJobRef implements
     javaSparkContext.sc().cancelJob(id);
     return true;
   }
+
+  @Override
+  public int monitorJob() {
+    LocalSparkJobMonitor localSparkJobMonitor = new LocalSparkJobMonitor(hiveConf, sparkJobStatus);
+    return localSparkJobMonitor.startMonitor();
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java?rev=1654003&r1=1654002&r2=1654003&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java Thu Jan 22 18:51:30 2015
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark.status.impl;
 
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.spark.status.RemoteSparkJobMonitor;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
 import org.apache.hive.spark.client.JobHandle;
@@ -26,12 +28,14 @@ import java.io.Serializable;
 public class RemoteSparkJobRef implements SparkJobRef {
 
   private final String jobId;
-  private final SparkJobStatus sparkJobStatus;
+  private final HiveConf hiveConf;
+  private final RemoteSparkJobStatus sparkJobStatus;
   private final JobHandle<Serializable> jobHandler;
 
-  public RemoteSparkJobRef(JobHandle<Serializable> jobHandler, SparkJobStatus sparkJobStatus) {
+  public RemoteSparkJobRef(HiveConf hiveConf, JobHandle<Serializable> jobHandler, RemoteSparkJobStatus sparkJobStatus) {
     this.jobHandler = jobHandler;
     this.jobId = jobHandler.getClientJobId();
+    this.hiveConf = hiveConf;
     this.sparkJobStatus = sparkJobStatus;
   }
 
@@ -49,4 +53,10 @@ public class RemoteSparkJobRef implement
   public boolean cancelJob() {
     return jobHandler.cancel(true);
   }
+
+  @Override
+  public int monitorJob() {
+    RemoteSparkJobMonitor remoteSparkJobMonitor = new RemoteSparkJobMonitor(hiveConf, sparkJobStatus);
+    return remoteSparkJobMonitor.startMonitor();
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1654003&r1=1654002&r2=1654003&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java Thu Jan 22 18:51:30 2015
@@ -153,6 +153,10 @@ public class RemoteSparkJobStatus implem
     }
   }
 
+  public JobHandle.State getRemoteJobState() {
+    return jobHandle.getState();
+  }
+
   private static class GetJobInfoJob implements Job<SparkJobInfo> {
     private final String clientJobId;
     private final int sparkJobId;