You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/09/03 12:45:56 UTC
[02/50] [abbrv] kylin git commit: KYLIN-1966 futher refactor,
decouple flat table and cube
KYLIN-1966 futher refactor, decouple flat table and cube
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/29ba46be
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/29ba46be
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/29ba46be
Branch: refs/heads/1.5.x-CDH5.7
Commit: 29ba46beee3c9330c54a23265fdc183333f750d8
Parents: 1b34c38
Author: Yang Li <li...@apache.org>
Authored: Mon Aug 22 08:37:12 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Aug 22 08:37:12 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeSegment.java | 8 +-
.../inmemcubing/AbstractInMemCubeBuilder.java | 7 +-
.../cube/inmemcubing/DoggedCubeBuilder.java | 7 +-
.../cube/inmemcubing/InMemCubeBuilder.java | 7 +-
.../InMemCubeBuilderInputConverter.java | 15 +-
.../org/apache/kylin/cube/model/CubeDesc.java | 3 +-
.../cube/model/CubeJoinedFlatTableDesc.java | 42 +-----
.../cube/model/CubeJoinedFlatTableEnrich.java | 140 +++++++++++++++++++
.../org/apache/kylin/cube/util/CubingUtils.java | 7 +-
.../org/apache/kylin/engine/EngineFactory.java | 12 ++
.../apache/kylin/engine/IBatchCubingEngine.java | 8 ++
.../apache/kylin/job/JoinedFlatTableTest.java | 13 +-
.../metadata/model/IJoinedFlatTableDesc.java | 2 +
.../kylin/engine/mr/MRBatchCubingEngine.java | 13 ++
.../kylin/engine/mr/MRBatchCubingEngine2.java | 13 ++
.../java/org/apache/kylin/engine/mr/MRUtil.java | 5 +-
.../engine/mr/steps/BaseCuboidMapperBase.java | 7 +-
.../mr/steps/FactDistinctColumnsMapperBase.java | 7 +-
.../engine/mr/steps/InMemCuboidMapper.java | 5 +-
.../engine/spark/SparkBatchCubingEngine.java | 12 ++
.../apache/kylin/engine/spark/SparkCubing.java | 29 ++--
.../engine/spark/SparkCubingJobBuilder.java | 5 +-
engine-streaming/pom.xml | 4 +
.../streaming/cube/StreamingCubeBuilder.java | 9 +-
.../ITDoggedCubeBuilderStressTest.java | 5 +-
.../inmemcubing/ITDoggedCubeBuilderTest.java | 7 +-
.../inmemcubing/ITInMemCubeBuilderTest.java | 19 +--
.../kylin/rest/controller/CubeController.java | 5 +-
.../kylin/source/kafka/KafkaStreamingInput.java | 4 +-
29 files changed, 310 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 0797ab3..aaa88f1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -35,18 +35,16 @@ import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.IRealization;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonBackReference;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -530,10 +528,6 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable {
return cubeInstance;
}
- public IJoinedFlatTableDesc getJoinedFlatTableDesc() {
- return new CubeJoinedFlatTableDesc(this);
- }
-
public String getIndexPath() {
return indexPath;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
index d417d11..0bfaab3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
@@ -28,6 +28,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.GridTable;
import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,18 +40,22 @@ abstract public class AbstractInMemCubeBuilder {
private static Logger logger = LoggerFactory.getLogger(AbstractInMemCubeBuilder.class);
+ final protected IJoinedFlatTableDesc flatDesc;
final protected CubeDesc cubeDesc;
final protected Map<TblColRef, Dictionary<String>> dictionaryMap;
protected int taskThreadCount = 4;
protected int reserveMemoryMB = 100;
- public AbstractInMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+ public AbstractInMemCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+ if (flatDesc == null)
+ throw new NullPointerException();
if (cubeDesc == null)
throw new NullPointerException();
if (dictionaryMap == null)
throw new IllegalArgumentException("dictionary cannot be null");
+ this.flatDesc = flatDesc;
this.cubeDesc = cubeDesc;
this.dictionaryMap = dictionaryMap;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index 15f2241..69f1f82 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -38,6 +38,7 @@ import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequestBuilder;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,8 +55,8 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
private int splitRowThreshold = Integer.MAX_VALUE;
private int unitRows = 1000;
- public DoggedCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
- super(cubeDesc, dictionaryMap);
+ public DoggedCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+ super(cubeDesc, flatDesc, dictionaryMap);
// check memory more often if a single row is big
if (cubeDesc.hasMemoryHungryMeasures())
@@ -270,7 +271,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
RuntimeException exception;
public SplitThread() {
- this.builder = new InMemCubeBuilder(cubeDesc, dictionaryMap);
+ this.builder = new InMemCubeBuilder(cubeDesc, flatDesc, dictionaryMap);
this.builder.setConcurrentThreads(taskThreadCount);
this.builder.setReserveMemoryMB(reserveMemoryMB);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 36d1296..e4908b8 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -48,6 +48,7 @@ import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.measure.topn.Counter;
import org.apache.kylin.measure.topn.TopNCounter;
import org.apache.kylin.metadata.datatype.DoubleMutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
@@ -86,8 +87,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private Object[] totalSumForSanityCheck;
private ICuboidCollector resultCollector;
- public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
- super(cubeDesc, dictionaryMap);
+ public InMemCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+ super(cubeDesc, flatDesc, dictionaryMap);
this.cuboidScheduler = new CuboidScheduler(cubeDesc);
this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
this.totalCuboidCount = cuboidScheduler.getCuboidCount();
@@ -514,7 +515,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
this.info = info;
this.input = input;
this.record = new GTRecord(info);
- this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc, dictionaryMap, info);
+ this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc, flatDesc, dictionaryMap, info);
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
index 607f6bb..ab44f63 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
@@ -23,11 +23,12 @@ import java.util.Map;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.measure.MeasureIngester;
import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.ParameterDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -40,7 +41,7 @@ public class InMemCubeBuilderInputConverter {
public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
- private final CubeJoinedFlatTableDesc intermediateTableDesc;
+ private final CubeJoinedFlatTableEnrich flatDesc;
private final MeasureDesc[] measureDescs;
private final MeasureIngester<?>[] measureIngesters;
private final int measureCount;
@@ -48,9 +49,9 @@ public class InMemCubeBuilderInputConverter {
private final GTInfo gtInfo;
protected List<byte[]> nullBytes;
- public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap, GTInfo gtInfo) {
+ public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap, GTInfo gtInfo) {
this.gtInfo = gtInfo;
- this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc);
+ this.flatDesc = new CubeJoinedFlatTableEnrich(flatDesc, cubeDesc);
this.measureCount = cubeDesc.getMeasures().size();
this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
this.measureIngesters = MeasureIngester.create(cubeDesc.getMeasures());
@@ -74,11 +75,11 @@ public class InMemCubeBuilderInputConverter {
}
private Object[] buildKey(List<String> row) {
- int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
+ int keySize = flatDesc.getRowKeyColumnIndexes().length;
Object[] key = new Object[keySize];
for (int i = 0; i < keySize; i++) {
- key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+ key[i] = row.get(flatDesc.getRowKeyColumnIndexes()[i]);
if (key[i] != null && isNull(Bytes.toBytes((String) key[i]))) {
key[i] = null;
}
@@ -98,7 +99,7 @@ public class InMemCubeBuilderInputConverter {
private Object buildValueOf(int idxOfMeasure, List<String> row) {
MeasureDesc measure = measureDescs[idxOfMeasure];
FunctionDesc function = measure.getFunction();
- int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure];
+ int[] colIdxOnFlatTable = flatDesc.getMeasureColumnIndexes()[idxOfMeasure];
int paramCount = function.getParameterCount();
String[] inputToMeasure = new String[paramCount];
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 2c83972..30290cd 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -78,7 +78,7 @@ import com.google.common.collect.Sets;
*/
@SuppressWarnings("serial")
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class CubeDesc extends RootPersistentEntity {
+public class CubeDesc extends RootPersistentEntity implements IEngineAware {
private static final Logger logger = LoggerFactory.getLogger(CubeDesc.class);
public static class CannotFilterExtendedColumnException extends RuntimeException {
@@ -1004,6 +1004,7 @@ public class CubeDesc extends RootPersistentEntity {
this.storageType = storageType;
}
+ @Override
public int getEngineType() {
return engineType;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index 1f9d772..6aeb617 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -23,7 +23,6 @@ import java.util.Map;
import org.apache.kylin.common.util.BytesSplitter;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
@@ -42,8 +41,6 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
private final CubeSegment cubeSegment;
private int columnCount;
- private int[] rowKeyColumnIndexes; // the column index on flat table
- private int[][] measureColumnIndexes; // [i] is the i.th measure related column index on flat table
private List<TblColRef> columnList = Lists.newArrayList();
private Map<TblColRef, Integer> columnIndexMap;
@@ -65,9 +62,6 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
// check what columns from hive tables are required, and index them
private void parseCubeDesc() {
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-
if (cubeSegment == null) {
this.tableName = "kylin_intermediate_" + cubeDesc.getName();
} else {
@@ -81,34 +75,15 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
columnIndex++;
}
- // build index for rowkey columns
- List<TblColRef> cuboidColumns = baseCuboid.getColumns();
- int rowkeyColCount = cubeDesc.getRowkey().getRowKeyColumns().length;
- rowKeyColumnIndexes = new int[rowkeyColCount];
- for (int i = 0; i < rowkeyColCount; i++) {
- TblColRef col = cuboidColumns.get(i);
- Integer dimIdx = columnIndexMap.get(col);
- if (dimIdx == null) {
- throw new RuntimeException("Can't find column " + col);
- }
- rowKeyColumnIndexes[i] = dimIdx;
- }
-
List<MeasureDesc> measures = cubeDesc.getMeasures();
int measureSize = measures.size();
- measureColumnIndexes = new int[measureSize][];
for (int i = 0; i < measureSize; i++) {
FunctionDesc func = measures.get(i).getFunction();
List<TblColRef> colRefs = func.getParameter().getColRefs();
- if (colRefs == null) {
- measureColumnIndexes[i] = null;
- } else {
- measureColumnIndexes[i] = new int[colRefs.size()];
+ if (colRefs != null) {
for (int j = 0; j < colRefs.size(); j++) {
TblColRef c = colRefs.get(j);
- measureColumnIndexes[i][j] = columnList.indexOf(c);
- if (measureColumnIndexes[i][j] < 0) {
- measureColumnIndexes[i][j] = columnIndex;
+ if (columnList.indexOf(c) < 0) {
columnIndexMap.put(c, columnIndex);
columnList.add(c);
columnIndex++;
@@ -148,18 +123,6 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
// TODO: check data types here
}
- public CubeDesc getCubeDesc() {
- return cubeDesc;
- }
-
- public int[] getRowKeyColumnIndexes() {
- return rowKeyColumnIndexes;
- }
-
- public int[][] getMeasureColumnIndexes() {
- return measureColumnIndexes;
- }
-
@Override
public String getTableName() {
return tableName;
@@ -175,6 +138,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
return cubeDesc.getModel();
}
+ @Override
public int getColumnIndex(TblColRef colRef) {
Integer index = columnIndexMap.get(colRef);
if (index == null)
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
new file mode 100644
index 0000000..5212859
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.model;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.BytesSplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * An enrich of IJoinedFlatTableDesc for cubes
+ */
+public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc {
+
+ private CubeDesc cubeDesc;
+ private IJoinedFlatTableDesc flatDesc;
+ private int[] rowKeyColumnIndexes; // the column index on flat table
+ private int[][] measureColumnIndexes; // [i] is the i.th measure related column index on flat table
+
+ public CubeJoinedFlatTableEnrich(IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
+ // != works due to object cache
+ if (cubeDesc.getModel() != flatDesc.getDataModel())
+ throw new IllegalArgumentException();
+
+ this.cubeDesc = cubeDesc;
+ this.flatDesc = flatDesc;
+ parseCubeDesc();
+ }
+
+ // check what columns from hive tables are required, and index them
+ private void parseCubeDesc() {
+ long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+
+ // build index for rowkey columns
+ List<TblColRef> cuboidColumns = baseCuboid.getColumns();
+ int rowkeyColCount = cubeDesc.getRowkey().getRowKeyColumns().length;
+ rowKeyColumnIndexes = new int[rowkeyColCount];
+ for (int i = 0; i < rowkeyColCount; i++) {
+ TblColRef col = cuboidColumns.get(i);
+ rowKeyColumnIndexes[i] = flatDesc.getColumnIndex(col);
+ }
+
+ List<MeasureDesc> measures = cubeDesc.getMeasures();
+ int measureSize = measures.size();
+ measureColumnIndexes = new int[measureSize][];
+ for (int i = 0; i < measureSize; i++) {
+ FunctionDesc func = measures.get(i).getFunction();
+ List<TblColRef> colRefs = func.getParameter().getColRefs();
+ if (colRefs == null) {
+ measureColumnIndexes[i] = null;
+ } else {
+ measureColumnIndexes[i] = new int[colRefs.size()];
+ for (int j = 0; j < colRefs.size(); j++) {
+ TblColRef c = colRefs.get(j);
+ measureColumnIndexes[i][j] = flatDesc.getColumnIndex(c);
+ }
+ }
+ }
+ }
+
+ // sanity check the input record (in bytes) matches what's expected
+ public void sanityCheck(BytesSplitter bytesSplitter) {
+ int columnCount = flatDesc.getAllColumns().size();
+ if (columnCount != bytesSplitter.getBufferSize()) {
+ throw new IllegalArgumentException("Expect " + columnCount + " columns, but see " + bytesSplitter.getBufferSize() + " -- " + bytesSplitter);
+ }
+
+ // TODO: check data types here
+ }
+
+ public CubeDesc getCubeDesc() {
+ return cubeDesc;
+ }
+
+ public int[] getRowKeyColumnIndexes() {
+ return rowKeyColumnIndexes;
+ }
+
+ public int[][] getMeasureColumnIndexes() {
+ return measureColumnIndexes;
+ }
+
+ @Override
+ public String getTableName() {
+ return flatDesc.getTableName();
+ }
+
+ @Override
+ public List<TblColRef> getAllColumns() {
+ return flatDesc.getAllColumns();
+ }
+
+ @Override
+ public DataModelDesc getDataModel() {
+ return flatDesc.getDataModel();
+ }
+
+ @Override
+ public int getColumnIndex(TblColRef colRef) {
+ return flatDesc.getColumnIndex(colRef);
+ }
+
+ @Override
+ public long getSourceOffsetStart() {
+ return flatDesc.getSourceOffsetStart();
+ }
+
+ @Override
+ public long getSourceOffsetEnd() {
+ return flatDesc.getSourceOffsetEnd();
+ }
+
+ @Override
+ public TblColRef getDistributedBy() {
+ return flatDesc.getDistributedBy();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
index b7f79e1..aa4610f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
@@ -48,12 +48,13 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.ReadableTable;
import org.slf4j.Logger;
@@ -74,8 +75,8 @@ public class CubingUtils {
private static Logger logger = LoggerFactory.getLogger(CubingUtils.class);
- public static Map<Long, HyperLogLogPlusCounter> sampling(CubeDesc cubeDesc, Iterable<List<String>> streams) {
- CubeJoinedFlatTableDesc flatDesc = new CubeJoinedFlatTableDesc(cubeDesc);
+ public static Map<Long, HyperLogLogPlusCounter> sampling(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDescIn, Iterable<List<String>> streams) {
+ final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(flatDescIn, cubeDesc);
final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
final List<Long> allCuboidIds = new CuboidScheduler(cubeDesc).getAllCuboidIds();
final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
index 79d2f81..7044a3e 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
@@ -23,8 +23,10 @@ import java.util.Map;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ImplementationSwitch;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.model.IEngineAware;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
public class EngineFactory {
@@ -46,6 +48,16 @@ public class EngineFactory {
return streamingEngines.get(aware.getEngineType());
}
+ /** Mark deprecated to indicate for test purpose only */
+ @Deprecated
+ public static IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) {
+ return batchEngine(cubeDesc).getJoinedFlatTableDesc(cubeDesc);
+ }
+
+ public static IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) {
+ return batchEngine(newSegment).getJoinedFlatTableDesc(newSegment);
+ }
+
/** Build a new cube segment, typically its time range appends to the end of current cube. */
public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter);
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
index 556893c..754dbde 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
@@ -19,9 +19,17 @@
package org.apache.kylin.engine;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
public interface IBatchCubingEngine {
+
+ /** Mark deprecated to indicate for test purpose only */
+ @Deprecated
+ public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc);
+
+ public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment);
/** Build a new cube segment, typically its time range appends to the end of current cube. */
public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter);
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
index 17a7178..0faf22a 100644
--- a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
@@ -27,8 +27,9 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@@ -42,7 +43,7 @@ import org.junit.Test;
public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
CubeInstance cube = null;
- CubeJoinedFlatTableDesc intermediateTableDesc = null;
+ IJoinedFlatTableDesc flatTableDesc = null;
String fakeJobUUID = "abc-def";
CubeSegment cubeSegment = null;
@@ -51,7 +52,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
this.createTestMetadata();
cube = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready");
cubeSegment = cube.getSegments().get(0);
- intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
+ flatTableDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment);
}
@After
@@ -61,7 +62,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
@Test
public void testGenCreateTableDDL() {
- String ddl = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, "/tmp");
+ String ddl = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, "/tmp");
System.out.println(ddl);
System.out.println("The length for the ddl is " + ddl.length());
@@ -69,14 +70,14 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
@Test
public void testGenDropTableDDL() {
- String ddl = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc);
+ String ddl = JoinedFlatTable.generateDropTableStatement(flatTableDesc);
System.out.println(ddl);
assertEquals(101, ddl.length());
}
@Test
public void testGenerateInsertSql() throws IOException {
- String sqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
+ String sqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
System.out.println(sqls);
int length = sqls.length();
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index 55ea71f..f3a4107 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -29,6 +29,8 @@ public interface IJoinedFlatTableDesc {
DataModelDesc getDataModel();
List<TblColRef> getAllColumns();
+
+ int getColumnIndex(TblColRef colRef);
long getSourceOffsetStart();
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
index 5198db1..681c545 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
@@ -19,12 +19,25 @@
package org.apache.kylin.engine.mr;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.engine.IBatchCubingEngine;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
public class MRBatchCubingEngine implements IBatchCubingEngine {
@Override
+ public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) {
+ return new CubeJoinedFlatTableDesc(cubeDesc);
+ }
+
+ @Override
+ public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) {
+ return new CubeJoinedFlatTableDesc(newSegment);
+ }
+
+ @Override
public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
return new BatchCubingJobBuilder(newSegment, submitter).build();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
index b3af7d7..d9fdcb9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
@@ -19,12 +19,25 @@
package org.apache.kylin.engine.mr;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.engine.IBatchCubingEngine;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
public class MRBatchCubingEngine2 implements IBatchCubingEngine {
@Override
+ public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) {
+ return new CubeJoinedFlatTableDesc(cubeDesc);
+ }
+
+ @Override
+ public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) {
+ return new CubeJoinedFlatTableDesc(newSegment);
+ }
+
+ @Override
public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
return new BatchCubingJobBuilder2(newSegment, submitter).build();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 877358b..2c3b77f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
@@ -30,6 +31,7 @@ import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.SourceFactory;
import org.apache.kylin.storage.StorageFactory;
@@ -37,7 +39,8 @@ import org.apache.kylin.storage.StorageFactory;
public class MRUtil {
public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
- return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(seg.getJoinedFlatTableDesc());
+ IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+ return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc);
}
public static IMRTableInputFormat getTableInputFormat(String tableName) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 4786505..588b087 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -37,7 +37,8 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
@@ -65,7 +66,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
protected CubeDesc cubeDesc;
protected CubeSegment cubeSegment;
protected List<byte[]> nullBytes;
- protected CubeJoinedFlatTableDesc intermediateTableDesc;
+ protected CubeJoinedFlatTableEnrich intermediateTableDesc;
protected String intermediateTableRowDelimiter;
protected byte byteRowDelimiter;
protected int counter;
@@ -102,7 +103,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
- intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
+ intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
bytesSplitter = new BytesSplitter(200, 16384);
rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 20259cb..3fa966d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -30,7 +30,8 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.MRUtil;
@@ -54,7 +55,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
protected Text outputValue = new Text();
protected int errorRecordCounter = 0;
- protected CubeJoinedFlatTableDesc intermediateTableDesc;
+ protected CubeJoinedFlatTableEnrich intermediateTableDesc;
protected int[] dictionaryColumnIndex;
@Override
@@ -72,7 +73,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
- intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc);
+ intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
dictionaryColumnIndex = new int[factDictCols.size()];
for (int i = 0; i < factDictCols.size(); i++) {
TblColRef colRef = factDictCols.get(i);
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 24c37ce..1d90d01 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -39,12 +39,14 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.ByteArrayWritable;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,6 +81,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
cubeSegment = cube.getSegmentById(segmentID);
flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
+ IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment);
Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap();
@@ -92,7 +95,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
dictionaryMap.put(col, cubeSegment.getDictionary(col));
}
- DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap);
cubeBuilder.setReserveMemoryMB(calculateReserveMB(context.getConfiguration()));
ExecutorService executorService = Executors.newSingleThreadExecutor();
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java
index 4eec233..08ed207 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java
@@ -18,8 +18,10 @@
package org.apache.kylin.engine.spark;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.IBatchCubingEngine;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
/**
*/
@@ -52,4 +54,14 @@ public class SparkBatchCubingEngine implements IBatchCubingEngine {
public Class<?> getStorageInterface() {
return null;
}
+
+ @Override
+ public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 3ccbcc8..5c2def2 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -69,12 +69,13 @@ import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.cube.model.RowKeyDesc;
import org.apache.kylin.cube.util.CubingUtils;
import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
+import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter;
@@ -84,6 +85,7 @@ import org.apache.kylin.measure.BufferedMeasureEncoder;
import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
@@ -175,9 +177,10 @@ public class SparkCubing extends AbstractApplication {
final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
final String[] columns = intermediateTable.columns();
+ final CubeSegment seg = cubeInstance.getSegmentById(segmentId);
final CubeDesc cubeDesc = cubeInstance.getDescriptor();
final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
- final CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc);
+ final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(seg), cubeDesc);
final List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns();
final long start = System.currentTimeMillis();
final RowKeyDesc rowKey = cubeDesc.getRowkey();
@@ -186,7 +189,7 @@ public class SparkCubing extends AbstractApplication {
if (!rowKey.isUseDictionary(col)) {
continue;
}
- final int rowKeyColumnIndex = flatTableDesc.getRowKeyColumnIndexes()[i];
+ final int rowKeyColumnIndex = flatDesc.getRowKeyColumnIndexes()[i];
tblColRefMap.put(rowKeyColumnIndex, col);
}
@@ -228,18 +231,19 @@ public class SparkCubing extends AbstractApplication {
})));
}
final long end = System.currentTimeMillis();
- CubingUtils.writeDictionary(cubeInstance.getSegmentById(segmentId), dictionaryMap, start, end);
+ CubingUtils.writeDictionary(seg, dictionaryMap, start, end);
try {
CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
- cubeBuilder.setToUpdateSegs(cubeInstance.getSegmentById(segmentId));
+ cubeBuilder.setToUpdateSegs(seg);
cubeManager.updateCube(cubeBuilder);
} catch (IOException e) {
throw new RuntimeException("Failed to deal with the request: " + e.getLocalizedMessage());
}
}
- private Map<Long, HyperLogLogPlusCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName) throws Exception {
+ private Map<Long, HyperLogLogPlusCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName, String segmentId) throws Exception {
CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+ CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
CubeDesc cubeDesc = cubeInstance.getDescriptor();
CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
List<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds();
@@ -248,9 +252,9 @@ public class SparkCubing extends AbstractApplication {
zeroValue.put(id, new HyperLogLogPlusCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
}
- CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc);
+ CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
- final int[] rowKeyColumnIndexes = flatTableDesc.getRowKeyColumnIndexes();
+ final int[] rowKeyColumnIndexes = flatDesc.getRowKeyColumnIndexes();
final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
@@ -318,9 +322,7 @@ public class SparkCubing extends AbstractApplication {
return samplingResult;
}
- /*
- return hfile location
- */
+ /** return hfile location */
private String build(JavaRDD<List<String>> javaRDD, final String cubeName, final String segmentId, final byte[][] splitKeys) throws Exception {
CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
CubeDesc cubeDesc = cubeInstance.getDescriptor();
@@ -365,7 +367,8 @@ public class SparkCubing extends AbstractApplication {
LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue();
System.out.println("load properties finished");
- AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder(cubeInstance.getDescriptor(), dictionaryMap);
+ IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment);
+ AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder(cubeInstance.getDescriptor(), flatDesc, dictionaryMap);
final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cubeInstance.getSegmentById(segmentId), columnLengthMap));
Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter));
try {
@@ -611,7 +614,7 @@ public class SparkCubing extends AbstractApplication {
}
});
- final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(rowJavaRDD, cubeName);
+ final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(rowJavaRDD, cubeName, segmentId);
final byte[][] splitKeys = createHTable(cubeName, segmentId, samplingResult);
final String hfile = build(rowJavaRDD, cubeName, segmentId, splitKeys);
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
index 05246f4..edd9460 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
@@ -18,6 +18,7 @@
package org.apache.kylin.engine.spark;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.IMROutput2;
@@ -48,11 +49,11 @@ public class SparkCubingJobBuilder extends JobBuilderSupport {
}
public DefaultChainedExecutable build() {
- final CubingJob result = CubingJob.createBuildJob((CubeSegment) seg, submitter, config);
+ final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
final String jobId = result.getId();
inputSide.addStepPhase1_CreateFlatTable(result);
- final IJoinedFlatTableDesc joinedFlatTableDesc = seg.getJoinedFlatTableDesc();
+ final IJoinedFlatTableDesc joinedFlatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
final String tableName = joinedFlatTableDesc.getTableName();
logger.info("intermediate table:" + tableName);
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/engine-streaming/pom.xml b/engine-streaming/pom.xml
index e25a133..095871c 100644
--- a/engine-streaming/pom.xml
+++ b/engine-streaming/pom.xml
@@ -45,6 +45,10 @@
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-core-storage</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-job</artifactId>
+ </dependency>
<!-- Env & Test -->
<dependency>
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index 465a983..180f0b8 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -55,9 +55,11 @@ import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder;
import org.apache.kylin.cube.util.CubingUtils;
+import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.streaming.StreamingBatchBuilder;
import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
@@ -84,8 +86,10 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
try {
CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
+ final IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeInstance.getDescriptor());
+
LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>();
- InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), dictionaryMap);
+ InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), flatDesc, dictionaryMap);
final Future<?> future = Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, cuboidWriter));
processedRowCount = streamingBatch.getMessages().size();
for (StreamingMessage streamingMessage : streamingBatch.getMessages()) {
@@ -129,9 +133,10 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
public Map<Long, HyperLogLogPlusCounter> sampling(StreamingBatch streamingBatch) {
final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
+ final IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeInstance.getDescriptor());
long start = System.currentTimeMillis();
- final Map<Long, HyperLogLogPlusCounter> samplingResult = CubingUtils.sampling(cubeInstance.getDescriptor(), Lists.transform(streamingBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
+ final Map<Long, HyperLogLogPlusCounter> samplingResult = CubingUtils.sampling(cubeInstance.getDescriptor(), flatDesc, Lists.transform(streamingBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
@Nullable
@Override
public List<String> apply(@Nullable StreamingMessage input) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
index ef69d57..4d23979 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
@@ -31,7 +31,9 @@ import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -79,7 +81,8 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
ExecutorService executorService = Executors.newSingleThreadExecutor();
long randSeed = System.currentTimeMillis();
- DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
+ DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap);
doggedBuilder.setConcurrentThreads(THREADS);
{
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
index 8923744..8827dff 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
@@ -40,7 +40,9 @@ import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -87,7 +89,8 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
ExecutorService executorService = Executors.newSingleThreadExecutor();
long randSeed = System.currentTimeMillis();
- DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
+ DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap);
doggedBuilder.setConcurrentThreads(THREADS);
doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
FileRecordWriter doggedResult = new FileRecordWriter();
@@ -99,7 +102,7 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
doggedResult.close();
}
- InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap);
inmemBuilder.setConcurrentThreads(THREADS);
FileRecordWriter inmemResult = new FileRecordWriter();
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
index 87b222e..6612375 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
@@ -42,11 +42,13 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
+import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.junit.After;
@@ -109,7 +111,8 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
private void testBuildInner() throws Exception {
- InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
+ IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
+ InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap);
//DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
cubeBuilder.setConcurrentThreads(nThreads);
@@ -149,8 +152,8 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
}
static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException {
- CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor());
- int nColumns = flatTableDesc.getAllColumns().size();
+ IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
+ int nColumns = flatDesc.getAllColumns().size();
@SuppressWarnings("unchecked")
Set<String>[] distinctSets = new Set[nColumns];
@@ -190,15 +193,15 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
CubeDesc desc = cube.getDescriptor();
- CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc);
- int nColumns = flatTableDesc.getAllColumns().size();
+ CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(desc), desc);
+ int nColumns = flatDesc.getAllColumns().size();
List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns();
for (int c = 0; c < columns.size(); c++) {
TblColRef col = columns.get(c);
if (desc.getRowkey().isUseDictionary(col)) {
logger.info("Building dictionary for " + col);
- List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
+ List<byte[]> valueList = readValueList(flatTable, nColumns, flatDesc.getRowKeyColumnIndexes()[c]);
Dictionary<String> dict = DictionaryGenerator.buildDictionary(col.getType(), new IterableDictionaryValueEnumerator(valueList));
result.put(col, dict);
}
@@ -211,7 +214,7 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
if (dictCols.isEmpty())
continue;
- int[] flatTableIdx = flatTableDesc.getMeasureColumnIndexes()[measureIdx];
+ int[] flatTableIdx = flatDesc.getMeasureColumnIndexes()[measureIdx];
List<TblColRef> paramCols = func.getParameter().getColRefs();
for (int i = 0; i < paramCols.size(); i++) {
TblColRef col = paramCols.get(i);
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 4e56f74..57b0965 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -34,11 +34,12 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeBuildTypeEnum;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.dimension.DimensionEncodingFactory;
+import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.rest.exception.BadRequestException;
@@ -135,7 +136,7 @@ public class CubeController extends BasicController {
public GeneralResponse getSql(@PathVariable String cubeName, @PathVariable String segmentName) {
CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.READY);
- CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
+ IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment);
String sql = JoinedFlatTable.generateSelectDataStatement(flatTableDesc);
GeneralResponse repsonse = new GeneralResponse();
http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index e055d9e..c3bdb75 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -46,7 +46,7 @@ import org.apache.kylin.common.util.StreamingBatch;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.streaming.IStreamingInput;
import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.engine.streaming.StreamingManager;
@@ -90,7 +90,7 @@ public class KafkaStreamingInput implements IStreamingInput {
try {
final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming);
- List<TblColRef> columns = new CubeJoinedFlatTableDesc(cube.getDescriptor()).getAllColumns();
+ List<TblColRef> columns = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor()).getAllColumns();
final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
final ExecutorService executorService = Executors.newCachedThreadPool();