You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/09/24 12:57:21 UTC
[kylin] 03/04: KYLIN-3546 Add column which belongs to root fact
table in join relation but lost
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch 2.5.x
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 807dafcf3610f5b8210b6ad98626af5aa1af0363
Author: hit-lacus <hi...@126.com>
AuthorDate: Mon Sep 24 15:34:30 2018 +0800
KYLIN-3546 Add column which belongs to root fact table in join relation but lost
---
.../apache/kylin/source/kafka/KafkaInputBase.java | 24 +++++++++++++++++++++-
1 file changed, 23 insertions(+), 1 deletion(-)
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
index a624f8f..cb2e14c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
@@ -18,8 +18,11 @@
package org.apache.kylin.source.kafka;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
+import com.google.common.collect.Sets;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.JobBuilderSupport;
@@ -32,7 +35,10 @@ import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.JoinTableDesc;
import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.TableRef;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.hive.CreateFlatHiveTableStep;
import org.apache.kylin.source.hive.GarbageCollectionStep;
@@ -89,7 +95,23 @@ public class KafkaInputBase {
@Override
public List<TblColRef> getAllColumns() {
- return flatDesc.getFactColumns();
+ final Set<TblColRef> factTableColumnSet = Sets.newHashSet();
+ TableRef rootFactTable = getDataModel().getRootFactTable();
+ for (TblColRef colRef : flatDesc.getAllColumns()) {
+ if (colRef.getTableRef().equals(rootFactTable)) {
+ factTableColumnSet.add(colRef);
+ }
+ }
+ // Add column which belongs to root fact table in join relation but lost
+ for (JoinTableDesc joinTableDesc : getDataModel().getJoinTables()) {
+ JoinDesc join = joinTableDesc.getJoin();
+ for (TblColRef colRef : join.getForeignKeyColumns()) {
+ if (colRef.getTableRef().equals(rootFactTable)) {
+ factTableColumnSet.add(colRef);
+ }
+ }
+ }
+ return new LinkedList<>(factTableColumnSet);
}
@Override