You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/02/25 07:01:00 UTC
[kylin] branch 2.6.x updated: KYLIN-4046 Refine JDBC
Source(source.default=8)
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/2.6.x by this push:
new 3c79862 KYLIN-4046 Refine JDBC Source(source.default=8)
3c79862 is described below
commit 3c79862cf9c39fb6fc6e7a9c9a9dba9fa0cd4f63
Author: hit-lacus <hi...@126.com>
AuthorDate: Mon Jun 24 15:34:04 2019 +0800
KYLIN-4046 Refine JDBC Source(source.default=8)
Currently, the function of ingest data from RDBMS(kylin.source.default=8) to Kylin has some problems , in this patch, I want to :
1. fix case-sensitive
2. fix weak dialect compatibility
3. fix mis-use quote character
---
.../org/apache/kylin/common/SourceDialect.java | 56 +++-
.../java/org/apache/kylin/job/JoinedFlatTable.java | 4 +-
.../apache/kylin/metadata/model/PartitionDesc.java | 27 +-
.../org/apache/kylin/metadata/model/TblColRef.java | 18 ++
.../DefaultPartitionConditionBuilderTest.java | 8 +-
pom.xml | 3 +
.../org/apache/kylin/source/jdbc/JdbcDialect.java | 26 --
.../org/apache/kylin/source/jdbc/JdbcExplorer.java | 17 +-
.../kylin/source/jdbc/JdbcHiveInputBase.java | 344 +++++++++++++++++++--
.../apache/kylin/source/jdbc/JdbcTableReader.java | 15 +-
.../java/org/apache/kylin/source/jdbc/SqlUtil.java | 5 +-
.../source/jdbc/extensible/JdbcHiveInputBase.java | 2 +-
.../source/jdbc/metadata/DefaultJdbcMetadata.java | 6 +-
.../kylin/source/jdbc/metadata/IJdbcMetadata.java | 4 +
.../source/jdbc/metadata/JdbcMetadataFactory.java | 16 +-
.../source/jdbc/metadata/MySQLJdbcMetadata.java | 6 +
.../jdbc/metadata/SQLServerJdbcMetadata.java | 6 +
.../apache/kylin/source/jdbc/JdbcExplorerTest.java | 4 +-
.../kylin/source/jdbc/JdbcHiveInputBaseTest.java | 68 ++++
19 files changed, 534 insertions(+), 101 deletions(-)
diff --git a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java b/core-common/src/main/java/org/apache/kylin/common/SourceDialect.java
similarity index 52%
rename from source-jdbc/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java
rename to core-common/src/main/java/org/apache/kylin/common/SourceDialect.java
index d9c7425..a87054d 100644
--- a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java
+++ b/core-common/src/main/java/org/apache/kylin/common/SourceDialect.java
@@ -15,21 +15,45 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kylin.source.jdbc.metadata;
-
-import org.apache.kylin.source.jdbc.JdbcDialect;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class JdbcMetadataFactoryTest {
-
- @Test
- public void testGetJdbcMetadata() {
- Assert.assertTrue(
- JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_MSSQL, null) instanceof SQLServerJdbcMetadata);
- Assert.assertTrue(
- JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_MYSQL, null) instanceof MySQLJdbcMetadata);
- Assert.assertTrue(
- JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_VERTICA, null) instanceof DefaultJdbcMetadata);
+
+package org.apache.kylin.common;
+
+/**
+ * Decide sql pattern according to dialect from differenct data source
+ */
+public enum SourceDialect {
+ HIVE("hive"),
+
+ /**
+ * Support MySQL 5.7
+ */
+ MYSQL("mysql"),
+
+ /**
+ * Support Microsoft Sql Server 2017
+ */
+ SQL_SERVER("mssql"),
+
+ VERTICA("vertica"),
+
+ /**
+ * Others
+ */
+ UNKNOWN("unknown");
+
+ String source;
+
+ SourceDialect(String source) {
+ this.source = source;
+ }
+
+ public static SourceDialect getDialect(String name) {
+
+ for (SourceDialect dialect : SourceDialect.values()) {
+ if (dialect.source.equalsIgnoreCase(name)) {
+ return dialect;
+ }
+ }
+ return UNKNOWN;
}
}
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 0d1cafb..d0a42cc 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -241,7 +241,7 @@ public class JoinedFlatTable {
if (segRange != null && !segRange.isInfinite()) {
whereBuilder.append(" AND (");
String quotedPartitionCond = quoteIdentifierInSqlExpr(flatDesc,
- partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, flatDesc.getSegment(), segRange));
+ partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, flatDesc.getSegment(), segRange, null));
whereBuilder.append(quotedPartitionCond);
whereBuilder.append(")" + sep);
}
@@ -251,7 +251,7 @@ public class JoinedFlatTable {
sql.append(whereBuilder.toString());
}
- private static String colName(TblColRef col) {
+ public static String colName(TblColRef col) {
return colName(col, true);
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
index 56ededb..f93996e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
@@ -20,6 +20,7 @@ package org.apache.kylin.metadata.model;
import java.io.Serializable;
import java.util.Locale;
+import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.util.ClassUtil;
@@ -184,19 +185,26 @@ public class PartitionDesc implements Serializable {
// ============================================================================
public static interface IPartitionConditionBuilder {
- String buildDateRangeCondition(PartitionDesc partDesc, ISegment seg, SegmentRange segRange);
+ String buildDateRangeCondition(PartitionDesc partDesc, ISegment seg, SegmentRange segRange, Function<TblColRef, String> quoteFunc);
}
public static class DefaultPartitionConditionBuilder implements IPartitionConditionBuilder, Serializable {
@Override
- public String buildDateRangeCondition(PartitionDesc partDesc, ISegment seg, SegmentRange segRange) {
+ public String buildDateRangeCondition(PartitionDesc partDesc, ISegment seg, SegmentRange segRange, Function<TblColRef, String> quoteFunc) {
long startInclusive = (Long) segRange.start.v;
long endExclusive = (Long) segRange.end.v;
TblColRef partitionDateColumn = partDesc.getPartitionDateColumnRef();
TblColRef partitionTimeColumn = partDesc.getPartitionTimeColumnRef();
+ if (partitionDateColumn != null) {
+ partitionDateColumn.setQuotedFunc(quoteFunc);
+ }
+ if (partitionTimeColumn != null) {
+ partitionTimeColumn.setQuotedFunc(quoteFunc);
+ }
+
StringBuilder builder = new StringBuilder();
if (partDesc.partitionColumnIsYmdInt()) {
@@ -224,7 +232,7 @@ public class PartitionDesc implements Serializable {
private static void buildSingleColumnRangeCondAsTimeMillis(StringBuilder builder, TblColRef partitionColumn,
long startInclusive, long endExclusive) {
- String partitionColumnName = partitionColumn.getIdentity();
+ String partitionColumnName = partitionColumn.getQuotedIdentity();
builder.append(partitionColumnName + " >= " + startInclusive);
builder.append(" AND ");
builder.append(partitionColumnName + " < " + endExclusive);
@@ -232,7 +240,7 @@ public class PartitionDesc implements Serializable {
private static void buildSingleColumnRangeCondAsYmdInt(StringBuilder builder, TblColRef partitionColumn,
long startInclusive, long endExclusive, String partitionColumnDateFormat) {
- String partitionColumnName = partitionColumn.getIdentity();
+ String partitionColumnName = partitionColumn.getQuotedIdentity();
builder.append(partitionColumnName + " >= "
+ DateFormat.formatToDateStr(startInclusive, partitionColumnDateFormat));
builder.append(" AND ");
@@ -242,7 +250,7 @@ public class PartitionDesc implements Serializable {
private static void buildSingleColumnRangeCondition(StringBuilder builder, TblColRef partitionColumn,
long startInclusive, long endExclusive, String partitionColumnDateFormat) {
- String partitionColumnName = partitionColumn.getIdentity();
+ String partitionColumnName = partitionColumn.getQuotedIdentity();
if (endExclusive <= startInclusive) {
builder.append("1=0");
@@ -267,8 +275,8 @@ public class PartitionDesc implements Serializable {
private static void buildMultipleColumnRangeCondition(StringBuilder builder, TblColRef partitionDateColumn,
TblColRef partitionTimeColumn, long startInclusive, long endExclusive, String partitionColumnDateFormat,
String partitionColumnTimeFormat, boolean partitionDateColumnIsYmdInt) {
- String partitionDateColumnName = partitionDateColumn.getIdentity();
- String partitionTimeColumnName = partitionTimeColumn.getIdentity();
+ String partitionDateColumnName = partitionDateColumn.getQuotedIdentity();
+ String partitionTimeColumnName = partitionTimeColumn.getQuotedIdentity();
String singleQuotation = partitionDateColumnIsYmdInt ? "" : "'";
builder.append("(");
builder.append("(");
@@ -308,11 +316,14 @@ public class PartitionDesc implements Serializable {
public static class YearMonthDayPartitionConditionBuilder implements IPartitionConditionBuilder {
@Override
- public String buildDateRangeCondition(PartitionDesc partDesc, ISegment seg, SegmentRange segRange) {
+ public String buildDateRangeCondition(PartitionDesc partDesc, ISegment seg, SegmentRange segRange, Function<TblColRef, String> func) {
long startInclusive = (Long) segRange.start.v;
long endExclusive = (Long) segRange.end.v;
TblColRef partitionColumn = partDesc.getPartitionDateColumnRef();
+ if (partitionColumn != null) {
+ partitionColumn.setQuotedFunc(func);
+ }
String tableAlias = partitionColumn.getTableAlias();
String concatField = String.format(Locale.ROOT, "CONCAT(%s.YEAR,'-',%s.MONTH,'-',%s.DAY)", tableAlias,
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
index 918eedf..0dc08a9 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
@@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.io.Serializable;
import java.util.Locale;
+import java.util.function.Function;
+
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.metadata.datatype.DataType;
@@ -120,6 +122,15 @@ public class TblColRef implements Serializable {
private String identity;
private String parserDescription;
+ /**
+ * Function used to get quoted identitier
+ */
+ private transient Function<TblColRef, String> quotedFunc;
+
+ public void setQuotedFunc(Function<TblColRef, String> quotedFunc) {
+ this.quotedFunc = quotedFunc;
+ }
+
TblColRef(ColumnDesc column) {
this.column = column;
}
@@ -238,6 +249,13 @@ public class TblColRef implements Serializable {
return identity;
}
+ public String getQuotedIdentity() {
+ if (quotedFunc == null)
+ return getIdentity();
+ else
+ return quotedFunc.apply(this);
+ }
+
@Override
public String toString() {
if (isInnerColumn() && parserDescription != null)
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java
index b536e29..438fb4a 100644
--- a/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java
@@ -53,12 +53,12 @@ public class DefaultPartitionConditionBuilderTest extends LocalFileMetadataTestC
partitionDesc.setPartitionDateColumn(col.getCanonicalName());
partitionDesc.setPartitionDateFormat("yyyy-MM-dd");
TSRange range = new TSRange(DateFormat.stringToMillis("2016-02-22"), DateFormat.stringToMillis("2016-02-23"));
- String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range);
+ String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range, null);
Assert.assertEquals("UNKNOWN_ALIAS.DATE_COLUMN >= '2016-02-22' AND UNKNOWN_ALIAS.DATE_COLUMN < '2016-02-23'",
condition);
range = new TSRange(0L, 0L);
- condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range);
+ condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range, null);
Assert.assertEquals("1=0", condition);
}
@@ -71,7 +71,7 @@ public class DefaultPartitionConditionBuilderTest extends LocalFileMetadataTestC
partitionDesc.setPartitionTimeFormat("HH");
TSRange range = new TSRange(DateFormat.stringToMillis("2016-02-22 00:00:00"),
DateFormat.stringToMillis("2016-02-23 01:00:00"));
- String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range);
+ String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range, null);
Assert.assertEquals("UNKNOWN_ALIAS.HOUR_COLUMN >= '00' AND UNKNOWN_ALIAS.HOUR_COLUMN < '01'", condition);
}
@@ -88,7 +88,7 @@ public class DefaultPartitionConditionBuilderTest extends LocalFileMetadataTestC
partitionDesc.setPartitionTimeFormat("H");
TSRange range = new TSRange(DateFormat.stringToMillis("2016-02-22 00:00:00"),
DateFormat.stringToMillis("2016-02-23 01:00:00"));
- String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range);
+ String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range, null);
Assert.assertEquals(
"((UNKNOWN_ALIAS.DATE_COLUMN = '2016-02-22' AND UNKNOWN_ALIAS.HOUR_COLUMN >= '0') OR (UNKNOWN_ALIAS.DATE_COLUMN > '2016-02-22')) AND ((UNKNOWN_ALIAS.DATE_COLUMN = '2016-02-23' AND UNKNOWN_ALIAS.HOUR_COLUMN < '1') OR (UNKNOWN_ALIAS.DATE_COLUMN < '2016-02-23'))",
condition);
diff --git a/pom.xml b/pom.xml
index 3438822..4657649 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1640,6 +1640,9 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
+ <!-- Used to print file with unapproved licenses in project to stand output -->
+ <consoleOutput>true</consoleOutput>
+
<!-- Exclude files/folders for apache release -->
<excludes>
<exclude>DEPENDENCIES</exclude>
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java
deleted file mode 100644
index 7e5ecee..0000000
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.jdbc;
-
-public class JdbcDialect {
- public static final String DIALECT_VERTICA = "vertica";
- public static final String DIALECT_ORACLE = "oracle";
- public static final String DIALECT_MYSQL = "mysql";
- public static final String DIALECT_HIVE = "hive";
- public static final String DIALECT_MSSQL = "mssql";
-}
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
index 7eb4fa9..d728dcf 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
@@ -30,6 +30,7 @@ import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.SourceDialect;
import org.apache.kylin.common.util.DBUtils;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.RandomUtil;
@@ -50,7 +51,7 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye
private static final Logger logger = LoggerFactory.getLogger(JdbcExplorer.class);
private final KylinConfig config;
- private final String dialect;
+ private final SourceDialect dialect;
private final DBConnConf dbconf;
private final IJdbcMetadata jdbcMetadataDialect;
@@ -61,7 +62,7 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye
String jdbcUser = config.getJdbcSourceUser();
String jdbcPass = config.getJdbcSourcePass();
this.dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass);
- this.dialect = config.getJdbcSourceDialect();
+ this.dialect = SourceDialect.getDialect(config.getJdbcSourceDialect());
this.jdbcMetadataDialect = JdbcMetadataFactory.getJdbcMetadata(dialect, dbconf);
}
@@ -117,7 +118,7 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye
}
private String getSqlDataType(String javaDataType) {
- if (JdbcDialect.DIALECT_VERTICA.equals(dialect) || JdbcDialect.DIALECT_MSSQL.equals(dialect)) {
+ if (SourceDialect.VERTICA.equals(dialect) || SourceDialect.MYSQL.equals(dialect)) {
if (javaDataType.toLowerCase(Locale.ROOT).equals("double")) {
return "float";
}
@@ -132,9 +133,9 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye
}
private String generateCreateSchemaSql(String schemaName) {
- if (JdbcDialect.DIALECT_VERTICA.equals(dialect) || JdbcDialect.DIALECT_MYSQL.equals(dialect)) {
+ if (SourceDialect.VERTICA.equals(dialect) || SourceDialect.MYSQL.equals(dialect)) {
return String.format(Locale.ROOT, "CREATE schema IF NOT EXISTS %s", schemaName);
- } else if (JdbcDialect.DIALECT_MSSQL.equals(dialect)) {
+ } else if (SourceDialect.SQL_SERVER.equals(dialect)) {
return String.format(Locale.ROOT,
"IF NOT EXISTS (SELECT name FROM sys.schemas WHERE name = N'%s') EXEC('CREATE SCHEMA"
+ " [%s] AUTHORIZATION [dbo]')",
@@ -151,13 +152,13 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye
}
private String generateLoadDataSql(String tableName, String tableFileDir) {
- if (JdbcDialect.DIALECT_VERTICA.equals(dialect)) {
+ if (SourceDialect.VERTICA.equals(dialect)) {
return String.format(Locale.ROOT, "copy %s from local '%s/%s.csv' delimiter as ',';", tableName,
tableFileDir, tableName);
- } else if (JdbcDialect.DIALECT_MYSQL.equals(dialect)) {
+ } else if (SourceDialect.MYSQL.equals(dialect)) {
return String.format(Locale.ROOT, "LOAD DATA INFILE '%s/%s.csv' INTO %s FIELDS TERMINATED BY ',';",
tableFileDir, tableName, tableName);
- } else if (JdbcDialect.DIALECT_MSSQL.equals(dialect)) {
+ } else if (SourceDialect.SQL_SERVER.equals(dialect)) {
return String.format(Locale.ROOT, "BULK INSERT %s FROM '%s/%s.csv' WITH(FIELDTERMINATOR = ',')", tableName,
tableFileDir, tableName);
} else {
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java
index 20f2dcb..94594f3 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java
@@ -18,28 +18,43 @@
package org.apache.kylin.source.jdbc;
import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.SourceDialect;
import org.apache.kylin.common.util.SourceConfigurationUtil;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.util.FlatTableSqlQuoteUtils;
import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.JoinTableDesc;
import org.apache.kylin.metadata.model.PartitionDesc;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.model.TableRef;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.hive.DBConnConf;
import org.apache.kylin.source.hive.HiveInputBase;
+import org.apache.kylin.source.jdbc.metadata.IJdbcMetadata;
+import org.apache.kylin.source.jdbc.metadata.JdbcMetadataFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
public class JdbcHiveInputBase extends HiveInputBase {
private static final Logger logger = LoggerFactory.getLogger(JdbcHiveInputBase.class);
@@ -47,9 +62,66 @@ public class JdbcHiveInputBase extends HiveInputBase {
private static final String DEFAULT_QUEUE = "default";
public static class JdbcBaseBatchCubingInputSide extends BaseBatchCubingInputSide {
+ private IJdbcMetadata jdbcMetadataDialect;
+ private DBConnConf dbconf;
+ private SourceDialect dialect;
+ private final Map<String, String> metaMap = new TreeMap<>();
public JdbcBaseBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
super(flatDesc);
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ String connectionUrl = config.getJdbcSourceConnectionUrl();
+ String driverClass = config.getJdbcSourceDriver();
+ String jdbcUser = config.getJdbcSourceUser();
+ String jdbcPass = config.getJdbcSourcePass();
+ dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass);
+ dialect = SourceDialect.getDialect(config.getJdbcSourceDialect());
+ jdbcMetadataDialect = JdbcMetadataFactory.getJdbcMetadata(dialect, dbconf);
+ calCachedJdbcMeta(metaMap, dbconf, jdbcMetadataDialect);
+ if (logger.isTraceEnabled()) {
+ StringBuilder dumpInfo = new StringBuilder();
+ metaMap.forEach((k, v) -> dumpInfo.append("CachedMetadata: ").append(k).append(" => ").append(v)
+ .append(System.lineSeparator()));
+ logger.trace(dumpInfo.toString());
+ }
+ }
+
+ /**
+ * Fetch and cache metadata from JDBC API, which will help to resolve
+ * case-sensitive problem of sql identifier
+ *
+ * @param metadataMap a Map which mapping upper case identifier to real/original identifier
+ */
+ public static void calCachedJdbcMeta(Map<String, String> metadataMap, DBConnConf dbconf,
+ IJdbcMetadata jdbcMetadataDialect) {
+ try (Connection connection = SqlUtil.getConnection(dbconf)) {
+ DatabaseMetaData databaseMetaData = connection.getMetaData();
+ for (String database : jdbcMetadataDialect.listDatabases()) {
+ metadataMap.put(database.toUpperCase(Locale.ROOT), database);
+ ResultSet tableRs = jdbcMetadataDialect.getTable(databaseMetaData, database, null);
+ while (tableRs.next()) {
+ String tableName = tableRs.getString("TABLE_NAME");
+ ResultSet colRs = jdbcMetadataDialect.listColumns(databaseMetaData, database, tableName);
+ while (colRs.next()) {
+ String colName = colRs.getString("COLUMN_NAME");
+ colName = database + "." + tableName + "." + colName;
+ metadataMap.put(colName.toUpperCase(Locale.ROOT), colName);
+ }
+ colRs.close();
+ tableName = database + "." + tableName;
+ metadataMap.put(tableName.toUpperCase(Locale.ROOT), tableName);
+ }
+ tableRs.close();
+ }
+ } catch (IllegalStateException e) {
+ if (SqlUtil.DRIVER_MISS.equalsIgnoreCase(e.getMessage())) {
+ logger.warn("Ignore JDBC Driver Missing in yarn node.", e);
+ } else {
+ throw e;
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException("Error when connect to JDBC source " + dbconf.getUrl(), e);
+ }
}
protected KylinConfig getConfig() {
@@ -148,22 +220,19 @@ public class JdbcHiveInputBase extends HiveInputBase {
partCol = partitionDesc.getPartitionDateColumn();//tablename.colname
}
- String splitTable;
String splitTableAlias;
String splitColumn;
String splitDatabase;
TblColRef splitColRef = determineSplitColumn();
- splitTable = splitColRef.getTableRef().getTableName();
splitTableAlias = splitColRef.getTableAlias();
- splitColumn = JoinedFlatTable.getQuotedColExpressionInSourceDB(flatDesc, splitColRef);
+
+ splitColumn = getColumnIdentityQuoted(splitColRef, jdbcMetadataDialect, metaMap, true);
splitDatabase = splitColRef.getColumnDesc().getTable().getDatabase();
- //using sqoop to extract data from jdbc source and dump them to hive
- String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { partCol });
+ String selectSql = generateSelectDataStatementRDBMS(flatDesc, true, new String[] { partCol },
+ jdbcMetadataDialect, metaMap);
selectSql = escapeQuotationInSql(selectSql);
-
-
String hiveTable = flatDesc.getTableName();
String connectionUrl = config.getJdbcSourceConnectionUrl();
String driverClass = config.getJdbcSourceDriver();
@@ -175,17 +244,19 @@ public class JdbcHiveInputBase extends HiveInputBase {
String filedDelimiter = config.getJdbcSourceFieldDelimiter();
int mapperNum = config.getSqoopMapperNum();
- String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM %s.%s as %s", splitColumn,
- splitColumn, splitDatabase, splitTable, splitTableAlias);
+ String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM %s.%s ", splitColumn, splitColumn,
+ getSchemaQuoted(metaMap, splitDatabase, jdbcMetadataDialect, true),
+ getTableIdentityQuoted(splitColRef.getTableRef(), metaMap, jdbcMetadataDialect, true));
if (partitionDesc.isPartitioned()) {
SegmentRange segRange = flatDesc.getSegRange();
if (segRange != null && !segRange.isInfinite()) {
if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
&& (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
- .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
- String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
- partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
- flatDesc.getSegment(), segRange));
+ .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
+
+ String quotedPartCond = partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(
+ partitionDesc, flatDesc.getSegment(), segRange,
+ col -> getTableColumnIdentityQuoted(col, jdbcMetadataDialect, metaMap, true));
bquery += " WHERE " + quotedPartCond;
}
}
@@ -195,14 +266,13 @@ public class JdbcHiveInputBase extends HiveInputBase {
// escape ` in cmd
splitColumn = escapeQuotationInSql(splitColumn);
- String cmd = String.format(Locale.ROOT,
- "%s/bin/sqoop import" + generateSqoopConfigArgString()
- + "--connect \"%s\" --driver %s --username %s --password \"%s\" --query \"%s AND \\$CONDITIONS\" "
- + "--target-dir %s/%s --split-by %s --boundary-query \"%s\" --null-string '%s' "
- + "--null-non-string '%s' --fields-terminated-by '%s' --num-mappers %d",
- sqoopHome, connectionUrl, driverClass, jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable,
- splitColumn, bquery, sqoopNullString, sqoopNullNonString, filedDelimiter, mapperNum);
- logger.debug(String.format(Locale.ROOT, "sqoop cmd:%s", cmd));
+ String cmd = String.format(Locale.ROOT, "%s/bin/sqoop import" + generateSqoopConfigArgString()
+ + "--connect \"%s\" --driver %s --username %s --password \"%s\" --query \"%s AND \\$CONDITIONS\" "
+ + "--target-dir %s/%s --split-by %s --boundary-query \"%s\" --null-string '%s' "
+ + "--null-non-string '%s' --fields-terminated-by '%s' --num-mappers %d", sqoopHome, connectionUrl,
+ driverClass, jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable, splitColumn, bquery,
+ sqoopNullString, sqoopNullNonString, filedDelimiter, mapperNum);
+ logger.debug("sqoop cmd : {}", cmd);
CmdStep step = new CmdStep();
step.setCmd(cmd);
step.setName(ExecutableConstants.STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE);
@@ -212,7 +282,7 @@ public class JdbcHiveInputBase extends HiveInputBase {
protected String generateSqoopConfigArgString() {
KylinConfig kylinConfig = getConfig();
Map<String, String> config = Maps.newHashMap();
- config.put("mapreduce.job.queuename", getSqoopJobQueueName(kylinConfig)); // override job queue from mapreduce config
+ config.put(MR_OVERRIDE_QUEUE_KEY, getSqoopJobQueueName(kylinConfig)); // override job queue from mapreduce config
config.putAll(SourceConfigurationUtil.loadSqoopConfiguration());
config.putAll(kylinConfig.getSqoopConfigOverride());
@@ -229,4 +299,232 @@ public class JdbcHiveInputBase extends HiveInputBase {
sqlExpr = sqlExpr.replaceAll("`", "\\\\`");
return sqlExpr;
}
+
+ private static String generateSelectDataStatementRDBMS(IJoinedFlatTableDesc flatDesc, boolean singleLine,
+ String[] skipAs, IJdbcMetadata metadata, Map<String, String> metaMap) {
+ SourceDialect dialect = metadata.getDialect();
+ final String sep = singleLine ? " " : "\n";
+
+ final List<String> skipAsList = (skipAs == null) ? new ArrayList<>() : Arrays.asList(skipAs);
+
+ StringBuilder sql = new StringBuilder();
+ sql.append("SELECT");
+ sql.append(sep);
+
+ for (int i = 0; i < flatDesc.getAllColumns().size(); i++) {
+ TblColRef col = flatDesc.getAllColumns().get(i);
+ if (i > 0) {
+ sql.append(",");
+ }
+ String colTotalName = String.format(Locale.ROOT, "%s.%s", col.getTableRef().getTableName(), col.getName());
+ if (skipAsList.contains(colTotalName)) {
+ sql.append(getTableColumnIdentityQuoted(col, metadata, metaMap, true)).append(sep);
+ } else {
+ sql.append(getTableColumnIdentityQuoted(col, metadata, metaMap, true)).append(" as ")
+ .append(quoteIdentifier(JoinedFlatTable.colName(col), dialect)).append(sep);
+ }
+ }
+ appendJoinStatement(flatDesc, sql, singleLine, metadata, metaMap);
+ appendWhereStatement(flatDesc, sql, singleLine, metadata, metaMap);
+ return sql.toString();
+ }
+
+ private static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine,
+ IJdbcMetadata metadata, Map<String, String> metaMap) {
+ final String sep = singleLine ? " " : "\n";
+ Set<TableRef> dimTableCache = new HashSet<>();
+
+ DataModelDesc model = flatDesc.getDataModel();
+ sql.append(" FROM ")
+ .append(getSchemaQuoted(metaMap,
+ flatDesc.getDataModel().getRootFactTable().getTableDesc().getDatabase(), metadata, true))
+ .append(".")
+ .append(getTableIdentityQuoted(flatDesc.getDataModel().getRootFactTable(), metaMap, metadata, true));
+
+ sql.append(" ");
+ sql.append((getTableIdentityQuoted(flatDesc.getDataModel().getRootFactTable(), metaMap, metadata, true)))
+ .append(sep);
+
+ for (JoinTableDesc lookupDesc : model.getJoinTables()) {
+ JoinDesc join = lookupDesc.getJoin();
+ if (join != null && !join.getType().equals("")) {
+ TableRef dimTable = lookupDesc.getTableRef();
+ if (!dimTableCache.contains(dimTable)) {
+ TblColRef[] pk = join.getPrimaryKeyColumns();
+ TblColRef[] fk = join.getForeignKeyColumns();
+ if (pk.length != fk.length) {
+ throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
+ }
+ String joinType = join.getType().toUpperCase(Locale.ROOT);
+
+ sql.append(joinType).append(" JOIN ")
+ .append(getSchemaQuoted(metaMap, dimTable.getTableDesc().getDatabase(), metadata, true))
+ .append(".").append(getTableIdentityQuoted(dimTable, metaMap, metadata, true));
+
+ sql.append(" ");
+ sql.append(getTableIdentityQuoted(dimTable, metaMap, metadata, true)).append(sep);
+ sql.append("ON ");
+ for (int i = 0; i < pk.length; i++) {
+ if (i > 0) {
+ sql.append(" AND ");
+ }
+ sql.append(getTableColumnIdentityQuoted(fk[i], metadata, metaMap, true)).append(" = ")
+ .append(getTableColumnIdentityQuoted(pk[i], metadata, metaMap, true));
+ }
+ sql.append(sep);
+ dimTableCache.add(dimTable);
+ }
+ }
+ }
+ }
+
+ private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine,
+ IJdbcMetadata metadata, Map<String, String> metaMap) {
+ final String sep = singleLine ? " " : "\n";
+
+ StringBuilder whereBuilder = new StringBuilder();
+ whereBuilder.append("WHERE 1=1");
+
+ DataModelDesc model = flatDesc.getDataModel();
+ if (StringUtils.isNotEmpty(model.getFilterCondition())) {
+ whereBuilder.append(" AND (").append(model.getFilterCondition()).append(") ");
+ }
+
+ if (flatDesc.getSegment() != null) {
+ PartitionDesc partDesc = model.getPartitionDesc();
+ if (partDesc != null && partDesc.getPartitionDateColumn() != null) {
+ SegmentRange segRange = flatDesc.getSegRange();
+
+ if (segRange != null && !segRange.isInfinite()) {
+ whereBuilder.append(" AND (");
+ whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc,
+ flatDesc.getSegment(), segRange,
+ col -> getTableColumnIdentityQuoted(col, metadata, metaMap, true)));
+ whereBuilder.append(")");
+ whereBuilder.append(sep);
+ }
+ }
+ }
+ sql.append(whereBuilder.toString());
+ }
+
+ /**
+ * @return {TABLE_NAME}.{COLUMN_NAME}
+ */
+ private static String getTableColumnIdentityQuoted(TblColRef col, IJdbcMetadata metadata,
+ Map<String, String> metaMap, boolean needQuote) {
+ String tblName = getTableIdentityQuoted(col.getTableRef(), metaMap, metadata, needQuote);
+ String colName = getColumnIdentityQuoted(col, metadata, metaMap, needQuote);
+ return tblName + "." + colName;
+ }
+
+ /**
+ * @return {SCHEMA_NAME}
+ */
+ static String getSchemaQuoted(Map<String, String> metaMap, String database, IJdbcMetadata metadata,
+ boolean needQuote) {
+ String databaseName = fetchValue(database, null, null, metaMap);
+ if (needQuote) {
+ return quoteIdentifier(databaseName, metadata.getDialect());
+ } else {
+ return databaseName;
+ }
+ }
+
+ /**
+ * @return {TABLE_NAME}
+ */
+ static String getTableIdentityQuoted(TableRef tableRef, Map<String, String> metaMap, IJdbcMetadata metadata,
+ boolean needQuote) {
+ String value = fetchValue(tableRef.getTableDesc().getDatabase(), tableRef.getTableDesc().getName(), null,
+ metaMap);
+ String[] res = value.split("\\.");
+ value = res[res.length - 1];
+ if (needQuote) {
+ return quoteIdentifier(value, metadata.getDialect());
+ } else {
+ return value;
+ }
+ }
+
+ /**
+ * @return {TABLE_NAME}
+ */
+ static String getTableIdentityQuoted(String database, String table, Map<String, String> metaMap,
+ IJdbcMetadata metadata, boolean needQuote) {
+ String value = fetchValue(database, table, null, metaMap);
+ String[] res = value.split("\\.");
+ value = res[res.length - 1];
+ if (needQuote) {
+ return quoteIdentifier(value, metadata.getDialect());
+ } else {
+ return value;
+ }
+ }
+
+ /**
+ * @return {COLUMN_NAME}
+ */
+ private static String getColumnIdentityQuoted(TblColRef tblColRef, IJdbcMetadata metadata,
+ Map<String, String> metaMap, boolean needQuote) {
+ String value = fetchValue(tblColRef.getTableRef().getTableDesc().getDatabase(),
+ tblColRef.getTableRef().getTableDesc().getName(), tblColRef.getName(), metaMap);
+ String[] res = value.split("\\.");
+ value = res[res.length - 1];
+ if (needQuote) {
+ return quoteIdentifier(value, metadata.getDialect());
+ } else {
+ return value;
+ }
+ }
+
+ /**
+ * Quote the identifier acccording to sql dialect, as far as I know,
+ * MySQL use backtick(`), oracle 11g use double quotation("), sql server 2017
+ * use square brackets([ or ]) as quote character.
+ *
+ * @param identifier something looks like tableA.columnB
+ */
+ static String quoteIdentifier(String identifier, SourceDialect dialect) {
+ if (KylinConfig.getInstanceFromEnv().enableHiveDdlQuote()) {
+ String[] identifierArray = identifier.split("\\.");
+ String quoted = "";
+ for (int i = 0; i < identifierArray.length; i++) {
+ switch (dialect) {
+ case SQL_SERVER:
+ identifierArray[i] = "[" + identifierArray[i] + "]";
+ break;
+ case MYSQL:
+ case HIVE:
+ identifierArray[i] = "`" + identifierArray[i] + "`";
+ break;
+ default:
+ String quote = KylinConfig.getInstanceFromEnv().getQuoteCharacter();
+ identifierArray[i] = quote + identifierArray[i] + quote;
+ }
+ }
+ quoted = String.join(".", identifierArray);
+ return quoted;
+ } else {
+ return identifier;
+ }
+ }
+
+ static String fetchValue(String database, String table, String column, Map<String, String> metadataMap) {
+ String key;
+ if (table == null && column == null) {
+ key = database;
+ } else if (column == null) {
+ key = database + "." + table;
+ } else {
+ key = database + "." + table + "." + column;
+ }
+ String val = metadataMap.get(key.toUpperCase(Locale.ROOT));
+ if (val == null) {
+ logger.warn("Not find for {} from metadata cache.", key);
+ return key;
+ } else {
+ return val;
+ }
+ }
}
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java
index 3c2b4f9..1c689dd 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java
@@ -23,10 +23,15 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.SourceDialect;
import org.apache.kylin.source.IReadableTable.TableReader;
import org.apache.kylin.source.hive.DBConnConf;
+import org.apache.kylin.source.jdbc.metadata.IJdbcMetadata;
+import org.apache.kylin.source.jdbc.metadata.JdbcMetadataFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +66,15 @@ public class JdbcTableReader implements TableReader {
String jdbcPass = config.getJdbcSourcePass();
dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass);
jdbcCon = SqlUtil.getConnection(dbconf);
- String sql = String.format(Locale.ROOT, "select * from %s.%s", dbName, tableName);
+ IJdbcMetadata meta = JdbcMetadataFactory
+ .getJdbcMetadata(SourceDialect.getDialect(config.getJdbcSourceDialect()), dbconf);
+
+ Map<String, String> metadataCache = new TreeMap<>();
+ JdbcHiveInputBase.JdbcBaseBatchCubingInputSide.calCachedJdbcMeta(metadataCache, dbconf, meta);
+ String database = JdbcHiveInputBase.getSchemaQuoted(metadataCache, dbName, meta, true);
+ String table = JdbcHiveInputBase.getTableIdentityQuoted(dbName, tableName, metadataCache, meta, true);
+
+ String sql = String.format(Locale.ROOT, "select * from %s.%s", database, table);
try {
statement = jdbcCon.createStatement();
rs = statement.executeQuery(sql);
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
index 5242832..9299d78 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
@@ -23,7 +23,6 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.Random;
-
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.source.hive.DBConnConf;
import org.slf4j.Logger;
@@ -62,6 +61,7 @@ public class SqlUtil {
}
public static final int tryTimes = 5;
+ public static final String DRIVER_MISS = "DRIVER_MISS";
public static Connection getConnection(DBConnConf dbconf) {
if (dbconf.getUrl() == null)
@@ -70,7 +70,8 @@ public class SqlUtil {
try {
Class.forName(dbconf.getDriver());
} catch (Exception e) {
- logger.error("", e);
+ logger.error("Miss Driver", e);
+ throw new IllegalStateException(DRIVER_MISS);
}
boolean got = false;
int times = 0;
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
index 9fd6d30..fcafae2 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
@@ -94,7 +94,7 @@ public class JdbcHiveInputBase extends org.apache.kylin.source.jdbc.JdbcHiveInpu
.getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
- flatDesc.getSegment(), segRange));
+ flatDesc.getSegment(), segRange, null));
bquery += " WHERE " + quotedPartCond;
}
}
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java
index 0842199..b9c65fc 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java
@@ -23,8 +23,8 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
-
import java.util.Locale;
+import org.apache.kylin.common.SourceDialect;
import org.apache.kylin.source.hive.DBConnConf;
import org.apache.kylin.source.jdbc.SqlUtil;
import org.slf4j.Logger;
@@ -74,4 +74,8 @@ public class DefaultJdbcMetadata implements IJdbcMetadata {
public ResultSet listColumns(final DatabaseMetaData dbmd, String schema, String table) throws SQLException {
return dbmd.getColumns(null, schema, table, null);
}
+
+ public SourceDialect getDialect() {
+ return SourceDialect.UNKNOWN;
+ }
}
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java
index 169fe60..f41c3e8 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java
@@ -17,12 +17,16 @@
*/
package org.apache.kylin.source.jdbc.metadata;
+import org.apache.kylin.common.SourceDialect;
+
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
public interface IJdbcMetadata {
+ SourceDialect getDialect();
+
List<String> listDatabases() throws SQLException;
List<String> listTables(String database) throws SQLException;
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java
index ae4c0ff..498bc09 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java
@@ -17,17 +17,19 @@
*/
package org.apache.kylin.source.jdbc.metadata;
-import java.util.Locale;
+import org.apache.kylin.common.SourceDialect;
import org.apache.kylin.source.hive.DBConnConf;
-import org.apache.kylin.source.jdbc.JdbcDialect;
-public abstract class JdbcMetadataFactory {
- public static IJdbcMetadata getJdbcMetadata(String dialect, final DBConnConf dbConnConf) {
- String jdbcDialect = (null == dialect) ? "" : dialect.toLowerCase(Locale.ROOT);
+public class JdbcMetadataFactory {
+
+ private JdbcMetadataFactory() {
+ }
+
+ public static IJdbcMetadata getJdbcMetadata(SourceDialect jdbcDialect, final DBConnConf dbConnConf) {
switch (jdbcDialect) {
- case (JdbcDialect.DIALECT_MSSQL):
+ case SQL_SERVER:
return new SQLServerJdbcMetadata(dbConnConf);
- case (JdbcDialect.DIALECT_MYSQL):
+ case MYSQL:
return new MySQLJdbcMetadata(dbConnConf);
default:
return new DefaultJdbcMetadata(dbConnConf);
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java
index 54c2a03..e3c523c 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java
@@ -24,6 +24,7 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.kylin.common.SourceDialect;
import org.apache.kylin.source.hive.DBConnConf;
import org.apache.kylin.source.jdbc.SqlUtil;
@@ -64,4 +65,9 @@ public class MySQLJdbcMetadata extends DefaultJdbcMetadata {
public ResultSet getTable(final DatabaseMetaData dbmd, String catalog, String table) throws SQLException {
return dbmd.getTables(catalog, null, table, null);
}
+
+ @Override
+ public SourceDialect getDialect() {
+ return SourceDialect.MYSQL;
+ }
}
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java
index 5373672..696a350 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.SourceDialect;
import org.apache.kylin.source.hive.DBConnConf;
import org.apache.kylin.source.jdbc.SqlUtil;
@@ -59,4 +60,9 @@ public class SQLServerJdbcMetadata extends DefaultJdbcMetadata {
}
return new ArrayList<>(ret);
}
+
+ @Override
+ public SourceDialect getDialect() {
+ return SourceDialect.SQL_SERVER;
+ }
}
diff --git a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java
index a0df4f4..ed3d181 100644
--- a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java
+++ b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java
@@ -18,7 +18,6 @@
package org.apache.kylin.source.jdbc;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -35,6 +34,7 @@ import java.util.List;
import java.util.Locale;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.SourceDialect;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.metadata.model.ColumnDesc;
@@ -83,7 +83,7 @@ public class JdbcExplorerTest extends LocalFileMetadataTestCase {
PowerMockito.stub(PowerMockito.method(SqlUtil.class, "getConnection")).toReturn(connection);
PowerMockito.mockStatic(JdbcMetadataFactory.class);
- when(JdbcMetadataFactory.getJdbcMetadata(anyString(), any(DBConnConf.class))).thenReturn(jdbcMetadata);
+ when(JdbcMetadataFactory.getJdbcMetadata(any(SourceDialect.class), any(DBConnConf.class))).thenReturn(jdbcMetadata);
when(connection.getMetaData()).thenReturn(dbmd);
jdbcExplorer = spy(JdbcExplorer.class);
diff --git a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcHiveInputBaseTest.java b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcHiveInputBaseTest.java
new file mode 100644
index 0000000..f6415e6
--- /dev/null
+++ b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcHiveInputBaseTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.jdbc;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.SourceDialect;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class JdbcHiveInputBaseTest extends LocalFileMetadataTestCase {
+
+ @BeforeClass
+ public static void setupClass() throws SQLException {
+ staticCreateTestMetadata();
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ kylinConfig.setProperty("kylin.source.hive.quote-enabled", "true");
+ }
+
+ @Test
+ public void testFetchValue() {
+ Map<String, String> map = new HashMap<>();
+ String guess = JdbcHiveInputBase.fetchValue("DB_1", "TB_2", "COL_3", map);
+
+ // not found, return input value
+ assertEquals("DB_1.TB_2.COL_3", guess);
+ map.put("DB_1.TB_2.COL_3", "Db_1.Tb_2.Col_3");
+
+ guess = JdbcHiveInputBase.fetchValue("DB_1", "TB_2", "COL_3", map);
+ // found, return cached value
+ assertEquals("Db_1.Tb_2.Col_3", guess);
+ }
+
+ @Test
+ public void testQuoteIdentifier() {
+ String guess = JdbcHiveInputBase.quoteIdentifier("Tbl1.Col1", SourceDialect.MYSQL);
+ assertEquals("`Tbl1`.`Col1`", guess);
+ guess = JdbcHiveInputBase.quoteIdentifier("Tbl1.Col1", SourceDialect.SQL_SERVER);
+ assertEquals("[Tbl1].[Col1]", guess);
+ }
+
+ @AfterClass
+ public static void clenup() {
+ staticCleanupTestMetadata();
+ }
+}