You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/01/06 11:10:11 UTC

[kylin] 06/12: Revert KYLIN-5372 due to this fix cannot override snapshot build scenarios

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 1edec930c1c6331d4c98452061c8820f2d70f4ae
Author: huangsheng <hu...@163.com>
AuthorDate: Fri Nov 4 16:37:34 2022 +0800

    Revert KYLIN-5372 due to this fix cannot override snapshot build scenarios
---
 .../engine/spark/application/SparkApplication.java |   5 -
 .../org/apache/spark/application/JobWorker.scala   |   8 +-
 .../apache/spark/application/TestJobMonitor.scala  |   2 +-
 .../apache/spark/application/TestJobWorker.scala   | 141 +--------------------
 4 files changed, 3 insertions(+), 153 deletions(-)

diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 2e8a503156..9cc144902d 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.kylin.cluster.IClusterManager;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
@@ -325,10 +324,6 @@ public abstract class SparkApplication implements Application {
     }
 
     protected void handleException(Exception e) throws Exception {
-        if (e instanceof AccessControlException) {
-            logger.error("Permission denied.", e);
-            throw new NoRetryException("Permission denied.");
-        }
         throw e;
     }
 
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
index 77b8834f02..069ac171cc 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
@@ -18,7 +18,6 @@
 
 package org.apache.spark.application
 
-
 import java.util.concurrent.Executors
 
 import org.apache.kylin.engine.spark.application.SparkApplication
@@ -49,6 +48,7 @@ class JobWorker(application: SparkApplication, args: Array[String], eventLoop: K
     execute()
   }
 
