You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/15 13:22:40 UTC

[kylin] branch master updated: KYLIN-3707 add configuration for setting isolation-level for sqoop

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new fb54f38  KYLIN-3707 add configuration for setting isolation-level for sqoop
fb54f38 is described below

commit fb54f38cdad26b776b6f5116cacfbc8470e19142
Author: woyumen4597 <wo...@gmail.com>
AuthorDate: Fri Dec 14 15:03:34 2018 +0800

    KYLIN-3707 add configuration for setting isolation-level for sqoop
---
 .../sdk/datasource/framework/JdbcConnector.java    |  4 +-
 .../framework/conv/DefaultConfiguer.java           |  9 ++--
 .../datasource/framework/conv/SqlConverter.java    |  6 +++
 .../framework/conv/SqlConverterTest.java           | 62 +++++++++++++++++-----
 .../source/jdbc/extensible/JdbcHiveInputBase.java  | 12 +++--
 5 files changed, 70 insertions(+), 23 deletions(-)

diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java
index d849e6c..47ba6b3 100644
--- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java
@@ -37,7 +37,6 @@ import org.apache.kylin.sdk.datasource.framework.def.DataSourceDefProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 
 public class JdbcConnector implements Closeable {
@@ -175,8 +174,7 @@ public class JdbcConnector implements Closeable {
         return jdbcDs.getPropertyValue(key);
     }
 
-    @VisibleForTesting
-    SqlConverter getSqlConverter() {
+    public SqlConverter getSqlConverter() {
         return sqlConverter;
     }
 
diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java
index 6d7fb6d..6c01a70 100644
--- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java
@@ -78,10 +78,6 @@ public class DefaultConfiguer implements SqlConverter.IConfigurer{
         if (this.adaptor == null) {
             return orig;
         }
-        // fix problem of case sensitive when generate sql.
-//        if (isCaseSensitive()) {
-//            orig = adaptor.fixCaseSensitiveSql(orig);
-//        }
         return adaptor.fixSql(orig);
     }
 
@@ -134,4 +130,9 @@ public class DefaultConfiguer implements SqlConverter.IConfigurer{
         }
         return adaptor.fixIdentifierCaseSensitve(orig);
     }
+
+    @Override
+    public String getTransactionIsolationLevel() {
+        return dsDef.getPropertyValue("transaction.isolation-level");
+    }
 }
diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java
index d25c04f..e8302e8 100644
--- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java
@@ -110,5 +110,11 @@ public class SqlConverter {
         boolean enableQuote();
 
         String fixIdentifierCaseSensitve(String orig);
+
+        /**
+         * Only support following 3 types
+         * TRANSACTION_READ_COMMITTED,TRANSACTION_READ_UNCOMMITTED,TRANSACTION_READ_COMMITTED
+         */
+        String getTransactionIsolationLevel();
     }
 }
diff --git a/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java b/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java
index 94cc651..451be60 100644
--- a/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java
+++ b/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java
@@ -111,12 +111,20 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
             public String fixIdentifierCaseSensitve(String orig) {
                 return orig;
             }
+
+            @Override
+            public String getTransactionIsolationLevel() {
+                return null;
+            }
         }, master);
 
         // escape default keywords
-        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT *\nFROM \"Default\".\"FACT\"", converter.convertSql("select * from \"Default\".FACT"));
-        Assert.assertEquals("SELECT *\nFROM \"default\".\"FACT\"", converter.convertSql("select * from \"default\".FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"",
+                converter.convertSql("select * from \"DEFAULT\".FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"Default\".\"FACT\"",
+                converter.convertSql("select * from \"Default\".FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"default\".\"FACT\"",
+                converter.convertSql("select * from \"default\".FACT"));
     }
 
     @Test
@@ -189,6 +197,11 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
             public String fixIdentifierCaseSensitve(String orig) {
                 return orig;
             }
+
+            @Override
+            public String getTransactionIsolationLevel() {
+                return null;
+            }
         }, master);
 
         // normal cases
@@ -203,17 +216,20 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
 
         // escape default keywords
         Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from DEFAULT.FACT"));
