You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/02/07 14:26:03 UTC

[kylin] 08/44: KYLIN-4197 Set deadline for ClientEnvExtractor (#994)

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch 3.0.x
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 753e1a85cb7d5cde660e66ae11449bca0b786883
Author: Xiaoxiang Yu <hi...@126.com>
AuthorDate: Tue Dec 17 18:15:19 2019 +0800

    KYLIN-4197 Set deadline for ClientEnvExtractor (#994)
    
    * KYLIN-4197 Set deadline for ClientEnvExtractor
    
    * Add configuration for lambda mode.
---
 build/bin/kylin-port-replace-util.sh               |  2 +-
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 ++
 .../rest/controller/StreamingV2Controller.java     | 11 +++-
 .../org/apache/kylin/tool/ClientEnvExtractor.java  | 52 ++++++++++++-----
 .../apache/kylin/tool/ClientEnvExtractorTest.java  | 68 ++++++++++++++++++++++
 5 files changed, 119 insertions(+), 18 deletions(-)

diff --git a/build/bin/kylin-port-replace-util.sh b/build/bin/kylin-port-replace-util.sh
index 0d6f005..28a53db 100755
--- a/build/bin/kylin-port-replace-util.sh
+++ b/build/bin/kylin-port-replace-util.sh
@@ -89,7 +89,7 @@ then
     #replace kylin.stream.node for Streaming Coordinator
     stream_node="kylin.stream.node=`hostname -f`:$new_kylin_port"
     echo "Using new kylin.stream.node: $stream_node"
-    line_count=$(awk '$0 ~ /kylin.stream.node/ {print $0}' ${KYLIN_CONFIG_FILE} | wc -l)
+    line_count=$(awk '$0 ~ /^kylin.stream.node/ {print $0}' ${KYLIN_CONFIG_FILE} | wc -l)
     if [[ $line_count -eq 0 ]]; then
         echo "kylin.stream.node=`hostname -f`:7070" >> ${KYLIN_CONFIG_FILE}
     fi
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index d44d944..4c8d437 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2289,6 +2289,10 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.stream.auto-resubmit-after-discard-enabled", "true"));
     }
 
+    public String getHiveDatabaseLambdaCube() {
+        return this.getOptional("kylin.stream.hive.database-for-lambda-cube", DEFAULT);
+    }
+
     // ============================================================================
     // Health Check CLI
     // ============================================================================
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
index cfd7086..846616e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
@@ -188,9 +188,11 @@ public class StreamingV2Controller extends BasicController {
         // validate the compatibility for input table schema and the underline hive table schema
         if (tableDesc.getSourceType() == ISourceAware.ID_KAFKA_HIVE) {
             List<FieldSchema> fields;
+            String db = tableDesc.getDatabase();
             try {
                 HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(new HiveConf());
-                fields = metaStoreClient.getFields(KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable(), tableDesc.getName());
+                fields = metaStoreClient.getFields(db, tableDesc.getName());
+                logger.info("Checking the {} in {}", tableDesc.getName(), db);
             } catch (NoSuchObjectException noObjectException) {
                 logger.info("table not exist in hive meta store for table:" + tableDesc.getIdentity(),
                         noObjectException);
@@ -209,10 +211,12 @@ public class StreamingV2Controller extends BasicController {
             for (ColumnDesc columnDesc : tableDesc.getColumns()) {
                 FieldSchema fieldSchema = fieldSchemaMap.get(columnDesc.getName().toUpperCase(Locale.ROOT));
                 if (fieldSchema == null) {
+                    // Partition column cannot be fetched via Hive Metadata API.
                     if (!TimeDerivedColumnType.isTimeDerivedColumn(columnDesc.getName())) {
-                        incompatibleMsgs.add("column not exist in hive table:" + columnDesc.getName());
+                        incompatibleMsgs.add("Column not exist in hive table:" + columnDesc.getName());
                         continue;
                     } else {
+                        logger.info("Column not exist in hive table: {}.", columnDesc.getName());
                         continue;
                     }
                 }
@@ -486,6 +490,7 @@ public class StreamingV2Controller extends BasicController {
 
     private TableDesc deserializeTableDesc(StreamingRequestV2 streamingRequest) {
         TableDesc desc = null;
+        String db = KylinConfig.getInstanceFromEnv().getHiveDatabaseLambdaCube();
         try {
             logger.debug("Saving TableDesc " + streamingRequest.getTableData());
             desc = JsonUtil.readValue(streamingRequest.getTableData(), TableDesc.class);
@@ -502,7 +507,7 @@ public class StreamingV2Controller extends BasicController {
 
         String[] dbTable = HadoopUtil.parseHiveTableName(desc.getName());
         desc.setName(dbTable[1]);
-        desc.setDatabase(dbTable[0]);
+        desc.setDatabase(db);
         desc.getIdentity();
         return desc;
     }
diff --git a/tool/src/main/java/org/apache/kylin/tool/ClientEnvExtractor.java b/tool/src/main/java/org/apache/kylin/tool/ClientEnvExtractor.java
index 78d02c5..80741ea 100644
--- a/tool/src/main/java/org/apache/kylin/tool/ClientEnvExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/ClientEnvExtractor.java
@@ -22,6 +22,12 @@ package org.apache.kylin.tool;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
@@ -38,10 +44,12 @@ public class ClientEnvExtractor extends AbstractInfoExtractor {
     private static final Logger logger = LoggerFactory.getLogger(ClientEnvExtractor.class);
     private KylinConfig kylinConfig;
     private CliCommandExecutor cmdExecutor;
+    private ExecutorService executorService;
+    int maxWaitSeconds = 120;
 
     public ClientEnvExtractor() throws IOException {
         super();
-
+        executorService = Executors.newFixedThreadPool(1);
         packageType = "client";
         kylinConfig = KylinConfig.getInstanceFromEnv();
         cmdExecutor = kylinConfig.getCliCommandExecutor();
@@ -64,6 +72,7 @@ public class ClientEnvExtractor extends AbstractInfoExtractor {
         addShellOutput("hbase version", "hbase", "version");
         addShellOutput("hive --version", "hive", "version");
         addShellOutput("beeline --version", "hive", "beeline_version");
+        executorService.shutdownNow();
     }
 
     private void addLocalFile(String src, String destDir) {
@@ -83,20 +92,35 @@ public class ClientEnvExtractor extends AbstractInfoExtractor {
         }
     }
 
-    private void addShellOutput(String cmd, String destDir, String filename) {
-        try {
-            File destDirFile = null;
-            if (!StringUtils.isEmpty(destDir)) {
-                destDirFile = new File(exportDir, destDir);
-                FileUtils.forceMkdir(destDirFile);
-            } else {
-                destDirFile = exportDir;
+    void addShellOutput(String cmd, String destDir, String filename) {
+        Future f = executorService.submit(() -> {
+            try {
+                File destDirFile = null;
+                if (!StringUtils.isEmpty(destDir)) {
+                    destDirFile = new File(exportDir, destDir);
+                    FileUtils.forceMkdir(destDirFile);
+                } else {
+                    destDirFile = exportDir;
+                }
+                Pair<Integer, String> result = cmdExecutor.execute(cmd);
+                String output = result.getSecond();
+                FileUtils.writeStringToFile(new File(destDirFile, filename), output, Charset.defaultCharset());
+            } catch (IOException e) {
+                logger.warn("Failed to run command: " + cmd + ".", e);
             }
-            Pair<Integer, String> result = cmdExecutor.execute(cmd);
-            String output = result.getSecond();
-            FileUtils.writeStringToFile(new File(destDirFile, filename), output, Charset.defaultCharset());
-        } catch (Exception e) {
-            logger.warn("Failed to run command: " + cmd + ".", e);
+        });
+
+        try {
+            // assume most shell should return in two minutes
+            f.get(maxWaitSeconds, TimeUnit.SECONDS);
+        } catch (TimeoutException timeoutException) {
+            logger.error("Timeout for \"{}\" in {} seconds.", cmd, maxWaitSeconds);
+            executorService.shutdownNow();
+            executorService = Executors.newFixedThreadPool(1);
+        } catch (ExecutionException runtimeException) {
+            logger.error("Runtime error: {}", runtimeException.getLocalizedMessage());
+        } catch (InterruptedException otherException) {
+            // Ignore
         }
     }
 }
diff --git a/tool/src/test/java/org/apache/kylin/tool/ClientEnvExtractorTest.java b/tool/src/test/java/org/apache/kylin/tool/ClientEnvExtractorTest.java
new file mode 100644
index 0000000..d2b31b0
--- /dev/null
+++ b/tool/src/test/java/org/apache/kylin/tool/ClientEnvExtractorTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.kylin.tool;
+
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+public class ClientEnvExtractorTest extends HBaseMetadataTestCase {
+
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder();
+
+    @Before
+    public void setup() throws Exception {
+        super.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        super.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testNormal() throws IOException {
+        File f = folder.newFolder("ClientEnvExtractorTest_testNormal");
+        ClientEnvExtractor executor = new ClientEnvExtractor();
+        executor.addShellOutput("pwd", f.getAbsolutePath(), "testNormal");
+    }
+
+    @Test(timeout = 5000)
+    public void testTimeout() throws IOException {
+        File f = folder.newFolder("ClientEnvExtractorTest_testTimeout");
+        ClientEnvExtractor executor = new ClientEnvExtractor();
+        executor.maxWaitSeconds = 2;
+        executor.addShellOutput("sleep 1000", f.getAbsolutePath(), "testTimeout");
+        executor.addShellOutput("pwd", f.getAbsolutePath(), "pwd");
+    }
+
+    @Test
+    public void testError() throws IOException {
+        File f = folder.newFolder("ClientEnvExtractorTest_testError");
+        ClientEnvExtractor executor = new ClientEnvExtractor();
+        executor.addShellOutput("CMD_NEVER_EXISTS", f.getAbsolutePath(), "testError");
+    }
+}
\ No newline at end of file