You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/05/27 11:26:52 UTC

[kylin] 02/04: KYLIN-3363 fix wrong partition condition appended in JDBC Source

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch 2.3.x
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 260391f856e52519605e7d200d84610e69548224
Author: lidongsjtu <li...@apache.org>
AuthorDate: Sat May 26 22:44:30 2018 +0800

    KYLIN-3363 fix wrong partition condition appended in JDBC Source
---
 .../apache/kylin/source/jdbc/JdbcHiveMRInput.java  | 41 +++++++++++++---------
 1 file changed, 24 insertions(+), 17 deletions(-)

diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index 457c832..bbaaa4a 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@ -30,6 +30,7 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.model.TableExtDesc.ColumnStats;
 import org.apache.kylin.metadata.model.TableRef;
@@ -53,7 +54,7 @@ public class JdbcHiveMRInput extends HiveMRInput {
         public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
             super(flatDesc);
         }
-        
+
         private KylinConfig getConfig() {
             return flatDesc.getDataModel().getConfig();
         }
@@ -140,20 +141,19 @@ public class JdbcHiveMRInput extends HiveMRInput {
             KylinConfig config = getConfig();
             PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc();
             String partCol = null;
-            String partitionString = null;
 
             if (partitionDesc.isPartitioned()) {
                 partCol = partitionDesc.getPartitionDateColumn();//tablename.colname
-                partitionString = partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
-                        flatDesc.getSegment(), flatDesc.getSegRange());
             }
 
             String splitTable;
+            String splitTableAlias;
             String splitColumn;
             String splitDatabase;
             TblColRef splitColRef = determineSplitColumn();
             splitTable = splitColRef.getTableRef().getTableName();
-            splitColumn = splitColRef.getName();
+            splitTableAlias = splitColRef.getTableAlias();
+            splitColumn = splitColRef.getExpressionInSourceDB();
             splitDatabase = splitColRef.getColumnDesc().getTable().getDatabase();
 
             //using sqoop to extract data from jdbc source and dump them to hive
@@ -167,22 +167,29 @@ public class JdbcHiveMRInput extends HiveMRInput {
             String filedDelimiter = config.getJdbcSourceFieldDelimiter();
             int mapperNum = config.getSqoopMapperNum();
 
-            String bquery = String.format("SELECT min(%s), max(%s) FROM %s.%s", splitColumn, splitColumn, splitDatabase,
-                    splitTable);
-            if (partitionString != null) {
-                bquery += " WHERE " + partitionString;
+            String bquery = String.format("SELECT min(%s), max(%s) FROM \"%s\".%s as %s", splitColumn, splitColumn,
+                    splitDatabase, splitTable, splitTableAlias);
+            if (partitionDesc.isPartitioned()) {
+                SegmentRange segRange = flatDesc.getSegRange();
+                if (segRange != null && !segRange.isInfinite()) {
+                    if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
+                            && (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
+                                    .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
+                        bquery += " WHERE " + partitionDesc.getPartitionConditionBuilder()
+                                .buildDateRangeCondition(partitionDesc, flatDesc.getSegment(), segRange);
+                    }
+                }
             }
 
             //related to "kylin.engine.mr.config-override.mapreduce.job.queuename"
             String queueName = getSqoopJobQueueName(config);
-            String cmd = String.format(
-                    "%s/sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true "
-                            + "-Dmapreduce.job.queuename=%s "
-                            + "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
-                            + "--target-dir %s/%s --split-by %s.%s --boundary-query \"%s\" --null-string '' "
-                            + "--fields-terminated-by '%s' --num-mappers %d",
-                    sqoopHome, queueName, connectionUrl, driverClass, jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable,
-                    splitTable, splitColumn, bquery, filedDelimiter, mapperNum);
+            String cmd = String.format("%s/sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true "
+                    + "-Dmapreduce.job.queuename=%s "
+                    + "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
+                    + "--target-dir %s/%s --split-by %s.%s --boundary-query \"%s\" --null-string '' "
+                    + "--fields-terminated-by '%s' --num-mappers %d", sqoopHome, queueName, connectionUrl, driverClass,
+                    jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable, splitTable, splitColumn, bquery,
+                    filedDelimiter, mapperNum);
             logger.debug(String.format("sqoop cmd:%s", cmd));
             CmdStep step = new CmdStep();
             step.setCmd(cmd);

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.