You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/09/05 17:50:45 UTC
kudu git commit: feat: add task interruption checking for RowIterator
Repository: kudu
Updated Branches:
refs/heads/master d81153867 -> ba3cceea6
feat: add task interruption checking for RowIterator
Once we the submit a spark job to spark cluster, we cannot cancel or
stop the kudu-spark job immediately before checking task interruption,
we can kill the job immediately by clicking the "kill" link on
the spark Web UI.
Change-Id: I0b4284f2c0a40cd7ba8cf2b76e0403592552c814
Reviewed-on: http://gerrit.cloudera.org:8080/7753
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ba3cceea
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ba3cceea
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ba3cceea
Branch: refs/heads/master
Commit: ba3cceea699a8a1416a58e7ae705944c0ad87ea8
Parents: d811538
Author: caiconghui <ca...@xiaomi.com>
Authored: Mon Aug 21 10:05:36 2017 +0800
Committer: Dan Burkert <da...@apache.org>
Committed: Tue Sep 5 17:49:58 2017 +0000
----------------------------------------------------------------------
.../src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/ba3cceea/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index 68c98b6..c1b45eb 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -87,6 +87,9 @@ private class RowIterator(private val scanner: KuduScanner) extends Iterator[Row
override def hasNext: Boolean = {
while ((currentIterator != null && !currentIterator.hasNext && scanner.hasMoreRows) ||
(scanner.hasMoreRows && currentIterator == null)) {
+ if (TaskContext.get().isInterrupted()) {
+ throw new RuntimeException("Kudu task interrupted")
+ }
currentIterator = scanner.nextRows()
}
currentIterator.hasNext