+
   private def execute(): Unit = {
     pool.execute(new Runnable {
       override def run(): Unit = {
@@ -56,12 +56,6 @@ class JobWorker(application: SparkApplication, args: Array[String], eventLoop: K
           application.execute(args)
           eventLoop.post(JobSucceeded())
         } catch {
-          // Compatible with runtime exceptions thrown by the SparkApplication.execute(args: Array[String])
-          case runtimeException: RuntimeException =>
-            runtimeException.getCause match {
-              case noRetryException: NoRetryException => eventLoop.post(UnknownThrowable(noRetryException))
-              case throwable: Throwable => eventLoop.post(ResourceLack(throwable))
-            }
           case exception: NoRetryException => eventLoop.post(UnknownThrowable(exception))
           case throwable: Throwable => eventLoop.post(ResourceLack(throwable))
         }
diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala
index 1864d8749c..b9a1ff87c2 100644
--- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala
+++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala
@@ -22,7 +22,6 @@ import java.util
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicBoolean
 import com.amazonaws.services.s3.model.AmazonS3Exception
-import org.apache.hadoop.security.AccessControlException
 import org.apache.kylin.cluster.{AvailableResource, IClusterManager, ResourceInfo}
 import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.engine.spark.job.KylinBuildEnv
@@ -308,6 +307,7 @@ class TestJobMonitor extends SparderBaseFunSuite with BeforeAndAfterEach {
     }
   }
 
+
   test("post JobFailed event when receive class not found event") {
     withEventLoop { eventLoop =>
       Mockito.when(config.getSparkEngineMaxRetryTime).thenReturn(1)
diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
index 64afc26b85..73035a24c3 100644
--- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
+++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
@@ -18,10 +18,9 @@
 
 package org.apache.spark.application
 
-import org.apache.hadoop.security.AccessControlException
-
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicBoolean
+
 import org.apache.kylin.engine.spark.application.SparkApplication
 import org.apache.kylin.engine.spark.scheduler._
 import org.apache.spark.scheduler.KylinJobEventLoop
@@ -54,30 +53,6 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter {
     eventLoop.stop()
   }
 
-  test("post ResourceLack event when job failed with runtime exception for lack of resource") {
-    val eventLoop = new KylinJobEventLoop
-    eventLoop.start()
-    val worker = new JobWorker(new ResourceLackJobWithRuntimeException(), Array.empty, eventLoop)
-    val latch = new CountDownLatch(2)
-    val receiveResourceLack = new AtomicBoolean(false)
-    val listener = new KylinJobListener {
-      override def onReceive(event: KylinJobEvent): Unit = {
-        if (event.isInstanceOf[ResourceLack]) {
-          receiveResourceLack.getAndSet(true)
-        }
-        latch.countDown()
-      }
-    }
-    eventLoop.registerListener(listener)
-    eventLoop.post(RunJob())
-    // receive RunJob and ResourceLack
-    latch.await()
-    assert(receiveResourceLack.get())
-    eventLoop.unregisterListener(listener)
-    worker.stop()
-    eventLoop.stop()
-  }
-
   test("post JobSucceeded event when job succeeded") {
     val eventLoop = new KylinJobEventLoop
     eventLoop.start()
@@ -125,78 +100,6 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter {
     worker.stop()
     eventLoop.stop()
   }
-
-  test("post Permission denied event when PermissionDenied occurred with handle Exception function") {
-    val eventLoop = new KylinJobEventLoop
-    eventLoop.start()
-    val worker = new JobWorker(new PermissionDeniedJobWithHandleException(), Array.empty, eventLoop)
-    val latch = new CountDownLatch(2)
-    val receivePermissionDenied = new AtomicBoolean(false)
-    val listener = new KylinJobListener {
-      override def onReceive(event: KylinJobEvent): Unit = {
-        if (event.isInstanceOf[UnknownThrowable]) {
-          receivePermissionDenied.getAndSet(true)
-        }
-        latch.countDown()
-      }
-    }
-    eventLoop.registerListener(listener)
-    eventLoop.post(RunJob())
-    // receive RunJob and PermissionDenied
-    latch.await()
-    assert(receivePermissionDenied.get())
-    eventLoop.unregisterListener(listener)
-    worker.stop()
-    eventLoop.stop()
-  }
-
-  test("post Permission denied event when RuntimeException occurred") {
-    val eventLoop = new KylinJobEventLoop
-    eventLoop.start()
-    val worker = new JobWorker(new PermissionDeniedJobWithRuntimeException(), Array.empty, eventLoop)
-    val latch = new CountDownLatch(2)
-    val receivePermissionDenied = new AtomicBoolean(false)
-    val listener = new KylinJobListener {
-      override def onReceive(event: KylinJobEvent): Unit = {
-        if (event.isInstanceOf[UnknownThrowable]) {
-          receivePermissionDenied.getAndSet(true)
-        }
-        latch.countDown()
-      }
-    }
-    eventLoop.registerListener(listener)
-    eventLoop.post(RunJob())
-    // receive RunJob and PermissionDenied
-    latch.await()
-    assert(receivePermissionDenied.get())
-    eventLoop.unregisterListener(listener)
-    worker.stop()
-    eventLoop.stop()
-  }
-
-  test("post Permission denied event when AccessControlException occurred") {
-    val eventLoop = new KylinJobEventLoop
-    eventLoop.start()
-    val worker = new JobWorker(new PermissionDeniedJobWithNoRetryException(), Array.empty, eventLoop)
-    val latch = new CountDownLatch(2)
-    val receivePermissionDenied = new AtomicBoolean(false)
-    val listener = new KylinJobListener {
-      override def onReceive(event: KylinJobEvent): Unit = {
-        if (event.isInstanceOf[UnknownThrowable]) {
-          receivePermissionDenied.getAndSet(true)
-        }
-        latch.countDown()
-      }
-    }
-    eventLoop.registerListener(listener)
-    eventLoop.post(RunJob())
-    // receive RunJob and PermissionDenied
-    latch.await()
-    assert(receivePermissionDenied.get())
-    eventLoop.unregisterListener(listener)
-    worker.stop()
-    eventLoop.stop()
-  }
 }
 
 class UnknownThrowableJob extends SparkApplication {
@@ -207,35 +110,6 @@ class UnknownThrowableJob extends SparkApplication {
   override protected def doExecute(): Unit = {}
 }
 
-class PermissionDeniedJobWithHandleException extends SparkApplication {
-  override def execute(args: Array[String]): Unit = {
-    try {
-      throw new AccessControlException()
-    } catch {
-      case e : Exception => handleException(e)
-    }
-  }
-  override protected def doExecute(): Unit = {}
-}
-
-class PermissionDeniedJobWithRuntimeException extends SparkApplication {
-  override def execute(args: Array[String]): Unit = {
-    try {
-      throw new AccessControlException()
-    } catch {
-      case e : Exception => throw new RuntimeException("Error execute " + this.getClass.getName, new NoRetryException("Permission denied."))
-    }
-  }
-  override protected def doExecute(): Unit = {}
-}
-
-class PermissionDeniedJobWithNoRetryException extends SparkApplication {
-  override def execute(args: Array[String]): Unit = {
-      throw new NoRetryException("Permission Denied")
-  }
-  override protected def doExecute(): Unit = {}
-}
-
 class ResourceLackJob extends SparkApplication {
 
   override def execute(args: Array[String]): Unit = {
@@ -245,19 +119,6 @@ class ResourceLackJob extends SparkApplication {
   override protected def doExecute(): Unit = {}
 }
 
-class ResourceLackJobWithRuntimeException extends SparkApplication {
-
-  override def execute(args: Array[String]): Unit = {
-    try {
-      throw new Exception()
-    } catch {
-      case e: Exception => throw new RuntimeException("Error execute " + this.getClass.getName, e)
-    }
-  }
-
-  override protected def doExecute(): Unit = {}
-}
-
 class MockSucceedJob extends SparkApplication {
   override def execute(args: Array[String]): Unit = {}