You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/17 10:15:28 UTC
[kylin] branch master updated: KYLIN-4197 Set deadline for
ClientEnvExtractor (#994)
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 38a3605 KYLIN-4197 Set deadline for ClientEnvExtractor (#994)
38a3605 is described below
commit 38a3605dee5aaf57339c486dbfe8dd445dfe4bf6
Author: Xiaoxiang Yu <hi...@126.com>
AuthorDate: Tue Dec 17 18:15:19 2019 +0800
KYLIN-4197 Set deadline for ClientEnvExtractor (#994)
* KYLIN-4197 Set deadline for ClientEnvExtractor
* Add configuration for lambda mode.
---
build/bin/kylin-port-replace-util.sh | 2 +-
.../org/apache/kylin/common/KylinConfigBase.java | 4 ++
.../rest/controller/StreamingV2Controller.java | 11 +++-
.../org/apache/kylin/tool/ClientEnvExtractor.java | 52 ++++++++++++-----
.../apache/kylin/tool/ClientEnvExtractorTest.java | 68 ++++++++++++++++++++++
5 files changed, 119 insertions(+), 18 deletions(-)
diff --git a/build/bin/kylin-port-replace-util.sh b/build/bin/kylin-port-replace-util.sh
index 0d6f005..28a53db 100755
--- a/build/bin/kylin-port-replace-util.sh
+++ b/build/bin/kylin-port-replace-util.sh
@@ -89,7 +89,7 @@ then
#replace kylin.stream.node for Streaming Coordinator
stream_node="kylin.stream.node=`hostname -f`:$new_kylin_port"
echo "Using new kylin.stream.node: $stream_node"
- line_count=$(awk '$0 ~ /kylin.stream.node/ {print $0}' ${KYLIN_CONFIG_FILE} | wc -l)
+ line_count=$(awk '$0 ~ /^kylin.stream.node/ {print $0}' ${KYLIN_CONFIG_FILE} | wc -l)
if [[ $line_count -eq 0 ]]; then
echo "kylin.stream.node=`hostname -f`:7070" >> ${KYLIN_CONFIG_FILE}
fi
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index d44d944..4c8d437 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2289,6 +2289,10 @@ public abstract class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.stream.auto-resubmit-after-discard-enabled", "true"));
}
+ public String getHiveDatabaseLambdaCube() {
+ return this.getOptional("kylin.stream.hive.database-for-lambda-cube", DEFAULT);
+ }
+
// ============================================================================
// Health Check CLI
// ============================================================================
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
index 45e0cda..2843cac 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
@@ -189,9 +189,11 @@ public class StreamingV2Controller extends BasicController {
// validate the compatibility for input table schema and the underline hive table schema
if (tableDesc.getSourceType() == ISourceAware.ID_KAFKA_HIVE) {
List<FieldSchema> fields;
+ String db = tableDesc.getDatabase();
try {
HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(new HiveConf());
- fields = metaStoreClient.getFields(KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable(), tableDesc.getName());
+ fields = metaStoreClient.getFields(db, tableDesc.getName());
+ logger.info("Checking the {} in {}", tableDesc.getName(), db);
} catch (NoSuchObjectException noObjectException) {
logger.info("table not exist in hive meta store for table:" + tableDesc.getIdentity(),
noObjectException);
@@ -210,10 +212,12 @@ public class StreamingV2Controller extends BasicController {
for (ColumnDesc columnDesc : tableDesc.getColumns()) {
FieldSchema fieldSchema = fieldSchemaMap.get(columnDesc.getName().toUpperCase(Locale.ROOT));
if (fieldSchema == null) {
+ // Partition column cannot be fetched via Hive Metadata API.
if (!TimeDerivedColumnType.isTimeDerivedColumn(columnDesc.getName())) {
- incompatibleMsgs.add("column not exist in hive table:" + columnDesc.getName());
+ incompatibleMsgs.add("Column not exist in hive table:" + columnDesc.getName());
continue;
} else {
+ logger.info("Column not exist in hive table: {}.", columnDesc.getName());
continue;
}
}
@@ -487,6 +491,7 @@ public class StreamingV2Controller extends BasicController {
private TableDesc deserializeTableDesc(StreamingRequestV2 streamingRequest) {
TableDesc desc = null;
+ String db = KylinConfig.getInstanceFromEnv().getHiveDatabaseLambdaCube();
try {
logger.debug("Saving TableDesc " + streamingRequest.getTableData());
desc = JsonUtil.readValue(streamingRequest.getTableData(), TableDesc.class);
@@ -504,7 +509,7 @@ public class StreamingV2Controller extends BasicController {
Preconditions.checkNotNull(desc, "Failed to deserialize from TableDesc definition");
String[] dbTable = HadoopUtil.parseHiveTableName(desc.getName());
desc.setName(dbTable[1]);
- desc.setDatabase(dbTable[0]);
+ desc.setDatabase(db);
desc.getIdentity();
return desc;
}
diff --git a/tool/src/main/java/org/apache/kylin/tool/ClientEnvExtractor.java b/tool/src/main/java/org/apache/kylin/tool/ClientEnvExtractor.java
index 78d02c5..80741ea 100644
--- a/tool/src/main/java/org/apache/kylin/tool/ClientEnvExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/ClientEnvExtractor.java
@@ -22,6 +22,12 @@ package org.apache.kylin.tool;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
@@ -38,10 +44,12 @@ public class ClientEnvExtractor extends AbstractInfoExtractor {
private static final Logger logger = LoggerFactory.getLogger(ClientEnvExtractor.class);
private KylinConfig kylinConfig;
private CliCommandExecutor cmdExecutor;
+ private ExecutorService executorService;
+ int maxWaitSeconds = 120;
public ClientEnvExtractor() throws IOException {
super();
-
+ executorService = Executors.newFixedThreadPool(1);
packageType = "client";
kylinConfig = KylinConfig.getInstanceFromEnv();
cmdExecutor = kylinConfig.getCliCommandExecutor();
@@ -64,6 +72,7 @@ public class ClientEnvExtractor extends AbstractInfoExtractor {
addShellOutput("hbase version", "hbase", "version");
addShellOutput("hive --version", "hive", "version");
addShellOutput("beeline --version", "hive", "beeline_version");
+ executorService.shutdownNow();
}
private void addLocalFile(String src, String destDir) {
@@ -83,20 +92,35 @@ public class ClientEnvExtractor extends AbstractInfoExtractor {
}
}
- private void addShellOutput(String cmd, String destDir, String filename) {
- try {
- File destDirFile = null;
- if (!StringUtils.isEmpty(destDir)) {
- destDirFile = new File(exportDir, destDir);
- FileUtils.forceMkdir(destDirFile);
- } else {
- destDirFile = exportDir;
+ void addShellOutput(String cmd, String destDir, String filename) {
+ Future f = executorService.submit(() -> {
+ try {
+ File destDirFile = null;
+ if (!StringUtils.isEmpty(destDir)) {
+ destDirFile = new File(exportDir, destDir);
+ FileUtils.forceMkdir(destDirFile);
+ } else {
+ destDirFile = exportDir;
+ }
+ Pair<Integer, String> result = cmdExecutor.execute(cmd);
+ String output = result.getSecond();
+ FileUtils.writeStringToFile(new File(destDirFile, filename), output, Charset.defaultCharset());
+ } catch (IOException e) {
+ logger.warn("Failed to run command: " + cmd + ".", e);
}
- Pair<Integer, String> result = cmdExecutor.execute(cmd);
- String output = result.getSecond();
- FileUtils.writeStringToFile(new File(destDirFile, filename), output, Charset.defaultCharset());
- } catch (Exception e) {
- logger.warn("Failed to run command: " + cmd + ".", e);
+ });
+
+ try {
+ // assume most shell should return in two minutes
+ f.get(maxWaitSeconds, TimeUnit.SECONDS);
+ } catch (TimeoutException timeoutException) {
+ logger.error("Timeout for \"{}\" in {} seconds.", cmd, maxWaitSeconds);
+ executorService.shutdownNow();
+ executorService = Executors.newFixedThreadPool(1);
+ } catch (ExecutionException runtimeException) {
+ logger.error("Runtime error: {}", runtimeException.getLocalizedMessage());
+ } catch (InterruptedException otherException) {
+ // Ignore
}
}
}
diff --git a/tool/src/test/java/org/apache/kylin/tool/ClientEnvExtractorTest.java b/tool/src/test/java/org/apache/kylin/tool/ClientEnvExtractorTest.java
new file mode 100644
index 0000000..d2b31b0
--- /dev/null
+++ b/tool/src/test/java/org/apache/kylin/tool/ClientEnvExtractorTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.kylin.tool;
+
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+public class ClientEnvExtractorTest extends HBaseMetadataTestCase {
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Before
+ public void setup() throws Exception {
+ super.createTestMetadata();
+ }
+
+ @After
+ public void after() throws Exception {
+ super.cleanupTestMetadata();
+ }
+
+ @Test
+ public void testNormal() throws IOException {
+ File f = folder.newFolder("ClientEnvExtractorTest_testNormal");
+ ClientEnvExtractor executor = new ClientEnvExtractor();
+ executor.addShellOutput("pwd", f.getAbsolutePath(), "testNormal");
+ }
+
+ @Test(timeout = 5000)
+ public void testTimeout() throws IOException {
+ File f = folder.newFolder("ClientEnvExtractorTest_testTimeout");
+ ClientEnvExtractor executor = new ClientEnvExtractor();
+ executor.maxWaitSeconds = 2;
+ executor.addShellOutput("sleep 1000", f.getAbsolutePath(), "testTimeout");
+ executor.addShellOutput("pwd", f.getAbsolutePath(), "pwd");
+ }
+
+ @Test
+ public void testError() throws IOException {
+ File f = folder.newFolder("ClientEnvExtractorTest_testError");
+ ClientEnvExtractor executor = new ClientEnvExtractor();
+ executor.addShellOutput("CMD_NEVER_EXISTS", f.getAbsolutePath(), "testError");
+ }
+}
\ No newline at end of file