You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/10/29 03:30:46 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4800 Add canary
tool for sparder-context
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new 1d43e0a KYLIN-4800 Add canary tool for sparder-context
1d43e0a is described below
commit 1d43e0a4ac81648f89666365e06e2519ae53271b
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Wed Oct 28 09:09:02 2020 +0800
KYLIN-4800 Add canary tool for sparder-context
---
.../org/apache/kylin/common/KylinConfigBase.java | 21 +++
kylin-spark-project/kylin-spark-query/pom.xml | 7 +
.../kylin/query/monitor/SparderContextCanary.java | 145 +++++++++++++++++++++
.../org/apache/spark/sql/SparderContext.scala | 6 +
.../query/monitor/SparderContextCanaryTest.java | 94 +++++++++++++
5 files changed, 273 insertions(+)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 1ffdc16..45afaa2 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2879,6 +2879,27 @@ public abstract class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(this.getOptional("kylin.query.auto-sparder-context", "false"));
}
+ /**
+ * Sparder is considered unavailable when the check task is unresponsive for more than this time
+ */
+ public int getSparderCanaryErrorResponseMs() {
+ return Integer.parseInt(this.getOptional("kylin.canary.sparder-context-error-response-ms", "3000"));
+ }
+
+ /**
+ * The maximum number of restart sparder when sparder is not available
+ */
+ public int getThresholdToRestartSparder() {
+ return Integer.parseInt(this.getOptional("kylin.canary.sparder-context-threshold-to-restart-spark", "3"));
+ }
+
+ /**
+ * Time period between two sparder health checks
+ */
+ public int getSparderCanaryPeriodMinutes() {
+ return Integer.parseInt(this.getOptional("kylin.canary.sparder-context-period-min", "3"));
+ }
+
// ============================================================================
// Spark with Kerberos
// ============================================================================
diff --git a/kylin-spark-project/kylin-spark-query/pom.xml b/kylin-spark-project/kylin-spark-query/pom.xml
index ee8ce74..59493eb 100644
--- a/kylin-spark-project/kylin-spark-query/pom.xml
+++ b/kylin-spark-project/kylin-spark-query/pom.xml
@@ -56,6 +56,13 @@
<type>test-jar</type>
</dependency>
<dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-spark-engine</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
diff --git a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/monitor/SparderContextCanary.java b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/monitor/SparderContextCanary.java
new file mode 100644
index 0000000..d0950c1
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/monitor/SparderContextCanary.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.monitor;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.KylinSparkEnv;
+import org.apache.spark.sql.SparderContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class SparderContextCanary {
+ private static final Logger logger = LoggerFactory.getLogger(SparderContextCanary.class);
+ private static volatile boolean isStarted = false;
+
+ private static final int THRESHOLD_TO_RESTART_SPARK = KylinConfig.getInstanceFromEnv().getThresholdToRestartSparder();
+ private static final int PERIOD_MINUTES = KylinConfig.getInstanceFromEnv().getSparderCanaryPeriodMinutes();
+
+ private static volatile int errorAccumulated = 0;
+ private static volatile long lastResponseTime = -1;
+ private static volatile boolean sparderRestarting = false;
+
+ private SparderContextCanary() {
+ }
+
+ public static int getErrorAccumulated() {
+ return errorAccumulated;
+ }
+
+ @SuppressWarnings("unused")
+ public long getLastResponseTime() {
+ return lastResponseTime;
+ }
+
+ @SuppressWarnings("unused")
+ public boolean isSparderRestarting() {
+ return sparderRestarting;
+ }
+
+ public static void init() {
+ if (!isStarted) {
+ synchronized (SparderContextCanary.class) {
+ if (!isStarted) {
+ isStarted = true;
+ logger.info("Start monitoring Sparder");
+ Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(SparderContextCanary::monitor,
+ PERIOD_MINUTES, PERIOD_MINUTES, TimeUnit.MINUTES);
+ }
+ }
+ }
+ }
+
+ public static boolean isError() {
+ return errorAccumulated >= THRESHOLD_TO_RESTART_SPARK;
+ }
+
+ public static void monitor() {
+ try {
+ long startTime = System.currentTimeMillis();
+ // check sparder context
+ if (!SparderContext.isSparkAvailable()) {
+ logger.info("Sparder is unavailable, need to restart immediately.");
+ errorAccumulated = Math.max(errorAccumulated + 1, THRESHOLD_TO_RESTART_SPARK);
+ } else {
+ try {
+ JavaSparkContext jsc = JavaSparkContext.fromSparkContext(SparderContext.getSparkSession().sparkContext());
+ jsc.setLocalProperty("spark.scheduler.pool", "vip_tasks");
+
+ long t = System.currentTimeMillis();
+ long ret = numberCount(jsc).get(KylinConfig.getInstanceFromEnv().getSparderCanaryErrorResponseMs(),
+ TimeUnit.MILLISECONDS);
+ logger.info("SparderContextCanary numberCount returned successfully with value {}, takes {} ms.", ret,
+ (System.currentTimeMillis() - t));
+ // reset errorAccumulated once good context is confirmed
+ errorAccumulated = 0;
+ } catch (TimeoutException te) {
+ errorAccumulated++;
+ logger.error("SparderContextCanary numberCount timeout, didn't return in {} ms, error {} times.",
+ KylinConfig.getInstanceFromEnv().getSparderCanaryErrorResponseMs(), errorAccumulated);
+ } catch (ExecutionException ee) {
+ logger.error("SparderContextCanary numberCount occurs exception, need to restart immediately.", ee);
+ errorAccumulated = Math.max(errorAccumulated + 1, THRESHOLD_TO_RESTART_SPARK);
+ } catch (Exception e) {
+ errorAccumulated++;
+ logger.error("SparderContextCanary numberCount occurs exception.", e);
+ }
+ }
+
+ lastResponseTime = System.currentTimeMillis() - startTime;
+ logger.debug("Sparder context errorAccumulated:{}", errorAccumulated);
+
+ if (isError()) {
+ sparderRestarting = true;
+ try {
+ // Take repair action if error accumulated exceeds threshold
+ logger.warn("Repairing sparder context");
+ if ("true".equals(System.getProperty("spark.local"))) {
+ SparderContext.setSparkSession(KylinSparkEnv.getSparkSession());
+ } else {
+ SparderContext.restartSpark();
+ }
+ } catch (Throwable th) {
+ logger.error("Restart sparder context failed.", th);
+ }
+ sparderRestarting = false;
+ }
+ } catch (Throwable th) {
+ logger.error("Error when monitoring Sparder.", th);
+ }
+ }
+
+ // for canary
+ private static JavaFutureAction<Long> numberCount(JavaSparkContext jsc) {
+ List<Integer> list = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ list.add(i);
+ }
+
+ return jsc.parallelize(list).countAsync();
+ }
+}
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
index bd994e2..638a9ac 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference
import org.apache.commons.io.FileUtils
import org.apache.kylin.common.KylinConfig
+import org.apache.kylin.query.monitor.SparderContextCanary
import org.apache.kylin.spark.classloader.ClassLoaderUtils
import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
import org.apache.spark.sql.execution.datasource.KylinSourceStrategy
@@ -194,6 +195,11 @@ object SparderContext extends Logging {
logInfo("Initializing Spark, waiting for done.")
initializingThread.join()
}
+
+ if (System.getProperty("spark.local") ne "true") {
+ //monitor sparder
+ SparderContextCanary.init()
+ }
}
}
diff --git a/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/monitor/SparderContextCanaryTest.java b/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/monitor/SparderContextCanaryTest.java
new file mode 100644
index 0000000..7a22892
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/monitor/SparderContextCanaryTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.monitor;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.TempMetadataBuilder;
+import org.apache.kylin.engine.spark.LocalWithSparkSessionTest;
+import org.apache.kylin.job.exception.SchedulerException;
+import org.apache.spark.sql.KylinSparkEnv;
+import org.apache.spark.sql.SparderContext;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SparderContextCanaryTest extends LocalWithSparkSessionTest {
+ @Override
+ @Before
+ public void setup() throws SchedulerException {
+ super.setup();
+ SparderContext.setSparkSession(KylinSparkEnv.getSparkSession());
+ }
+
+ @After
+ public void after() {
+ super.after();
+ }
+
+ @Test
+ public void testSparderKilled() {
+ // first check should be good
+ Boolean ss = SparderContext.isSparkAvailable();
+ Assert.assertTrue(SparderContext.isSparkAvailable());
+
+ // stop sparder and check again, the sparder context should auto-restart
+ SparderContext.getSparkSession().stop();
+ Assert.assertFalse(SparderContext.isSparkAvailable());
+
+ SparderContextCanary.monitor();
+
+ Assert.assertTrue(SparderContext.isSparkAvailable());
+
+ SparderContextCanary.monitor();
+ Assert.assertEquals(0, SparderContextCanary.getErrorAccumulated());
+ }
+
+ @Test
+ public void testSparderTimeout() {
+ // first check should be GOOD
+ Assert.assertTrue(SparderContext.isSparkAvailable());
+
+ // set kylin.canary.sqlcontext-error-response-ms to 1
+ // And SparkContextCanary numberCount will timeout
+ Assert.assertEquals(0, SparderContextCanary.getErrorAccumulated());
+ System.setProperty("kylin.canary.sparder-context-error-response-ms", "1");
+ SparderContextCanary.monitor();
+
+ // errorAccumulated increase
+ Assert.assertEquals(1, SparderContextCanary.getErrorAccumulated());
+
+ // reach threshold to restart spark. Reset errorAccumulated.
+ SparderContextCanary.monitor();
+ Assert.assertEquals(2, SparderContextCanary.getErrorAccumulated());
+ SparderContextCanary.monitor();
+ Assert.assertEquals(3, SparderContextCanary.getErrorAccumulated());
+
+ Assert.assertTrue(SparderContext.isSparkAvailable());
+
+ System.clearProperty("kylin.canary.sparder-context-error-response-ms");
+
+ }
+
+ public void createTestMetadata() {
+ String tempMetadataDir = TempMetadataBuilder.prepareNLocalTempMetadata();
+ KylinConfig.setKylinConfigForLocalTest(tempMetadataDir);
+ getTestConfig().setProperty("kylin.query.security.acl-tcr-enabled", "false");
+ }
+}