You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/02/07 07:09:40 UTC

[kylin] 11/15: KYLIN-5409 avoid permission denied job retry

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit b34babb851439bca4a068aed21d2ba5d5f40914f
Author: huangsheng <hu...@163.com>
AuthorDate: Thu Nov 24 15:54:40 2022 +0800

    KYLIN-5409 avoid permission denied job retry
---
 .../engine/spark/application/SparkApplication.java |  17 ++++
 .../apache/spark/application/TestJobWorker.scala   | 110 +++++++++++++++++++++
 2 files changed, 127 insertions(+)

diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 42117ac398..5bc553d353 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -60,6 +60,7 @@ import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.query.pushdown.SparkSubmitter;
 import org.apache.kylin.query.util.PushDownUtil;
 import org.apache.spark.SparkConf;
+import org.apache.spark.SparkException;
 import org.apache.spark.application.NoRetryException;
 import org.apache.spark.launcher.SparkLauncher;
 import org.apache.spark.sql.KylinSession;
@@ -323,16 +324,32 @@ public abstract class SparkApplication implements Application {
             executeFinish();
         }
     }
+
     protected void handleException(Exception e) throws Exception {
         if (e instanceof AccessControlException) {
             interceptAccessControlException(e);
         }
         if (e instanceof RuntimeException && e.getCause() instanceof AccessControlException) {
             interceptAccessControlException(e.getCause());
+        } else if (e instanceof RuntimeException && e.getCause() instanceof SparkException) {
+            Throwable rootCause = extractRealRootCauseFromSparkException(e);
+            if (rootCause instanceof AccessControlException) {
+                interceptAccessControlException(e);
+            }
         }
         throw e;
     }
 
