You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by yc...@apache.org on 2018/02/13 15:25:11 UTC
hive git commit: HIVE-18671: Lock not released after Hive on Spark
query was cancelled (Yongzhi Chen, reviewed by Aihua Xu)
Repository: hive
Updated Branches:
refs/heads/master 1d15990ad -> 9a02aa86b
HIVE-18671: Lock not released after Hive on Spark query was cancelled (Yongzhi Chen, reviewed by Aihua Xu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9a02aa86
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9a02aa86
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9a02aa86
Branch: refs/heads/master
Commit: 9a02aa86b9fe4b68681ba1c7129d5028f24791c9
Parents: 1d15990
Author: Yongzhi Chen <yc...@apache.org>
Authored: Tue Feb 13 10:03:53 2018 -0500
Committer: Yongzhi Chen <yc...@apache.org>
Committed: Tue Feb 13 10:24:34 2018 -0500
----------------------------------------------------------------------
.../ql/exec/spark/status/RemoteSparkJobMonitor.java | 6 ++++++
.../hadoop/hive/ql/exec/spark/TestSparkTask.java | 16 ++++++++++++++++
2 files changed, 22 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9a02aa86/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index 22f7024..fc4e4de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -174,6 +174,12 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
done = true;
rc = 3;
break;
+ case CANCELLED:
+ console.printInfo("Status: Cancelled");
+ running = false;
+ done = true;
+ rc = 3;
+ break;
}
if (!done) {
http://git-wip-us.apache.org/repos/asf/hive/blob/9a02aa86/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
index 928ecc0..435c6b6 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.spark;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
@@ -27,10 +28,14 @@ import java.util.List;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.spark.status.RemoteSparkJobMonitor;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hive.spark.client.JobHandle.State;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -81,6 +86,17 @@ public class TestSparkTask {
Assert.assertEquals(child1.getParentTasks().size(), 0);
}
+ @Test
+ public void testRemoteSparkCancel() {
+ RemoteSparkJobStatus jobSts = Mockito.mock(RemoteSparkJobStatus.class);
+ when(jobSts.getRemoteJobState()).thenReturn(State.CANCELLED);
+ when(jobSts.isRemoteActive()).thenReturn(true);
+ HiveConf hiveConf = new HiveConf();
+ RemoteSparkJobMonitor remoteSparkJobMonitor = new RemoteSparkJobMonitor(hiveConf, jobSts);
+ Assert.assertEquals(remoteSparkJobMonitor.startMonitor(), 3);
+ }
+
+
private boolean isEmptySparkWork(SparkWork sparkWork) {
List<BaseWork> allWorks = sparkWork.getAllWork();
boolean allWorksIsEmtpy = true;