You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/13 04:46:02 UTC

svn commit: r1639241 - in /hive/branches/spark: itests/pom.xml ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java

Author: xuefu
Date: Thu Nov 13 03:46:01 2014
New Revision: 1639241

URL: http://svn.apache.org/r1639241
Log:
HIVE-8780: insert1.q and ppd_join4.q hangs with hadoop-1 [Spark Branch] (Chengxiang via Xuefu)

Modified:
    hive/branches/spark/itests/pom.xml
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java

Modified: hive/branches/spark/itests/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/pom.xml?rev=1639241&r1=1639240&r2=1639241&view=diff
==============================================================================
--- hive/branches/spark/itests/pom.xml (original)
+++ hive/branches/spark/itests/pom.xml Thu Nov 13 03:46:01 2014
@@ -49,6 +49,12 @@
         <module>hive-minikdc</module>
       </modules>
     </profile>
+    <profile>
+      <id>hadoop-1</id>
+      <modules>
+        <module>qtest-spark</module>
+      </modules>
+    </profile>
   </profiles>
 
 </project>

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1639241&r1=1639240&r2=1639241&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Thu Nov 13 03:46:01 2014
@@ -220,7 +220,7 @@ public class SparkClient implements Seri
     // As we always use foreach action to submit RDD graph, it would only trigger on job.
     int jobId = future.jobIds().get(0);
     SimpleSparkJobStatus sparkJobStatus =
-      new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters);
+      new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters, future);
     return new SparkJobRef(jobId, sparkJobStatus);
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java?rev=1639241&r1=1639240&r2=1639241&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java Thu Nov 13 03:46:01 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.sp
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
+import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.executor.InputMetrics;
 import org.apache.spark.executor.ShuffleReadMetrics;
 import org.apache.spark.executor.ShuffleWriteMetrics;
@@ -49,17 +50,20 @@ public class SimpleSparkJobStatus implem
   private JobStateListener jobStateListener;
   private JobProgressListener jobProgressListener;
   private SparkCounters sparkCounters;
+  private JavaFutureAction<Void> future;
 
   public SimpleSparkJobStatus(
     int jobId,
     JobStateListener stateListener,
     JobProgressListener progressListener,
-    SparkCounters sparkCounters) {
+    SparkCounters sparkCounters,
+    JavaFutureAction<Void> future) {
 
     this.jobId = jobId;
     this.jobStateListener = stateListener;
     this.jobProgressListener = progressListener;
     this.sparkCounters = sparkCounters;
+    this.future = future;
   }
 
   @Override
@@ -69,7 +73,14 @@ public class SimpleSparkJobStatus implem
 
   @Override
   public SparkJobState getState() {
-    return jobStateListener.getJobState(jobId);
+    // For spark job with empty source data, it's not submitted actually, so we would never
+    // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current
+    // job state.
+    if (future.isDone()) {
+      return SparkJobState.SUCCEEDED;
+    } else {
+      return jobStateListener.getJobState(jobId);
+    }
   }
 
   @Override
@@ -135,6 +146,10 @@ public class SimpleSparkJobStatus implem
     // add spark job metrics.
     String jobIdentifier = "Spark Job[" + jobId + "] Metrics";
     Map<String, List<TaskMetrics>> jobMetric = jobStateListener.getJobMetric(jobId);
+    if (jobMetric == null) {
+      return null;
+    }
+
     Map<String, Long> flatJobMetric = combineJobLevelMetrics(jobMetric);
     for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) {
       sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue()));
@@ -170,31 +185,35 @@ public class SimpleSparkJobStatus implem
     boolean shuffleWriteMetricExist = false;
 
     for (List<TaskMetrics> stageMetric : jobMetric.values()) {
-      for (TaskMetrics taskMetrics : stageMetric) {
-        executorDeserializeTime += taskMetrics.executorDeserializeTime();
-        executorRunTime += taskMetrics.executorRunTime();
-        resultSize += taskMetrics.resultSize();
-        jvmGCTime += taskMetrics.jvmGCTime();
-        resultSerializationTime += taskMetrics.resultSerializationTime();
-        memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
-        diskBytesSpilled += taskMetrics.diskBytesSpilled();
-        if (!taskMetrics.inputMetrics().isEmpty()) {
-          inputMetricExist = true;
-          bytesRead += taskMetrics.inputMetrics().get().bytesRead();
-        }
-        Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
-        if (!shuffleReadMetricsOption.isEmpty()) {
-          shuffleReadMetricExist = true;
-          remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched();
-          localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched();
-          fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime();
-          remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead();
-        }
-        Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
-        if (!shuffleWriteMetricsOption.isEmpty()) {
-          shuffleWriteMetricExist = true;
-          shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten();
-          shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime();
+      if (stageMetric != null) {
+        for (TaskMetrics taskMetrics : stageMetric) {
+          if (taskMetrics != null) {
+            executorDeserializeTime += taskMetrics.executorDeserializeTime();
+            executorRunTime += taskMetrics.executorRunTime();
+            resultSize += taskMetrics.resultSize();
+            jvmGCTime += taskMetrics.jvmGCTime();
+            resultSerializationTime += taskMetrics.resultSerializationTime();
+            memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
+            diskBytesSpilled += taskMetrics.diskBytesSpilled();
+            if (!taskMetrics.inputMetrics().isEmpty()) {
+              inputMetricExist = true;
+              bytesRead += taskMetrics.inputMetrics().get().bytesRead();
+            }
+            Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
+            if (!shuffleReadMetricsOption.isEmpty()) {
+              shuffleReadMetricExist = true;
+              remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched();
+              localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched();
+              fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime();
+              remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead();
+            }
+            Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
+            if (!shuffleWriteMetricsOption.isEmpty()) {
+              shuffleWriteMetricExist = true;
+              shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten();
+              shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime();
+            }
+          }
         }
       }
     }