You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/10/29 03:30:46 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4800 Add canary tool for sparder-context

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new 1d43e0a  KYLIN-4800 Add canary tool for sparder-context
1d43e0a is described below

commit 1d43e0a4ac81648f89666365e06e2519ae53271b
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Wed Oct 28 09:09:02 2020 +0800

    KYLIN-4800 Add canary tool for sparder-context
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  21 +++
 kylin-spark-project/kylin-spark-query/pom.xml      |   7 +
 .../kylin/query/monitor/SparderContextCanary.java  | 145 +++++++++++++++++++++
 .../org/apache/spark/sql/SparderContext.scala      |   6 +
 .../query/monitor/SparderContextCanaryTest.java    |  94 +++++++++++++
 5 files changed, 273 insertions(+)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 1ffdc16..45afaa2 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2879,6 +2879,27 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(this.getOptional("kylin.query.auto-sparder-context", "false"));
     }
 
+    /**
+     * Sparder is considered unavailable when the check task is unresponsive for more than this time
+     */
+    public int getSparderCanaryErrorResponseMs() {
+        return Integer.parseInt(this.getOptional("kylin.canary.sparder-context-error-response-ms", "3000"));
+    }
+
+    /**
+     * The maximum number of restart sparder when sparder is not available
+     */
+    public int getThresholdToRestartSparder() {
+        return Integer.parseInt(this.getOptional("kylin.canary.sparder-context-threshold-to-restart-spark", "3"));
+    }
+
+    /**
+     * Time period between two sparder health checks
+     */
+    public int getSparderCanaryPeriodMinutes() {
+        return Integer.parseInt(this.getOptional("kylin.canary.sparder-context-period-min", "3"));
+    }
+
     // ============================================================================
     // Spark with Kerberos
     // ============================================================================
diff --git a/kylin-spark-project/kylin-spark-query/pom.xml b/kylin-spark-project/kylin-spark-query/pom.xml
index ee8ce74..59493eb 100644
--- a/kylin-spark-project/kylin-spark-query/pom.xml
+++ b/kylin-spark-project/kylin-spark-query/pom.xml
@@ -56,6 +56,13 @@
             <type>test-jar</type>
         </dependency>
         <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-spark-engine</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
             <version>${awaitility.version}</version>
diff --git a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/monitor/SparderContextCanary.java b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/monitor/SparderContextCanary.java
new file mode 100644
index 0000000..d0950c1
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/monitor/SparderContextCanary.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.monitor;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.KylinSparkEnv;
+import org.apache.spark.sql.SparderContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class SparderContextCanary {
+    private static final Logger logger = LoggerFactory.getLogger(SparderContextCanary.class);
+    private static volatile boolean isStarted = false;
+
+    private static final int THRESHOLD_TO_RESTART_SPARK = KylinConfig.getInstanceFromEnv().getThresholdToRestartSparder();
+    private static final int PERIOD_MINUTES = KylinConfig.getInstanceFromEnv().getSparderCanaryPeriodMinutes();
+
+    private static volatile int errorAccumulated = 0;
+    private static volatile long lastResponseTime = -1;
+    private static volatile boolean sparderRestarting = false;
+
+    private SparderContextCanary() {
+    }
+
+    public static int getErrorAccumulated() {
+        return errorAccumulated;
+    }
+
+    @SuppressWarnings("unused")
+    public long getLastResponseTime() {
+        return lastResponseTime;
+    }
+
+    @SuppressWarnings("unused")
+    public boolean isSparderRestarting() {
+        return sparderRestarting;
+    }
+
+    public static void init() {
+        if (!isStarted) {
+            synchronized (SparderContextCanary.class) {
+                if (!isStarted) {
+                    isStarted = true;
+                    logger.info("Start monitoring Sparder");
+                    Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(SparderContextCanary::monitor,
+                            PERIOD_MINUTES, PERIOD_MINUTES, TimeUnit.MINUTES);
+                }
+            }
+        }
+    }
+
+    public static boolean isError() {
+        return errorAccumulated >= THRESHOLD_TO_RESTART_SPARK;
+    }
+
+    public static void monitor() {
+        try {
+            long startTime = System.currentTimeMillis();
+            // check sparder context
+            if (!SparderContext.isSparkAvailable()) {
+                logger.info("Sparder is unavailable, need to restart immediately.");
+                errorAccumulated = Math.max(errorAccumulated + 1, THRESHOLD_TO_RESTART_SPARK);
+            } else {
+                try {
+                    JavaSparkContext jsc = JavaSparkContext.fromSparkContext(SparderContext.getSparkSession().sparkContext());
+                    jsc.setLocalProperty("spark.scheduler.pool", "vip_tasks");
+
+                    long t = System.currentTimeMillis();
+                    long ret = numberCount(jsc).get(KylinConfig.getInstanceFromEnv().getSparderCanaryErrorResponseMs(),
+                            TimeUnit.MILLISECONDS);
+                    logger.info("SparderContextCanary numberCount returned successfully with value {}, takes {} ms.", ret,
+                            (System.currentTimeMillis() - t));
+                    // reset errorAccumulated once good context is confirmed
+                    errorAccumulated = 0;
+                } catch (TimeoutException te) {
+                    errorAccumulated++;
+                    logger.error("SparderContextCanary numberCount timeout, didn't return in {} ms, error {} times.",
+                            KylinConfig.getInstanceFromEnv().getSparderCanaryErrorResponseMs(), errorAccumulated);
+                } catch (ExecutionException ee) {
+                    logger.error("SparderContextCanary numberCount occurs exception, need to restart immediately.", ee);
+                    errorAccumulated = Math.max(errorAccumulated + 1, THRESHOLD_TO_RESTART_SPARK);
+                } catch (Exception e) {
+                    errorAccumulated++;
+                    logger.error("SparderContextCanary numberCount occurs exception.", e);
+                }
+            }
+
+            lastResponseTime = System.currentTimeMillis() - startTime;
+            logger.debug("Sparder context errorAccumulated:{}", errorAccumulated);
+
+            if (isError()) {
+                sparderRestarting = true;
+                try {
+                    // Take repair action if error accumulated exceeds threshold
+                    logger.warn("Repairing sparder context");
+                    if ("true".equals(System.getProperty("spark.local"))) {
+                        SparderContext.setSparkSession(KylinSparkEnv.getSparkSession());
+                    } else {
+                        SparderContext.restartSpark();
+                    }
+                } catch (Throwable th) {
+                    logger.error("Restart sparder context failed.", th);
+                }
+                sparderRestarting = false;
+            }
+        } catch (Throwable th) {
+            logger.error("Error when monitoring Sparder.", th);
+        }
+    }
+
+    // for canary
+    private static JavaFutureAction<Long> numberCount(JavaSparkContext jsc) {
+        List<Integer> list = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            list.add(i);
+        }
+
+        return jsc.parallelize(list).countAsync();
+    }
+}
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
index bd994e2..638a9ac 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.commons.io.FileUtils
 import org.apache.kylin.common.KylinConfig
