You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/13 04:46:02 UTC
svn commit: r1639241 - in /hive/branches/spark: itests/pom.xml
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
Author: xuefu
Date: Thu Nov 13 03:46:01 2014
New Revision: 1639241
URL: http://svn.apache.org/r1639241
Log:
HIVE-8780: insert1.q and ppd_join4.q hangs with hadoop-1 [Spark Branch] (Chengxiang via Xuefu)
Modified:
hive/branches/spark/itests/pom.xml
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
Modified: hive/branches/spark/itests/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/pom.xml?rev=1639241&r1=1639240&r2=1639241&view=diff
==============================================================================
--- hive/branches/spark/itests/pom.xml (original)
+++ hive/branches/spark/itests/pom.xml Thu Nov 13 03:46:01 2014
@@ -49,6 +49,12 @@
<module>hive-minikdc</module>
</modules>
</profile>
+ <profile>
+ <id>hadoop-1</id>
+ <modules>
+ <module>qtest-spark</module>
+ </modules>
+ </profile>
</profiles>
</project>
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1639241&r1=1639240&r2=1639241&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Thu Nov 13 03:46:01 2014
@@ -220,7 +220,7 @@ public class SparkClient implements Seri
// As we always use foreach action to submit RDD graph, it would only trigger on job.
int jobId = future.jobIds().get(0);
SimpleSparkJobStatus sparkJobStatus =
- new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters);
+ new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters, future);
return new SparkJobRef(jobId, sparkJobStatus);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java?rev=1639241&r1=1639240&r2=1639241&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java Thu Nov 13 03:46:01 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.sp
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
+import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.executor.InputMetrics;
import org.apache.spark.executor.ShuffleReadMetrics;
import org.apache.spark.executor.ShuffleWriteMetrics;
@@ -49,17 +50,20 @@ public class SimpleSparkJobStatus implem
private JobStateListener jobStateListener;
private JobProgressListener jobProgressListener;
private SparkCounters sparkCounters;
+ private JavaFutureAction<Void> future;
public SimpleSparkJobStatus(
int jobId,
JobStateListener stateListener,
JobProgressListener progressListener,
- SparkCounters sparkCounters) {
+ SparkCounters sparkCounters,
+ JavaFutureAction<Void> future) {
this.jobId = jobId;
this.jobStateListener = stateListener;
this.jobProgressListener = progressListener;
this.sparkCounters = sparkCounters;
+ this.future = future;
}
@Override
@@ -69,7 +73,14 @@ public class SimpleSparkJobStatus implem
@Override
public SparkJobState getState() {
- return jobStateListener.getJobState(jobId);
+ // For spark job with empty source data, it's not submitted actually, so we would never
+ // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current
+ // job state.
+ if (future.isDone()) {
+ return SparkJobState.SUCCEEDED;
+ } else {
+ return jobStateListener.getJobState(jobId);
+ }
}
@Override
@@ -135,6 +146,10 @@ public class SimpleSparkJobStatus implem
// add spark job metrics.
String jobIdentifier = "Spark Job[" + jobId + "] Metrics";
Map<String, List<TaskMetrics>> jobMetric = jobStateListener.getJobMetric(jobId);
+ if (jobMetric == null) {
+ return null;
+ }
+
Map<String, Long> flatJobMetric = combineJobLevelMetrics(jobMetric);
for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) {
sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue()));
@@ -170,31 +185,35 @@ public class SimpleSparkJobStatus implem
boolean shuffleWriteMetricExist = false;
for (List<TaskMetrics> stageMetric : jobMetric.values()) {
- for (TaskMetrics taskMetrics : stageMetric) {
- executorDeserializeTime += taskMetrics.executorDeserializeTime();
- executorRunTime += taskMetrics.executorRunTime();
- resultSize += taskMetrics.resultSize();
- jvmGCTime += taskMetrics.jvmGCTime();
- resultSerializationTime += taskMetrics.resultSerializationTime();
- memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
- diskBytesSpilled += taskMetrics.diskBytesSpilled();
- if (!taskMetrics.inputMetrics().isEmpty()) {
- inputMetricExist = true;
- bytesRead += taskMetrics.inputMetrics().get().bytesRead();
- }
- Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
- if (!shuffleReadMetricsOption.isEmpty()) {
- shuffleReadMetricExist = true;
- remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched();
- localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched();
- fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime();
- remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead();
- }
- Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
- if (!shuffleWriteMetricsOption.isEmpty()) {
- shuffleWriteMetricExist = true;
- shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten();
- shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime();
+ if (stageMetric != null) {
+ for (TaskMetrics taskMetrics : stageMetric) {
+ if (taskMetrics != null) {
+ executorDeserializeTime += taskMetrics.executorDeserializeTime();
+ executorRunTime += taskMetrics.executorRunTime();
+ resultSize += taskMetrics.resultSize();
+ jvmGCTime += taskMetrics.jvmGCTime();
+ resultSerializationTime += taskMetrics.resultSerializationTime();
+ memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
+ diskBytesSpilled += taskMetrics.diskBytesSpilled();
+ if (!taskMetrics.inputMetrics().isEmpty()) {
+ inputMetricExist = true;
+ bytesRead += taskMetrics.inputMetrics().get().bytesRead();
+ }
+ Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
+ if (!shuffleReadMetricsOption.isEmpty()) {
+ shuffleReadMetricExist = true;
+ remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched();
+ localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched();
+ fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime();
+ remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead();
+ }
+ Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
+ if (!shuffleWriteMetricsOption.isEmpty()) {
+ shuffleWriteMetricExist = true;
+ shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten();
+ shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime();
+ }
+ }
}
}
}