You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/01/06 11:10:11 UTC
[kylin] 06/12: Revert KYLIN-5372 due to this fix cannot override snapshot build scenarios
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 1edec930c1c6331d4c98452061c8820f2d70f4ae
Author: huangsheng <hu...@163.com>
AuthorDate: Fri Nov 4 16:37:34 2022 +0800
Revert KYLIN-5372 due to this fix cannot override snapshot build scenarios
---
.../engine/spark/application/SparkApplication.java | 5 -
.../org/apache/spark/application/JobWorker.scala | 8 +-
.../apache/spark/application/TestJobMonitor.scala | 2 +-
.../apache/spark/application/TestJobWorker.scala | 141 +--------------------
4 files changed, 3 insertions(+), 153 deletions(-)
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 2e8a503156..9cc144902d 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.AccessControlException;
import org.apache.kylin.cluster.IClusterManager;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
@@ -325,10 +324,6 @@ public abstract class SparkApplication implements Application {
}
protected void handleException(Exception e) throws Exception {
- if (e instanceof AccessControlException) {
- logger.error("Permission denied.", e);
- throw new NoRetryException("Permission denied.");
- }
throw e;
}
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
index 77b8834f02..069ac171cc 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
@@ -18,7 +18,6 @@
package org.apache.spark.application
-
import java.util.concurrent.Executors
import org.apache.kylin.engine.spark.application.SparkApplication
@@ -49,6 +48,7 @@ class JobWorker(application: SparkApplication, args: Array[String], eventLoop: K
execute()
}
+
private def execute(): Unit = {
pool.execute(new Runnable {
override def run(): Unit = {
@@ -56,12 +56,6 @@ class JobWorker(application: SparkApplication, args: Array[String], eventLoop: K
application.execute(args)
eventLoop.post(JobSucceeded())
} catch {
- // Compatible with runtime exceptions thrown by the SparkApplication.execute(args: Array[String])
- case runtimeException: RuntimeException =>
- runtimeException.getCause match {
- case noRetryException: NoRetryException => eventLoop.post(UnknownThrowable(noRetryException))
- case throwable: Throwable => eventLoop.post(ResourceLack(throwable))
- }
case exception: NoRetryException => eventLoop.post(UnknownThrowable(exception))
case throwable: Throwable => eventLoop.post(ResourceLack(throwable))
}
diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala
index 1864d8749c..b9a1ff87c2 100644
--- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala
+++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala
@@ -22,7 +22,6 @@ import java.util
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicBoolean
import com.amazonaws.services.s3.model.AmazonS3Exception
-import org.apache.hadoop.security.AccessControlException
import org.apache.kylin.cluster.{AvailableResource, IClusterManager, ResourceInfo}
import org.apache.kylin.common.KylinConfig
import org.apache.kylin.engine.spark.job.KylinBuildEnv
@@ -308,6 +307,7 @@ class TestJobMonitor extends SparderBaseFunSuite with BeforeAndAfterEach {
}
}
+
test("post JobFailed event when receive class not found event") {
withEventLoop { eventLoop =>
Mockito.when(config.getSparkEngineMaxRetryTime).thenReturn(1)
diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
index 64afc26b85..73035a24c3 100644
--- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
+++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
@@ -18,10 +18,9 @@
package org.apache.spark.application
-import org.apache.hadoop.security.AccessControlException
-
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicBoolean
+
import org.apache.kylin.engine.spark.application.SparkApplication
import org.apache.kylin.engine.spark.scheduler._
import org.apache.spark.scheduler.KylinJobEventLoop
@@ -54,30 +53,6 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter {
eventLoop.stop()
}
- test("post ResourceLack event when job failed with runtime exception for lack of resource") {
- val eventLoop = new KylinJobEventLoop
- eventLoop.start()
- val worker = new JobWorker(new ResourceLackJobWithRuntimeException(), Array.empty, eventLoop)
- val latch = new CountDownLatch(2)
- val receiveResourceLack = new AtomicBoolean(false)
- val listener = new KylinJobListener {
- override def onReceive(event: KylinJobEvent): Unit = {
- if (event.isInstanceOf[ResourceLack]) {
- receiveResourceLack.getAndSet(true)
- }
- latch.countDown()
- }
- }
- eventLoop.registerListener(listener)
- eventLoop.post(RunJob())
- // receive RunJob and ResourceLack
- latch.await()
- assert(receiveResourceLack.get())
- eventLoop.unregisterListener(listener)
- worker.stop()
- eventLoop.stop()
- }
-
test("post JobSucceeded event when job succeeded") {
val eventLoop = new KylinJobEventLoop
eventLoop.start()
@@ -125,78 +100,6 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter {
worker.stop()
eventLoop.stop()
}
-
- test("post Permission denied event when PermissionDenied occurred with handle Exception function") {
- val eventLoop = new KylinJobEventLoop
- eventLoop.start()
- val worker = new JobWorker(new PermissionDeniedJobWithHandleException(), Array.empty, eventLoop)
- val latch = new CountDownLatch(2)
- val receivePermissionDenied = new AtomicBoolean(false)
- val listener = new KylinJobListener {
- override def onReceive(event: KylinJobEvent): Unit = {
- if (event.isInstanceOf[UnknownThrowable]) {
- receivePermissionDenied.getAndSet(true)
- }
- latch.countDown()
- }
- }
- eventLoop.registerListener(listener)
- eventLoop.post(RunJob())
- // receive RunJob and PermissionDenied
- latch.await()
- assert(receivePermissionDenied.get())
- eventLoop.unregisterListener(listener)
- worker.stop()
- eventLoop.stop()
- }
-
- test("post Permission denied event when RuntimeException occurred") {
- val eventLoop = new KylinJobEventLoop
- eventLoop.start()
- val worker = new JobWorker(new PermissionDeniedJobWithRuntimeException(), Array.empty, eventLoop)
- val latch = new CountDownLatch(2)
- val receivePermissionDenied = new AtomicBoolean(false)
- val listener = new KylinJobListener {
- override def onReceive(event: KylinJobEvent): Unit = {
- if (event.isInstanceOf[UnknownThrowable]) {
- receivePermissionDenied.getAndSet(true)
- }
- latch.countDown()
- }
- }
- eventLoop.registerListener(listener)
- eventLoop.post(RunJob())
- // receive RunJob and PermissionDenied
- latch.await()
- assert(receivePermissionDenied.get())
- eventLoop.unregisterListener(listener)
- worker.stop()
- eventLoop.stop()
- }
-
- test("post Permission denied event when AccessControlException occurred") {
- val eventLoop = new KylinJobEventLoop
- eventLoop.start()
- val worker = new JobWorker(new PermissionDeniedJobWithNoRetryException(), Array.empty, eventLoop)
- val latch = new CountDownLatch(2)
- val receivePermissionDenied = new AtomicBoolean(false)
- val listener = new KylinJobListener {
- override def onReceive(event: KylinJobEvent): Unit = {
- if (event.isInstanceOf[UnknownThrowable]) {
- receivePermissionDenied.getAndSet(true)
- }
- latch.countDown()
- }
- }
- eventLoop.registerListener(listener)
- eventLoop.post(RunJob())
- // receive RunJob and PermissionDenied
- latch.await()
- assert(receivePermissionDenied.get())
- eventLoop.unregisterListener(listener)
- worker.stop()
- eventLoop.stop()
- }
}
class UnknownThrowableJob extends SparkApplication {
@@ -207,35 +110,6 @@ class UnknownThrowableJob extends SparkApplication {
override protected def doExecute(): Unit = {}
}
-class PermissionDeniedJobWithHandleException extends SparkApplication {
- override def execute(args: Array[String]): Unit = {
- try {
- throw new AccessControlException()
- } catch {
- case e : Exception => handleException(e)
- }
- }
- override protected def doExecute(): Unit = {}
-}
-
-class PermissionDeniedJobWithRuntimeException extends SparkApplication {
- override def execute(args: Array[String]): Unit = {
- try {
- throw new AccessControlException()
- } catch {
- case e : Exception => throw new RuntimeException("Error execute " + this.getClass.getName, new NoRetryException("Permission denied."))
- }
- }
- override protected def doExecute(): Unit = {}
-}
-
-class PermissionDeniedJobWithNoRetryException extends SparkApplication {
- override def execute(args: Array[String]): Unit = {
- throw new NoRetryException("Permission Denied")
- }
- override protected def doExecute(): Unit = {}
-}
-
class ResourceLackJob extends SparkApplication {
override def execute(args: Array[String]): Unit = {
@@ -245,19 +119,6 @@ class ResourceLackJob extends SparkApplication {
override protected def doExecute(): Unit = {}
}
-class ResourceLackJobWithRuntimeException extends SparkApplication {
-
- override def execute(args: Array[String]): Unit = {
- try {
- throw new Exception()
- } catch {
- case e: Exception => throw new RuntimeException("Error execute " + this.getClass.getName, e)
- }
- }
-
- override protected def doExecute(): Unit = {}
-}
-
class MockSucceedJob extends SparkApplication {
override def execute(args: Array[String]): Unit = {}