-        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"Default\".FACT"));
-        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"default\".FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"",
+                converter.convertSql("select * from \"DEFAULT\".FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"",
+                converter.convertSql("select * from \"Default\".FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"",
+                converter.convertSql("select * from \"default\".FACT"));
 
         // function mapping
         Assert.assertEquals("SELECT EXTRACT(DOY FROM \"PART_DT\")\nFROM \"DEFAULT\".\"FACT\"",
                 converter.convertSql("select DAYOFYEAR(PART_DT) from \"DEFAULT\".FACT"));
         Assert.assertEquals(
-                "SELECT 12 * (EXTRACT(YEAR FROM \"DT1\") - EXTRACT(YEAR FROM \"DT2\")) + EXTRACT(MONTH FROM \"DT1\") - EXTRACT(MONTH FROM \"DT2\") - " +
-                        "CASE WHEN EXTRACT(DAY FROM \"DT2\") > EXTRACT(DAY FROM \"DT1\") THEN 1 ELSE 0 END\n" +
-                        "FROM \"DEFAULT\".\"FACT\"",
+                "SELECT 12 * (EXTRACT(YEAR FROM \"DT1\") - EXTRACT(YEAR FROM \"DT2\")) + EXTRACT(MONTH FROM \"DT1\") - EXTRACT(MONTH FROM \"DT2\") - "
+                        + "CASE WHEN EXTRACT(DAY FROM \"DT2\") > EXTRACT(DAY FROM \"DT1\") THEN 1 ELSE 0 END\n"
+                        + "FROM \"DEFAULT\".\"FACT\"",
                 converter.convertSql("select TIMESTAMPDIFF(month,DT2,      DT1) from \"DEFAULT\".FACT"));
         Assert.assertEquals("SELECT TRUNC(\"ID\")\nFROM \"DEFAULT\".\"FACT\"",
                 converter.convertSql("select cast(ID as INT) from \"DEFAULT\".FACT"));
@@ -221,12 +237,14 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
                 converter.convertSql("select 1 from a where 1 BETWEEN 0 and 2"));
         Assert.assertEquals("SELECT \"CURRENT_DATE\", TEST_CURR_TIME()",
                 converter.convertSql("select CURRENT_DATE, CURRENT_TIME"));
-        Assert.assertEquals("SELECT EXP(AVG(LN(EXTRACT(DOY FROM CAST('2018-03-20' AS DATE)))))\nFROM \"DEFAULT\".\"FACT\"",
+        Assert.assertEquals(
+                "SELECT EXP(AVG(LN(EXTRACT(DOY FROM CAST('2018-03-20' AS DATE)))))\nFROM \"DEFAULT\".\"FACT\"",
                 converter.convertSql(
                         "select exp(avg(ln(dayofyear(cast('2018-03-20' as date))))) from \"DEFAULT\".FACT"));
 
         // over function
-        Assert.assertEquals("SELECT STDDEVP(\"C1\") OVER (ORDER BY \"C1\")\nFROM \"TEST_SUITE\"\nFETCH NEXT 1 ROWS ONLY",
+        Assert.assertEquals(
+                "SELECT STDDEVP(\"C1\") OVER (ORDER BY \"C1\")\nFROM \"TEST_SUITE\"\nFETCH NEXT 1 ROWS ONLY",
                 converter.convertSql("select stddev_pop(c1) over(order by c1) from test_suite limit 1"));
 
         // type mapping
@@ -332,6 +350,11 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
             public String fixIdentifierCaseSensitve(String orig) {
                 return orig;
             }
+
+            @Override
+            public String getTransactionIsolationLevel() {
+                return null;
+            }
         }, master);
 
         Assert.assertEquals("SELECT 1\nORDER BY 2\nOFFSET 0 ROWS\nFETCH NEXT 1 ROWS ONLY",
@@ -347,6 +370,7 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
         Assert.assertEquals("SELECT 1\nORDER BY 1\nOFFSET 0 ROWS\nFETCH NEXT 1 ROWS ONLY",
                 converter.convertSql("SELECT 1 LIMIT 1"));
     }
+
     @Test
     public void testConvertQuotedSqlWithEscape() throws SQLException {
         DataSourceDefProvider provider = DataSourceDefProvider.getInstance();
@@ -417,6 +441,11 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
             public String fixIdentifierCaseSensitve(String orig) {
                 return orig;
             }
+
+            @Override
+            public String getTransactionIsolationLevel() {
+                return null;
+            }
         }, master);
 
         Assert.assertEquals("SELECT SUM(\"A\"), COUNT(\"A\") AS \"AB\"\nFROM \"DEFAULT\".\"CUBE\"",
@@ -425,8 +454,10 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
                 converter.convertSql("select A(), B(`A`), cast(`PRICE@@` as `DDD`) from DEFAULT.`CUBE`"));
         Assert.assertEquals("SELECT A(), B(\"A\"), CAST(\"PRICE@@\" AS DDD)\nFROM \"DEFAULT\".\"CUBE\"",
                 converter.convertSql("select A(), B(\"A\"), cast(\"PRICE@@\" as \"DDD\") from \"DEFAULT\".\"CUBE\""));
