You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/08/20 13:47:32 UTC
kylin git commit: KYLIN-1966 Refactor IJoinedFlatTableDesc
Repository: kylin
Updated Branches:
refs/heads/master ddb59106b -> 889c544ad
KYLIN-1966 Refactor IJoinedFlatTableDesc
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/889c544a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/889c544a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/889c544a
Branch: refs/heads/master
Commit: 889c544adf650963598381ce905aa8f86cb8ef86
Parents: ddb5910
Author: Yang Li <li...@apache.org>
Authored: Sat Aug 20 21:47:18 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sat Aug 20 21:47:18 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeSegment.java | 2 +-
.../InMemCubeBuilderInputConverter.java | 2 +-
.../cube/model/CubeJoinedFlatTableDesc.java | 86 +++++++++---------
.../org/apache/kylin/cube/util/CubingUtils.java | 4 +-
.../org/apache/kylin/job/JoinedFlatTable.java | 93 ++++++++++----------
.../apache/kylin/job/JoinedFlatTableTest.java | 2 +-
.../kylin/metadata/model/DataModelDesc.java | 1 +
.../metadata/model/IJoinedFlatTableDesc.java | 10 ++-
.../metadata/model/IntermediateColumnDesc.java | 60 -------------
.../apache/kylin/metadata/model/LookupDesc.java | 12 +++
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 3 -
.../org/apache/kylin/engine/mr/IMRInput.java | 13 +--
.../java/org/apache/kylin/engine/mr/MRUtil.java | 6 +-
.../engine/mr/steps/BaseCuboidMapperBase.java | 2 +-
.../mr/steps/FactDistinctColumnsMapperBase.java | 2 +-
.../apache/kylin/engine/spark/SparkCubing.java | 4 +-
.../inmemcubing/ITInMemCubeBuilderTest.java | 8 +-
.../kylin/rest/controller/CubeController.java | 3 +-
.../apache/kylin/source/hive/HiveMRInput.java | 38 +++-----
.../kylin/source/kafka/KafkaStreamingInput.java | 12 +--
20 files changed, 139 insertions(+), 224 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 4697c63..0797ab3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -531,7 +531,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable {
}
public IJoinedFlatTableDesc getJoinedFlatTableDesc() {
- return new CubeJoinedFlatTableDesc(this.getCubeDesc(), this);
+ return new CubeJoinedFlatTableDesc(this);
}
public String getIndexPath() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
index 3ce635b..607f6bb 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
@@ -50,7 +50,7 @@ public class InMemCubeBuilderInputConverter {
public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap, GTInfo gtInfo) {
this.gtInfo = gtInfo;
- this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+ this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc);
this.measureCount = cubeDesc.getMeasures().size();
this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
this.measureIngesters = MeasureIngester.create(cubeDesc.getMeasures());
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index e8856ef..ebc903a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -28,7 +28,6 @@ import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
import org.apache.kylin.metadata.model.JoinDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -48,13 +47,21 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
private int[] rowKeyColumnIndexes; // the column index on flat table
private int[][] measureColumnIndexes; // [i] is the i.th measure related column index on flat table
- private List<IntermediateColumnDesc> columnList = Lists.newArrayList();
-
- private Map<String, Integer> columnIndexMap;
+ private List<TblColRef> columnList = Lists.newArrayList();
+ private Map<TblColRef, Integer> columnIndexMap;
private List<JoinDesc> cubeJoins;
- public CubeJoinedFlatTableDesc(CubeDesc cubeDesc, CubeSegment cubeSegment) {
+
+ public CubeJoinedFlatTableDesc(CubeDesc cubeDesc) {
+ this(cubeDesc, null);
+ }
+
+ public CubeJoinedFlatTableDesc(CubeSegment cubeSegment) {
+ this(cubeSegment.getCubeDesc(), cubeSegment);
+ }
+
+ private CubeJoinedFlatTableDesc(CubeDesc cubeDesc, CubeSegment cubeSegment /* can be null */) {
this.cubeDesc = cubeDesc;
this.cubeSegment = cubeSegment;
this.columnIndexMap = Maps.newHashMap();
@@ -62,16 +69,8 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
parseCubeDesc();
}
- /**
- * @return the cubeSegment
- */
- public CubeSegment getCubeSegment() {
- return cubeSegment;
- }
-
// check what columns from hive tables are required, and index them
private void parseCubeDesc() {
- int rowkeyColCount = cubeDesc.getRowkey().getRowKeyColumns().length;
long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
@@ -83,19 +82,20 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
int columnIndex = 0;
for (TblColRef col : cubeDesc.listDimensionColumnsExcludingDerived(false)) {
- columnIndexMap.put(colName(col.getCanonicalName()), columnIndex);
- columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), col));
+ columnIndexMap.put(col, columnIndex);
+ columnList.add(col);
columnIndex++;
}
// build index for rowkey columns
List<TblColRef> cuboidColumns = baseCuboid.getColumns();
+ int rowkeyColCount = cubeDesc.getRowkey().getRowKeyColumns().length;
rowKeyColumnIndexes = new int[rowkeyColCount];
for (int i = 0; i < rowkeyColCount; i++) {
- String colName = colName(cuboidColumns.get(i).getCanonicalName());
- Integer dimIdx = columnIndexMap.get(colName);
+ TblColRef col = cuboidColumns.get(i);
+ Integer dimIdx = columnIndexMap.get(col);
if (dimIdx == null) {
- throw new RuntimeException("Can't find column " + colName);
+ throw new RuntimeException("Can't find column " + col);
}
rowKeyColumnIndexes[i] = dimIdx;
}
@@ -112,11 +112,11 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
measureColumnIndexes[i] = new int[colRefs.size()];
for (int j = 0; j < colRefs.size(); j++) {
TblColRef c = colRefs.get(j);
- measureColumnIndexes[i][j] = contains(columnList, c);
+ measureColumnIndexes[i][j] = columnList.indexOf(c);
if (measureColumnIndexes[i][j] < 0) {
measureColumnIndexes[i][j] = columnIndex;
- columnIndexMap.put(colName(c.getCanonicalName()), columnIndex);
- columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), c));
+ columnIndexMap.put(c, columnIndex);
+ columnList.add(c);
columnIndex++;
}
}
@@ -126,16 +126,16 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
if (cubeDesc.getDictionaries() != null) {
for (DictionaryDesc dictDesc : cubeDesc.getDictionaries()) {
TblColRef c = dictDesc.getColumnRef();
- if (contains(columnList, c) < 0) {
- columnIndexMap.put(colName(c.getCanonicalName()), columnIndex);
- columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), c));
+ if (columnList.indexOf(c) < 0) {
+ columnIndexMap.put(c, columnIndex);
+ columnList.add(c);
columnIndex++;
}
if (dictDesc.getResuseColumnRef() != null) {
c = dictDesc.getResuseColumnRef();
- if (contains(columnList, c) < 0) {
- columnIndexMap.put(colName(c.getCanonicalName()), columnIndex);
- columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), c));
+ if (columnList.indexOf(c) < 0) {
+ columnIndexMap.put(c, columnIndex);
+ columnList.add(c);
columnIndex++;
}
}
@@ -151,16 +151,6 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
}
}
- private int contains(List<IntermediateColumnDesc> columnList, TblColRef c) {
- for (int i = 0; i < columnList.size(); i++) {
- IntermediateColumnDesc col = columnList.get(i);
-
- if (col.isSameAs(c.getTable(), c.getName()))
- return i;
- }
- return -1;
- }
-
// sanity check the input record (in bytes) matches what's expected
public void sanityCheck(BytesSplitter bytesSplitter) {
if (columnCount != bytesSplitter.getBufferSize()) {
@@ -188,7 +178,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
}
@Override
- public List<IntermediateColumnDesc> getColumnList() {
+ public List<TblColRef> getAllColumns() {
return columnList;
}
@@ -197,11 +187,6 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
return cubeDesc.getModel();
}
- @Override
- public List<JoinDesc> getUsedJoinsSet() {
- return cubeJoins;
- }
-
private static String colName(String canonicalColName) {
return canonicalColName.replace(".", "_");
}
@@ -215,4 +200,19 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
return index.intValue();
}
+ @Override
+ public long getSourceOffsetStart() {
+ return cubeSegment.getSourceOffsetStart();
+ }
+
+ @Override
+ public long getSourceOffsetEnd() {
+ return cubeSegment.getSourceOffsetEnd();
+ }
+
+ @Override
+ public TblColRef getDistributedBy() {
+ return cubeDesc.getDistributedByColumn();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
index 0be39b4..b7f79e1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
@@ -75,7 +75,7 @@ public class CubingUtils {
private static Logger logger = LoggerFactory.getLogger(CubingUtils.class);
public static Map<Long, HyperLogLogPlusCounter> sampling(CubeDesc cubeDesc, Iterable<List<String>> streams) {
- CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+ CubeJoinedFlatTableDesc flatDesc = new CubeJoinedFlatTableDesc(cubeDesc);
final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
final List<Long> allCuboidIds = new CuboidScheduler(cubeDesc).getAllCuboidIds();
final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
@@ -125,7 +125,7 @@ public class CubingUtils {
//generate hash for each row key column
for (int i = 0; i < rowkeyLength; i++) {
Hasher hc = hf.newHasher();
- final String cell = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+ final String cell = row.get(flatDesc.getRowKeyColumnIndexes()[i]);
if (cell != null) {
row_hashcodes[i].set(hc.putString(cell).hash().asBytes());
} else {
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index cb3735a..b39265d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -28,13 +28,9 @@ import java.util.Set;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
import org.apache.kylin.metadata.model.JoinDesc;
import org.apache.kylin.metadata.model.LookupDesc;
import org.apache.kylin.metadata.model.PartitionDesc;
@@ -42,6 +38,9 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.w3c.dom.Document;
import org.w3c.dom.NodeList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
/**
*
*/
@@ -79,24 +78,24 @@ public class JoinedFlatTable {
return buffer.toString();
}
- public static String generateCreateTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
+ public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir) {
StringBuilder ddl = new StringBuilder();
- ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediateTableDesc.getTableName() + "\n");
+ ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + flatDesc.getTableName() + "\n");
ddl.append("(" + "\n");
- for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
- IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
+ for (int i = 0; i < flatDesc.getAllColumns().size(); i++) {
+ TblColRef col = flatDesc.getAllColumns().get(i);
if (i > 0) {
ddl.append(",");
}
- ddl.append(colName(col.getCanonicalName()) + " " + getHiveDataType(col.getDataType()) + "\n");
+ ddl.append(colName(col.getCanonicalName()) + " " + getHiveDataType(col.getDatatype()) + "\n");
}
ddl.append(")" + "\n");
ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n");
ddl.append("STORED AS SEQUENCEFILE" + "\n");
- ddl.append("LOCATION '" + getTableDir(intermediateTableDesc, storageDfsDir) + "';").append("\n");
+ ddl.append("LOCATION '" + getTableDir(flatDesc, storageDfsDir) + "';").append("\n");
// ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" +
// ";\n");
return ddl.toString();
@@ -115,32 +114,32 @@ public class JoinedFlatTable {
return sql.toString();
}
- public static String generateSelectDataStatement(IJoinedFlatTableDesc intermediateTableDesc) {
+ public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc) {
StringBuilder sql = new StringBuilder();
sql.append("SELECT" + "\n");
String tableAlias;
- Map<String, String> tableAliasMap = buildTableAliasMap(intermediateTableDesc.getDataModel());
- for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
- IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
+ Map<String, String> tableAliasMap = buildTableAliasMap(flatDesc.getDataModel());
+ for (int i = 0; i < flatDesc.getAllColumns().size(); i++) {
+ TblColRef col = flatDesc.getAllColumns().get(i);
if (i > 0) {
sql.append(",");
}
- tableAlias = tableAliasMap.get(col.getTableName());
- sql.append(tableAlias + "." + col.getColumnName() + "\n");
+ tableAlias = tableAliasMap.get(col.getTable());
+ sql.append(tableAlias + "." + col.getName() + "\n");
}
- appendJoinStatement(intermediateTableDesc, sql, tableAliasMap);
- appendWhereStatement(intermediateTableDesc, sql, tableAliasMap);
- appendDistributeStatement(intermediateTableDesc, sql, tableAliasMap);
+ appendJoinStatement(flatDesc, sql, tableAliasMap);
+ appendWhereStatement(flatDesc, sql, tableAliasMap);
+ appendDistributeStatement(flatDesc, sql, tableAliasMap);
return sql.toString();
}
- public static String generateCountDataStatement(IJoinedFlatTableDesc intermediateTableDesc, final String outputDir) {
- final Map<String, String> tableAliasMap = buildTableAliasMap(intermediateTableDesc.getDataModel());
+ public static String generateCountDataStatement(IJoinedFlatTableDesc flatDesc, final String outputDir) {
+ final Map<String, String> tableAliasMap = buildTableAliasMap(flatDesc.getDataModel());
final StringBuilder sql = new StringBuilder();
- final String factTbl = intermediateTableDesc.getDataModel().getFactTable();
+ final String factTbl = flatDesc.getDataModel().getFactTable();
sql.append("dfs -mkdir -p " + outputDir + ";\n");
sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + factTbl + " " + tableAliasMap.get(factTbl) + "\n");
- appendWhereStatement(intermediateTableDesc, sql, tableAliasMap);
+ appendWhereStatement(flatDesc, sql, tableAliasMap);
return sql.toString();
}
@@ -172,12 +171,12 @@ public class JoinedFlatTable {
tableAliasMap.put(table, alias);
}
- private static void appendJoinStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
- List<JoinDesc> cubeJoins = intermediateTableDesc.getUsedJoinsSet();
+ private static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
+ List<JoinDesc> cubeJoins = getUsedJoinsSet(flatDesc);
Set<String> dimTableCache = new HashSet<String>();
- DataModelDesc dataModelDesc = intermediateTableDesc.getDataModel();
+ DataModelDesc dataModelDesc = flatDesc.getDataModel();
String factTableName = dataModelDesc.getFactTable();
String factTableAlias = tableAliasMap.get(factTableName);
sql.append("FROM " + factTableName + " as " + factTableAlias + " \n");
@@ -212,13 +211,25 @@ public class JoinedFlatTable {
}
}
- private static void appendDistributeStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
- if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) {
- return;//TODO: for now only cube segments support distribution
+ private static List<JoinDesc> getUsedJoinsSet(IJoinedFlatTableDesc flatDesc) {
+ Set<String> usedTableIdentities = Sets.newHashSet();
+ for (TblColRef col : flatDesc.getAllColumns()) {
+ usedTableIdentities.add(col.getTable());
+ }
+
+ List<JoinDesc> result = Lists.newArrayList();
+ for (LookupDesc lookup : flatDesc.getDataModel().getLookups()) {
+ String table = lookup.getTableDesc().getIdentity();
+ if (usedTableIdentities.contains(table)) {
+ result.add(lookup.getJoin());
+ }
}
- CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) intermediateTableDesc;
+
+ return result;
+ }
- TblColRef distDcol = desc.getCubeDesc().getDistributedByColumn();
+ private static void appendDistributeStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
+ TblColRef distDcol = flatDesc.getDistributedBy();
if (distDcol != null) {
String tblAlias = tableAliasMap.get(distDcol.getTable());
@@ -228,30 +239,22 @@ public class JoinedFlatTable {
}
}
- private static void appendWhereStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
- if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) {
- return;//TODO: for now only cube segments support filter and partition
- }
- CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) intermediateTableDesc;
-
+ private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
boolean hasCondition = false;
StringBuilder whereBuilder = new StringBuilder();
whereBuilder.append("WHERE");
- CubeDesc cubeDesc = desc.getCubeDesc();
- DataModelDesc model = cubeDesc.getModel();
+ DataModelDesc model = flatDesc.getDataModel();
if (model.getFilterCondition() != null && model.getFilterCondition().equals("") == false) {
whereBuilder.append(" (").append(model.getFilterCondition()).append(") ");
hasCondition = true;
}
- CubeSegment cubeSegment = desc.getCubeSegment();
-
- if (null != cubeSegment) {
- PartitionDesc partDesc = model.getPartitionDesc();
- long dateStart = cubeSegment.getDateRangeStart();
- long dateEnd = cubeSegment.getDateRangeEnd();
+ PartitionDesc partDesc = model.getPartitionDesc();
+ if (partDesc != null && partDesc.getPartitionDateColumn() != null) {
+ long dateStart = flatDesc.getSourceOffsetStart();
+ long dateEnd = flatDesc.getSourceOffsetEnd();
if (!(dateStart == 0 && dateEnd == Long.MAX_VALUE)) {
whereBuilder.append(hasCondition ? " AND (" : " (");
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
index 697d392..17a7178 100644
--- a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
@@ -51,7 +51,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
this.createTestMetadata();
cube = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready");
cubeSegment = cube.getSegments().get(0);
- intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment);
+ intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
}
@After
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
index 5f39049..7f5edfe 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
@@ -270,6 +270,7 @@ public class DataModelDesc extends RootPersistentEntity {
if (dimTable == null) {
throw new IllegalStateException("Table " + lookup.getTable() + " does not exist for " + this);
}
+ lookup.setTableDesc(dimTable);
this.lookupTableDescs.add(dimTable);
JoinDesc join = lookup.getJoin();
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index 5088c9c..55ea71f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -26,9 +26,13 @@ public interface IJoinedFlatTableDesc {
String getTableName();
- List<IntermediateColumnDesc> getColumnList();
-
DataModelDesc getDataModel();
+
+ List<TblColRef> getAllColumns();
- List<JoinDesc> getUsedJoinsSet();
+ long getSourceOffsetStart();
+
+ long getSourceOffsetEnd();
+
+ TblColRef getDistributedBy();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IntermediateColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IntermediateColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IntermediateColumnDesc.java
deleted file mode 100644
index 10a3795..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IntermediateColumnDesc.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.model;
-
-/**
- */
-public class IntermediateColumnDesc {
- private String id;
- private TblColRef colRef;
-
- public IntermediateColumnDesc(String id, TblColRef colRef) {
- this.id = id;
- this.colRef = colRef;
- }
-
- public String getId() {
- return id;
- }
-
- public TblColRef getColRef() {
- return this.colRef;
- }
-
- public String getColumnName() {
- return colRef.getName();
- }
-
- public String getDataType() {
- return colRef.getDatatype();
- }
-
- public String getTableName() {
- return colRef.getTable();
- }
-
- public boolean isSameAs(String tableName, String columnName) {
- return colRef.isSameAs(tableName, columnName);
- }
-
- public String getCanonicalName() {
- return colRef.getCanonicalName();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-metadata/src/main/java/org/apache/kylin/metadata/model/LookupDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/LookupDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/LookupDesc.java
index 057a1e4..c85612a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/LookupDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/LookupDesc.java
@@ -30,6 +30,8 @@ public class LookupDesc {
@JsonProperty("join")
private JoinDesc join;
+
+ private TableDesc tableDesc;
public String getTable() {
return table;
@@ -47,4 +49,14 @@ public class LookupDesc {
this.join = join;
}
+ public TableDesc getTableDesc() {
+ return tableDesc;
+ }
+
+ void setTableDesc(TableDesc tableDesc) {
+ this.tableDesc = tableDesc;
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index b504dbf..289cd48 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -34,12 +34,10 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
private static final Logger logger = LoggerFactory.getLogger(BatchMergeJobBuilder2.class);
private final IMROutput2.IMRBatchMergeOutputSide2 outputSide;
- private final IMRInput.IMRBatchMergeInputSide inputSide;
public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
super(mergeSegment, submitter);
this.outputSide = MRUtil.getBatchMergeOutputSide2(seg);
- this.inputSide = MRUtil.getBatchMergeInputSide(seg);
}
public CubingJob build() {
@@ -57,7 +55,6 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
}
// Phase 1: Merge Dictionary
- inputSide.addStepPhase1_MergeDictionary(result);
result.addTask(createMergeDictionaryStep(mergingSegmentIds));
result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
outputSide.addStepPhase1_MergeDictionary(result);
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 6e01877..582052f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -19,8 +19,8 @@
package org.apache.kylin.engine.mr;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TableDesc;
/**
@@ -29,14 +29,11 @@ import org.apache.kylin.metadata.model.TableDesc;
public interface IMRInput {
/** Return a helper to participate in batch cubing job flow. */
- public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg);
+ public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
/** Return an InputFormat that reads from specified table. */
public IMRTableInputFormat getTableInputFormat(TableDesc table);
- /** Return a helper to participate in batch cubing merge job flow. */
- public IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg);
-
/**
* Utility that configures mapper to read from a table.
*/
@@ -70,10 +67,4 @@ public interface IMRInput {
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
}
- public interface IMRBatchMergeInputSide {
-
- /** Add step that executes before merge dictionary and before merge cube. */
- public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
-
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 14fdd93..877358b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -37,7 +37,7 @@ import org.apache.kylin.storage.StorageFactory;
public class MRUtil {
public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
- return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(seg);
+ return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(seg.getJoinedFlatTableDesc());
}
public static IMRTableInputFormat getTableInputFormat(String tableName) {
@@ -68,10 +68,6 @@ public class MRUtil {
return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
}
- public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
- return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
- }
-
// use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale
// Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe
public static int runMRJob(Tool tool, String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 85e3cc7..4786505 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -102,7 +102,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
- intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment);
+ intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
bytesSplitter = new BytesSplitter(200, 16384);
rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index a91d333..20259cb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -72,7 +72,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
- intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+ intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc);
dictionaryColumnIndex = new int[factDictCols.size()];
for (int i = 0; i < factDictCols.size(); i++) {
TblColRef colRef = factDictCols.get(i);
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 69629b2..3ccbcc8 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -177,7 +177,7 @@ public class SparkCubing extends AbstractApplication {
final String[] columns = intermediateTable.columns();
final CubeDesc cubeDesc = cubeInstance.getDescriptor();
final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
- final CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+ final CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc);
final List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns();
final long start = System.currentTimeMillis();
final RowKeyDesc rowKey = cubeDesc.getRowkey();
@@ -248,7 +248,7 @@ public class SparkCubing extends AbstractApplication {
zeroValue.put(id, new HyperLogLogPlusCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
}
- CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+ CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc);
final int[] rowKeyColumnIndexes = flatTableDesc.getRowKeyColumnIndexes();
final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
index 169daa4..87b222e 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
@@ -149,8 +149,8 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
}
static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException {
- CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null);
- int nColumns = flatTableDesc.getColumnList().size();
+ CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor());
+ int nColumns = flatTableDesc.getAllColumns().size();
@SuppressWarnings("unchecked")
Set<String>[] distinctSets = new Set[nColumns];
@@ -190,8 +190,8 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
CubeDesc desc = cube.getDescriptor();
- CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
- int nColumns = flatTableDesc.getColumnList().size();
+ CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc);
+ int nColumns = flatTableDesc.getAllColumns().size();
List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns();
for (int c = 0; c < columns.size(); c++) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 7932211..4e56f74 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -134,9 +134,8 @@ public class CubeController extends BasicController {
@ResponseBody
public GeneralResponse getSql(@PathVariable String cubeName, @PathVariable String segmentName) {
CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
- CubeDesc cubeDesc = cube.getDescriptor();
CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.READY);
- CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, cubeSegment);
+ CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
String sql = JoinedFlatTable.generateSelectDataStatement(flatTableDesc);
GeneralResponse repsonse = new GeneralResponse();
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 9ec0f02..e3d7879 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -28,9 +28,6 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
@@ -56,8 +53,8 @@ import com.google.common.collect.Sets;
public class HiveMRInput implements IMRInput {
@Override
- public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
- return new BatchCubingInputSide(seg);
+ public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ return new BatchCubingInputSide(flatDesc);
}
@Override
@@ -65,16 +62,6 @@ public class HiveMRInput implements IMRInput {
return new HiveTableInputFormat(table.getIdentity());
}
- @Override
- public IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
- return new IMRBatchMergeInputSide() {
- @Override
- public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
- // doing nothing
- }
- };
- }
-
public static class HiveTableInputFormat implements IMRTableInputFormat {
final String dbName;
final String tableName;
@@ -111,14 +98,12 @@ public class HiveMRInput implements IMRInput {
public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
final JobEngineConfig conf;
- final CubeSegment seg;
- final IJoinedFlatTableDesc flatHiveTableDesc;
+ final IJoinedFlatTableDesc flatDesc;
String hiveViewIntermediateTables = "";
- public BatchCubingInputSide(CubeSegment seg) {
+ public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
- this.seg = seg;
- this.flatHiveTableDesc = seg.getJoinedFlatTableDesc();
+ this.flatDesc = flatDesc;
}
@Override
@@ -127,8 +112,8 @@ public class HiveMRInput implements IMRInput {
final String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()) + "/row_count";
- jobFlow.addTask(createCountHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId(), rowCountOutputDir));
- jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId(), cubeName, rowCountOutputDir));
+ jobFlow.addTask(createCountHiveTableStep(conf, flatDesc, jobFlow.getId(), rowCountOutputDir));
+ jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName, rowCountOutputDir));
AbstractExecutable task = createLookupHiveViewMaterializationStep(jobFlow.getId());
if (task != null) {
jobFlow.addTask(task);
@@ -155,13 +140,10 @@ public class HiveMRInput implements IMRInput {
HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- CubeManager cubeMgr = CubeManager.getInstance(kylinConfig);
- String cubeName = seg.getRealization().getName();
- CubeDesc cubeDesc = cubeMgr.getCube(cubeName).getDescriptor();
MetadataManager metadataManager = MetadataManager.getInstance(kylinConfig);
final Set<TableDesc> lookupViewsTables = Sets.newHashSet();
- for (LookupDesc lookupDesc : cubeDesc.getModel().getLookups()) {
+ for (LookupDesc lookupDesc : flatDesc.getDataModel().getLookups()) {
TableDesc tableDesc = metadataManager.getTableDesc(lookupDesc.getTable());
if (TableDesc.TABLE_TYPE_VIRTUAL_VIEW.equalsIgnoreCase(tableDesc.getTableType())) {
lookupViewsTables.add(tableDesc);
@@ -216,7 +198,7 @@ public class HiveMRInput implements IMRInput {
GarbageCollectionStep step = new GarbageCollectionStep();
step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
step.setIntermediateTableIdentity(getIntermediateTableIdentity());
- step.setExternalDataPath(JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId())));
+ step.setExternalDataPath(JoinedFlatTable.getTableDir(flatDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId())));
step.setHiveViewIntermediateTableIdentities(hiveViewIntermediateTables);
jobFlow.addTask(step);
}
@@ -227,7 +209,7 @@ public class HiveMRInput implements IMRInput {
}
private String getIntermediateTableIdentity() {
- return conf.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatHiveTableDesc.getTableName();
+ return conf.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatDesc.getTableName();
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index fe3fe0a..e055d9e 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -40,8 +40,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import javax.annotation.Nullable;
-
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.StreamingBatch;
@@ -52,7 +50,6 @@ import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.engine.streaming.IStreamingInput;
import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.engine.streaming.StreamingManager;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
@@ -62,7 +59,6 @@ import org.apache.kylin.source.kafka.util.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
import com.google.common.collect.Lists;
import kafka.cluster.Broker;
@@ -94,13 +90,7 @@ public class KafkaStreamingInput implements IStreamingInput {
try {
final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming);
- List<TblColRef> columns = Lists.transform(new CubeJoinedFlatTableDesc(cube.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
- @Nullable
- @Override
- public TblColRef apply(IntermediateColumnDesc input) {
- return input.getColRef();
- }
- });
+ List<TblColRef> columns = new CubeJoinedFlatTableDesc(cube.getDescriptor()).getAllColumns();
final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
final ExecutorService executorService = Executors.newCachedThreadPool();