You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/05/27 11:26:52 UTC
[kylin] 02/04: KYLIN-3363 fix wrong partition condition appended in
JDBC Source
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch 2.3.x
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 260391f856e52519605e7d200d84610e69548224
Author: lidongsjtu <li...@apache.org>
AuthorDate: Sat May 26 22:44:30 2018 +0800
KYLIN-3363 fix wrong partition condition appended in JDBC Source
---
.../apache/kylin/source/jdbc/JdbcHiveMRInput.java | 41 +++++++++++++---------
1 file changed, 24 insertions(+), 17 deletions(-)
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index 457c832..bbaaa4a 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@ -30,6 +30,7 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.model.TableExtDesc.ColumnStats;
import org.apache.kylin.metadata.model.TableRef;
@@ -53,7 +54,7 @@ public class JdbcHiveMRInput extends HiveMRInput {
public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
super(flatDesc);
}
-
+
private KylinConfig getConfig() {
return flatDesc.getDataModel().getConfig();
}
@@ -140,20 +141,19 @@ public class JdbcHiveMRInput extends HiveMRInput {
KylinConfig config = getConfig();
PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc();
String partCol = null;
- String partitionString = null;
if (partitionDesc.isPartitioned()) {
partCol = partitionDesc.getPartitionDateColumn();//tablename.colname
- partitionString = partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
- flatDesc.getSegment(), flatDesc.getSegRange());
}
String splitTable;
+ String splitTableAlias;
String splitColumn;
String splitDatabase;
TblColRef splitColRef = determineSplitColumn();
splitTable = splitColRef.getTableRef().getTableName();
- splitColumn = splitColRef.getName();
+ splitTableAlias = splitColRef.getTableAlias();
+ splitColumn = splitColRef.getExpressionInSourceDB();
splitDatabase = splitColRef.getColumnDesc().getTable().getDatabase();
//using sqoop to extract data from jdbc source and dump them to hive
@@ -167,22 +167,29 @@ public class JdbcHiveMRInput extends HiveMRInput {
String filedDelimiter = config.getJdbcSourceFieldDelimiter();
int mapperNum = config.getSqoopMapperNum();
- String bquery = String.format("SELECT min(%s), max(%s) FROM %s.%s", splitColumn, splitColumn, splitDatabase,
- splitTable);
- if (partitionString != null) {
- bquery += " WHERE " + partitionString;
+ String bquery = String.format("SELECT min(%s), max(%s) FROM \"%s\".%s as %s", splitColumn, splitColumn,
+ splitDatabase, splitTable, splitTableAlias);
+ if (partitionDesc.isPartitioned()) {
+ SegmentRange segRange = flatDesc.getSegRange();
+ if (segRange != null && !segRange.isInfinite()) {
+ if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
+ && (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
+ .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
+ bquery += " WHERE " + partitionDesc.getPartitionConditionBuilder()
+ .buildDateRangeCondition(partitionDesc, flatDesc.getSegment(), segRange);
+ }
+ }
}
//related to "kylin.engine.mr.config-override.mapreduce.job.queuename"
String queueName = getSqoopJobQueueName(config);
- String cmd = String.format(
- "%s/sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true "
- + "-Dmapreduce.job.queuename=%s "
- + "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
- + "--target-dir %s/%s --split-by %s.%s --boundary-query \"%s\" --null-string '' "
- + "--fields-terminated-by '%s' --num-mappers %d",
- sqoopHome, queueName, connectionUrl, driverClass, jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable,
- splitTable, splitColumn, bquery, filedDelimiter, mapperNum);
+ String cmd = String.format("%s/sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true "
+ + "-Dmapreduce.job.queuename=%s "
+ + "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
+ + "--target-dir %s/%s --split-by %s.%s --boundary-query \"%s\" --null-string '' "
+ + "--fields-terminated-by '%s' --num-mappers %d", sqoopHome, queueName, connectionUrl, driverClass,
+ jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable, splitTable, splitColumn, bquery,
+ filedDelimiter, mapperNum);
logger.debug(String.format("sqoop cmd:%s", cmd));
CmdStep step = new CmdStep();
step.setCmd(cmd);
--
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.