+import org.apache.kylin.query.monitor.SparderContextCanary
 import org.apache.kylin.spark.classloader.ClassLoaderUtils
 import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
 import org.apache.spark.sql.execution.datasource.KylinSourceStrategy
@@ -194,6 +195,11 @@ object SparderContext extends Logging {
         logInfo("Initializing Spark, waiting for done.")
         initializingThread.join()
       }
+
+      if (System.getProperty("spark.local") ne "true") {
+        //monitor sparder
+        SparderContextCanary.init()
+      }
     }
   }
 
diff --git a/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/monitor/SparderContextCanaryTest.java b/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/monitor/SparderContextCanaryTest.java
new file mode 100644
index 0000000..7a22892
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/monitor/SparderContextCanaryTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.monitor;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.TempMetadataBuilder;
+import org.apache.kylin.engine.spark.LocalWithSparkSessionTest;
+import org.apache.kylin.job.exception.SchedulerException;
+import org.apache.spark.sql.KylinSparkEnv;
+import org.apache.spark.sql.SparderContext;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SparderContextCanaryTest extends LocalWithSparkSessionTest {
+    @Override
+    @Before
+    public void setup() throws SchedulerException {
+        super.setup();
+        SparderContext.setSparkSession(KylinSparkEnv.getSparkSession());
+    }
+
+    @After
+    public void after() {
+        super.after();
+    }
+
+    @Test
+    public void testSparderKilled() {
+        // first check should be good
+        Boolean ss = SparderContext.isSparkAvailable();
+        Assert.assertTrue(SparderContext.isSparkAvailable());
+
+        // stop sparder and check again, the sparder context should auto-restart
+        SparderContext.getSparkSession().stop();
+        Assert.assertFalse(SparderContext.isSparkAvailable());
+
+        SparderContextCanary.monitor();
+
+        Assert.assertTrue(SparderContext.isSparkAvailable());
+
+        SparderContextCanary.monitor();
+        Assert.assertEquals(0, SparderContextCanary.getErrorAccumulated());
+    }
+
+    @Test
+    public void testSparderTimeout() {
+        // first check should be GOOD
+        Assert.assertTrue(SparderContext.isSparkAvailable());
+
+        // set kylin.canary.sqlcontext-error-response-ms to 1
+        // And SparkContextCanary numberCount will timeout
+        Assert.assertEquals(0, SparderContextCanary.getErrorAccumulated());
+        System.setProperty("kylin.canary.sparder-context-error-response-ms", "1");
+        SparderContextCanary.monitor();
+
+        // errorAccumulated increase
+        Assert.assertEquals(1, SparderContextCanary.getErrorAccumulated());
+
+        // reach threshold to restart spark. Reset errorAccumulated.
+        SparderContextCanary.monitor();
+        Assert.assertEquals(2, SparderContextCanary.getErrorAccumulated());
+        SparderContextCanary.monitor();
+        Assert.assertEquals(3, SparderContextCanary.getErrorAccumulated());
+
+        Assert.assertTrue(SparderContext.isSparkAvailable());
+
+        System.clearProperty("kylin.canary.sparder-context-error-response-ms");
+
+    }
+
+    public void createTestMetadata() {
+        String tempMetadataDir = TempMetadataBuilder.prepareNLocalTempMetadata();
+        KylinConfig.setKylinConfigForLocalTest(tempMetadataDir);
+        getTestConfig().setProperty("kylin.query.security.acl-tcr-enabled", "false");
+    }
+}