You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/02/07 07:09:40 UTC
[kylin] 11/15: KYLIN-5409 avoid permission denied job retry
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit b34babb851439bca4a068aed21d2ba5d5f40914f
Author: huangsheng <hu...@163.com>
AuthorDate: Thu Nov 24 15:54:40 2022 +0800
KYLIN-5409 avoid permission denied job retry
---
.../engine/spark/application/SparkApplication.java | 17 ++++
.../apache/spark/application/TestJobWorker.scala | 110 +++++++++++++++++++++
2 files changed, 127 insertions(+)
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 42117ac398..5bc553d353 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -60,6 +60,7 @@ import org.apache.kylin.metadata.model.PartitionDesc;
import org.apache.kylin.query.pushdown.SparkSubmitter;
import org.apache.kylin.query.util.PushDownUtil;
import org.apache.spark.SparkConf;
+import org.apache.spark.SparkException;
import org.apache.spark.application.NoRetryException;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.sql.KylinSession;
@@ -323,16 +324,32 @@ public abstract class SparkApplication implements Application {
executeFinish();
}
}
+
protected void handleException(Exception e) throws Exception {
if (e instanceof AccessControlException) {
interceptAccessControlException(e);
}
if (e instanceof RuntimeException && e.getCause() instanceof AccessControlException) {
interceptAccessControlException(e.getCause());
+ } else if (e instanceof RuntimeException && e.getCause() instanceof SparkException) {
+ Throwable rootCause = extractRealRootCauseFromSparkException(e);
+ if (rootCause instanceof AccessControlException) {
+ interceptAccessControlException(e);
+ }
}
throw e;
}
+ // Extract the real root exception that caused the spark job to fail.
+ // For example. Intercepts Spark Job that fail due to permissions exception to prevent unnecessary retry from wasting resources
+ protected Throwable extractRealRootCauseFromSparkException(Exception e) {
+ Throwable rootCause = e.getCause();
+ while (rootCause instanceof SparkException) {
+ rootCause = rootCause.getCause();
+ }
+ return rootCause;
+ }
+
// Permission exception will not be retried. Simply let the job fail.
protected void interceptAccessControlException(Throwable e) throws NoRetryException{
logger.error("Permission denied.", e);
diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
index c1cfa95387..06284a83e3 100644
--- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
+++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.kylin.engine.spark.application.SparkApplication
import org.apache.kylin.engine.spark.scheduler._
+import org.apache.spark.SparkException
import org.apache.spark.scheduler.KylinJobEventLoop
import org.apache.spark.sql.common.SparderBaseFunSuite
import org.scalatest.BeforeAndAfter
@@ -222,6 +223,54 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter {
eventLoop.stop()
}
+ test("post Permission denied event when PermissionDenied occurred with Spark Exception wraped") {
+ val eventLoop = new KylinJobEventLoop
+ eventLoop.start()
+ val worker = new JobWorker(new HandlePermissionDeniedJobWithSparkExceptionWraped(), Array.empty, eventLoop)
+ val latch = new CountDownLatch(2)
+ val receivePermissionDenied = new AtomicBoolean(false)
+ val listener = new KylinJobListener {
+ override def onReceive(event: KylinJobEvent): Unit = {
+ if (event.isInstanceOf[UnknownThrowable]) {
+ receivePermissionDenied.getAndSet(true)
+ }
+ latch.countDown()
+ }
+ }
+ eventLoop.registerListener(listener)
+ eventLoop.post(RunJob())
+ // receive RunJob and PermissionDenied
+ latch.await(30, TimeUnit.SECONDS)
+ assert(receivePermissionDenied.get())
+ eventLoop.unregisterListener(listener)
+ worker.stop()
+ eventLoop.stop()
+ }
+
+ test("post Permission denied event when PermissionDenied occurred with multiple Spark Exception wraped") {
+ val eventLoop = new KylinJobEventLoop
+ eventLoop.start()
+ val worker = new JobWorker(new HandlePermissionDeniedJobWithMultipleSparkExceptionWraped(), Array.empty, eventLoop)
+ val latch = new CountDownLatch(2)
+ val receivePermissionDenied = new AtomicBoolean(false)
+ val listener = new KylinJobListener {
+ override def onReceive(event: KylinJobEvent): Unit = {
+ if (event.isInstanceOf[UnknownThrowable]) {
+ receivePermissionDenied.getAndSet(true)
+ }
+ latch.countDown()
+ }
+ }
+ eventLoop.registerListener(listener)
+ eventLoop.post(RunJob())
+ // receive RunJob and PermissionDenied
+ latch.await(30, TimeUnit.SECONDS)
+ assert(receivePermissionDenied.get())
+ eventLoop.unregisterListener(listener)
+ worker.stop()
+ eventLoop.stop()
+ }
+
test("post ResourceLack event when job failed for lack of resource with RuntimeException wraped") {
val eventLoop = new KylinJobEventLoop
eventLoop.start()
@@ -246,6 +295,30 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter {
eventLoop.stop()
}
+ test("post ResourceLack event when job failed for lack of resource with Spark Exception wraped") {
+ val eventLoop = new KylinJobEventLoop
+ eventLoop.start()
+ val worker = new JobWorker(new HandleResourceLackJobWithSparkExceptionWraped(), Array.empty, eventLoop)
+ val latch = new CountDownLatch(2)
+ val receivePermissionDenied = new AtomicBoolean(false)
+ val listener = new KylinJobListener {
+ override def onReceive(event: KylinJobEvent): Unit = {
+ if (event.isInstanceOf[ResourceLack]) {
+ receivePermissionDenied.getAndSet(true)
+ }
+ latch.countDown()
+ }
+ }
+ eventLoop.registerListener(listener)
+ eventLoop.post(RunJob())
+ // receive RunJob and PermissionDenied
+ latch.await(30, TimeUnit.SECONDS)
+ assert(receivePermissionDenied.get())
+ eventLoop.unregisterListener(listener)
+ worker.stop()
+ eventLoop.stop()
+ }
+
test("post Permission denied event when RuntimeException occurred") {
val eventLoop = new KylinJobEventLoop
eventLoop.start()
@@ -338,6 +411,7 @@ class ResourceLackJobWithNonAccessControlException extends SparkApplication {
override protected def doExecute(): Unit = {}
}
+
class HandlePermissionDeniedJobWithRuntimeExceptionWraped extends SparkApplication {
override def execute(args: Array[String]): Unit = {
try {
@@ -349,6 +423,42 @@ class HandlePermissionDeniedJobWithRuntimeExceptionWraped extends SparkApplicati
override protected def doExecute(): Unit = {}
}
+class HandlePermissionDeniedJobWithSparkExceptionWraped extends SparkApplication {
+ override def execute(args: Array[String]): Unit = {
+ try {
+ throw new AccessControlException()
+ } catch {
+ case e: Exception => handleException(new RuntimeException(new SparkException("PermissionDenied", e)))
+ }
+ }
+ override protected def doExecute(): Unit = {}
+}
+
+class HandlePermissionDeniedJobWithMultipleSparkExceptionWraped extends SparkApplication {
+ override def execute(args: Array[String]): Unit = {
+ try {
+ throw new AccessControlException()
+ } catch {
+ case e: Exception => handleException(new RuntimeException(
+ new SparkException("Exception thrown in awaitResult", new SparkException("Job aborted", e))
+ ))
+ }
+ }
+
+ override protected def doExecute(): Unit = {}
+}
+
+class HandleResourceLackJobWithSparkExceptionWraped extends SparkApplication {
+ override def execute(args: Array[String]): Unit = {
+ try {
+ throw new Exception()
+ } catch {
+ case e: Exception => handleException(new RuntimeException(new SparkException("Exception", e)))
+ }
+ }
+ override protected def doExecute(): Unit = {}
+}
+
class HandleResourceLackJobWithRuntimeExceptionWraped extends SparkApplication {
override def execute(args: Array[String]): Unit = {
try {