-        Assert.assertEquals("SELECT \"kylin_sales\".\"price_@@\", \"kylin_sales\".\"count\"\nFROM \"cube\".\"kylin_sales\"\nWHERE \"kylin_sales\".\"price_@@\" > 1 AND \"kylin_sales\".\"count\" < 50",
-                converter.convertSql("select `kylin_sales`.`price_@@`, `kylin_sales`.`count` from `cube`.`kylin_sales` where `kylin_sales`.`price_@@` > 1 and `kylin_sales`.`count` < 50"));
+        Assert.assertEquals(
+                "SELECT \"kylin_sales\".\"price_@@\", \"kylin_sales\".\"count\"\nFROM \"cube\".\"kylin_sales\"\nWHERE \"kylin_sales\".\"price_@@\" > 1 AND \"kylin_sales\".\"count\" < 50",
+                converter.convertSql(
+                        "select `kylin_sales`.`price_@@`, `kylin_sales`.`count` from `cube`.`kylin_sales` where `kylin_sales`.`price_@@` > 1 and `kylin_sales`.`count` < 50"));
         Assert.assertEquals("SELECT COUNT(DISTINCT \"price_#@\")\nFROM \"cube\".\"kylin_sales\"",
                 converter.convertSql("select count(distinct `price_#@`) from `cube`.`kylin_sales`"));
 
@@ -502,6 +533,11 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
             public String fixIdentifierCaseSensitve(String orig) {
                 return orig.toUpperCase(Locale.ROOT);
             }
+
+            @Override
+            public String getTransactionIsolationLevel() {
+                return null;
+            }
         }, master);
 
         Assert.assertEquals("\"TEST\".\"AA\"", converter.convertColumn("`test`.`aa`", "`"));
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
index 10eb31e..ec69084 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
@@ -29,6 +29,7 @@ import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.sdk.datasource.framework.JdbcConnector;
+import org.apache.kylin.sdk.datasource.framework.conv.SqlConverter;
 import org.apache.kylin.source.jdbc.sqoop.SqoopCmdStep;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,15 +83,15 @@ public class JdbcHiveInputBase extends org.apache.kylin.source.jdbc.JdbcHiveInpu
             String filedDelimiter = config.getJdbcSourceFieldDelimiter();
             int mapperNum = config.getSqoopMapperNum();
 
-            String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM `%s`.%s as `%s`", splitColumn, splitColumn,
-                    splitDatabase, splitTable, splitTableAlias);
+            String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM `%s`.%s as `%s`", splitColumn,
+                    splitColumn, splitDatabase, splitTable, splitTableAlias);
             bquery = dataSource.convertSql(bquery);
             if (partitionDesc.isPartitioned()) {
                 SegmentRange segRange = flatDesc.getSegRange();
                 if (segRange != null && !segRange.isInfinite()) {
                     if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
                             && (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
-                            .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
+                                    .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
                         String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
                                 partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
                                         flatDesc.getSegment(), segRange),
@@ -110,6 +111,11 @@ public class JdbcHiveInputBase extends org.apache.kylin.source.jdbc.JdbcHiveInpu
                     dataSource.getJdbcUrl(), dataSource.getJdbcDriver(), dataSource.getJdbcUser(),
                     dataSource.getJdbcPassword(), selectSql, jobWorkingDir, hiveTable, splitColumn, bquery,
                     filedDelimiter, mapperNum);
+            SqlConverter.IConfigurer configurer = dataSource.getSqlConverter().getConfigurer();
+            if (configurer.getTransactionIsolationLevel() != null) {
+                cmd = cmd + " --relaxed-isolation --metadata-transaction-isolation-level "
+                        + configurer.getTransactionIsolationLevel();
+            }
             logger.debug("sqoop cmd: {}", cmd);
 
             SqoopCmdStep step = new SqoopCmdStep();