You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/28 07:59:07 UTC

[kylin] branch kylin5 updated: KYLIN-5372 If the Job occurs permission exception error ,then let the Job failed instead of retry.(#29259)

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new 0f81c131fc KYLIN-5372 If the Job occurs permission exception error ,then let the Job failed instead of retry.(#29259)
0f81c131fc is described below

commit 0f81c131fc2e11b654b7dd2e7290213e41a6ff90
Author: huangsheng <hu...@163.com>
AuthorDate: Sat Oct 29 10:48:23 2022 +0800

    KYLIN-5372 If the Job occurs permission exception error ,then let the Job failed instead of retry.(#29259)
    
    KYLIN-5372 If the Job occurs permission exception ,then let the Job failed  instead of retry.
---
 .../engine/spark/application/SparkApplication.java |   5 +
 .../org/apache/spark/application/JobWorker.scala   |   8 +-
 .../apache/spark/application/TestJobMonitor.scala  |   2 +-
 .../apache/spark/application/TestJobWorker.scala   | 141 ++++++++++++++++++++-
 4 files changed, 153 insertions(+), 3 deletions(-)

diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 9cc144902d..2e8a503156 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.kylin.cluster.IClusterManager;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
@@ -324,6 +325,10 @@ public abstract class SparkApplication implements Application {
     }
 
     protected void handleException(Exception e) throws Exception {
+        if (e instanceof AccessControlException) {
+            logger.error("Permission denied.", e);
+            throw new NoRetryException("Permission denied.");
+        }
         throw e;
     }
 
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
index 069ac171cc..77b8834f02 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
@@ -18,6 +18,7 @@
 
 package org.apache.spark.application
 
+
 import java.util.concurrent.Executors
 
 import org.apache.kylin.engine.spark.application.SparkApplication
@@ -48,7 +49,6 @@ class JobWorker(application: SparkApplication, args: Array[String], eventLoop: K
     execute()
   }
 
-
   private def execute(): Unit = {
     pool.execute(new Runnable {
       override def run(): Unit = {
@@ -56,6 +56,12 @@ class JobWorker(application: SparkApplication, args: Array[String], eventLoop: K
           application.execute(args)
           eventLoop.post(JobSucceeded())
         } catch {
+          // Compatible with runtime exceptions thrown by the SparkApplication.execute(args: Array[String])
+          case runtimeException: RuntimeException =>
+            runtimeException.getCause match {
+              case noRetryException: NoRetryException => eventLoop.post(UnknownThrowable(noRetryException))
+              case throwable: Throwable => eventLoop.post(ResourceLack(throwable))
+            }
           case exception: NoRetryException => eventLoop.post(UnknownThrowable(exception))
           case throwable: Throwable => eventLoop.post(ResourceLack(throwable))
         }
diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala
index b9a1ff87c2..1864d8749c 100644
--- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala
+++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala
@@ -22,6 +22,7 @@ import java.util
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicBoolean
 import com.amazonaws.services.s3.model.AmazonS3Exception
+import org.apache.hadoop.security.AccessControlException
 import org.apache.kylin.cluster.{AvailableResource, IClusterManager, ResourceInfo}
 import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.engine.spark.job.KylinBuildEnv
@@ -307,7 +308,6 @@ class TestJobMonitor extends SparderBaseFunSuite with BeforeAndAfterEach {
     }
   }
 
-
   test("post JobFailed event when receive class not found event") {
     withEventLoop { eventLoop =>
       Mockito.when(config.getSparkEngineMaxRetryTime).thenReturn(1)
diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
index 73035a24c3..64afc26b85 100644
--- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
+++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
@@ -18,9 +18,10 @@
 
 package org.apache.spark.application
 
+import org.apache.hadoop.security.AccessControlException
+
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicBoolean
-
 import org.apache.kylin.engine.spark.application.SparkApplication
 import org.apache.kylin.engine.spark.scheduler._
 import org.apache.spark.scheduler.KylinJobEventLoop
@@ -53,6 +54,30 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter {
     eventLoop.stop()
   }
 
+  test("post ResourceLack event when job failed with runtime exception for lack of resource") {
+    val eventLoop = new KylinJobEventLoop
+    eventLoop.start()
+    val worker = new JobWorker(new ResourceLackJobWithRuntimeException(), Array.empty, eventLoop)
+    val latch = new CountDownLatch(2)
+    val receiveResourceLack = new AtomicBoolean(false)
+    val listener = new KylinJobListener {
+      override def onReceive(event: KylinJobEvent): Unit = {
+        if (event.isInstanceOf[ResourceLack]) {
+          receiveResourceLack.getAndSet(true)
+        }
+        latch.countDown()
+      }
+    }
+    eventLoop.registerListener(listener)
+    eventLoop.post(RunJob())
+    // receive RunJob and ResourceLack
+    latch.await()
+    assert(receiveResourceLack.get())
+    eventLoop.unregisterListener(listener)
+    worker.stop()
+    eventLoop.stop()
+  }
+
   test("post JobSucceeded event when job succeeded") {
     val eventLoop = new KylinJobEventLoop
     eventLoop.start()
@@ -100,6 +125,78 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter {
     worker.stop()
     eventLoop.stop()
   }
+
+  test("post Permission denied event when PermissionDenied occurred with handle Exception function") {
+    val eventLoop = new KylinJobEventLoop
+    eventLoop.start()
+    val worker = new JobWorker(new PermissionDeniedJobWithHandleException(), Array.empty, eventLoop)
+    val latch = new CountDownLatch(2)
+    val receivePermissionDenied = new AtomicBoolean(false)
+    val listener = new KylinJobListener {
+      override def onReceive(event: KylinJobEvent): Unit = {
+        if (event.isInstanceOf[UnknownThrowable]) {
+          receivePermissionDenied.getAndSet(true)
+        }
+        latch.countDown()
+      }
+    }
+    eventLoop.registerListener(listener)
+    eventLoop.post(RunJob())
+    // receive RunJob and PermissionDenied
+    latch.await()
+    assert(receivePermissionDenied.get())
+    eventLoop.unregisterListener(listener)
+    worker.stop()
+    eventLoop.stop()
+  }
+
+  test("post Permission denied event when RuntimeException occurred") {
+    val eventLoop = new KylinJobEventLoop
+    eventLoop.start()
+    val worker = new JobWorker(new PermissionDeniedJobWithRuntimeException(), Array.empty, eventLoop)
+    val latch = new CountDownLatch(2)
+    val receivePermissionDenied = new AtomicBoolean(false)
+    val listener = new KylinJobListener {
+      override def onReceive(event: KylinJobEvent): Unit = {
+        if (event.isInstanceOf[UnknownThrowable]) {
+          receivePermissionDenied.getAndSet(true)
+        }
+        latch.countDown()
+      }
+    }
+    eventLoop.registerListener(listener)
+    eventLoop.post(RunJob())
+    // receive RunJob and PermissionDenied
+    latch.await()
+    assert(receivePermissionDenied.get())
+    eventLoop.unregisterListener(listener)
+    worker.stop()
+    eventLoop.stop()
+  }
+
+  test("post Permission denied event when AccessControlException occurred") {
+    val eventLoop = new KylinJobEventLoop
+    eventLoop.start()
+    val worker = new JobWorker(new PermissionDeniedJobWithNoRetryException(), Array.empty, eventLoop)
+    val latch = new CountDownLatch(2)
+    val receivePermissionDenied = new AtomicBoolean(false)
+    val listener = new KylinJobListener {
+      override def onReceive(event: KylinJobEvent): Unit = {
+        if (event.isInstanceOf[UnknownThrowable]) {
+          receivePermissionDenied.getAndSet(true)
+        }
+        latch.countDown()
+      }
+    }
+    eventLoop.registerListener(listener)
+    eventLoop.post(RunJob())
+    // receive RunJob and PermissionDenied
+    latch.await()
+    assert(receivePermissionDenied.get())
+    eventLoop.unregisterListener(listener)
+    worker.stop()
+    eventLoop.stop()
+  }
 }
 
 class UnknownThrowableJob extends SparkApplication {
@@ -110,6 +207,35 @@ class UnknownThrowableJob extends SparkApplication {
   override protected def doExecute(): Unit = {}
 }
 
+class PermissionDeniedJobWithHandleException extends SparkApplication {
+  override def execute(args: Array[String]): Unit = {
+    try {
+      throw new AccessControlException()
+    } catch {
+      case e : Exception => handleException(e)
+    }
+  }
+  override protected def doExecute(): Unit = {}
+}
+
+class PermissionDeniedJobWithRuntimeException extends SparkApplication {
+  override def execute(args: Array[String]): Unit = {
+    try {
+      throw new AccessControlException()
+    } catch {
+      case e : Exception => throw new RuntimeException("Error execute " + this.getClass.getName, new NoRetryException("Permission denied."))
+    }
+  }
+  override protected def doExecute(): Unit = {}
+}
+
+class PermissionDeniedJobWithNoRetryException extends SparkApplication {
+  override def execute(args: Array[String]): Unit = {
+      throw new NoRetryException("Permission Denied")
+  }
+  override protected def doExecute(): Unit = {}
+}
+
 class ResourceLackJob extends SparkApplication {
 
   override def execute(args: Array[String]): Unit = {
@@ -119,6 +245,19 @@ class ResourceLackJob extends SparkApplication {
   override protected def doExecute(): Unit = {}
 }
 
+class ResourceLackJobWithRuntimeException extends SparkApplication {
+
+  override def execute(args: Array[String]): Unit = {
+    try {
+      throw new Exception()
+    } catch {
+      case e: Exception => throw new RuntimeException("Error execute " + this.getClass.getName, e)
+    }
+  }
+
+  override protected def doExecute(): Unit = {}
+}
+
 class MockSucceedJob extends SparkApplication {
   override def execute(args: Array[String]): Unit = {}