You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/15 13:22:40 UTC
[kylin] branch master updated: KYLIN-3707 add configuration for
setting isolation-level for sqoop
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new fb54f38 KYLIN-3707 add configuration for setting isolation-level for sqoop
fb54f38 is described below
commit fb54f38cdad26b776b6f5116cacfbc8470e19142
Author: woyumen4597 <wo...@gmail.com>
AuthorDate: Fri Dec 14 15:03:34 2018 +0800
KYLIN-3707 add configuration for setting isolation-level for sqoop
---
.../sdk/datasource/framework/JdbcConnector.java | 4 +-
.../framework/conv/DefaultConfiguer.java | 9 ++--
.../datasource/framework/conv/SqlConverter.java | 6 +++
.../framework/conv/SqlConverterTest.java | 62 +++++++++++++++++-----
.../source/jdbc/extensible/JdbcHiveInputBase.java | 12 +++--
5 files changed, 70 insertions(+), 23 deletions(-)
diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java
index d849e6c..47ba6b3 100644
--- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java
@@ -37,7 +37,6 @@ import org.apache.kylin.sdk.datasource.framework.def.DataSourceDefProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
public class JdbcConnector implements Closeable {
@@ -175,8 +174,7 @@ public class JdbcConnector implements Closeable {
return jdbcDs.getPropertyValue(key);
}
- @VisibleForTesting
- SqlConverter getSqlConverter() {
+ public SqlConverter getSqlConverter() {
return sqlConverter;
}
diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java
index 6d7fb6d..6c01a70 100644
--- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java
@@ -78,10 +78,6 @@ public class DefaultConfiguer implements SqlConverter.IConfigurer{
if (this.adaptor == null) {
return orig;
}
- // fix problem of case sensitive when generate sql.
-// if (isCaseSensitive()) {
-// orig = adaptor.fixCaseSensitiveSql(orig);
-// }
return adaptor.fixSql(orig);
}
@@ -134,4 +130,9 @@ public class DefaultConfiguer implements SqlConverter.IConfigurer{
}
return adaptor.fixIdentifierCaseSensitve(orig);
}
+
+ @Override
+ public String getTransactionIsolationLevel() {
+ return dsDef.getPropertyValue("transaction.isolation-level");
+ }
}
diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java
index d25c04f..e8302e8 100644
--- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java
@@ -110,5 +110,11 @@ public class SqlConverter {
boolean enableQuote();
String fixIdentifierCaseSensitve(String orig);
+
+ /**
+ * Only support following 3 types
+ * TRANSACTION_READ_COMMITTED,TRANSACTION_READ_UNCOMMITTED,TRANSACTION_READ_COMMITTED
+ */
+ String getTransactionIsolationLevel();
}
}
diff --git a/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java b/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java
index 94cc651..451be60 100644
--- a/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java
+++ b/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java
@@ -111,12 +111,20 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
public String fixIdentifierCaseSensitve(String orig) {
return orig;
}
+
+ @Override
+ public String getTransactionIsolationLevel() {
+ return null;
+ }
}, master);
// escape default keywords
- Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"DEFAULT\".FACT"));
- Assert.assertEquals("SELECT *\nFROM \"Default\".\"FACT\"", converter.convertSql("select * from \"Default\".FACT"));
- Assert.assertEquals("SELECT *\nFROM \"default\".\"FACT\"", converter.convertSql("select * from \"default\".FACT"));
+ Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"",
+ converter.convertSql("select * from \"DEFAULT\".FACT"));
+ Assert.assertEquals("SELECT *\nFROM \"Default\".\"FACT\"",
+ converter.convertSql("select * from \"Default\".FACT"));
+ Assert.assertEquals("SELECT *\nFROM \"default\".\"FACT\"",
+ converter.convertSql("select * from \"default\".FACT"));
}
@Test
@@ -189,6 +197,11 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
public String fixIdentifierCaseSensitve(String orig) {
return orig;
}
+
+ @Override
+ public String getTransactionIsolationLevel() {
+ return null;
+ }
}, master);
// normal cases
@@ -203,17 +216,20 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
// escape default keywords
Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from DEFAULT.FACT"));
- Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"DEFAULT\".FACT"));
- Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"Default\".FACT"));
- Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"default\".FACT"));
+ Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"",
+ converter.convertSql("select * from \"DEFAULT\".FACT"));
+ Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"",
+ converter.convertSql("select * from \"Default\".FACT"));
+ Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"",
+ converter.convertSql("select * from \"default\".FACT"));
// function mapping
Assert.assertEquals("SELECT EXTRACT(DOY FROM \"PART_DT\")\nFROM \"DEFAULT\".\"FACT\"",
converter.convertSql("select DAYOFYEAR(PART_DT) from \"DEFAULT\".FACT"));
Assert.assertEquals(
- "SELECT 12 * (EXTRACT(YEAR FROM \"DT1\") - EXTRACT(YEAR FROM \"DT2\")) + EXTRACT(MONTH FROM \"DT1\") - EXTRACT(MONTH FROM \"DT2\") - " +
- "CASE WHEN EXTRACT(DAY FROM \"DT2\") > EXTRACT(DAY FROM \"DT1\") THEN 1 ELSE 0 END\n" +
- "FROM \"DEFAULT\".\"FACT\"",
+ "SELECT 12 * (EXTRACT(YEAR FROM \"DT1\") - EXTRACT(YEAR FROM \"DT2\")) + EXTRACT(MONTH FROM \"DT1\") - EXTRACT(MONTH FROM \"DT2\") - "
+ + "CASE WHEN EXTRACT(DAY FROM \"DT2\") > EXTRACT(DAY FROM \"DT1\") THEN 1 ELSE 0 END\n"
+ + "FROM \"DEFAULT\".\"FACT\"",
converter.convertSql("select TIMESTAMPDIFF(month,DT2, DT1) from \"DEFAULT\".FACT"));
Assert.assertEquals("SELECT TRUNC(\"ID\")\nFROM \"DEFAULT\".\"FACT\"",
converter.convertSql("select cast(ID as INT) from \"DEFAULT\".FACT"));
@@ -221,12 +237,14 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
converter.convertSql("select 1 from a where 1 BETWEEN 0 and 2"));
Assert.assertEquals("SELECT \"CURRENT_DATE\", TEST_CURR_TIME()",
converter.convertSql("select CURRENT_DATE, CURRENT_TIME"));
- Assert.assertEquals("SELECT EXP(AVG(LN(EXTRACT(DOY FROM CAST('2018-03-20' AS DATE)))))\nFROM \"DEFAULT\".\"FACT\"",
+ Assert.assertEquals(
+ "SELECT EXP(AVG(LN(EXTRACT(DOY FROM CAST('2018-03-20' AS DATE)))))\nFROM \"DEFAULT\".\"FACT\"",
converter.convertSql(
"select exp(avg(ln(dayofyear(cast('2018-03-20' as date))))) from \"DEFAULT\".FACT"));
// over function
- Assert.assertEquals("SELECT STDDEVP(\"C1\") OVER (ORDER BY \"C1\")\nFROM \"TEST_SUITE\"\nFETCH NEXT 1 ROWS ONLY",
+ Assert.assertEquals(
+ "SELECT STDDEVP(\"C1\") OVER (ORDER BY \"C1\")\nFROM \"TEST_SUITE\"\nFETCH NEXT 1 ROWS ONLY",
converter.convertSql("select stddev_pop(c1) over(order by c1) from test_suite limit 1"));
// type mapping
@@ -332,6 +350,11 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
public String fixIdentifierCaseSensitve(String orig) {
return orig;
}
+
+ @Override
+ public String getTransactionIsolationLevel() {
+ return null;
+ }
}, master);
Assert.assertEquals("SELECT 1\nORDER BY 2\nOFFSET 0 ROWS\nFETCH NEXT 1 ROWS ONLY",
@@ -347,6 +370,7 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
Assert.assertEquals("SELECT 1\nORDER BY 1\nOFFSET 0 ROWS\nFETCH NEXT 1 ROWS ONLY",
converter.convertSql("SELECT 1 LIMIT 1"));
}
+
@Test
public void testConvertQuotedSqlWithEscape() throws SQLException {
DataSourceDefProvider provider = DataSourceDefProvider.getInstance();
@@ -417,6 +441,11 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
public String fixIdentifierCaseSensitve(String orig) {
return orig;
}
+
+ @Override
+ public String getTransactionIsolationLevel() {
+ return null;
+ }
}, master);
Assert.assertEquals("SELECT SUM(\"A\"), COUNT(\"A\") AS \"AB\"\nFROM \"DEFAULT\".\"CUBE\"",
@@ -425,8 +454,10 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
converter.convertSql("select A(), B(`A`), cast(`PRICE@@` as `DDD`) from DEFAULT.`CUBE`"));
Assert.assertEquals("SELECT A(), B(\"A\"), CAST(\"PRICE@@\" AS DDD)\nFROM \"DEFAULT\".\"CUBE\"",
converter.convertSql("select A(), B(\"A\"), cast(\"PRICE@@\" as \"DDD\") from \"DEFAULT\".\"CUBE\""));
- Assert.assertEquals("SELECT \"kylin_sales\".\"price_@@\", \"kylin_sales\".\"count\"\nFROM \"cube\".\"kylin_sales\"\nWHERE \"kylin_sales\".\"price_@@\" > 1 AND \"kylin_sales\".\"count\" < 50",
- converter.convertSql("select `kylin_sales`.`price_@@`, `kylin_sales`.`count` from `cube`.`kylin_sales` where `kylin_sales`.`price_@@` > 1 and `kylin_sales`.`count` < 50"));
+ Assert.assertEquals(
+ "SELECT \"kylin_sales\".\"price_@@\", \"kylin_sales\".\"count\"\nFROM \"cube\".\"kylin_sales\"\nWHERE \"kylin_sales\".\"price_@@\" > 1 AND \"kylin_sales\".\"count\" < 50",
+ converter.convertSql(
+ "select `kylin_sales`.`price_@@`, `kylin_sales`.`count` from `cube`.`kylin_sales` where `kylin_sales`.`price_@@` > 1 and `kylin_sales`.`count` < 50"));
Assert.assertEquals("SELECT COUNT(DISTINCT \"price_#@\")\nFROM \"cube\".\"kylin_sales\"",
converter.convertSql("select count(distinct `price_#@`) from `cube`.`kylin_sales`"));
@@ -502,6 +533,11 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
public String fixIdentifierCaseSensitve(String orig) {
return orig.toUpperCase(Locale.ROOT);
}
+
+ @Override
+ public String getTransactionIsolationLevel() {
+ return null;
+ }
}, master);
Assert.assertEquals("\"TEST\".\"AA\"", converter.convertColumn("`test`.`aa`", "`"));
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
index 10eb31e..ec69084 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
@@ -29,6 +29,7 @@ import org.apache.kylin.metadata.model.PartitionDesc;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.sdk.datasource.framework.JdbcConnector;
+import org.apache.kylin.sdk.datasource.framework.conv.SqlConverter;
import org.apache.kylin.source.jdbc.sqoop.SqoopCmdStep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,15 +83,15 @@ public class JdbcHiveInputBase extends org.apache.kylin.source.jdbc.JdbcHiveInpu
String filedDelimiter = config.getJdbcSourceFieldDelimiter();
int mapperNum = config.getSqoopMapperNum();
- String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM `%s`.%s as `%s`", splitColumn, splitColumn,
- splitDatabase, splitTable, splitTableAlias);
+ String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM `%s`.%s as `%s`", splitColumn,
+ splitColumn, splitDatabase, splitTable, splitTableAlias);
bquery = dataSource.convertSql(bquery);
if (partitionDesc.isPartitioned()) {
SegmentRange segRange = flatDesc.getSegRange();
if (segRange != null && !segRange.isInfinite()) {
if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
&& (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
- .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
+ .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
flatDesc.getSegment(), segRange),
@@ -110,6 +111,11 @@ public class JdbcHiveInputBase extends org.apache.kylin.source.jdbc.JdbcHiveInpu
dataSource.getJdbcUrl(), dataSource.getJdbcDriver(), dataSource.getJdbcUser(),
dataSource.getJdbcPassword(), selectSql, jobWorkingDir, hiveTable, splitColumn, bquery,
filedDelimiter, mapperNum);
+ SqlConverter.IConfigurer configurer = dataSource.getSqlConverter().getConfigurer();
+ if (configurer.getTransactionIsolationLevel() != null) {
+ cmd = cmd + " --relaxed-isolation --metadata-transaction-isolation-level "
+ + configurer.getTransactionIsolationLevel();
+ }
logger.debug("sqoop cmd: {}", cmd);
SqoopCmdStep step = new SqoopCmdStep();