You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/11/26 06:16:19 UTC
[1/4] kylin git commit: KYLIN-3044 support SQLServer as kylin data
source [Forced Update!]
Repository: kylin
Updated Branches:
refs/heads/KYLIN-2966 85d7e6260 -> b40d0e06f (forced update)
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadataTest.java
----------------------------------------------------------------------
diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadataTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadataTest.java
new file mode 100644
index 0000000..a5516ab
--- /dev/null
+++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadataTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.jdbc.metadata;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.kylin.source.hive.DBConnConf;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SQLServerJdbcMetadataTest extends DefaultJdbcMetadataTest {
+
+ @Before
+ public void setup() {
+ dbConnConf = new DBConnConf();
+ dbConnConf.setUrl("jdbc:sqlserver://fakehost:1433;database=testdb");
+ dbConnConf.setDriver("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+ dbConnConf.setUser("user");
+ dbConnConf.setPass("pass");
+ jdbcMetadata = new SQLServerJdbcMetadata(dbConnConf);
+
+ setupProperties();
+ }
+
+ @Test
+ public void testListDatabases() throws SQLException {
+ ResultSet rs = mock(ResultSet.class);
+ when(rs.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+ when(rs.getString("TABLE_SCHEM")).thenReturn("schema1").thenReturn("schema2");
+ when(rs.getString("TABLE_CATALOG")).thenReturn("catalog1").thenReturn("testdb");
+
+ when(connection.getCatalog()).thenReturn("testdb");
+ when(connection.getMetaData()).thenReturn(dbmd);
+ when(dbmd.getSchemas("testdb", "%")).thenReturn(rs);
+
+ List<String> dbs = jdbcMetadata.listDatabases();
+
+ Assert.assertEquals(1, dbs.size());
+ Assert.assertEquals("schema2", dbs.get(0));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testListDatabasesWithoutSpecificDB() throws SQLException {
+ when(connection.getCatalog()).thenReturn("");
+ jdbcMetadata.listDatabases();
+ }
+}
[4/4] kylin git commit: KYLIN-2966 add pushdown jdbc columntype
mapping
Posted by li...@apache.org.
KYLIN-2966 add pushdown jdbc columntype mapping
This closes #82
Signed-off-by: Li Yang <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b40d0e06
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b40d0e06
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b40d0e06
Branch: refs/heads/KYLIN-2966
Commit: b40d0e06f8dddf0916a40eb01a49417d4b65826b
Parents: 695971a
Author: zhaiyuyong <zh...@126.com>
Authored: Wed Oct 25 17:06:04 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Sun Nov 26 14:16:04 2017 +0800
----------------------------------------------------------------------
.../query/adhoc/PushDownRunnerJdbcImpl.java | 45 +++++++++++++++++++-
1 file changed, 44 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/b40d0e06/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java b/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java
index 503e273..7283d66 100644
--- a/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java
+++ b/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java
@@ -23,6 +23,7 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Types;
import java.util.LinkedList;
import java.util.List;
@@ -66,7 +67,7 @@ public class PushDownRunnerJdbcImpl implements IPushDownRunner {
columnMetas.add(new SelectedColumnMeta(metaData.isAutoIncrement(i), metaData.isCaseSensitive(i), false,
metaData.isCurrency(i), metaData.isNullable(i), false, metaData.getColumnDisplaySize(i),
metaData.getColumnLabel(i), metaData.getColumnName(i), null, null, null,
- metaData.getPrecision(i), metaData.getScale(i), metaData.getColumnType(i),
+ metaData.getPrecision(i), metaData.getScale(i), toSqlType(metaData.getColumnTypeName(i)),
metaData.getColumnTypeName(i), metaData.isReadOnly(i), false, false));
}
} finally {
@@ -76,6 +77,48 @@ public class PushDownRunnerJdbcImpl implements IPushDownRunner {
}
}
+ // calcite does not understand all java SqlTypes, for example LONGNVARCHAR -16, thus need this mapping (KYLIN-2966)
+ public static int toSqlType(String type) throws SQLException {
+ if ("string".equalsIgnoreCase(type)) {
+ return Types.VARCHAR;
+ } else if ("varchar".equalsIgnoreCase(type)) {
+ return Types.VARCHAR;
+ } else if ("char".equalsIgnoreCase(type)) {
+ return Types.CHAR;
+ } else if ("float".equalsIgnoreCase(type)) {
+ return Types.FLOAT;
+ } else if ("real".equalsIgnoreCase(type)) {
+ return Types.REAL;
+ } else if ("double".equalsIgnoreCase(type)) {
+ return Types.DOUBLE;
+ } else if ("boolean".equalsIgnoreCase(type)) {
+ return Types.BOOLEAN;
+ } else if ("tinyint".equalsIgnoreCase(type)) {
+ return Types.TINYINT;
+ } else if ("smallint".equalsIgnoreCase(type)) {
+ return Types.SMALLINT;
+ } else if ("int".equalsIgnoreCase(type)) {
+ return Types.INTEGER;
+ } else if ("bigint".equalsIgnoreCase(type)) {
+ return Types.BIGINT;
+ } else if ("date".equalsIgnoreCase(type)) {
+ return Types.DATE;
+ } else if ("timestamp".equalsIgnoreCase(type)) {
+ return Types.TIMESTAMP;
+ } else if ("decimal".equalsIgnoreCase(type)) {
+ return Types.DECIMAL;
+ } else if ("binary".equalsIgnoreCase(type)) {
+ return Types.BINARY;
+ } else if ("map".equalsIgnoreCase(type)) {
+ return Types.JAVA_OBJECT;
+ } else if ("array".equalsIgnoreCase(type)) {
+ return Types.ARRAY;
+ } else if ("struct".equalsIgnoreCase(type)) {
+ return Types.STRUCT;
+ }
+ throw new SQLException("Unrecognized column type: " + type);
+ }
+
@Override
public void executeUpdate(String sql) throws Exception {
Statement statement = null;
[3/4] kylin git commit: KYLIN-3044 minor code review
Posted by li...@apache.org.
KYLIN-3044 minor code review
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/695971a2
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/695971a2
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/695971a2
Branch: refs/heads/KYLIN-2966
Commit: 695971a2a27837bbf10f3fb0c4079ce11333951d
Parents: cac1f8b
Author: Li Yang <li...@apache.org>
Authored: Sun Nov 26 13:04:27 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Sun Nov 26 13:04:27 2017 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/KylinConfigBase.java | 2 +-
.../java/org/apache/kylin/job/JoinedFlatTable.java | 3 ++-
.../org/apache/kylin/source/jdbc/JdbcExplorer.java | 2 +-
.../apache/kylin/source/jdbc/JdbcHiveMRInput.java | 15 ++++++++-------
.../java/org/apache/kylin/source/jdbc/SqlUtil.java | 2 +-
.../org/apache/kylin/source/jdbc/SqlUtilTest.java | 6 +++---
6 files changed, 16 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/695971a2/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index b2368ce..1a93dd4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -752,7 +752,7 @@ abstract public class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.source.jdbc.sqoop-mapper-num", "4"));
}
- public String getFieldDelimiter() {
+ public String getSourceFieldDelimiter() {
return getOptional("kylin.source.jdbc.field-delimiter", "|");
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/695971a2/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 0ab3d3d..d136ec6 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -85,7 +85,8 @@ public class JoinedFlatTable {
public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir,
String format) {
- return generateCreateTableStatement(flatDesc, storageDfsDir, format, "|");
+ String fieldDelimiter = flatDesc.getDataModel().getConfig().getSourceFieldDelimiter();
+ return generateCreateTableStatement(flatDesc, storageDfsDir, format, fieldDelimiter);
}
public static String generateDropTableStatement(IJoinedFlatTableDesc flatDesc) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/695971a2/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
index 1278128..2827b3d 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
@@ -112,7 +112,7 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye
ColumnDesc cdesc = new ColumnDesc();
cdesc.setName(cname.toUpperCase());
- String kylinType = SqlUtil.jdbcTypetoKylinDataType(type);
+ String kylinType = SqlUtil.jdbcTypeToKylinDataType(type);
int precision = (SqlUtil.isPrecisionApplicable(kylinType) && csize > 0) ? csize : -1;
int scale = (SqlUtil.isScaleApplicable(kylinType) && digits > 0) ? digits : -1;
http://git-wip-us.apache.org/repos/asf/kylin/blob/695971a2/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index e83518a..15259cc 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@ -21,7 +21,6 @@ package org.apache.kylin.source.jdbc;
import java.util.List;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.constant.ExecutableConstants;
@@ -51,6 +50,10 @@ public class JdbcHiveMRInput extends HiveMRInput {
public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
super(flatDesc);
}
+
+ private KylinConfig getConfig() {
+ return flatDesc.getDataModel().getConfig();
+ }
@Override
protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
@@ -64,8 +67,7 @@ public class JdbcHiveMRInput extends HiveMRInput {
private AbstractExecutable createFlatHiveTableFromFiles(String hiveInitStatements, String jobWorkingDir) {
final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- String filedDelimiter = config.getFieldDelimiter();
+ String filedDelimiter = getConfig().getSourceFieldDelimiter();
// Sqoop does not support exporting SEQUENSEFILE to Hive now SQOOP-869
final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir,
"TEXTFILE", filedDelimiter);
@@ -97,7 +99,7 @@ public class JdbcHiveMRInput extends HiveMRInput {
return partitionDesc.getPartitionDateColumnRef();
}
TblColRef splitColumn = null;
- TableMetadataManager tblManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+ TableMetadataManager tblManager = TableMetadataManager.getInstance(getConfig());
long maxCardinality = 0;
for (TableRef tableRef : flatDesc.getDataModel().getAllTables()) {
TableExtDesc tableExtDesc = tblManager.getTableExt(tableRef.getTableDesc());
@@ -123,8 +125,7 @@ public class JdbcHiveMRInput extends HiveMRInput {
}
private AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, String cubeName) {
- KylinConfig config = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName)
- .getConfig();
+ KylinConfig config = getConfig();
PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc();
String partCol = null;
String partitionString = null;
@@ -151,7 +152,7 @@ public class JdbcHiveMRInput extends HiveMRInput {
String jdbcUser = config.getJdbcSourceUser();
String jdbcPass = config.getJdbcSourcePass();
String sqoopHome = config.getSqoopHome();
- String filedDelimiter = config.getFieldDelimiter();
+ String filedDelimiter = config.getSourceFieldDelimiter();
int mapperNum = config.getSqoopMapperNum();
String bquery = String.format("SELECT min(%s), max(%s) FROM %s.%s", splitColumn, splitColumn, splitDatabase,
http://git-wip-us.apache.org/repos/asf/kylin/blob/695971a2/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
index 79fab7d..715bb99 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
@@ -96,7 +96,7 @@ public class SqlUtil {
return con;
}
- public static String jdbcTypetoKylinDataType(int sqlType) {
+ public static String jdbcTypeToKylinDataType(int sqlType) {
String result = "any";
switch (sqlType) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/695971a2/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java
----------------------------------------------------------------------
diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java
index d952675..7a7fd33 100644
--- a/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java
+++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java
@@ -27,9 +27,9 @@ public class SqlUtilTest {
@Test
public void testJdbcTypetoKylinDataType() {
this.getClass().getClassLoader().toString();
- Assert.assertEquals("double", SqlUtil.jdbcTypetoKylinDataType(Types.FLOAT));
- Assert.assertEquals("varchar", SqlUtil.jdbcTypetoKylinDataType(Types.NVARCHAR));
- Assert.assertEquals("any", SqlUtil.jdbcTypetoKylinDataType(Types.ARRAY));
+ Assert.assertEquals("double", SqlUtil.jdbcTypeToKylinDataType(Types.FLOAT));
+ Assert.assertEquals("varchar", SqlUtil.jdbcTypeToKylinDataType(Types.NVARCHAR));
+ Assert.assertEquals("any", SqlUtil.jdbcTypeToKylinDataType(Types.ARRAY));
}
@Test
[2/4] kylin git commit: KYLIN-3044 support SQLServer as kylin data
source
Posted by li...@apache.org.
KYLIN-3044 support SQLServer as kylin data source
Signed-off-by: Li Yang <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cac1f8bb
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cac1f8bb
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cac1f8bb
Branch: refs/heads/KYLIN-2966
Commit: cac1f8bb48239fe5a49ca720575915f3507a4010
Parents: d837e18
Author: etherge <et...@163.com>
Authored: Wed Nov 22 00:26:49 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Sun Nov 26 13:03:47 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 31 +-
.../main/resources/kylin-defaults.properties | 9 +
.../org/apache/kylin/job/JoinedFlatTable.java | 45 +--
pom.xml | 2 +-
source-hive/pom.xml | 25 ++
.../apache/kylin/source/jdbc/JdbcDialect.java | 26 ++
.../apache/kylin/source/jdbc/JdbcExplorer.java | 288 ++++++++-----------
.../kylin/source/jdbc/JdbcHiveMRInput.java | 129 +++++++--
.../kylin/source/jdbc/JdbcTableReader.java | 32 ++-
.../org/apache/kylin/source/jdbc/SqlUtil.java | 140 ++++++---
.../jdbc/metadata/DefaultJdbcMetadata.java | 76 +++++
.../source/jdbc/metadata/IJdbcMetadata.java | 33 +++
.../jdbc/metadata/JdbcMetadataFactory.java | 35 +++
.../source/jdbc/metadata/MySQLJdbcMetadata.java | 69 +++++
.../jdbc/metadata/SQLServerJdbcMetadata.java | 61 ++++
.../kylin/source/jdbc/JdbcExplorerTest.java | 156 ++++++++++
.../apache/kylin/source/jdbc/SqlUtilTest.java | 46 +++
.../jdbc/metadata/DefaultJdbcMetadataTest.java | 126 ++++++++
.../jdbc/metadata/JdbcMetadataFactoryTest.java | 35 +++
.../jdbc/metadata/MySQLJdbcMetadataTest.java | 104 +++++++
.../metadata/SQLServerJdbcMetadataTest.java | 68 +++++
21 files changed, 1258 insertions(+), 278 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3d67ee3..b2368ce 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -287,7 +287,7 @@ abstract public class KylinConfigBase implements Serializable {
r.putAll(getPropertiesByPrefix("kylin.metadata.resource-store-provider.")); // note the naming convention -- http://kylin.apache.org/development/coding_naming_convention.html
return r;
}
-
+
public String getDataModelImpl() {
return getOptional("kylin.metadata.data-model-impl", null);
}
@@ -295,7 +295,7 @@ abstract public class KylinConfigBase implements Serializable {
public String getDataModelManagerImpl() {
return getOptional("kylin.metadata.data-model-manager-impl", null);
}
-
+
public String[] getRealizationProviders() {
return getOptionalStringArray("kylin.metadata.realization-providers", //
new String[] { "org.apache.kylin.cube.CubeManager", "org.apache.kylin.storage.hybrid.HybridManager" });
@@ -314,7 +314,7 @@ abstract public class KylinConfigBase implements Serializable {
"org.apache.kylin.storage.hbase.util.ZookeeperDistributedLock$Factory");
return (DistributedLockFactory) ClassUtil.newInstance(clsName);
}
-
+
public String getHBaseMappingAdapter() {
return getOptional("kylin.metadata.hbasemapping-adapter");
}
@@ -431,11 +431,11 @@ abstract public class KylinConfigBase implements Serializable {
// ============================================================================
// Cube Planner
// ============================================================================
-
+
public boolean isCubePlannerEnabled() {
return Boolean.parseBoolean(getOptional("kylin.cube.cubeplanner.enabled", "false"));
}
-
+
public boolean isCubePlannerEnabledForExistingCube() {
return Boolean.parseBoolean(getOptional("kylin.cube.cubeplanner.enabled-for-existing-cube", "false"));
}
@@ -724,23 +724,23 @@ abstract public class KylinConfigBase implements Serializable {
// SOURCE.JDBC
// ============================================================================
- public String getJdbcConnectionUrl() {
+ public String getJdbcSourceConnectionUrl() {
return getOptional("kylin.source.jdbc.connection-url");
}
- public String getJdbcDriver() {
+ public String getJdbcSourceDriver() {
return getOptional("kylin.source.jdbc.driver");
}
- public String getJdbcDialect() {
+ public String getJdbcSourceDialect() {
return getOptional("kylin.source.jdbc.dialect");
}
- public String getJdbcUser() {
+ public String getJdbcSourceUser() {
return getOptional("kylin.source.jdbc.user");
}
- public String getJdbcPass() {
+ public String getJdbcSourcePass() {
return getOptional("kylin.source.jdbc.pass");
}
@@ -748,6 +748,14 @@ abstract public class KylinConfigBase implements Serializable {
return getOptional("kylin.source.jdbc.sqoop-home");
}
+ public int getSqoopMapperNum() {
+ return Integer.parseInt(getOptional("kylin.source.jdbc.sqoop-mapper-num", "4"));
+ }
+
+ public String getFieldDelimiter() {
+ return getOptional("kylin.source.jdbc.field-delimiter", "|");
+ }
+
// ============================================================================
// STORAGE.HBASE
// ============================================================================
@@ -1008,7 +1016,6 @@ abstract public class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-uhc-dict", "true"));
}
-
public boolean isBuildDictInReducerEnabled() {
return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-dict-in-reducer", "true"));
}
@@ -1279,7 +1286,7 @@ abstract public class KylinConfigBase implements Serializable {
public int getServerUserCacheMaxEntries() {
return Integer.valueOf(this.getOptional("kylin.server.auth-user-cache.max-entries", "100"));
}
-
+
public String getExternalAclProvider() {
return getOptional("kylin.server.external-acl-provider", "");
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/core-common/src/main/resources/kylin-defaults.properties
----------------------------------------------------------------------
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 1602087..475deb3 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -290,3 +290,12 @@ kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
#kylin.query.pushdown.jdbc.pool-max-total=8
#kylin.query.pushdown.jdbc.pool-max-idle=8
#kylin.query.pushdown.jdbc.pool-min-idle=0
+
+### JDBC Data Source
+#kylin.source.jdbc.connection-url=
+#kylin.source.jdbc.driver=
+#kylin.source.jdbc.dialect=
+#kylin.source.jdbc.user=
+#kylin.source.jdbc.pass=
+#kylin.source.jdbc.sqoop-home=
+#kylin.source.jdbc.filed-delimiter=|
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 9593718..0ab3d3d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.engine.JobEngineConfig;
@@ -59,7 +60,7 @@ public class JoinedFlatTable {
}
public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir,
- String format) {
+ String format, String filedDelimiter) {
StringBuilder ddl = new StringBuilder();
ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + flatDesc.getTableName() + "\n");
@@ -74,7 +75,7 @@ public class JoinedFlatTable {
}
ddl.append(")" + "\n");
if ("TEXTFILE".equals(format)) {
- ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" + "\n");
+ ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '" + filedDelimiter + "'\n");
}
ddl.append("STORED AS " + format + "\n");
ddl.append("LOCATION '" + getTableDir(flatDesc, storageDfsDir) + "';").append("\n");
@@ -82,6 +83,11 @@ public class JoinedFlatTable {
return ddl.toString();
}
+ public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir,
+ String format) {
+ return generateCreateTableStatement(flatDesc, storageDfsDir, format, "|");
+ }
+
public static String generateDropTableStatement(IJoinedFlatTableDesc flatDesc) {
StringBuilder ddl = new StringBuilder();
ddl.append("DROP TABLE IF EXISTS " + flatDesc.getTableName() + ";").append("\n");
@@ -94,7 +100,7 @@ public class JoinedFlatTable {
if (null == segment) {
kylinConfig = KylinConfig.getInstanceFromEnv();
} else {
- kylinConfig = ((CubeSegment) flatDesc.getSegment()).getConfig();
+ kylinConfig = (flatDesc.getSegment()).getConfig();
}
if (kylinConfig.isAdvancedFlatTableUsed()) {
@@ -210,15 +216,12 @@ public class JoinedFlatTable {
private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) {
final String sep = singleLine ? " " : "\n";
- boolean hasCondition = false;
StringBuilder whereBuilder = new StringBuilder();
- whereBuilder.append("WHERE");
+ whereBuilder.append("WHERE 1=1");
DataModelDesc model = flatDesc.getDataModel();
-
- if (model.getFilterCondition() != null && model.getFilterCondition().equals("") == false) {
- whereBuilder.append(" (").append(model.getFilterCondition()).append(") ");
- hasCondition = true;
+ if (StringUtils.isNotEmpty(model.getFilterCondition())) {
+ whereBuilder.append(" AND (").append(model.getFilterCondition()).append(") ");
}
if (flatDesc.getSegment() != null) {
@@ -227,18 +230,15 @@ public class JoinedFlatTable {
SegmentRange segRange = flatDesc.getSegRange();
if (segRange != null && !segRange.isInfinite()) {
- whereBuilder.append(hasCondition ? " AND (" : " (");
+ whereBuilder.append(" AND (");
whereBuilder.append(
partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, segRange));
whereBuilder.append(")" + sep);
- hasCondition = true;
}
}
}
- if (hasCondition) {
- sql.append(whereBuilder.toString());
- }
+ sql.append(whereBuilder.toString());
}
private static String colName(TblColRef col) {
@@ -246,10 +246,19 @@ public class JoinedFlatTable {
}
private static String getHiveDataType(String javaDataType) {
- String hiveDataType = javaDataType.toLowerCase().startsWith("varchar") ? "string" : javaDataType;
- hiveDataType = javaDataType.toLowerCase().startsWith("integer") ? "int" : hiveDataType;
+ String originDataType = javaDataType.toLowerCase();
+ String hiveDataType;
+ if (originDataType.startsWith("varchar")) {
+ hiveDataType = "string";
+ } else if (originDataType.startsWith("integer")) {
+ hiveDataType = "int";
+ } else if (originDataType.startsWith("bigint")) {
+ hiveDataType = "bigint";
+ } else {
+ hiveDataType = originDataType;
+ }
- return hiveDataType.toLowerCase();
+ return hiveDataType;
}
public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc) {
@@ -267,4 +276,4 @@ public class JoinedFlatTable {
return sql.toString();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c85974a..7a40076 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,7 +95,7 @@
<jetty.version>9.2.20.v20161216</jetty.version>
<jamm.version>0.3.1</jamm.version>
<mockito.version>2.7.14</mockito.version>
-
+ <powermock.version>1.7.0</powermock.version>
<!-- Commons -->
<commons-lang3.version>3.4</commons-lang3.version>
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/pom.xml
----------------------------------------------------------------------
diff --git a/source-hive/pom.xml b/source-hive/pom.xml
index 9a4d537..b9f87ee 100644
--- a/source-hive/pom.xml
+++ b/source-hive/pom.xml
@@ -77,6 +77,31 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-core</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <!-- Powermock has conflict with newer version of Mockito, so use OLDER version here -->
+ <version>1.10.19</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java
new file mode 100644
index 0000000..7e5ecee
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc;
+
+public class JdbcDialect {
+ public static final String DIALECT_VERTICA = "vertica";
+ public static final String DIALECT_ORACLE = "oracle";
+ public static final String DIALECT_MYSQL = "mysql";
+ public static final String DIALECT_HIVE = "hive";
+ public static final String DIALECT_MSSQL = "mssql";
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
index 736cf2e..1278128 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -31,74 +32,153 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.DBUtils;
import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.ISourceAware;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.source.ISampleDataDeployer;
import org.apache.kylin.source.ISourceMetadataExplorer;
import org.apache.kylin.source.hive.DBConnConf;
+import org.apache.kylin.source.jdbc.metadata.IJdbcMetadata;
+import org.apache.kylin.source.jdbc.metadata.JdbcMetadataFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeployer {
private static final Logger logger = LoggerFactory.getLogger(JdbcExplorer.class);
-
- public static final String DIALECT_VERTICA="vertica";
- public static final String DIALECT_ORACLE="oracle";
- public static final String DIALECT_MYSQL="mysql";
- public static final String DIALECT_HIVE="hive";
-
- public static final String TABLE_TYPE_TABLE="TABLE";
- public static final String TABLE_TYPE_VIEW="VIEW";
-
- private KylinConfig config;
- private DBConnConf dbconf;
- private String dialect;
+
+ private final KylinConfig config;
+ private final String dialect;
+ private final DBConnConf dbconf;
+ private final IJdbcMetadata jdbcMetadataDialect;
public JdbcExplorer() {
config = KylinConfig.getInstanceFromEnv();
- String connectionUrl = config.getJdbcConnectionUrl();
- String driverClass = config.getJdbcDriver();
- String jdbcUser = config.getJdbcUser();
- String jdbcPass = config.getJdbcPass();
- dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass);
- this.dialect = config.getJdbcDialect();
+ String connectionUrl = config.getJdbcSourceConnectionUrl();
+ String driverClass = config.getJdbcSourceDriver();
+ String jdbcUser = config.getJdbcSourceUser();
+ String jdbcPass = config.getJdbcSourcePass();
+ this.dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass);
+ this.dialect = config.getJdbcSourceDialect();
+ this.jdbcMetadataDialect = JdbcMetadataFactory.getJdbcMetadata(dialect, dbconf);
+ }
+
+ @Override
+ public List<String> listDatabases() throws SQLException {
+ return jdbcMetadataDialect.listDatabases();
+ }
+
+ @Override
+ public List<String> listTables(String schema) throws SQLException {
+ return jdbcMetadataDialect.listTables(schema);
+ }
+
+ @Override
+ public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table, String prj)
+ throws SQLException {
+ TableDesc tableDesc = new TableDesc();
+ tableDesc.setDatabase(database.toUpperCase());
+ tableDesc.setName(table.toUpperCase());
+ tableDesc.setUuid(UUID.randomUUID().toString());
+ tableDesc.setLastModified(0);
+ tableDesc.setSourceType(ISourceAware.ID_JDBC);
+
+ Connection con = SqlUtil.getConnection(dbconf);
+ DatabaseMetaData dbmd = con.getMetaData();
+
+ try (ResultSet rs = jdbcMetadataDialect.getTable(dbmd, database, table)) {
+ String tableType = null;
+ while (rs.next()) {
+ tableType = rs.getString("TABLE_TYPE");
+ }
+ if (tableType != null) {
+ tableDesc.setTableType(tableType);
+ } else {
+ throw new RuntimeException(String.format("table %s not found in schema:%s", table, database));
+ }
+ }
+
+ List<ColumnDesc> columns = new ArrayList<>();
+ try (ResultSet rs = jdbcMetadataDialect.listColumns(dbmd, database, table)) {
+ while (rs.next()) {
+ String cname = rs.getString("COLUMN_NAME");
+ int type = rs.getInt("DATA_TYPE");
+ int csize = rs.getInt("COLUMN_SIZE");
+ int digits = rs.getInt("DECIMAL_DIGITS");
+ int pos = rs.getInt("ORDINAL_POSITION");
+ String remarks = rs.getString("REMARKS");
+
+ ColumnDesc cdesc = new ColumnDesc();
+ cdesc.setName(cname.toUpperCase());
+
+ String kylinType = SqlUtil.jdbcTypetoKylinDataType(type);
+ int precision = (SqlUtil.isPrecisionApplicable(kylinType) && csize > 0) ? csize : -1;
+ int scale = (SqlUtil.isScaleApplicable(kylinType) && digits > 0) ? digits : -1;
+
+ cdesc.setDatatype(new DataType(kylinType, precision, scale).toString());
+ cdesc.setId(String.valueOf(pos));
+ cdesc.setComment(remarks);
+ columns.add(cdesc);
+ }
+ } finally {
+ DBUtils.closeQuietly(con);
+ }
+
+ tableDesc.setColumns(columns.toArray(new ColumnDesc[columns.size()]));
+
+ TableExtDesc tableExtDesc = new TableExtDesc();
+ tableExtDesc.setIdentity(tableDesc.getIdentity());
+ tableExtDesc.setUuid(UUID.randomUUID().toString());
+ tableExtDesc.setLastModified(0);
+ tableExtDesc.init(prj);
+
+ return Pair.newPair(tableDesc, tableExtDesc);
}
-
+
private String getSqlDataType(String javaDataType) {
- if (DIALECT_VERTICA.equals(dialect)){
- if (javaDataType.toLowerCase().equals("double")){
+ if (JdbcDialect.DIALECT_VERTICA.equals(dialect) || JdbcDialect.DIALECT_MSSQL.equals(dialect)) {
+ if (javaDataType.toLowerCase().equals("double")) {
return "float";
}
}
return javaDataType.toLowerCase();
}
-
+
@Override
public void createSampleDatabase(String database) throws Exception {
executeSQL(generateCreateSchemaSql(database));
}
- private String generateCreateSchemaSql(String schemaName){
- if (DIALECT_VERTICA.equals(dialect)){
+ private String generateCreateSchemaSql(String schemaName) {
+ if (JdbcDialect.DIALECT_VERTICA.equals(dialect) || JdbcDialect.DIALECT_MYSQL.equals(dialect)) {
return String.format("CREATE schema IF NOT EXISTS %s", schemaName);
- }else{
+ } else if (JdbcDialect.DIALECT_MSSQL.equals(dialect)) {
+ return String.format("IF NOT EXISTS (SELECT name FROM sys.schemas WHERE name = N'%s') EXEC('CREATE SCHEMA"
+ + " [%s] AUTHORIZATION [dbo]')", schemaName, schemaName);
+ } else {
logger.error(String.format("unsupported dialect %s.", dialect));
return null;
}
}
-
+
@Override
public void loadSampleData(String tableName, String tmpDataDir) throws Exception {
executeSQL(generateLoadDataSql(tableName, tmpDataDir));
}
private String generateLoadDataSql(String tableName, String tableFileDir) {
- if (DIALECT_VERTICA.equals(dialect)){
- return String.format("copy %s from local '%s/%s.csv' delimiter as ',';", tableName, tableFileDir, tableName);
- }else{
+ if (JdbcDialect.DIALECT_VERTICA.equals(dialect)) {
+ return String.format("copy %s from local '%s/%s.csv' delimiter as ',';", tableName, tableFileDir,
+ tableName);
+ } else if (JdbcDialect.DIALECT_MYSQL.equals(dialect)) {
+ return String.format("LOAD DATA INFILE '%s/%s.csv' INTO %s FIELDS TERMINATED BY ',';", tableFileDir,
+ tableName, tableName);
+ } else if (JdbcDialect.DIALECT_MSSQL.equals(dialect)) {
+ return String.format("BULK INSERT %s FROM '%s/%s.csv' WITH(FIELDTERMINATOR = ',')", tableName, tableFileDir,
+ tableName);
+ } else {
logger.error(String.format("unsupported dialect %s.", dialect));
return null;
}
@@ -111,7 +191,8 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye
private String[] generateCreateTableSql(TableDesc tableDesc) {
logger.info(String.format("gen create table sql:%s", tableDesc));
- String tableIdentity = String.format("%s.%s", tableDesc.getDatabase().toUpperCase(), tableDesc.getName()).toUpperCase();
+ String tableIdentity = String.format("%s.%s", tableDesc.getDatabase().toUpperCase(), tableDesc.getName())
+ .toUpperCase();
String dropsql = "DROP TABLE IF EXISTS " + tableIdentity;
String dropsql2 = "DROP VIEW IF EXISTS " + tableIdentity;
@@ -147,157 +228,20 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye
return new String[] { dropView, dropTable, createSql };
}
- private void executeSQL(String sql) throws CommandNeedRetryException, IOException {
+ private void executeSQL(String sql) throws CommandNeedRetryException, IOException, SQLException {
Connection con = SqlUtil.getConnection(dbconf);
logger.info(String.format(sql));
SqlUtil.execUpdateSQL(con, sql);
- SqlUtil.closeResources(con, null);
+ DBUtils.closeQuietly(con);
}
- private void executeSQL(String[] sqls) throws CommandNeedRetryException, IOException {
+ private void executeSQL(String[] sqls) throws CommandNeedRetryException, IOException, SQLException {
Connection con = SqlUtil.getConnection(dbconf);
- for (String sql : sqls){
+ for (String sql : sqls) {
logger.info(String.format(sql));
SqlUtil.execUpdateSQL(con, sql);
}
- SqlUtil.closeResources(con, null);
- }
-
- @Override
- public List<String> listDatabases() throws Exception {
- Connection con = SqlUtil.getConnection(dbconf);
- DatabaseMetaData dbmd = con.getMetaData();
- ResultSet rs = dbmd.getSchemas();
- List<String> ret = new ArrayList<String>();
- /*
- The schema columns are:
- - TABLE_SCHEM String => schema name
- - TABLE_CATALOG String => catalog name (may be null)
- */
- while (rs.next()){
- String schema = rs.getString(1);
- String catalog = rs.getString(2);
- logger.info(String.format("%s,%s", schema, catalog));
- ret.add(schema);
- }
- SqlUtil.closeResources(con, null);
- return ret;
- }
-
- @Override
- public List<String> listTables(String database) throws Exception {
- Connection con = SqlUtil.getConnection(dbconf);
- DatabaseMetaData dbmd = con.getMetaData();
- ResultSet rs = dbmd.getTables(null, database, null, null);
- List<String> ret = new ArrayList<String>();
- /*
- - TABLE_CAT String => table catalog (may be null)
- - TABLE_SCHEM String => table schema (may be null)
- - TABLE_NAME String => table name
- - TABLE_TYPE String => table type. Typical types are "TABLE", "VIEW", "SYSTEM TABLE", "GLOBAL
- TEMPORARY", "LOCAL TEMPORARY", "ALIAS", "SYNONYM".
- - REMARKS String => explanatory comment on the table
- - TYPE_CAT String => the types catalog (may be null)
- - TYPE_SCHEM String => the types schema (may be null)
- - TYPE_NAME String => type name (may be null)
- - SELF_REFERENCING_COL_NAME String => name of the designated "identifier" column of a typed
- table (may be null)
- - REF_GENERATION String => specifies how values in SELF_REFERENCING_COL_NAME are created.
- Values are "SYSTEM", "USER", "DERIVED". (may be null)
- */
- while (rs.next()){
- String catalog = rs.getString(1);
- String schema = rs.getString(2);
- String name = rs.getString(3);
- String type = rs.getString(4);
- logger.info(String.format("%s,%s,%s,%s", schema, catalog, name, type));
- ret.add(name);
- }
- SqlUtil.closeResources(con, null);
- return ret;
- }
-
- @Override
- public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table, String prj) throws Exception {
-
- TableDesc tableDesc = new TableDesc();
- tableDesc.setDatabase(database.toUpperCase());
- tableDesc.setName(table.toUpperCase());
- tableDesc.setUuid(UUID.randomUUID().toString());
- tableDesc.setLastModified(0);
-
- Connection con = SqlUtil.getConnection(dbconf);
- DatabaseMetaData dbmd = con.getMetaData();
- ResultSet rs = dbmd.getTables(null, database, table, null);
- String tableType=null;
- while (rs.next()){
- tableType = rs.getString(4);
- }
- DBUtils.closeQuietly(rs);
- if (tableType!=null){
- tableDesc.setTableType(tableType);
- }else{
- logger.error(String.format("table %s not found in schema:%s", table, database));
- }
- /*
- - 1. TABLE_CAT String => table catalog (may be null)
- - 2. TABLE_SCHEM String => table schema (may be null)
- - 3. TABLE_NAME String => table name
- - 4. COLUMN_NAME String => column name
- - 5. DATA_TYPE int => SQL type from java.sql.Types
- - 6. TYPE_NAME String => Data source dependent type name, for a UDT the type name is fully qualified
- - 7. COLUMN_SIZE int => column size.
- - 8. BUFFER_LENGTH is not used.
- - 9. DECIMAL_DIGITS int => the number of fractional digits. Null is returned for data types where DECIMAL_DIGITS is not applicable.
- - 10.NUM_PREC_RADIX int => Radix (typically either 10 or 2)
- - 11.NULLABLE int => is NULL allowed.
- - columnNoNulls - might not allow NULL values
- - columnNullable - definitely allows NULL values
- - columnNullableUnknown - nullability unknown
- - 12.REMARKS String => comment describing column (may be null)
- - 13.COLUMN_DEF String => default value for the column, which should be interpreted as a string when the value is enclosed in single quotes (may be null)
- - 14.SQL_DATA_TYPE int => unused
- - 15.SQL_DATETIME_SUB int => unused
- - 16.CHAR_OCTET_LENGTH int => for char types the maximum number of bytes in the column
- - 17.ORDINAL_POSITION int => index of column in table (starting at 1)
- - 18.IS_NULLABLE String => ISO rules are used to determine the nullability for a column.
- - YES --- if the column can include NULLs
- - NO --- if the column cannot include NULLs
- - empty string --- if the nullability for the column is unknown
- */
- List<ColumnDesc> columns = new ArrayList<ColumnDesc>();
- rs = dbmd.getColumns(null, database, table, null);
- while (rs.next()){
- String tname = rs.getString(3);
- String cname = rs.getString(4);
- int type=rs.getInt(5);
- String typeName=rs.getString(6);
- int csize=rs.getInt(7);
- int digits = rs.getInt(9);
- int nullable = rs.getInt(11);
- String comment = rs.getString(12);
- int pos = rs.getInt(17);
- logger.info(String.format("%s,%s,%d,%d,%d,%d,%s,%d", tname, cname, type, csize, digits, nullable, comment, pos));
-
- ColumnDesc cdesc = new ColumnDesc();
- cdesc.setName(cname.toUpperCase());
- // use "double" in kylin for "float"
- cdesc.setDatatype(typeName);
- cdesc.setId(String.valueOf(pos));
- columns.add(cdesc);
- }
- DBUtils.closeQuietly(rs);
DBUtils.closeQuietly(con);
-
- tableDesc.setColumns(columns.toArray(new ColumnDesc[columns.size()]));
-
- TableExtDesc tableExtDesc = new TableExtDesc();
- tableExtDesc.setIdentity(tableDesc.getIdentity());
- tableExtDesc.setUuid(UUID.randomUUID().toString());
- tableExtDesc.setLastModified(0);
- tableExtDesc.init(prj);
-
- return Pair.newPair(tableDesc, tableExtDesc);
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index ddd38db..e83518a 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@ -18,6 +18,8 @@
package org.apache.kylin.source.jdbc;
+import java.util.List;
+
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
@@ -25,21 +27,27 @@ import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.metadata.model.TableExtDesc.ColumnStats;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.hive.HiveMRInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JdbcHiveMRInput extends HiveMRInput {
-
+
private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class);
-
+
public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
return new BatchCubingInputSide(flatDesc);
}
public static class BatchCubingInputSide extends HiveMRInput.BatchCubingInputSide {
-
+
public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
super(flatDesc);
}
@@ -49,42 +57,123 @@ public class JdbcHiveMRInput extends HiveMRInput {
final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
final String jobWorkingDir = getJobWorkingDir(jobFlow);
-
+
jobFlow.addTask(createSqoopToFlatHiveStep(jobWorkingDir, cubeName));
jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, jobWorkingDir));
}
private AbstractExecutable createFlatHiveTableFromFiles(String hiveInitStatements, String jobWorkingDir) {
final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
- final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir, "TEXTFILE");
-
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ String filedDelimiter = config.getFieldDelimiter();
+ // Sqoop does not support exporting SEQUENSEFILE to Hive now SQOOP-869
+ final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir,
+ "TEXTFILE", filedDelimiter);
+
HiveCmdStep step = new HiveCmdStep();
step.setCmd(hiveInitStatements + dropTableHql + createTableHql);
return step;
}
-
+
+ /**
+ * Choose a better split-by column for sqoop. The strategy is:
+ * 1. Prefer ClusteredBy column
+ * 2. Prefer DistributedBy column
+ * 3. Prefer Partition date column
+ * 4. Prefer Higher cardinality column
+ * 5. Prefer numeric column
+ * 6. Pick a column at first glance
+ * @return A column reference <code>TblColRef</code>for sqoop split-by
+ */
+ private TblColRef determineSplitColumn() {
+ if (null != flatDesc.getClusterBy()) {
+ return flatDesc.getClusterBy();
+ }
+ if (null != flatDesc.getDistributedBy()) {
+ return flatDesc.getDistributedBy();
+ }
+ PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc();
+ if (partitionDesc.isPartitioned()) {
+ return partitionDesc.getPartitionDateColumnRef();
+ }
+ TblColRef splitColumn = null;
+ TableMetadataManager tblManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+ long maxCardinality = 0;
+ for (TableRef tableRef : flatDesc.getDataModel().getAllTables()) {
+ TableExtDesc tableExtDesc = tblManager.getTableExt(tableRef.getTableDesc());
+ List<ColumnStats> columnStatses = tableExtDesc.getColumnStats();
+ if (!columnStatses.isEmpty()) {
+ for (TblColRef colRef : tableRef.getColumns()) {
+ long cardinality = columnStatses.get(colRef.getColumnDesc().getZeroBasedIndex())
+ .getCardinality();
+ splitColumn = cardinality > maxCardinality ? colRef : splitColumn;
+ }
+ }
+ }
+ if (null == splitColumn) {
+ for (TblColRef colRef : flatDesc.getAllColumns()) {
+ if (colRef.getType().isIntegerFamily()) {
+ return colRef;
+ }
+ }
+ splitColumn = flatDesc.getAllColumns().get(0);
+ }
+
+ return splitColumn;
+ }
+
private AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, String cubeName) {
- KylinConfig config = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName).getConfig();
- String partCol = flatDesc.getDataModel().getPartitionDesc().getPartitionDateColumn();//tablename.colname
+ KylinConfig config = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName)
+ .getConfig();
+ PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc();
+ String partCol = null;
+ String partitionString = null;
+
+ if (partitionDesc.isPartitioned()) {
+ partCol = partitionDesc.getPartitionDateColumn();//tablename.colname
+ partitionString = partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
+ flatDesc.getSegRange());
+ }
+
+ String splitTable;
+ String splitColumn;
+ String splitDatabase;
+ TblColRef splitColRef = determineSplitColumn();
+ splitTable = splitColRef.getTableRef().getTableName();
+ splitColumn = splitColRef.getName();
+ splitDatabase = splitColRef.getColumnDesc().getTable().getDatabase();
+
//using sqoop to extract data from jdbc source and dump them to hive
- String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[]{partCol});
+ String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { partCol });
String hiveTable = flatDesc.getTableName();
- String connectionUrl = config.getJdbcConnectionUrl();
- String driverClass = config.getJdbcDriver();
- String jdbcUser = config.getJdbcUser();
- String jdbcPass = config.getJdbcPass();
+ String connectionUrl = config.getJdbcSourceConnectionUrl();
+ String driverClass = config.getJdbcSourceDriver();
+ String jdbcUser = config.getJdbcSourceUser();
+ String jdbcPass = config.getJdbcSourcePass();
String sqoopHome = config.getSqoopHome();
- String cmd= String.format(String.format("%s/sqoop import "
- + "--connect %s --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
- + "--target-dir %s/%s --split-by %s", sqoopHome, connectionUrl, driverClass, jdbcUser,
- jdbcPass, selectSql, jobWorkingDir, hiveTable, partCol));
- logger.info(String.format("sqoop cmd:%s", cmd));
+ String filedDelimiter = config.getFieldDelimiter();
+ int mapperNum = config.getSqoopMapperNum();
+
+ String bquery = String.format("SELECT min(%s), max(%s) FROM %s.%s", splitColumn, splitColumn, splitDatabase,
+ splitTable);
+ if (partitionString != null) {
+ bquery += " WHERE " + partitionString;
+ }
+
+ String cmd = String.format(String.format(
+ "%s/sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true "
+ + "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
+ + "--target-dir %s/%s --split-by %s.%s --boundary-query \"%s\" --null-string '' "
+ + "--fields-terminated-by '%s' --num-mappers %d",
+ sqoopHome, connectionUrl, driverClass, jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable,
+ splitTable, splitColumn, bquery, filedDelimiter, mapperNum));
+ logger.debug(String.format("sqoop cmd:%s", cmd));
CmdStep step = new CmdStep();
step.setCmd(cmd);
step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
return step;
}
-
+
@Override
protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
// skip
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java
index b8865d6..e2616b7 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java
@@ -23,6 +23,7 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.source.IReadableTable.TableReader;
import org.apache.kylin.source.hive.DBConnConf;
@@ -30,23 +31,22 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * An implementation of TableReader with HCatalog for Hive table.
+ * An implementation of TableReader with JDBC.
*/
public class JdbcTableReader implements TableReader {
private static final Logger logger = LoggerFactory.getLogger(JdbcTableReader.class);
-
+
private String dbName;
private String tableName;
private DBConnConf dbconf;
- private String dialect;
private Connection jdbcCon;
private Statement statement;
private ResultSet rs;
private int colCount;
/**
- * Constructor for reading whole hive table
+ * Constructor for reading whole jdbc table
* @param dbName
* @param tableName
* @throws IOException
@@ -55,22 +55,20 @@ public class JdbcTableReader implements TableReader {
this.dbName = dbName;
this.tableName = tableName;
KylinConfig config = KylinConfig.getInstanceFromEnv();
- String connectionUrl = config.getJdbcConnectionUrl();
- String driverClass = config.getJdbcDriver();
- String jdbcUser = config.getJdbcUser();
- String jdbcPass = config.getJdbcPass();
+ String connectionUrl = config.getJdbcSourceConnectionUrl();
+ String driverClass = config.getJdbcSourceDriver();
+ String jdbcUser = config.getJdbcSourceUser();
+ String jdbcPass = config.getJdbcSourcePass();
dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass);
- this.dialect = config.getJdbcDialect();
jdbcCon = SqlUtil.getConnection(dbconf);
String sql = String.format("select * from %s.%s", dbName, tableName);
try {
statement = jdbcCon.createStatement();
rs = statement.executeQuery(sql);
colCount = rs.getMetaData().getColumnCount();
- }catch(SQLException e){
+ } catch (SQLException e) {
throw new IOException(String.format("error while exec %s", sql), e);
}
-
}
@Override
@@ -85,11 +83,17 @@ public class JdbcTableReader implements TableReader {
@Override
public String[] getRow() {
String[] ret = new String[colCount];
- for (int i=1; i<=colCount; i++){
+ for (int i = 1; i <= colCount; i++) {
try {
Object o = rs.getObject(i);
- ret[i-1] = (o == null? null:o.toString());
- }catch(Exception e){
+ String result;
+ if (null == o || o instanceof byte[]) {
+ result = null;
+ } else {
+ result = o.toString();
+ }
+ ret[i - 1] = result;
+ } catch (Exception e) {
logger.error("", e);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
index a112d87..79fab7d 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
@@ -21,87 +21,145 @@ package org.apache.kylin.source.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
+import java.sql.Types;
import java.util.Random;
-import javax.sql.DataSource;
-
-import org.slf4j.LoggerFactory;
+import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.source.hive.DBConnConf;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SqlUtil {
private static final Logger logger = LoggerFactory.getLogger(SqlUtil.class);
- public static void closeResources(Connection con, Statement statement){
- try{
- if (statement!=null && !statement.isClosed()){
+ public static void closeResources(Connection con, Statement statement) {
+ try {
+ if (statement != null && !statement.isClosed()) {
statement.close();
}
- }catch(Exception e){
+ } catch (Exception e) {
logger.error("", e);
}
-
- try{
- if (con!=null && !con.isClosed()){
+
+ try {
+ if (con != null && !con.isClosed()) {
con.close();
}
- }catch(Exception e){
+ } catch (Exception e) {
logger.error("", e);
}
}
-
-
- public static void execUpdateSQL(String sql, DataSource ds){
- Connection con = null;
- try{
- con = ds.getConnection();
- execUpdateSQL(con, sql);
- }catch(Exception e){
- logger.error("", e);
- }finally{
- closeResources(con, null);
- }
- }
-
- public static void execUpdateSQL(Connection db, String sql){
- Statement statement=null;
- try{
+
+ public static void execUpdateSQL(Connection db, String sql) {
+ Statement statement = null;
+ try {
statement = db.createStatement();
- statement.executeUpdate(sql);
- }catch(Exception e){
+ statement.executeUpdate(sql);
+ } catch (Exception e) {
logger.error("", e);
- }finally{
+ } finally {
closeResources(null, statement);
}
}
-
- public static int tryTimes=10;
- public static Connection getConnection(DBConnConf dbconf){
- if (dbconf.getUrl()==null)
+
+ public static int tryTimes = 5;
+
+ public static Connection getConnection(DBConnConf dbconf) {
+ if (dbconf.getUrl() == null)
return null;
Connection con = null;
try {
Class.forName(dbconf.getDriver());
- }catch(Exception e){
+ } catch (Exception e) {
logger.error("", e);
}
- boolean got=false;
- int times=0;
+ boolean got = false;
+ int times = 0;
Random r = new Random();
- while(!got && times<tryTimes){
+ while (!got && times < tryTimes) {
times++;
try {
con = DriverManager.getConnection(dbconf.getUrl(), dbconf.getUser(), dbconf.getPass());
got = true;
- }catch(Exception e){
+ } catch (Exception e) {
logger.warn("while use:" + dbconf, e);
try {
int rt = r.nextInt(10);
- Thread.sleep(rt*1000);
+ Thread.sleep(rt * 1000);
} catch (InterruptedException e1) {
}
}
}
+ if (null == con) {
+ throw new RuntimeException("Can not connect to the data source.");
+ }
return con;
}
+
+ public static String jdbcTypetoKylinDataType(int sqlType) {
+ String result = "any";
+
+ switch (sqlType) {
+ case Types.CHAR:
+ result = "char";
+ break;
+ case Types.VARCHAR:
+ case Types.NVARCHAR:
+ case Types.LONGVARCHAR:
+ result = "varchar";
+ break;
+ case Types.NUMERIC:
+ case Types.DECIMAL:
+ result = "decimal";
+ break;
+ case Types.BIT:
+ case Types.BOOLEAN:
+ result = "boolean";
+ break;
+ case Types.TINYINT:
+ result = "tinyint";
+ break;
+ case Types.SMALLINT:
+ result = "smallint";
+ break;
+ case Types.INTEGER:
+ result = "integer";
+ break;
+ case Types.BIGINT:
+ result = "bigint";
+ break;
+ case Types.REAL:
+ case Types.FLOAT:
+ case Types.DOUBLE:
+ result = "double";
+ break;
+ case Types.BINARY:
+ case Types.VARBINARY:
+ case Types.LONGVARBINARY:
+ result = "byte";
+ break;
+ case Types.DATE:
+ result = "date";
+ break;
+ case Types.TIME:
+ result = "time";
+ break;
+ case Types.TIMESTAMP:
+ result = "timestamp";
+ break;
+ default:
+ //do nothing
+ break;
+ }
+
+ return result;
+ }
+
+ public static boolean isPrecisionApplicable(String typeName) {
+ return isScaleApplicable(typeName) || DataType.STRING_FAMILY.contains(typeName);
+ }
+
+ public static boolean isScaleApplicable(String typeName) {
+ return DataType.NUMBER_FAMILY.contains(typeName) && !DataType.INTEGER_FAMILY.contains(typeName);
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java
new file mode 100644
index 0000000..f4ffc23
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc.metadata;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kylin.source.hive.DBConnConf;
+import org.apache.kylin.source.jdbc.SqlUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultJdbcMetadata implements IJdbcMetadata {
+ private final static Logger logger = LoggerFactory.getLogger(DefaultJdbcMetadata.class);
+ protected DBConnConf dbconf;
+
+ public DefaultJdbcMetadata(DBConnConf dbConnConf) {
+ this.dbconf = dbConnConf;
+ }
+
+ @Override
+ public List<String> listDatabases() throws SQLException {
+ List<String> ret = new ArrayList<>();
+ try (Connection con = SqlUtil.getConnection(dbconf); ResultSet rs = con.getMetaData().getSchemas()) {
+ while (rs.next()) {
+ String schema = rs.getString("TABLE_SCHEM");
+ String catalog = rs.getString("TABLE_CATALOG");
+ logger.info(String.format("%s,%s", schema, catalog));
+ ret.add(schema);
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public List<String> listTables(String schema) throws SQLException {
+ List<String> ret = new ArrayList<>();
+ try (Connection con = SqlUtil.getConnection(dbconf);
+ ResultSet rs = con.getMetaData().getTables(null, schema, null, null)) {
+ while (rs.next()) {
+ String name = rs.getString("TABLE_NAME");
+ ret.add(name);
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public ResultSet getTable(final DatabaseMetaData dbmd, String schema, String table) throws SQLException {
+ return dbmd.getTables(null, schema, table, null);
+ }
+
+ @Override
+ public ResultSet listColumns(final DatabaseMetaData dbmd, String schema, String table) throws SQLException {
+ return dbmd.getColumns(null, schema, table, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java
new file mode 100644
index 0000000..169fe60
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc.metadata;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+public interface IJdbcMetadata {
+ List<String> listDatabases() throws SQLException;
+
+ List<String> listTables(String database) throws SQLException;
+
+ ResultSet getTable(final DatabaseMetaData dbmd, String database, String table) throws SQLException;
+
+ ResultSet listColumns(final DatabaseMetaData dbmd, String database, String table) throws SQLException;
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java
new file mode 100644
index 0000000..4100f79
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc.metadata;
+
+import org.apache.kylin.source.hive.DBConnConf;
+import org.apache.kylin.source.jdbc.JdbcDialect;
+
+public abstract class JdbcMetadataFactory {
+ public static IJdbcMetadata getJdbcMetadata(String dialect, final DBConnConf dbConnConf) {
+ String jdbcDialect = (null == dialect) ? "" : dialect.toLowerCase();
+ switch (jdbcDialect) {
+ case (JdbcDialect.DIALECT_MSSQL):
+ return new SQLServerJdbcMetadata(dbConnConf);
+ case (JdbcDialect.DIALECT_MYSQL):
+ return new MySQLJdbcMetadata(dbConnConf);
+ default:
+ return new DefaultJdbcMetadata(dbConnConf);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java
new file mode 100644
index 0000000..6404fd6
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc.metadata;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kylin.source.hive.DBConnConf;
+import org.apache.kylin.source.jdbc.SqlUtil;
+
+public class MySQLJdbcMetadata extends DefaultJdbcMetadata {
+ public MySQLJdbcMetadata(DBConnConf dbConnConf) {
+ super(dbConnConf);
+ }
+
+ @Override
+ public List<String> listDatabases() throws SQLException {
+ List<String> ret = new ArrayList<>();
+ try (Connection con = SqlUtil.getConnection(dbconf); ResultSet res = con.getMetaData().getCatalogs()) {
+ while (res.next()) {
+ ret.add(res.getString("TABLE_CAT"));
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public List<String> listTables(String catalog) throws SQLException {
+ List<String> ret = new ArrayList<>();
+ try (Connection con = SqlUtil.getConnection(dbconf);
+ ResultSet res = con.getMetaData().getTables(catalog, null, null, null)) {
+ String table;
+ while (res.next()) {
+ table = res.getString("TABLE_NAME");
+ ret.add(table);
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public ResultSet listColumns(final DatabaseMetaData dbmd, String catalog, String table) throws SQLException {
+ return dbmd.getColumns(catalog, null, table, null);
+ }
+
+ @Override
+ public ResultSet getTable(final DatabaseMetaData dbmd, String catalog, String table) throws SQLException {
+ return dbmd.getTables(catalog, null, table, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java
new file mode 100644
index 0000000..1a34b37
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc.metadata;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.source.hive.DBConnConf;
+import org.apache.kylin.source.jdbc.SqlUtil;
+
+import com.google.common.base.Preconditions;
+
+public class SQLServerJdbcMetadata extends DefaultJdbcMetadata {
+ public SQLServerJdbcMetadata(DBConnConf dbConnConf) {
+ super(dbConnConf);
+ }
+
+ @Override
+ public List<String> listDatabases() throws SQLException {
+ List<String> ret = new ArrayList<>();
+ try (Connection con = SqlUtil.getConnection(dbconf)) {
+
+ String database = con.getCatalog();
+ Preconditions.checkArgument(StringUtils.isNotEmpty(database),
+ "SQL Server needs a specific database in " + "connection string.");
+
+ try (ResultSet rs = con.getMetaData().getSchemas(database, "%")) {
+ String schema;
+ String catalog;
+ while (rs.next()) {
+ schema = rs.getString("TABLE_SCHEM");
+ catalog = rs.getString("TABLE_CATALOG");
+ // Skip system schemas
+ if (database.equals(catalog)) {
+ ret.add(schema);
+ }
+ }
+ }
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java
----------------------------------------------------------------------
diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java
new file mode 100644
index 0000000..b269329
--- /dev/null
+++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.jdbc;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.source.hive.DBConnConf;
+import org.apache.kylin.source.jdbc.metadata.DefaultJdbcMetadata;
+import org.apache.kylin.source.jdbc.metadata.IJdbcMetadata;
+import org.apache.kylin.source.jdbc.metadata.JdbcMetadataFactory;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ JdbcMetadataFactory.class, SqlUtil.class })
+
+public class JdbcExplorerTest extends LocalFileMetadataTestCase {
+ private JdbcExplorer jdbcExplorer;
+ private static Connection connection;
+ private static DatabaseMetaData dbmd;
+ private IJdbcMetadata jdbcMetadata;
+
+ @BeforeClass
+ public static void setupClass() throws SQLException {
+ staticCreateTestMetadata();
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ kylinConfig.setProperty("kylin.source.jdbc.connection-url", "jdbc:vertica://fakehost:1433/database");
+ kylinConfig.setProperty("kylin.source.jdbc.driver", "com.vertica.jdbc.Driver");
+ kylinConfig.setProperty("kylin.source.jdbc.user", "user");
+ kylinConfig.setProperty("kylin.source.jdbc.pass", "");
+ kylinConfig.setProperty("kylin.source.jdbc.dialect", "vertica");
+ }
+
+ @Before
+ public void setup() throws SQLException {
+ connection = mock(Connection.class);
+ dbmd = mock(DatabaseMetaData.class);
+ jdbcMetadata = mock(DefaultJdbcMetadata.class);
+
+ PowerMockito.stub(PowerMockito.method(SqlUtil.class, "getConnection")).toReturn(connection);
+ PowerMockito.mockStatic(JdbcMetadataFactory.class);
+
+ when(JdbcMetadataFactory.getJdbcMetadata(anyString(), any(DBConnConf.class))).thenReturn(jdbcMetadata);
+ when(connection.getMetaData()).thenReturn(dbmd);
+
+ jdbcExplorer = spy(JdbcExplorer.class);
+ }
+
+ @Test
+ public void testListDatabases() throws SQLException {
+ List<String> databases = new ArrayList<>();
+ databases.add("DB1");
+ databases.add("DB2");
+ when(jdbcMetadata.listDatabases()).thenReturn(databases);
+
+ List<String> result = jdbcExplorer.listDatabases();
+
+ verify(jdbcMetadata, times(1)).listDatabases();
+ Assert.assertEquals(databases, result);
+ }
+
+ @Test
+ public void testListTables() throws SQLException {
+ List<String> tables = new ArrayList<>();
+ tables.add("T1");
+ tables.add("T2");
+ String databaseName = "testDb";
+ when(jdbcMetadata.listTables(databaseName)).thenReturn(tables);
+
+ List<String> result = jdbcExplorer.listTables(databaseName);
+ verify(jdbcMetadata, times(1)).listTables(databaseName);
+ Assert.assertEquals(tables, result);
+ }
+
+ @Test
+ public void testLoadTableMetadata() throws SQLException {
+ String tableName = "tb1";
+ String databaseName = "testdb";
+ ResultSet rs1 = mock(ResultSet.class);
+ when(rs1.next()).thenReturn(true).thenReturn(false);
+ when(rs1.getString("TABLE_TYPE")).thenReturn("TABLE");
+
+ ResultSet rs2 = mock(ResultSet.class);
+ when(rs2.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false);
+ when(rs2.getString("COLUMN_NAME")).thenReturn("COL1").thenReturn("COL2").thenReturn("COL3");
+ when(rs2.getInt("DATA_TYPE")).thenReturn(Types.VARCHAR).thenReturn(Types.INTEGER).thenReturn(Types.DECIMAL);
+ when(rs2.getInt("COLUMN_SIZE")).thenReturn(128).thenReturn(10).thenReturn(19);
+ when(rs2.getInt("DECIMAL_DIGITS")).thenReturn(0).thenReturn(0).thenReturn(4);
+ when(rs2.getInt("ORDINAL_POSITION")).thenReturn(1).thenReturn(3).thenReturn(2);
+ when(rs2.getString("REMARKS")).thenReturn("comment1").thenReturn("comment2").thenReturn("comment3");
+
+ when(jdbcMetadata.getTable(dbmd, databaseName, tableName)).thenReturn(rs1);
+ when(jdbcMetadata.listColumns(dbmd, databaseName, tableName)).thenReturn(rs2);
+
+ Pair<TableDesc, TableExtDesc> result = jdbcExplorer.loadTableMetadata(databaseName, tableName, "proj");
+ TableDesc tableDesc = result.getFirst();
+ ColumnDesc columnDesc = tableDesc.getColumns()[1];
+
+ Assert.assertEquals(databaseName.toUpperCase(), tableDesc.getDatabase());
+ Assert.assertEquals(3, tableDesc.getColumnCount());
+ Assert.assertEquals("TABLE", tableDesc.getTableType());
+ Assert.assertEquals("COL2", columnDesc.getName());
+ Assert.assertEquals("integer", columnDesc.getTypeName());
+ Assert.assertEquals("comment2", columnDesc.getComment());
+ Assert.assertEquals(databaseName.toUpperCase() + "." + tableName.toUpperCase(),
+ result.getSecond().getIdentity());
+ }
+
+ @AfterClass
+ public static void clenup() {
+ staticCleanupTestMetadata();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java
----------------------------------------------------------------------
diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java
new file mode 100644
index 0000000..d952675
--- /dev/null
+++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.jdbc;
+
+import java.sql.Types;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SqlUtilTest {
+
+ @Test
+ public void testJdbcTypetoKylinDataType() {
+ this.getClass().getClassLoader().toString();
+ Assert.assertEquals("double", SqlUtil.jdbcTypetoKylinDataType(Types.FLOAT));
+ Assert.assertEquals("varchar", SqlUtil.jdbcTypetoKylinDataType(Types.NVARCHAR));
+ Assert.assertEquals("any", SqlUtil.jdbcTypetoKylinDataType(Types.ARRAY));
+ }
+
+ @Test
+ public void testIsPrecisionApplicable() {
+ Assert.assertFalse(SqlUtil.isPrecisionApplicable("boolean"));
+ Assert.assertTrue(SqlUtil.isPrecisionApplicable("varchar"));
+ }
+
+ @Test
+ public void testIsScaleApplicable() {
+ Assert.assertFalse(SqlUtil.isScaleApplicable("varchar"));
+ Assert.assertTrue(SqlUtil.isScaleApplicable("decimal"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadataTest.java
----------------------------------------------------------------------
diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadataTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadataTest.java
new file mode 100644
index 0000000..43d467d
--- /dev/null
+++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadataTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.jdbc.metadata;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.kylin.source.hive.DBConnConf;
+import org.apache.kylin.source.jdbc.SqlUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(SqlUtil.class)
+public class DefaultJdbcMetadataTest {
+ protected DBConnConf dbConnConf;
+ protected Connection connection;
+ protected DatabaseMetaData dbmd;
+ protected IJdbcMetadata jdbcMetadata;
+
+ @Before
+ public void setup() {
+ dbConnConf = new DBConnConf();
+ dbConnConf.setUrl("jdbc:vertica://fakehost:1433/database");
+ dbConnConf.setDriver("com.vertica.jdbc.Driver");
+ dbConnConf.setUser("user");
+ dbConnConf.setPass("pass");
+ jdbcMetadata = new DefaultJdbcMetadata(dbConnConf);
+
+ setupProperties();
+ }
+
+ protected void setupProperties() {
+ connection = mock(Connection.class);
+ dbmd = mock(DatabaseMetaData.class);
+
+ PowerMockito.mockStatic(SqlUtil.class);
+ when(SqlUtil.getConnection(dbConnConf)).thenReturn(connection);
+ }
+
+ @Test
+ public void testListDatabases() throws SQLException {
+ ResultSet rs = mock(ResultSet.class);
+ when(rs.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+ when(rs.getString("TABLE_SCHEM")).thenReturn("schema1").thenReturn("schema2");
+ when(rs.getString("TABLE_CATALOG")).thenReturn("catalog1").thenReturn("catalog2");
+
+ when(connection.getMetaData()).thenReturn(dbmd);
+ when(dbmd.getSchemas()).thenReturn(rs);
+
+ List<String> dbs = jdbcMetadata.listDatabases();
+
+ Assert.assertEquals(2, dbs.size());
+ Assert.assertEquals("schema1", dbs.get(0));
+ }
+
+ @Test
+ public void testListTables() throws SQLException {
+ ResultSet rs = mock(ResultSet.class);
+ when(rs.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false);
+ when(rs.getString("TABLE_NAME")).thenReturn("KYLIN_SALES").thenReturn("CAT_DT").thenReturn("KYLIN_CAT");
+
+ String schema = "testschema";
+ when(connection.getMetaData()).thenReturn(dbmd);
+ when(dbmd.getTables(null, schema, null, null)).thenReturn(rs);
+
+ List<String> tables = jdbcMetadata.listTables(schema);
+
+ Assert.assertEquals(3, tables.size());
+ Assert.assertEquals("CAT_DT", tables.get(1));
+ }
+
+ @Test
+ public void testGetTable() throws SQLException {
+ String schema = "testSchema";
+ String table = "testTable";
+ ResultSet rs = mock(ResultSet.class);
+ when(dbmd.getTables(null, schema, table, null)).thenReturn(rs);
+
+ ResultSet result = jdbcMetadata.getTable(dbmd, schema, table);
+
+ verify(dbmd, times(1)).getTables(null, schema, table, null);
+ Assert.assertEquals(rs, result);
+ }
+
+ @Test
+ public void testListColumns() throws SQLException {
+ String schema = "testSchema";
+ String table = "testTable";
+ ResultSet rs = mock(ResultSet.class);
+ when(dbmd.getColumns(null, schema, table, null)).thenReturn(rs);
+
+ ResultSet result = jdbcMetadata.listColumns(dbmd, schema, table);
+
+ verify(dbmd, times(1)).getColumns(null, schema, table, null);
+ Assert.assertEquals(rs, result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java
----------------------------------------------------------------------
diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java
new file mode 100644
index 0000000..d9c7425
--- /dev/null
+++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.jdbc.metadata;
+
+import org.apache.kylin.source.jdbc.JdbcDialect;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JdbcMetadataFactoryTest {
+
+ @Test
+ public void testGetJdbcMetadata() {
+ Assert.assertTrue(
+ JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_MSSQL, null) instanceof SQLServerJdbcMetadata);
+ Assert.assertTrue(
+ JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_MYSQL, null) instanceof MySQLJdbcMetadata);
+ Assert.assertTrue(
+ JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_VERTICA, null) instanceof DefaultJdbcMetadata);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadataTest.java
----------------------------------------------------------------------
diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadataTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadataTest.java
new file mode 100644
index 0000000..d0cb6c4
--- /dev/null
+++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadataTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.jdbc.metadata;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.kylin.source.hive.DBConnConf;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MySQLJdbcMetadataTest extends DefaultJdbcMetadataTest {
+
+ @Before
+ public void setup() {
+ dbConnConf = new DBConnConf();
+ dbConnConf.setUrl("jdbc:mysql://fakehost:1433/database");
+ dbConnConf.setDriver("com.mysql.jdbc.Driver");
+ dbConnConf.setUser("user");
+ dbConnConf.setPass("pass");
+ jdbcMetadata = new MySQLJdbcMetadata(dbConnConf);
+
+ setupProperties();
+ }
+
+ @Test
+ public void testListDatabases() throws SQLException {
+ ResultSet rs = mock(ResultSet.class);
+ when(rs.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+ when(rs.getString("TABLE_CAT")).thenReturn("catalog1").thenReturn("catalog2");
+
+ when(connection.getMetaData()).thenReturn(dbmd);
+ when(dbmd.getCatalogs()).thenReturn(rs);
+
+ List<String> dbs = jdbcMetadata.listDatabases();
+
+ Assert.assertEquals(2, dbs.size());
+ Assert.assertEquals("catalog1", dbs.get(0));
+ }
+
+ @Test
+ public void testListTables() throws SQLException {
+ ResultSet rs = mock(ResultSet.class);
+ when(rs.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false);
+ when(rs.getString("TABLE_NAME")).thenReturn("KYLIN_SALES").thenReturn("CAT_DT").thenReturn("KYLIN_CAT");
+
+ String catalog = "testCatalog";
+ when(connection.getMetaData()).thenReturn(dbmd);
+ when(dbmd.getTables(catalog, null, null, null)).thenReturn(rs);
+
+ List<String> tables = jdbcMetadata.listTables(catalog);
+
+ Assert.assertEquals(3, tables.size());
+ Assert.assertEquals("CAT_DT", tables.get(1));
+ }
+
+ @Test
+ public void testGetTable() throws SQLException {
+ String catalog = "testSchema";
+ String table = "testTable";
+ ResultSet rs = mock(ResultSet.class);
+ when(dbmd.getTables(catalog, null, table, null)).thenReturn(rs);
+
+ ResultSet result = jdbcMetadata.getTable(dbmd, catalog, table);
+
+ verify(dbmd, times(1)).getTables(catalog, null, table, null);
+ Assert.assertEquals(rs, result);
+ }
+
+ @Test
+ public void testListColumns() throws SQLException {
+ String catalog = "testSchema";
+ String table = "testTable";
+ ResultSet rs = mock(ResultSet.class);
+ when(dbmd.getColumns(catalog, null, table, null)).thenReturn(rs);
+
+ ResultSet result = jdbcMetadata.listColumns(dbmd, catalog, table);
+
+ verify(dbmd, times(1)).getColumns(catalog, null, table, null);
+ Assert.assertEquals(rs, result);
+ }
+}