You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/04/16 02:49:46 UTC
hive git commit: HIVE-18831: Differentiate errors that are thrown by
Spark tasks (Sahil Takiar reviewed by Rui Li)
Repository: hive
Updated Branches:
refs/heads/master 204d715e3 -> 2f0b41bc6
HIVE-18831: Differentiate errors that are thrown by Spark tasks (Sahil Takiar reviewed by Rui Li)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2f0b41bc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2f0b41bc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2f0b41bc
Branch: refs/heads/master
Commit: 2f0b41bc6863333aa89876f70819cb2113c57091
Parents: 204d715
Author: Sahil Takiar <ta...@gmail.com>
Authored: Sun Apr 15 19:48:11 2018 -0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Sun Apr 15 19:48:11 2018 -0700
----------------------------------------------------------------------
.../test/resources/testconfiguration.properties | 8 +-
.../hadoop/hive/cli/control/CliConfigs.java | 4 +-
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 4 +-
.../org/apache/hadoop/hive/ql/exec/Task.java | 2 +-
.../hadoop/hive/ql/exec/spark/SparkTask.java | 84 +++++++---
.../exec/spark/status/LocalSparkJobMonitor.java | 2 +-
.../spark/status/RemoteSparkJobMonitor.java | 28 +---
.../ql/exec/spark/status/SparkJobStatus.java | 6 +-
.../spark/status/impl/LocalSparkJobStatus.java | 22 +--
.../spark/status/impl/RemoteSparkJobStatus.java | 14 +-
.../hive/ql/exec/spark/TestSparkTask.java | 125 ++++++++++++++-
.../queries/clientnegative/spark_task_failure.q | 9 ++
.../spark/spark_task_failure.q.out | 53 +++++++
.../apache/hive/spark/client/BaseProtocol.java | 18 ++-
.../hive/spark/client/JobResultSerializer.java | 113 +++++++++++++
.../hive/spark/client/SparkClientImpl.java | 4 +-
.../hive/spark/client/rpc/KryoMessageCodec.java | 6 +
.../spark/client/TestJobResultSerializer.java | 157 +++++++++++++++++++
.../hive/spark/client/TestSparkClient.java | 61 ++++++-
19 files changed, 630 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 2db98c9..f99d6a1 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1683,9 +1683,11 @@ spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\
groupby2_multi_distinct.q,\
groupby3_map_skew_multi_distinct.q,\
groupby3_multi_distinct.q,\
- groupby_grouping_sets7.q,\
- spark_job_max_tasks.q,\
- spark_stage_max_tasks.q
+ groupby_grouping_sets7.q
+
+spark.only.query.negative.files=spark_job_max_tasks.q,\
+ spark_stage_max_tasks.q,\
+ spark_task_failure.q
spark.perf.disabled.query.files=query14.q,\
query64.q
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
index fa4de0f..8ee7fb9 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
@@ -358,9 +358,8 @@ public class CliConfigs {
setQueryDir("ql/src/test/queries/clientnegative");
excludesFrom(testConfigProps, "minimr.query.negative.files");
+ excludesFrom(testConfigProps, "spark.only.query.negative.files");
excludeQuery("authorization_uri_import.q");
- excludeQuery("spark_job_max_tasks.q");
- excludeQuery("spark_stage_max_tasks.q");
setResultsDir("ql/src/test/results/clientnegative");
setLogDir("itests/qtest/target/qfile-results/clientnegative");
@@ -595,6 +594,7 @@ public class CliConfigs {
setQueryDir("ql/src/test/queries/clientnegative");
includesFrom(testConfigProps, "spark.query.negative.files");
+ includesFrom(testConfigProps, "spark.only.query.negative.files");
setResultsDir("ql/src/test/results/clientnegative/spark");
setLogDir("itests/qtest-spark/target/qfile-results/clientnegative/spark");
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index f3e40eb..fde16f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -584,8 +584,8 @@ public enum ErrorMsg {
//========================== 40000 range starts here ========================//
- SPARK_JOB_RUNTIME_ERROR(40001,
- "Spark job failed during runtime. Please check stacktrace for the root cause.")
+ SPARK_JOB_RUNTIME_ERROR(40001, "Spark job failed due to: {0}", true),
+ SPARK_TASK_RUNTIME_ERROR(40002, "Spark job failed due to task failures: {0}", true)
;
private int errorCode;
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index 1e8857b..f329570 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -603,7 +603,7 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
public void shutdown() {
}
- Throwable getException() {
+ public Throwable getException() {
return exception;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 3083e30..bfa2da6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -26,15 +26,19 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
+
import com.google.common.base.Throwables;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.DriverContext;
@@ -72,7 +76,6 @@ import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.util.StringUtils;
import com.google.common.collect.Lists;
-import org.apache.spark.SparkException;
public class SparkTask extends Task<SparkWork> {
private static final String CLASS_NAME = SparkTask.class.getName();
@@ -153,7 +156,8 @@ public class SparkTask extends Task<SparkWork> {
// Get the final state of the Spark job and parses its job info
SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
- getSparkJobInfo(sparkJobStatus, rc);
+ getSparkJobInfo(sparkJobStatus);
+ setSparkException(sparkJobStatus, rc);
if (rc == 0) {
sparkStatistics = sparkJobStatus.getSparkStatistics();
@@ -457,7 +461,7 @@ public class SparkTask extends Task<SparkWork> {
return counters;
}
- private void getSparkJobInfo(SparkJobStatus sparkJobStatus, int rc) {
+ private void getSparkJobInfo(SparkJobStatus sparkJobStatus) {
try {
stageIds = new ArrayList<Integer>();
int[] ids = sparkJobStatus.getStageIds();
@@ -482,36 +486,68 @@ public class SparkTask extends Task<SparkWork> {
succeededTaskCount = sumComplete;
totalTaskCount = sumTotal;
failedTaskCount = sumFailed;
- if (rc != 0) {
- Throwable error = sparkJobStatus.getError();
- if (error != null) {
- if ((error instanceof InterruptedException) ||
- (error instanceof HiveException &&
- error.getCause() instanceof InterruptedException)) {
- LOG.info("Killing Spark job since query was interrupted");
- killJob();
- }
- HiveException he;
- if (isOOMError(error)) {
- he = new HiveException(error, ErrorMsg.SPARK_RUNTIME_OOM);
- } else {
- he = new HiveException(error, ErrorMsg.SPARK_JOB_RUNTIME_ERROR);
- }
- setException(he);
- }
- }
} catch (Exception e) {
LOG.error("Failed to get Spark job information", e);
}
}
+ @VisibleForTesting
+ void setSparkException(SparkJobStatus sparkJobStatus, int rc) {
+ if (rc != 0) {
+
+ // Set the Spark Job Exception
+ Throwable sparkJobException = sparkJobStatus.getSparkJobException();
+ if (sparkJobException != null) {
+ HiveException he;
+ if (isOOMError(sparkJobException)) {
+ he = new HiveException(sparkJobException, ErrorMsg.SPARK_RUNTIME_OOM);
+ } else if (isTaskFailure(sparkJobException)) {
+ he = new HiveException(sparkJobException, ErrorMsg.SPARK_TASK_RUNTIME_ERROR,
+ Throwables.getRootCause(sparkJobException).getMessage());
+ } else {
+ he = new HiveException(sparkJobException, ErrorMsg.SPARK_JOB_RUNTIME_ERROR,
+ Throwables.getRootCause(sparkJobException).getMessage());
+ }
+ setException(he);
+ }
+
+ // Set the Monitor Error
+ Throwable monitorError = sparkJobStatus.getMonitorError();
+ if (monitorError != null) {
+ if ((monitorError instanceof InterruptedException) ||
+ (monitorError instanceof HiveException &&
+ monitorError.getCause() instanceof InterruptedException)) {
+ LOG.info("Killing Spark job since query was interrupted");
+ killJob();
+ }
+
+ // Prefer to propagate errors from the Spark job rather than the monitor, as errors from
+ // the Spark job are likely to be more relevant
+ if (getException() == null) {
+ setException(monitorError);
+ }
+ }
+ }
+ }
+
+ private boolean isTaskFailure(Throwable error) {
+ Pattern taskFailedPattern = Pattern.compile("Task.*in stage.*failed.*times");
+ while (error != null) {
+ if (taskFailedPattern.matcher(error.getMessage()).find()) {
+ return true;
+ }
+ error = error.getCause();
+ }
+ return false;
+ }
+
private boolean isOOMError(Throwable error) {
while (error != null) {
if (error instanceof OutOfMemoryError) {
return true;
- } else if (error instanceof SparkException) {
- String sts = Throwables.getStackTraceAsString(error);
- return sts.contains("Container killed by YARN for exceeding memory limits");
+ } else if (error.getMessage().contains("Container killed by YARN for exceeding memory " +
+ "limits")) {
+ return true;
}
error = error.getCause();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
index 0525315..4ce9f53 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
@@ -128,7 +128,7 @@ public class LocalSparkJobMonitor extends SparkJobMonitor {
console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
rc = 1;
done = true;
- sparkJobStatus.setError(e);
+ sparkJobStatus.setMonitorError(e);
} finally {
if (done) {
break;
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index a132f74..98c228b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -77,7 +77,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
HiveException he = new HiveException(ErrorMsg.SPARK_JOB_MONITOR_TIMEOUT,
Long.toString(timeCount));
console.printError(he.getMessage());
- sparkJobStatus.setError(he);
+ sparkJobStatus.setMonitorError(he);
running = false;
done = true;
rc = 2;
@@ -147,29 +147,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
done = true;
break;
case FAILED:
- String detail = sparkJobStatus.getError().getMessage();
- StringBuilder errBuilder = new StringBuilder();
- errBuilder.append("Job failed with ");
- if (detail == null) {
- errBuilder.append("UNKNOWN reason");
- } else {
- // We SerDe the Throwable as String, parse it for the root cause
- final String CAUSE_CAPTION = "Caused by: ";
- int index = detail.lastIndexOf(CAUSE_CAPTION);
- if (index != -1) {
- String rootCause = detail.substring(index + CAUSE_CAPTION.length());
- index = rootCause.indexOf(System.getProperty("line.separator"));
- if (index != -1) {
- errBuilder.append(rootCause.substring(0, index));
- } else {
- errBuilder.append(rootCause);
- }
- } else {
- errBuilder.append(detail);
- }
- detail = System.getProperty("line.separator") + detail;
- }
- console.printError(errBuilder.toString(), detail);
+ LOG.error("Spark job[" + sparkJobStatus.getJobId() + "] failed", sparkJobStatus.getSparkJobException());
running = false;
done = true;
rc = 3;
@@ -202,7 +180,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
}
rc = 1;
done = true;
- sparkJobStatus.setError(finalException);
+ sparkJobStatus.setMonitorError(finalException);
} finally {
if (done) {
break;
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
index 8474afc..1e584f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
@@ -47,7 +47,9 @@ public interface SparkJobStatus {
void cleanup();
- Throwable getError();
+ Throwable getMonitorError();
- void setError(Throwable e);
+ void setMonitorError(Throwable e);
+
+ Throwable getSparkJobException();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
index 03f8a0b..4368eb0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
@@ -22,6 +22,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.google.common.base.Throwables;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -181,10 +183,17 @@ public class LocalSparkJobStatus implements SparkJobStatus {
}
@Override
- public Throwable getError() {
- if (error != null) {
- return error;
- }
+ public Throwable getMonitorError() {
+ return error;
+ }
+
+ @Override
+ public void setMonitorError(Throwable e) {
+ this.error = e;
+ }
+
+ @Override
+ public Throwable getSparkJobException() {
if (future.isDone()) {
try {
future.get();
@@ -195,11 +204,6 @@ public class LocalSparkJobStatus implements SparkJobStatus {
return null;
}
- @Override
- public void setError(Throwable e) {
- this.error = e;
- }
-
private SparkJobInfo getJobInfo() {
return sparkContext.statusTracker().getJobInfo(jobId);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
index ff969e0..e4a53fb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
@@ -165,18 +165,20 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
}
@Override
- public Throwable getError() {
- if (error != null) {
- return error;
- }
- return jobHandle.getError();
+ public Throwable getMonitorError() {
+ return error;
}
@Override
- public void setError(Throwable e) {
+ public void setMonitorError(Throwable e) {
this.error = e;
}
+ @Override
+ public Throwable getSparkJobException() {
+ return jobHandle.getError();
+ }
+
/**
* Indicates whether the remote context is active. SparkJobMonitor can use this to decide whether
* to stop monitoring.
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
index 75b4151..368fa9f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
@@ -17,36 +17,48 @@
*/
package org.apache.hadoop.hive.ql.exec.spark;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
import org.apache.hadoop.hive.ql.exec.spark.status.RemoteSparkJobMonitor;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.spark.client.JobHandle.State;
+
+import org.apache.spark.SparkException;
+
import org.junit.Assert;
import org.junit.Test;
-import org.mockito.Mockito;
public class TestSparkTask {
@Test
public void sparkTask_updates_Metrics() throws IOException {
- Metrics mockMetrics = Mockito.mock(Metrics.class);
+ Metrics mockMetrics = mock(Metrics.class);
SparkTask sparkTask = new SparkTask();
sparkTask.updateTaskMetrics(mockMetrics);
@@ -89,7 +101,7 @@ public class TestSparkTask {
@Test
public void testRemoteSparkCancel() {
- RemoteSparkJobStatus jobSts = Mockito.mock(RemoteSparkJobStatus.class);
+ RemoteSparkJobStatus jobSts = mock(RemoteSparkJobStatus.class);
when(jobSts.getRemoteJobState()).thenReturn(State.CANCELLED);
when(jobSts.isRemoteActive()).thenReturn(true);
HiveConf hiveConf = new HiveConf();
@@ -111,6 +123,109 @@ public class TestSparkTask {
Assert.assertTrue(statsString.contains("1"));
}
+ @Test
+ public void testSetSparkExceptionWithJobError() {
+ SparkTask sparkTask = new SparkTask();
+ SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class);
+
+ ExecutionException ee = new ExecutionException("Exception thrown by job",
+ new SparkException("Job aborted due to stage failure: Not a task or OOM error"));
+
+ when(mockSparkJobStatus.getSparkJobException()).thenReturn(ee);
+
+ sparkTask.setSparkException(mockSparkJobStatus, 3);
+
+ Assert.assertTrue(sparkTask.getException() instanceof HiveException);
+ Assert.assertEquals(((HiveException) sparkTask.getException()).getCanonicalErrorMsg(),
+ ErrorMsg.SPARK_JOB_RUNTIME_ERROR);
+ Assert.assertTrue(sparkTask.getException().getMessage().contains("Not a task or OOM error"));
+ }
+
+ @Test
+ public void testSetSparkExceptionWithTimeoutError() {
+ SparkTask sparkTask = new SparkTask();
+ SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class);
+ when(mockSparkJobStatus.getMonitorError()).thenReturn(new HiveException(ErrorMsg
+ .SPARK_JOB_MONITOR_TIMEOUT, Long.toString(60)));
+
+ sparkTask.setSparkException(mockSparkJobStatus, 3);
+
+ Assert.assertTrue(sparkTask.getException() instanceof HiveException);
+ Assert.assertEquals(((HiveException) sparkTask.getException()).getCanonicalErrorMsg(),
+ ErrorMsg.SPARK_JOB_MONITOR_TIMEOUT);
+ Assert.assertTrue(sparkTask.getException().getMessage().contains("60s"));
+ }
+
+ @Test
+ public void testSetSparkExceptionWithOOMError() {
+ SparkTask sparkTask = new SparkTask();
+ SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class);
+
+ ExecutionException jobError = new ExecutionException(
+ new SparkException("Container killed by YARN for exceeding memory limits"));
+ when(mockSparkJobStatus.getSparkJobException()).thenReturn(jobError);
+
+ sparkTask.setSparkException(mockSparkJobStatus, 3);
+
+ Assert.assertTrue(sparkTask.getException() instanceof HiveException);
+ Assert.assertEquals(((HiveException) sparkTask.getException()).getCanonicalErrorMsg(),
+ ErrorMsg.SPARK_RUNTIME_OOM);
+ }
+
+ @Test
+ public void testSparkExceptionAndMonitorError() {
+ SparkTask sparkTask = new SparkTask();
+ SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class);
+ when(mockSparkJobStatus.getMonitorError()).thenReturn(new RuntimeException());
+ when(mockSparkJobStatus.getSparkJobException()).thenReturn(
+ new ExecutionException(new SparkException("")));
+
+ sparkTask.setSparkException(mockSparkJobStatus, 3);
+
+ Assert.assertTrue(sparkTask.getException() instanceof HiveException);
+ Assert.assertEquals(((HiveException) sparkTask.getException()).getCanonicalErrorMsg(),
+ ErrorMsg.SPARK_JOB_RUNTIME_ERROR);
+ }
+
+ @Test
+ public void testHandleInterruptedException() throws Exception {
+ HiveConf hiveConf = new HiveConf();
+
+ SparkTask sparkTask = new SparkTask();
+ sparkTask.setWork(mock(SparkWork.class));
+
+ DriverContext mockDriverContext = mock(DriverContext.class);
+
+ QueryState mockQueryState = mock(QueryState.class);
+ when(mockQueryState.getConf()).thenReturn(hiveConf);
+
+ sparkTask.initialize(mockQueryState, null, mockDriverContext, null);
+
+ SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class);
+ when(mockSparkJobStatus.getMonitorError()).thenReturn(new InterruptedException());
+
+ SparkSession mockSparkSession = mock(SparkSession.class);
+ SparkJobRef mockSparkJobRef = mock(SparkJobRef.class);
+
+ when(mockSparkJobRef.monitorJob()).thenReturn(2);
+ when(mockSparkJobRef.getSparkJobStatus()).thenReturn(mockSparkJobStatus);
+ when(mockSparkSession.submit(any(), any())).thenReturn(mockSparkJobRef);
+
+ SessionState.start(hiveConf);
+ SessionState.get().setSparkSession(mockSparkSession);
+
+ sparkTask.execute(mockDriverContext);
+
+ verify(mockSparkJobRef, atLeastOnce()).cancelJob();
+
+ when(mockSparkJobStatus.getMonitorError()).thenReturn(
+ new HiveException(new InterruptedException()));
+
+ sparkTask.execute(mockDriverContext);
+
+ verify(mockSparkJobRef, atLeastOnce()).cancelJob();
+ }
+
private boolean isEmptySparkWork(SparkWork sparkWork) {
List<BaseWork> allWorks = sparkWork.getAllWork();
boolean allWorksIsEmtpy = true;
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/test/queries/clientnegative/spark_task_failure.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/spark_task_failure.q b/ql/src/test/queries/clientnegative/spark_task_failure.q
new file mode 100644
index 0000000..7bb8c50
--- /dev/null
+++ b/ql/src/test/queries/clientnegative/spark_task_failure.q
@@ -0,0 +1,9 @@
+ADD FILE ../../data/scripts/error_script;
+
+EXPLAIN
+SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue)
+FROM src;
+
+SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue)
+FROM src;
+
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out b/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out
new file mode 100644
index 0000000..1e68002
--- /dev/null
+++ b/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out
@@ -0,0 +1,53 @@
+PREHOOK: query: EXPLAIN
+SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue)
+FROM src
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue)
+FROM src
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Transform Operator
+ command: error_script
+ output info:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue)
+FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Spark job failed due to task failures: [Error 20003]: An error occurred when trying to close the Operator running your custom script.
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
index 4dbc490..6a988a4 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
@@ -23,9 +23,8 @@ import org.apache.hive.spark.client.metrics.Metrics;
import org.apache.hive.spark.client.rpc.RpcDispatcher;
import org.apache.hive.spark.counter.SparkCounters;
-import com.google.common.base.Throwables;
-abstract class BaseProtocol extends RpcDispatcher {
+public abstract class BaseProtocol extends RpcDispatcher {
protected static class CancelJob implements Serializable {
@@ -97,17 +96,17 @@ abstract class BaseProtocol extends RpcDispatcher {
}
- protected static class JobResult<T extends Serializable> implements Serializable {
+ public static class JobResult<T extends Serializable> implements Serializable {
final String id;
final T result;
- final String error;
+ final Throwable error;
final SparkCounters sparkCounters;
JobResult(String id, T result, Throwable error, SparkCounters sparkCounters) {
this.id = id;
this.result = result;
- this.error = error != null ? Throwables.getStackTraceAsString(error) : null;
+ this.error = error;
this.sparkCounters = sparkCounters;
}
@@ -115,6 +114,15 @@ abstract class BaseProtocol extends RpcDispatcher {
this(null, null, null, null);
}
+ @Override
+ public String toString() {
+ return "JobResult{" +
+ "id='" + id + '\'' +
+ ", result=" + result +
+ ", error=" + error +
+ ", sparkCounters=" + sparkCounters +
+ '}';
+ }
}
protected static class JobStarted implements Serializable {
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/spark-client/src/main/java/org/apache/hive/spark/client/JobResultSerializer.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobResultSerializer.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobResultSerializer.java
new file mode 100644
index 0000000..97cbcf0
--- /dev/null
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobResultSerializer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.Objects;
+
+
+/**
+ * A custom {@link Serializer} for serializing / deserializing {@link BaseProtocol.JobResult}
+ * objects. This class uses Java serialization to write / read the JobResult objects. This has
+ * the nice property that it is able to successfully serialize Java {@link Throwable}s. Whereas
+ * the Kryo serializer cannot (because certain objects in a Throwable don't have public zero-arg
+ * constructors.
+ *
+ * <p>
+ * Given that any developer can write a custom exception that contains non-serializable objects
+ * (e.g. objects that don't implement {@link java.io.Serializable}), this class needs to handle
+ * the case where the given Throwable cannot be serialized by Java. In this case, the
+ * serializer will recursively go through the {@link Throwable} and wrap all objects with a
+ * {@link RuntimeException} which is guaranteed to be serializable.
+ * </p>
+ */
+public class JobResultSerializer extends Serializer<BaseProtocol.JobResult<?>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JobResultSerializer.class);
+
+ @Override
+ public BaseProtocol.JobResult<?> read(Kryo kryo, Input input, Class type) {
+ try {
+ return (BaseProtocol.JobResult<?>) new ObjectInputStream(input).readObject();
+ } catch (Exception e) {
+ throw new KryoException("Error during Java deserialization.", e);
+ }
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, BaseProtocol.JobResult<?> object) {
+ try {
+ safeWriteToOutput(output, object);
+ } catch (Exception e) {
+ LOG.warn("Unable to serialize JobResult object " + object, e);
+
+ BaseProtocol.JobResult<?> serializableJobResult = new BaseProtocol.JobResult<>(object.id,
+ object.result, convertToSerializableSparkException(object.error),
+ object.sparkCounters);
+ try {
+ safeWriteToOutput(output, serializableJobResult);
+ } catch (Exception ex) {
+ throw new KryoException("Error during Java serialization.", ex);
+ }
+ }
+ }
+
+ private void safeWriteToOutput(Output output,
+ BaseProtocol.JobResult<?> jobResult) throws IOException {
+ ByteArrayOutputStream boas = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(boas);
+
+ oos.writeObject(jobResult);
+ oos.flush();
+
+ output.write(boas.toByteArray());
+ output.flush();
+ }
+
+ @VisibleForTesting
+ static RuntimeException convertToSerializableSparkException(Throwable error) {
+ RuntimeException serializableThrowable = new RuntimeException(
+ error.getClass().getName() + ": " + Objects.toString(error.getMessage(), ""),
+ error.getCause() == null ? null : convertToSerializableSparkException(
+ error.getCause()));
+
+ serializableThrowable.setStackTrace(error.getStackTrace());
+
+ Arrays.stream(error.getSuppressed())
+ .map(JobResultSerializer::convertToSerializableSparkException)
+ .forEach(serializableThrowable::addSuppressed);
+
+ return serializableThrowable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index 665ed92..e4f72a3 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -38,7 +38,6 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
-import java.io.PrintStream;
import java.io.Serializable;
import java.io.Writer;
import java.net.URI;
@@ -64,7 +63,6 @@ import org.apache.hive.spark.client.rpc.Rpc;
import org.apache.hive.spark.client.rpc.RpcConfiguration;
import org.apache.hive.spark.client.rpc.RpcServer;
import org.apache.spark.SparkContext;
-import org.apache.spark.SparkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -592,7 +590,7 @@ class SparkClientImpl implements SparkClient {
if (handle != null) {
LOG.info("Received result for {}", msg.id);
handle.setSparkCounters(msg.sparkCounters);
- Throwable error = msg.error != null ? new SparkException(msg.error) : null;
+ Throwable error = msg.error;
if (error == null) {
handle.setSuccess(msg.result);
} else {
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
index 9e789cf..5454ec2 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
@@ -23,7 +23,11 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
+import org.apache.hive.spark.client.BaseProtocol;
+import org.apache.hive.spark.client.JobResultSerializer;
+
import org.objenesis.strategy.StdInstantiatorStrategy;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +35,7 @@ import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.ByteBufferInputStream;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
@@ -60,6 +65,7 @@ class KryoMessageCodec extends ByteToMessageCodec<Object> {
kryo.register(klass, REG_ID_BASE + count);
count++;
}
+ kryo.register(BaseProtocol.JobResult.class, new JobResultSerializer(), count);
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
return kryo;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/spark-client/src/test/java/org/apache/hive/spark/client/TestJobResultSerializer.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestJobResultSerializer.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestJobResultSerializer.java
new file mode 100644
index 0000000..81e12c9
--- /dev/null
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestJobResultSerializer.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import com.google.common.base.Strings;
+import org.apache.hive.spark.counter.SparkCounters;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+
+public class TestJobResultSerializer {
+
+ @Test
+ public void testSerializablableExceptionSingleBlankException() {
+ RuntimeException blankRuntimeException = new RuntimeException();
+
+ RuntimeException serializableException = JobResultSerializer.convertToSerializableSparkException(
+ blankRuntimeException);
+
+ assertException(serializableException, blankRuntimeException);
+ }
+
+ @Test
+ public void testSerializablableExceptionSingleException() {
+ RuntimeException blankRuntimeException = new RuntimeException("hello");
+
+ RuntimeException serializableException = JobResultSerializer.convertToSerializableSparkException(
+ blankRuntimeException);
+
+ assertException(serializableException, blankRuntimeException);
+ }
+
+ @Test
+ public void testSerializablableExceptionNestedBlankException() {
+ RuntimeException nestedBlankRuntimeException = new RuntimeException();
+ RuntimeException blankRuntimeException = new RuntimeException(nestedBlankRuntimeException);
+
+ RuntimeException serializableException = JobResultSerializer.convertToSerializableSparkException(
+ blankRuntimeException);
+
+ assertNestedException(serializableException, blankRuntimeException);
+ }
+
+ @Test
+ public void testSerializablableExceptionNestedException() {
+ RuntimeException nestedRuntimeException = new RuntimeException("hello");
+ RuntimeException blankRuntimeException = new RuntimeException(nestedRuntimeException);
+
+ RuntimeException serializableException = JobResultSerializer.convertToSerializableSparkException(
+ blankRuntimeException);
+
+ assertNestedException(serializableException, blankRuntimeException);
+
+ nestedRuntimeException = new RuntimeException();
+ blankRuntimeException = new RuntimeException("hello", nestedRuntimeException);
+
+ serializableException = JobResultSerializer.convertToSerializableSparkException(
+ blankRuntimeException);
+
+ assertNestedException(serializableException, blankRuntimeException);
+
+ nestedRuntimeException = new RuntimeException("hello");
+ blankRuntimeException = new RuntimeException("hello", nestedRuntimeException);
+
+ serializableException = JobResultSerializer.convertToSerializableSparkException(
+ blankRuntimeException);
+
+ assertNestedException(serializableException, blankRuntimeException);
+ }
+
+ private void assertException(Throwable serializedException, Throwable originalException) {
+ Assert.assertEquals(originalException.getClass().getName() + ": " + Strings.nullToEmpty(
+ originalException.getMessage()), serializedException.getMessage());
+ Assert.assertArrayEquals(originalException.getStackTrace(),
+ serializedException.getStackTrace());
+ }
+
+ private void assertNestedException(Throwable serializedException, Throwable originalException) {
+ assertException(serializedException, originalException);
+ assertException(serializedException.getCause(), originalException.getCause());
+ }
+
+ @Test
+ public void testSerializeNonSerializableObject() {
+ Kryo kryo = new Kryo();
+ kryo.addDefaultSerializer(BaseProtocol.JobResult.class, new JobResultSerializer());
+
+ ByteArrayOutputStream boas = new ByteArrayOutputStream();
+ Output output = new Output(boas);
+
+ String id = "1";
+ String result = "result";
+ SparkCounters counters = new SparkCounters(null);
+
+ BaseProtocol.JobResult<String> jobResult = new BaseProtocol.JobResult<>(id, result, new
+ NonSerializableException("content"), counters);
+
+ kryo.writeClassAndObject(output, jobResult);
+ output.flush();
+
+ Input kryoIn = new Input(new ByteArrayInputStream(boas.toByteArray()));
+ Object deserializedObject = kryo.readClassAndObject(kryoIn);
+
+ Assert.assertTrue(deserializedObject instanceof BaseProtocol.JobResult);
+
+ BaseProtocol.JobResult<String> deserializedJobResult = (BaseProtocol.JobResult<String>) deserializedObject;
+
+ Assert.assertEquals(id, deserializedJobResult.id);
+ Assert.assertEquals(result, deserializedJobResult.result);
+ Assert.assertEquals(counters.toString(), deserializedJobResult.sparkCounters.toString());
+ Assert.assertTrue(deserializedJobResult.error instanceof RuntimeException);
+ }
+
+ private static final class NonSerializableException extends Exception {
+
+ private static final long serialVersionUID = 2548414562750016219L;
+
+ private final NonSerializableObject nonSerializableObject;
+
+ private NonSerializableException(String content) {
+ this.nonSerializableObject = new NonSerializableObject(content);
+ }
+ }
+
+ private static final class NonSerializableObject {
+
+ private String content;
+
+ private NonSerializableObject(String content) {
+ this.content = content;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
index fb31c93..fdf882b 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
@@ -133,8 +133,36 @@ public class TestSparkClient {
handle.get(TIMEOUT, TimeUnit.SECONDS);
fail("Should have thrown an exception.");
} catch (ExecutionException ee) {
- assertTrue(ee.getCause() instanceof SparkException);
- assertTrue(ee.getCause().getMessage().contains("IllegalStateException: Hello"));
+ assertTrue(ee.getCause() instanceof IllegalStateException);
+ assertTrue(ee.getCause().getMessage().contains("Hello"));
+ }
+
+ // Try an invalid state transition on the handle. This ensures that the actual state
+ // change we're interested in actually happened, since internally the handle serializes
+ // state changes.
+ assertFalse(((JobHandleImpl<String>)handle).changeState(JobHandle.State.SENT));
+
+ verify(listener).onJobQueued(handle);
+ verify(listener).onJobStarted(handle);
+ verify(listener).onJobFailed(same(handle), any(Throwable.class));
+ }
+ });
+ }
+
+ @Test
+ public void testErrorJobNotSerializable() throws Exception {
+ runTest(new TestFunction() {
+ @Override
+ public void call(SparkClient client) throws Exception {
+ JobHandle.Listener<String> listener = newListener();
+ List<JobHandle.Listener<String>> listeners = Lists.newArrayList(listener);
+ JobHandle<String> handle = client.submit(new ErrorJobNotSerializable(), listeners);
+ try {
+ handle.get(TIMEOUT, TimeUnit.SECONDS);
+ fail("Should have thrown an exception.");
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof RuntimeException);
+ assertTrue(ee.getCause().getMessage().contains("Hello"));
}
// Try an invalid state transition on the handle. This ensures that the actual state
@@ -331,6 +359,35 @@ public class TestSparkClient {
}
+ private static class ErrorJobNotSerializable implements Job<String> {
+
+ private static final class NonSerializableException extends Exception {
+
+ private static final long serialVersionUID = 2548414562750016219L;
+
+ private final NonSerializableObject nonSerializableObject;
+
+ private NonSerializableException(String content) {
+ super("Hello");
+ this.nonSerializableObject = new NonSerializableObject(content);
+ }
+ }
+
+ private static final class NonSerializableObject {
+
+ String content;
+
+ private NonSerializableObject(String content) {
+ this.content = content;
+ }
+ }
+
+ @Override
+ public String call(JobContext jc) throws NonSerializableException {
+ throw new NonSerializableException("Hello");
+ }
+ }
+
private static class SparkJob implements Job<Long> {
@Override