+    // Extract the real root exception that caused the spark job to fail.
+    // For example. Intercepts Spark Job that fail due to  permissions exception to prevent unnecessary retry from wasting resources
+    protected Throwable extractRealRootCauseFromSparkException(Exception e) {
+        Throwable rootCause = e.getCause();
+        while (rootCause instanceof SparkException) {
+            rootCause = rootCause.getCause();
+        }
+        return rootCause;
+    }
+
     // Permission exception will not be retried. Simply let the job fail.
     protected void interceptAccessControlException(Throwable e) throws NoRetryException{
         logger.error("Permission denied.", e);
diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
index c1cfa95387..06284a83e3 100644
--- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
+++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import org.apache.kylin.engine.spark.application.SparkApplication
 import org.apache.kylin.engine.spark.scheduler._
+import org.apache.spark.SparkException
 import org.apache.spark.scheduler.KylinJobEventLoop
 import org.apache.spark.sql.common.SparderBaseFunSuite
 import org.scalatest.BeforeAndAfter
@@ -222,6 +223,54 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter {
     eventLoop.stop()
   }
 
+  test("post Permission denied event when PermissionDenied occurred with Spark Exception wraped") {
+    val eventLoop = new KylinJobEventLoop
+    eventLoop.start()
+    val worker = new JobWorker(new HandlePermissionDeniedJobWithSparkExceptionWraped(), Array.empty, eventLoop)
+    val latch = new CountDownLatch(2)
+    val receivePermissionDenied = new AtomicBoolean(false)
+    val listener = new KylinJobListener {
+      override def onReceive(event: KylinJobEvent): Unit = {
+        if (event.isInstanceOf[UnknownThrowable]) {
+          receivePermissionDenied.getAndSet(true)
+        }
+        latch.countDown()
+      }
+    }
+    eventLoop.registerListener(listener)
+    eventLoop.post(RunJob())
+    // receive RunJob and PermissionDenied
+    latch.await(30, TimeUnit.SECONDS)
+    assert(receivePermissionDenied.get())
+    eventLoop.unregisterListener(listener)
+    worker.stop()
+    eventLoop.stop()
+  }
+
+  test("post Permission denied event when PermissionDenied occurred with multiple Spark Exception wraped") {
+    val eventLoop = new KylinJobEventLoop
+    eventLoop.start()
+    val worker = new JobWorker(new HandlePermissionDeniedJobWithMultipleSparkExceptionWraped(), Array.empty, eventLoop)
+    val latch = new CountDownLatch(2)
+    val receivePermissionDenied = new AtomicBoolean(false)
+    val listener = new KylinJobListener {
+      override def onReceive(event: KylinJobEvent): Unit = {
+        if (event.isInstanceOf[UnknownThrowable]) {
+          receivePermissionDenied.getAndSet(true)
+        }
+        latch.countDown()
+      }
+    }
+    eventLoop.registerListener(listener)
+    eventLoop.post(RunJob())
+    // receive RunJob and PermissionDenied
+    latch.await(30, TimeUnit.SECONDS)
+    assert(receivePermissionDenied.get())
+    eventLoop.unregisterListener(listener)
+    worker.stop()
+    eventLoop.stop()
+  }
+
   test("post ResourceLack event when job failed for lack of resource with RuntimeException wraped") {
     val eventLoop = new KylinJobEventLoop
     eventLoop.start()
@@ -246,6 +295,30 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter {
     eventLoop.stop()
   }
 
+  test("post ResourceLack event when job failed for lack of resource with Spark Exception wraped") {
+    val eventLoop = new KylinJobEventLoop
+    eventLoop.start()
+    val worker = new JobWorker(new HandleResourceLackJobWithSparkExceptionWraped(), Array.empty, eventLoop)
+    val latch = new CountDownLatch(2)
+    val receivePermissionDenied = new AtomicBoolean(false)
+    val listener = new KylinJobListener {
+      override def onReceive(event: KylinJobEvent): Unit = {
+        if (event.isInstanceOf[ResourceLack]) {
+          receivePermissionDenied.getAndSet(true)
+        }
+        latch.countDown()
+      }
+    }
+    eventLoop.registerListener(listener)
+    eventLoop.post(RunJob())
+    // receive RunJob and PermissionDenied
+    latch.await(30, TimeUnit.SECONDS)
+    assert(receivePermissionDenied.get())
+    eventLoop.unregisterListener(listener)
+    worker.stop()
+    eventLoop.stop()
+  }
+
   test("post Permission denied event when RuntimeException occurred") {
     val eventLoop = new KylinJobEventLoop
     eventLoop.start()
@@ -338,6 +411,7 @@ class ResourceLackJobWithNonAccessControlException extends SparkApplication {
   override protected def doExecute(): Unit = {}
 }
 
+
 class HandlePermissionDeniedJobWithRuntimeExceptionWraped extends SparkApplication {
   override def execute(args: Array[String]): Unit = {
     try {
@@ -349,6 +423,42 @@ class HandlePermissionDeniedJobWithRuntimeExceptionWraped extends SparkApplicati
   override protected def doExecute(): Unit = {}
 }
 
+class HandlePermissionDeniedJobWithSparkExceptionWraped extends SparkApplication {
+  override def execute(args: Array[String]): Unit = {
+    try {
+      throw new AccessControlException()
+    } catch {
+      case e: Exception => handleException(new RuntimeException(new SparkException("PermissionDenied", e)))
+    }
+  }
+  override protected def doExecute(): Unit = {}
+}
+
+class HandlePermissionDeniedJobWithMultipleSparkExceptionWraped extends SparkApplication {
+  override def execute(args: Array[String]): Unit = {
+    try {
+      throw new AccessControlException()
+    } catch {
+      case e: Exception => handleException(new RuntimeException(
+        new SparkException("Exception thrown in awaitResult", new SparkException("Job aborted", e))
+      ))
+    }
+  }
+
+  override protected def doExecute(): Unit = {}
+}
+
+class HandleResourceLackJobWithSparkExceptionWraped extends SparkApplication {
+  override def execute(args: Array[String]): Unit = {
+    try {
+      throw new Exception()
+    } catch {
+      case e: Exception => handleException(new RuntimeException(new SparkException("Exception", e)))
+    }
+  }
+  override protected def doExecute(): Unit = {}
+}
+
 class HandleResourceLackJobWithRuntimeExceptionWraped extends SparkApplication {
   override def execute(args: Array[String]): Unit = {
     try {