You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/23 10:25:05 UTC
[2/2] incubator-kylin git commit: KYLIN-942 Code review
KYLIN-942 Code review
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e62e0b3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e62e0b3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e62e0b3e
Branch: refs/heads/2.x-staging
Commit: e62e0b3ef19622c35b70840d652baf49ce97e1ac
Parents: b52c1bb
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Nov 20 11:24:47 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Nov 23 17:23:50 2015 +0800
----------------------------------------------------------------------
.../kylin/cube/CubeCapabilityChecker.java | 10 +-
.../cube/inmemcubing/InMemCubeBuilder.java | 15 +-
.../InMemCubeBuilderInputConverter.java | 14 +-
.../cube/inmemcubing/InMemCubeBuilderUtils.java | 10 +-
.../org/apache/kylin/cube/model/CubeDesc.java | 55 +------
.../model/validation/rule/FunctionRule.java | 2 +-
.../cube/inmemcubing/InMemCubeBuilderTest.java | 12 +-
.../apache/kylin/metadata/model/ColumnDesc.java | 1 +
.../kylin/metadata/model/DatabaseDesc.java | 1 +
.../kylin/metadata/model/FunctionDesc.java | 58 ++++++--
.../kylin/metadata/model/MeasureDesc.java | 12 ++
.../kylin/metadata/model/ParameterDesc.java | 41 ++---
.../kylin/storage/cache/StorageMockUtils.java | 2 -
.../engine/mr/steps/BaseCuboidMapperBase.java | 24 +--
.../mr/steps/MergeCuboidFromStorageMapper.java | 129 ++++++++--------
.../engine/mr/steps/MergeCuboidMapper.java | 144 +++++++++---------
.../localmeta/cube/test_kylin_cube_topn.json | 10 --
.../cube/test_kylin_cube_topn_left_join.json | 10 --
.../cube_desc/test_kylin_cube_topn_desc.json | 148 ------------------
.../test_kylin_cube_topn_left_join_desc.json | 149 -------------------
.../test_kylin_cube_without_slr_desc.json | 11 +-
...t_kylin_cube_without_slr_left_join_desc.json | 11 +-
.../kylin/invertedindex/model/IIDesc.java | 14 +-
.../invertedindex/model/IIKeyValueCodec.java | 1 -
.../kylin/query/relnode/OLAPAggregateRel.java | 2 +-
.../storage/hbase/cube/v1/CubeStorageQuery.java | 82 ++++------
.../cube/v1/SerializedHBaseTupleIterator.java | 22 ++-
.../storage/hbase/cube/v2/CubeStorageQuery.java | 90 +++++------
.../endpoint/EndpointTupleIterator.java | 1 -
.../storage/hbase/steps/RowValueDecoder.java | 12 ++
30 files changed, 369 insertions(+), 724 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index 3bb246a..343bf11 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -132,15 +132,15 @@ public class CubeCapabilityChecker {
for (MeasureDesc measure : cubeDesc.getMeasures()) {
if (measure.getFunction().isTopN()) {
List<TblColRef> cols = measure.getFunction().getParameter().getColRefs();
- TblColRef displayCol = cols.get(cols.size() - 1);
- if (digest.groupbyColumns.contains(displayCol)) {
- dimensionColumnsCopy.remove(displayCol);
+ TblColRef literalCol = cols.get(cols.size() - 1);
+ if (digest.groupbyColumns.contains(literalCol)) {
+ dimensionColumnsCopy.remove(literalCol);
if (isMatchedWithDimensions(dimensionColumnsCopy, cube)) {
- if (measure.getFunction().isCompatible(onlyFunction)) {
+ if (measure.getFunction().isTopNCompatibleSum(onlyFunction)) {
return true;
}
}
- dimensionColumnsCopy.add(displayCol);
+ dimensionColumnsCopy.add(literalCol);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index a179d70..e9d940a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -17,7 +17,12 @@
package org.apache.kylin.cube.inmemcubing;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -25,7 +30,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kylin.common.topn.Counter;
import org.apache.kylin.common.topn.TopNCounter;
-import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.common.util.Pair;
@@ -43,8 +47,6 @@ import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GridTable;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.metadata.measure.DoubleMutable;
-import org.apache.kylin.metadata.measure.LongMutable;
-import org.apache.kylin.metadata.measure.MeasureCodec;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
@@ -59,14 +61,12 @@ import com.google.common.collect.Lists;
public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
- private static final LongMutable ONE = new LongMutable(1l);
static final double BASE_CUBOID_CACHE_OVERSIZE_FACTOR = 0.1;
private final CuboidScheduler cuboidScheduler;
private final long baseCuboidId;
private final int totalCuboidCount;
private final CubeJoinedFlatTableDesc intermediateTableDesc;
- private final MeasureCodec measureCodec;
private final String[] metricsAggrFuncs;
private final MeasureDesc[] measureDescs;
private final int measureCount;
@@ -87,7 +87,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
this.totalCuboidCount = cuboidScheduler.getCuboidCount();
this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
- this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
this.measureCount = cubeDesc.getMeasures().size();
this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
@@ -510,7 +509,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
this.input = input;
this.record = new GTRecord(info);
this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc,
- InMemCubeBuilderUtils.createTopNDisplayColDictionaryMap(cubeDesc, intermediateTableDesc, dictionaryMap),
+ InMemCubeBuilderUtils.createTopNLiteralColDictionaryMap(cubeDesc, intermediateTableDesc, dictionaryMap),
info);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
index d9099ce..69a9fc9 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
@@ -47,18 +47,18 @@ public class InMemCubeBuilderInputConverter {
private final MeasureCodec measureCodec;
private final int measureCount;
private final ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- private final Map<Integer, Dictionary<String>> topNDisplayColDictMap;
+ private final Map<Integer, Dictionary<String>> topNLiteralColDictMap;
private final GTInfo gtInfo;
- public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<Integer, Dictionary<String>> topNDisplayColDictMap, GTInfo gtInfo) {
+ public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<Integer, Dictionary<String>> topNLiteralColDictMap, GTInfo gtInfo) {
this.cubeDesc = cubeDesc;
this.gtInfo = gtInfo;
this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
this.measureCount = cubeDesc.getMeasures().size();
this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
- this.topNDisplayColDictMap = Preconditions.checkNotNull(topNDisplayColDictMap, "topNDisplayColDictMap cannot be null");
+ this.topNLiteralColDictMap = Preconditions.checkNotNull(topNLiteralColDictMap, "topNLiteralColDictMap cannot be null");
}
public final GTRecord convert(List<String> row) {
@@ -102,13 +102,13 @@ public class InMemCubeBuilderInputConverter {
} else if (function.isTopN()) {
// encode the key column with dict, and get the counter column;
int keyColIndex = flatTableIdx[flatTableIdx.length - 1];
- Dictionary<String> displayColDict = topNDisplayColDictMap.get(keyColIndex);
- int keyColEncoded = displayColDict.getIdFromValue(row.get(keyColIndex));
+ Dictionary<String> literalColDict = topNLiteralColDictMap.get(keyColIndex);
+ int keyColEncoded = literalColDict.getIdFromValue(row.get(keyColIndex));
valueBuf.clear();
- valueBuf.putInt(displayColDict.getSizeOfId());
+ valueBuf.putInt(literalColDict.getSizeOfId());
valueBuf.putInt(keyColEncoded);
if (flatTableIdx.length == 1) {
- // only displayCol, use 1.0 as counter
+ // only literalCol, use 1.0 as counter
valueBuf.putDouble(1.0);
} else {
// get the counter column value
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
index f0ee372..9d819a4 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
@@ -35,19 +35,19 @@ import java.util.Map;
*/
public final class InMemCubeBuilderUtils {
- public static final HashMap<Integer, Dictionary<String>> createTopNDisplayColDictionaryMap(CubeDesc cubeDesc, CubeJoinedFlatTableDesc intermediateTableDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+ public static final HashMap<Integer, Dictionary<String>> createTopNLiteralColDictionaryMap(CubeDesc cubeDesc, CubeJoinedFlatTableDesc intermediateTableDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
HashMap<Integer, Dictionary<String>> result = Maps.newHashMap();
for (int measureIdx = 0; measureIdx < cubeDesc.getMeasures().size(); measureIdx++) {
MeasureDesc measureDesc = cubeDesc.getMeasures().get(measureIdx);
FunctionDesc func = measureDesc.getFunction();
if (func.isTopN()) {
int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
- int displayColIdx = flatTableIdx[flatTableIdx.length - 1];
- TblColRef displayCol = func.getParameter().getColRefs().get(flatTableIdx.length - 1);
+ int literalColIdx = flatTableIdx[flatTableIdx.length - 1];
+ TblColRef literalCol = func.getTopNLiteralColumn();
@SuppressWarnings("unchecked")
- Dictionary<String> dictionary = (Dictionary<String>) dictionaryMap.get(displayCol);
+ Dictionary<String> dictionary = (Dictionary<String>) dictionaryMap.get(literalCol);
//Preconditions.checkNotNull(dictionary);//FIXME disable check since dictionary is null when building empty segment
- result.put(displayColIdx, dictionary);
+ result.put(literalColIdx, dictionary);
}
}
return result;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 2250945..ef563ed 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -50,7 +50,6 @@ import org.apache.kylin.metadata.model.IEngineAware;
import org.apache.kylin.metadata.model.IStorageAware;
import org.apache.kylin.metadata.model.JoinDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -130,7 +129,6 @@ public class CubeDesc extends RootPersistentEntity {
private LinkedHashSet<TblColRef> allColumns = new LinkedHashSet<TblColRef>();
private LinkedHashSet<TblColRef> dimensionColumns = new LinkedHashSet<TblColRef>();
- private LinkedHashSet<TblColRef> measureDisplayColumns = new LinkedHashSet<TblColRef>();
private Map<TblColRef, DeriveInfo> derivedToHostMap = Maps.newHashMap();
private Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedMap = Maps.newHashMap();
@@ -414,7 +412,7 @@ public class CubeDesc extends RootPersistentEntity {
}
return calculateSignature().equals(getSignature());
}
-
+
public String calculateSignature() {
MessageDigest md = null;
try {
@@ -646,37 +644,12 @@ public class CubeDesc extends RootPersistentEntity {
m.setDependentMeasureRef(m.getDependentMeasureRef().toUpperCase());
}
- FunctionDesc f = m.getFunction();
- f.setExpression(f.getExpression().toUpperCase());
- f.initReturnDataType();
-
- ParameterDesc p = f.getParameter();
- p.normalizeColumnValue();
-
- ArrayList<TblColRef> colRefs = Lists.newArrayList();
- if (p.isColumnType()) {
- for (String cName : p.getValue().split("\\s*,\\s*")) {
- ColumnDesc sourceColumn = factTable.findColumnByName(cName);
- TblColRef colRef = new TblColRef(sourceColumn);
- colRefs.add(colRef);
- allColumns.add(colRef);
- }
- }
-
- // for topN
- if (StringUtils.isNotEmpty(p.getDisplayColumn())) {
- ColumnDesc sourceColumn = factTable.findColumnByName(p.getDisplayColumn());
- TblColRef colRef = new TblColRef(sourceColumn);
- colRefs.add(colRef);
- measureDisplayColumns.add(colRef);
- allColumns.add(colRef);
- }
-
- if (colRefs.isEmpty() == false)
- p.setColRefs(colRefs);
+ FunctionDesc func = m.getFunction();
+ func.init(factTable);
+ allColumns.addAll(func.getParameter().getColRefs());
// verify holistic count distinct as a dependent measure
- if (m.getFunction().isHolisticCountDistinct() && StringUtils.isBlank(m.getDependentMeasureRef())) {
+ if (func.isHolisticCountDistinct() && StringUtils.isBlank(m.getDependentMeasureRef())) {
throw new IllegalStateException(m + " is a holistic count distinct but it has no DependentMeasureRef defined!");
}
}
@@ -844,24 +817,10 @@ public class CubeDesc extends RootPersistentEntity {
}
}
- for (TblColRef colRef : measureDisplayColumns) {
- if (!result.contains(colRef))
- result.add(colRef);
+ for (MeasureDesc measure : measures) {
+ result.addAll(measure.getColumnsNeedDictionary());
}
return result;
}
- public LinkedHashSet<TblColRef> getMeasureDisplayColumns() {
- return measureDisplayColumns;
- }
-
-
- public boolean hasMeasureUsingDictionary() {
- for (MeasureDesc measureDesc : this.getMeasures()) {
- if (measureDesc.getFunction().isTopN())
- return true;
- }
-
- return false;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
index 80bd2f7..1920fc7 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
@@ -91,7 +91,7 @@ public class FunctionRule implements IValidatorRule<CubeDesc> {
if (StringUtils.equalsIgnoreCase(FunctionDesc.PARAMETER_TYPE_COLUMN, type)) {
validateColumnParameter(context, cube, value);
- } else if (StringUtils.equals(FunctionDesc.PARAMTER_TYPE_CONSTANT, type)) {
+ } else if (StringUtils.equals(FunctionDesc.PARAMETER_TYPE_CONSTANT, type)) {
validateCostantParameter(context, cube, value);
}
validateReturnType(context, cube, func);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
index e3fb30e..f853b08 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
@@ -188,13 +188,13 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
FunctionDesc func = measureDesc.getFunction();
if (func.isTopN()) {
int[] flatTableIdx = flatTableDesc.getMeasureColumnIndexes()[measureIdx];
- int displayColIdx = flatTableIdx[flatTableIdx.length - 1];
- TblColRef displayCol = func.getParameter().getColRefs().get(flatTableIdx.length - 1);
- logger.info("Building dictionary for " + displayCol);
- List<byte[]> valueList = readValueList(flatTable, nColumns, displayColIdx);
- Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(displayCol.getType(), new IterableDictionaryValueEnumerator(valueList));
+ int literalColIdx = flatTableIdx[flatTableIdx.length - 1];
+ TblColRef literalCol = func.getTopNLiteralColumn();
+ logger.info("Building dictionary for " + literalCol);
+ List<byte[]> valueList = readValueList(flatTable, nColumns, literalColIdx);
+ Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(literalCol.getType(), new IterableDictionaryValueEnumerator(valueList));
- result.put(displayCol, dict);
+ result.put(literalCol, dict);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
index 12371ce..6162477 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
@@ -30,6 +30,7 @@ import java.io.Serializable;
* Column Metadata from Source. All name should be uppercase.
* <p/>
*/
+@SuppressWarnings("serial")
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
public class ColumnDesc implements Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java
index 215e86c..6b8447d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java
@@ -27,6 +27,7 @@ import java.util.Set;
/**
* @author xjiang
*/
+@SuppressWarnings("serial")
public class DatabaseDesc implements Serializable {
private String name;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index d10f395..b8cefa2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -18,11 +18,13 @@
package org.apache.kylin.metadata.model;
+import java.util.ArrayList;
import java.util.Collection;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
/**
*/
@@ -36,9 +38,9 @@ public class FunctionDesc {
public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT";
public static final String FUNC_TOP_N = "TOP_N";
- public static final String PARAMTER_TYPE_CONSTANT = "constant";
+ public static final String PARAMETER_TYPE_CONSTANT = "constant";
public static final String PARAMETER_TYPE_COLUMN = "column";
-
+
@JsonProperty("expression")
private String expression;
@JsonProperty("parameter")
@@ -49,6 +51,26 @@ public class FunctionDesc {
private DataType returnDataType;
private boolean isDimensionAsMetric = false;
+ public void init(TableDesc factTable) {
+ expression = expression.toUpperCase();
+ returnDataType = DataType.getInstance(returnType);
+
+ for (ParameterDesc p = parameter; p != null; p = p.getNextParameter()) {
+ p.setValue(p.getValue().toUpperCase());
+ }
+
+ ArrayList<TblColRef> colRefs = Lists.newArrayList();
+ for (ParameterDesc p = parameter; p != null; p = p.getNextParameter()) {
+ if (p.isColumnType()) {
+ ColumnDesc sourceColumn = factTable.findColumnByName(p.getValue());
+ TblColRef colRef = new TblColRef(sourceColumn);
+ colRefs.add(colRef);
+ }
+ }
+
+ parameter.setColRefs(colRefs);
+ }
+
public String getRewriteFieldName() {
if (isSum()) {
return getParameter().getValue();
@@ -161,11 +183,6 @@ public class FunctionDesc {
public void setReturnType(String returnType) {
this.returnType = returnType;
- this.initReturnDataType();
- }
-
- // Jackson does not provide object post-processing currently
- public void initReturnDataType() {
this.returnDataType = DataType.getInstance(returnType);
}
@@ -225,13 +242,32 @@ public class FunctionDesc {
return "FunctionDesc [expression=" + expression + ", parameter=" + parameter + ", returnType=" + returnType + "]";
}
- public boolean isCompatible(FunctionDesc another) {
- if (another == null) {
+ // cols[0] is numeric (e.g. GMV), cols[1] is literal (e.g. SELLER)
+ public TblColRef getTopNNumericColumn() {
+ if (isTopN() == false)
+ throw new IllegalStateException();
+
+ return parameter.getColRefs().get(0);
+ }
+
+ // cols[0] is numeric (e.g. GMV), cols[1] is literal (e.g. SELLER)
+ public TblColRef getTopNLiteralColumn() {
+ if (isTopN() == false)
+ throw new IllegalStateException();
+
+ return parameter.getColRefs().get(1);
+ }
+
+ public boolean isTopNCompatibleSum(FunctionDesc sum) {
+ if (isTopN() == false)
+ throw new IllegalStateException();
+
+ if (sum == null) {
return false;
}
- if (this.isTopN() && another.isSum()) {
- if (this.getParameter().getColRefs().get(0).equals(another.getParameter().getColRefs().get(0)))
+ if (this.isTopN() && sum.isSum()) {
+ if (this.getParameter().getColRefs().get(0).equals(sum.getParameter().getColRefs().get(0)))
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
index 1561b1f..618d25a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
@@ -18,6 +18,9 @@
package org.apache.kylin.metadata.model;
+import java.util.Collections;
+import java.util.List;
+
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -37,6 +40,15 @@ public class MeasureDesc {
@JsonProperty("dependent_measure_ref")
private String dependentMeasureRef;
+ public List<TblColRef> getColumnsNeedDictionary() {
+ // measure could store literal values using dictionary encoding to save space, like TopN
+ if (function.isTopN()) {
+ return Collections.singletonList(function.getTopNLiteralColumn());
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
public int getId() {
return id;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index 9773b84..2cf4374 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -19,11 +19,8 @@
package org.apache.kylin.metadata.model;
import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
import java.util.List;
-import org.apache.commons.lang.StringUtils;
-
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -33,15 +30,13 @@ import com.fasterxml.jackson.annotation.JsonProperty;
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
public class ParameterDesc {
- public static final String COLUMN_TYPE = "column";
-
@JsonProperty("type")
private String type;
@JsonProperty("value")
private String value;
- @JsonProperty("displaycolumn")
- private String displayColumn;
+ @JsonProperty("next_parameter")
+ private ParameterDesc nextParameter;
private List<TblColRef> colRefs;
@@ -65,14 +60,6 @@ public class ParameterDesc {
this.value = value;
}
- public String getDisplayColumn() {
- return displayColumn;
- }
-
- public void setDisplayColumn(String displayColumn) {
- this.displayColumn = displayColumn;
- }
-
public List<TblColRef> getColRefs() {
return colRefs;
}
@@ -80,19 +67,17 @@ public class ParameterDesc {
public void setColRefs(List<TblColRef> colRefs) {
this.colRefs = colRefs;
}
+
+ public ParameterDesc getNextParameter() {
+ return nextParameter;
+ }
- public boolean isColumnType() {
- return COLUMN_TYPE.equals(type);
+ public void setNextParameter(ParameterDesc nextParameter) {
+ this.nextParameter = nextParameter;
}
- public void normalizeColumnValue() {
- if (isColumnType()) {
- String values[] = value.split("\\s*,\\s*");
- for (int i = 0; i < values.length; i++)
- values[i] = values[i].toUpperCase();
- Arrays.sort(values);
- value = StringUtils.join(values, ",");
- }
+ public boolean isColumnType() {
+ return FunctionDesc.PARAMETER_TYPE_COLUMN.equals(type);
}
@Override
@@ -102,7 +87,7 @@ public class ParameterDesc {
ParameterDesc that = (ParameterDesc) o;
- if (displayColumn != null ? !displayColumn.equals(that.displayColumn) : that.displayColumn != null) return false;
+ if (nextParameter != null ? !nextParameter.equals(that.nextParameter) : that.nextParameter != null) return false;
if (type != null ? !type.equals(that.type) : that.type != null) return false;
if (value != null ? !value.equals(that.value) : that.value != null) return false;
@@ -113,13 +98,13 @@ public class ParameterDesc {
public int hashCode() {
int result = type != null ? type.hashCode() : 0;
result = 31 * result + (value != null ? value.hashCode() : 0);
- result = 31 * result + (displayColumn != null ? displayColumn.hashCode() : 0);
+ result = 31 * result + (nextParameter != null ? nextParameter.hashCode() : 0);
return result;
}
@Override
public String toString() {
- return "ParameterDesc [type=" + type + ", value=" + value + ", displayColumn=" + displayColumn + "]";
+ return "ParameterDesc [type=" + type + ", value=" + value + ", nextParam=" + nextParameter + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
index 2b5ceee..2898f93 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
@@ -117,7 +117,6 @@ public class StorageMockUtils {
return compareFilter;
}
- @SuppressWarnings("unused")
public static TupleFilter buildAndFilter(List<TblColRef> columns) {
CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
@@ -127,7 +126,6 @@ public class StorageMockUtils {
return andFilter;
}
- @SuppressWarnings("unused")
public static TupleFilter buildOrFilter(List<TblColRef> columns) {
CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index ed1fd4a..44311c2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -57,7 +57,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
private Text outputKey = new Text();
private Text outputValue = new Text();
private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- private Map<Integer, Dictionary<String>> topNDisplayColDictMap;
+ private Map<Integer, Dictionary<String>> topNLiteralColDictMap;
@Override
protected void setup(Context context) throws IOException {
@@ -92,21 +92,21 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
keyBytesBuf = new byte[colCount][];
- initTopNDisplayColDictionaryMap();
+ initTopNLiteralColDictionaryMap();
initNullBytes();
}
- private void initTopNDisplayColDictionaryMap() {
- topNDisplayColDictMap = Maps.newHashMap();
+ private void initTopNLiteralColDictionaryMap() {
+ topNLiteralColDictMap = Maps.newHashMap();
for (int measureIdx = 0; measureIdx < measures.length; measureIdx++) {
MeasureDesc measureDesc = cubeDesc.getMeasures().get(measureIdx);
FunctionDesc func = measureDesc.getFunction();
if (func.isTopN()) {
int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
- int displayColIdx = flatTableIdx[flatTableIdx.length - 1];
- TblColRef displayCol = func.getParameter().getColRefs().get(flatTableIdx.length - 1);
- Dictionary<String> dictionary = (Dictionary<String>)cubeSegment.getDictionary(displayCol);
- topNDisplayColDictMap.put(displayColIdx, dictionary);
+ int literalColIdx = flatTableIdx[flatTableIdx.length - 1];
+ TblColRef literalCol = func.getTopNLiteralColumn();
+ Dictionary<String> dictionary = (Dictionary<String>)cubeSegment.getDictionary(literalCol);
+ topNLiteralColDictMap.put(literalColIdx, dictionary);
}
}
}
@@ -174,13 +174,13 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
else if(func.isTopN()) {
// encode the key column with dict, and get the counter column;
int keyColIndex = flatTableIdx[flatTableIdx.length - 1];
- Dictionary<String> displayColDict = topNDisplayColDictMap.get(keyColIndex);
- int keyColEncoded = displayColDict.getIdFromValue(Bytes.toString(splitBuffers[keyColIndex].value));
+ Dictionary<String> literalColDict = topNLiteralColDictMap.get(keyColIndex);
+ int keyColEncoded = literalColDict.getIdFromValue(Bytes.toString(splitBuffers[keyColIndex].value));
valueBuf.clear();
- valueBuf.putInt(displayColDict.getSizeOfId());
+ valueBuf.putInt(literalColDict.getSizeOfId());
valueBuf.putInt(keyColEncoded);
if (flatTableIdx.length == 1) {
- // only displayCol, use 1.0 as counter
+ // only literalCol, use 1.0 as counter
valueBuf.putDouble(1.0);
} else {
// get the counter column value
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index 286ff02..fc616fa 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -23,9 +23,6 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.topn.Counter;
import org.apache.kylin.common.topn.TopNCounter;
@@ -51,7 +48,6 @@ import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
@@ -59,6 +55,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
/**
* @author shaoshi
@@ -83,29 +80,14 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
private RowKeySplitter rowKeySplitter;
private RowKeyEncoderProvider rowKeyEncoderProvider;
- private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
+ private HashMap<TblColRef, Boolean> dimensionsNeedDict = new HashMap<TblColRef, Boolean>();
+ private List<MeasureDesc> measureDescs;
private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
private MeasureCodec codec;
private ByteArrayWritable outputValue = new ByteArrayWritable();
- private List<MeasureDesc> measuresDescs;
- private Integer[] measureIdxUsingDict;
-
- private Boolean checkNeedMerging(TblColRef col) throws IOException {
- Boolean ret = dictsNeedMerging.get(col);
- if (ret != null)
- return ret;
- else {
- ret = cubeDesc.getRowkey().isUseDictionary(col);
- if (ret) {
- String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable();
- ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
- }
- dictsNeedMerging.put(col, ret);
- return ret;
- }
- }
+ private List<Integer> topNMeasureIdx;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
@@ -130,17 +112,14 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
- measuresDescs = cubeDesc.getMeasures();
- codec = new MeasureCodec(cubeDesc.getMeasures());
- if (cubeDesc.hasMeasureUsingDictionary()) {
- List<Integer> measuresUsingDict = Lists.newArrayList();
- for (int i = 0; i < measuresDescs.size(); i++) {
- if (measuresDescs.get(i).getFunction().isTopN()) {
- // so far only TopN uses dic
- measuresUsingDict.add(i);
- }
+ measureDescs = cubeDesc.getMeasures();
+ codec = new MeasureCodec(measureDescs);
+
+ topNMeasureIdx = Lists.newArrayList();
+ for (int i = 0; i < measureDescs.size(); i++) {
+ if (measureDescs.get(i).getFunction().isTopN()) {
+ topNMeasureIdx.add(i);
}
- measureIdxUsingDict = measuresUsingDict.toArray(new Integer[measuresUsingDict.size()]);
}
}
@@ -213,11 +192,9 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf);
outputKey.set(newKeyBuf.array(), 0, fullKeySize);
- // encode measure if it uses dictionary
- if (cubeDesc.hasMeasureUsingDictionary()) {
- reEncodeMeasure(value);
- }
-
+ // re-encode measures if dictionary is used
+ reEncodeMeasure(value);
+
valueBuf.clear();
codec.encode(value, valueBuf);
outputValue.set(valueBuf.array(), 0, valueBuf.position());
@@ -225,46 +202,60 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
context.write(outputKey, outputValue);
}
+ private Boolean checkNeedMerging(TblColRef col) throws IOException {
+ Boolean ret = dimensionsNeedDict.get(col);
+ if (ret != null)
+ return ret;
+ else {
+ ret = cubeDesc.getRowkey().isUseDictionary(col);
+ if (ret) {
+ String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable();
+ ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
+ }
+ dimensionsNeedDict.put(col, ret);
+ return ret;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException {
+ // currently only topN uses dictionary in measure obj
+ if (topNMeasureIdx.isEmpty())
+ return;
+
int bufOffset = 0;
- for (int idx : measureIdxUsingDict) {
- // only TopN measure uses dic
+ for (int idx : topNMeasureIdx) {
TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx];
- MeasureDesc measureDesc = measuresDescs.get(idx);
- String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn().toUpperCase();
- if (StringUtils.isNotEmpty(displayCol)) {
- ColumnDesc sourceColumn = cubeDesc.getFactTableDesc().findColumnByName(displayCol);
- TblColRef colRef = new TblColRef(sourceColumn);
- DictionaryManager dictMgr = DictionaryManager.getInstance(config);
- Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
- Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
+ MeasureDesc measureDesc = measureDescs.get(idx);
+ TblColRef colRef = measureDesc.getFunction().getTopNLiteralColumn();
+ DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+ Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
+ Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
+
+ int topNSize = topNCounters.size();
+ while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+ mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+ mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
+ byte[] oldBuf = newKeyBodyBuf;
+ newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
+ System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
+ }
- int topNSize = topNCounters.size();
- while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
- mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
- mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
- byte[] oldBuf = newKeyBodyBuf;
- newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
- System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
+ for (Counter<ByteArray> c : topNCounters) {
+ int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
+ int idInMergedDict;
+ int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
+ if (size < 0) {
+ idInMergedDict = mergedDict.nullId();
+ } else {
+ idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
}
- for (Counter<ByteArray> c : topNCounters) {
- int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
- int idInMergedDict;
- int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
- if (size < 0) {
- idInMergedDict = mergedDict.nullId();
- } else {
- idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
- }
-
- BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
- c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
- bufOffset += mergedDict.getSizeOfId();
- }
+ BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+ c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+ bufOffset += mergedDict.getSizeOfId();
}
}
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 68d1481..6c2679e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -18,8 +18,13 @@
package org.apache.kylin.engine.mr.steps;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.kylin.common.KylinConfig;
@@ -43,17 +48,11 @@ import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import com.google.common.collect.Lists;
/**
* @author ysong1, honma
@@ -76,29 +75,16 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
private RowKeySplitter rowKeySplitter;
private RowKeyEncoderProvider rowKeyEncoderProvider;
- private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
- private List<MeasureDesc> measuresDescs;
+ private HashMap<TblColRef, Boolean> dimensionsNeedDict = new HashMap<TblColRef, Boolean>();
+
+ // for re-encode measures that use dictionary
+ private List<Integer> topNMeasureIdx;
+ private List<MeasureDesc> measureDescs;
private MeasureCodec codec;
private Object[] measureObjs;
- private Integer[] measureIdxUsingDict;
private ByteBuffer valueBuf;
private Text outputValue;
- private Boolean checkNeedMerging(TblColRef col) throws IOException {
- Boolean ret = dictsNeedMerging.get(col);
- if (ret != null)
- return ret;
- else {
- ret = cubeDesc.getRowkey().isUseDictionary(col);
- if (ret) {
- String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable();
- ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
- }
- dictsNeedMerging.put(col, ret);
- return ret;
- }
- }
-
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.bindCurrentConfiguration(context.getConfiguration());
@@ -124,20 +110,16 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
- if (cubeDesc.hasMeasureUsingDictionary()) {
- measuresDescs = cubeDesc.getMeasures();
- codec = new MeasureCodec(measuresDescs);
- measureObjs = new Object[measuresDescs.size()];
- List<Integer> measuresUsingDict = Lists.newArrayList();
- for (int i = 0; i < measuresDescs.size(); i++) {
- if (measuresDescs.get(i).getFunction().isTopN()) {
- // so far only TopN uses dic
- measuresUsingDict.add(i);
- }
+ measureDescs = cubeDesc.getMeasures();
+ codec = new MeasureCodec(measureDescs);
+ measureObjs = new Object[measureDescs.size()];
+ valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ outputValue = new Text();
+ topNMeasureIdx = Lists.newArrayList();
+ for (int i = 0; i < measureDescs.size(); i++) {
+ if (measureDescs.get(i).getFunction().isTopN()) {
+ topNMeasureIdx.add(i);
}
- measureIdxUsingDict = measuresUsingDict.toArray(new Integer[measuresUsingDict.size()]);
- valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- outputValue = new Text();
}
}
@@ -231,60 +213,70 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf);
outputKey.set(newKeyBuf.array(), 0, fullKeySize);
- // encode measure if it uses dictionary
- if (cubeDesc.hasMeasureUsingDictionary()) {
+ // re-encode measures if dictionary is used
+ if (topNMeasureIdx.size() > 0) {
codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
reEncodeMeasure(measureObjs);
valueBuf.clear();
codec.encode(measureObjs, valueBuf);
outputValue.set(valueBuf.array(), 0, valueBuf.position());
value = outputValue;
- }
-
+ }
+
context.write(outputKey, value);
}
+ private Boolean checkNeedMerging(TblColRef col) throws IOException {
+ Boolean ret = dimensionsNeedDict.get(col);
+ if (ret != null)
+ return ret;
+ else {
+ ret = cubeDesc.getRowkey().isUseDictionary(col);
+ if (ret) {
+ String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable();
+ ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
+ }
+ dimensionsNeedDict.put(col, ret);
+ return ret;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException {
int bufOffset = 0;
- for (int idx : measureIdxUsingDict) {
+ for (int idx : topNMeasureIdx) {
// only TopN measure uses dic
TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx];
- MeasureDesc measureDesc = measuresDescs.get(idx);
- String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn().toUpperCase();
- if (StringUtils.isNotEmpty(displayCol)) {
- ColumnDesc sourceColumn = cubeDesc.getFactTableDesc().findColumnByName(displayCol);
- TblColRef colRef = new TblColRef(sourceColumn);
- DictionaryManager dictMgr = DictionaryManager.getInstance(config);
- Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
- Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
+ MeasureDesc measureDesc = measureDescs.get(idx);
+ TblColRef colRef = measureDesc.getFunction().getTopNLiteralColumn();
+ DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+ Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
+ Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
+
+ int topNSize = topNCounters.size();
+ while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+ mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+ mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
+ byte[] oldBuf = newKeyBodyBuf;
+ newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
+ System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
+ }
- int topNSize = topNCounters.size();
- while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
- mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
- mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
- byte[] oldBuf = newKeyBodyBuf;
- newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
- System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
+ for (Counter<ByteArray> c : topNCounters) {
+ int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
+ int idInMergedDict;
+ int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
+ if (size < 0) {
+ idInMergedDict = mergedDict.nullId();
+ } else {
+ idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
}
- for (Counter<ByteArray> c : topNCounters) {
- int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
- int idInMergedDict;
- int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
- if (size < 0) {
- idInMergedDict = mergedDict.nullId();
- } else {
- idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
- }
-
- BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
- c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
- bufOffset += mergedDict.getSizeOfId();
- }
+ BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+ c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+ bufOffset += mergedDict.getSizeOfId();
}
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json b/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json
deleted file mode 100644
index 903fc15..0000000
--- a/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json
+++ /dev/null
@@ -1,10 +0,0 @@
-{
- "uuid" : "33354455-a33e-4b69-83dd-0bb8b1f8c53b",
- "last_modified" : 0,
- "name" : "test_kylin_cube_topn",
- "owner" : null,
- "version" : null,
- "descriptor" : "test_kylin_cube_topn_desc",
- "segments" : [ ],
- "create_time" : null
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/examples/test_case_data/localmeta/cube/test_kylin_cube_topn_left_join.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube/test_kylin_cube_topn_left_join.json b/examples/test_case_data/localmeta/cube/test_kylin_cube_topn_left_join.json
deleted file mode 100644
index 6f57561..0000000
--- a/examples/test_case_data/localmeta/cube/test_kylin_cube_topn_left_join.json
+++ /dev/null
@@ -1,10 +0,0 @@
-{
- "uuid" : "44454455-a33e-4b69-83dd-0bb8b1f8c53b",
- "last_modified" : 0,
- "name" : "test_kylin_cube_topn_left_join",
- "owner" : null,
- "version" : null,
- "descriptor" : "test_kylin_cube_topn_left_join_desc",
- "segments" : [ ],
- "create_time" : null
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
deleted file mode 100644
index fddbb10..0000000
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
+++ /dev/null
@@ -1,148 +0,0 @@
-{
- "uuid": "4334a905-1fc6-4f67-985c-38fa5aeafd92",
- "name": "test_kylin_cube_topn_desc",
- "description": null,
- "dimensions": [
- {
- "id": 0,
- "name": "CAL_DT",
- "table": "EDW.TEST_CAL_DT",
- "column": null,
- "derived": [
- "WEEK_BEG_DT"
- ],
- "hierarchy": false
- }
- ],
- "measures": [
- {
- "id": 1,
- "name": "GMV_SUM",
- "function": {
- "expression": "SUM",
- "parameter": {
- "type": "column",
- "value": "PRICE"
- },
- "returntype": "decimal(19,4)"
- },
- "dependent_measure_ref": null
- },
- {
- "id": 2,
- "name": "GMV_MIN",
- "function": {
- "expression": "MIN",
- "parameter": {
- "type": "column",
- "value": "PRICE"
- },
- "returntype": "decimal(19,4)"
- },
- "dependent_measure_ref": null
- },
- {
- "id": 3,
- "name": "GMV_MAX",
- "function": {
- "expression": "MAX",
- "parameter": {
- "type": "column",
- "value": "PRICE"
- },
- "returntype": "decimal(19,4)"
- },
- "dependent_measure_ref": null
- },
- {
- "id": 4,
- "name": "TRANS_CNT",
- "function": {
- "expression": "COUNT",
- "parameter": {
- "type": "constant",
- "value": "1"
- },
- "returntype": "bigint"
- },
- "dependent_measure_ref": null
- },
- {
- "id": 5,
- "name": "ITEM_COUNT_SUM",
- "function": {
- "expression": "SUM",
- "parameter": {
- "type": "column",
- "value": "ITEM_COUNT"
- },
- "returntype": "bigint"
- },
- "dependent_measure_ref": null
- },
- {
- "id": 6,
- "name": "TOP_SELLER",
- "function": {
- "expression": "TOP_N",
- "parameter": {
- "type": "column",
- "value": "PRICE",
- "displaycolumn": "SELLER_ID"
- },
- "returntype": "topn(100)"
- },
- "dependent_measure_ref": null
- }
- ],
- "rowkey": {
- "rowkey_columns": [
- {
- "column": "cal_dt",
- "length": 0,
- "dictionary": "true",
- "mandatory": false
- }
- ],
- "aggregation_groups": [
- [
- "cal_dt"
- ]
- ]
- },
- "last_modified": 1422435345330,
- "model_name": "test_kylin_inner_join_model_desc",
- "null_string": null,
- "hbase_mapping": {
- "column_family": [
- {
- "name": "f1",
- "columns": [
- {
- "qualifier": "m",
- "measure_refs": [
- "gmv_sum",
- "gmv_min",
- "gmv_max",
- "trans_cnt",
- "item_count_sum"
- ]
- }
- ]
- }, {
- "name": "f2",
- "columns": [
- {
- "qualifier": "m",
- "measure_refs": [
- "top_seller"
- ]
- }
- ]
- }
- ]
- },
- "notify_list": null,
- "engine_type": 2,
- "storage_type": 2
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_left_join_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_left_join_desc.json
deleted file mode 100644
index 6aecaae..0000000
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_left_join_desc.json
+++ /dev/null
@@ -1,149 +0,0 @@
-{
- "uuid": "5445a905-1fc6-4f67-985c-38fa5aeafd92",
- "name": "test_kylin_cube_topn_left_join_desc",
- "description": null,
- "dimensions": [
- {
- "id": 0,
- "name": "CAL_DT",
- "table": "EDW.TEST_CAL_DT",
- "column": null,
- "derived": [
- "WEEK_BEG_DT"
- ],
- "hierarchy": false
- }
- ],
- "measures": [
- {
- "id": 1,
- "name": "GMV_SUM",
- "function": {
- "expression": "SUM",
- "parameter": {
- "type": "column",
- "value": "PRICE"
- },
- "returntype": "decimal(19,4)"
- },
- "dependent_measure_ref": null
- },
- {
- "id": 2,
- "name": "GMV_MIN",
- "function": {
- "expression": "MIN",
- "parameter": {
- "type": "column",
- "value": "PRICE"
- },
- "returntype": "decimal(19,4)"
- },
- "dependent_measure_ref": null
- },
- {
- "id": 3,
- "name": "GMV_MAX",
- "function": {
- "expression": "MAX",
- "parameter": {
- "type": "column",
- "value": "PRICE"
- },
- "returntype": "decimal(19,4)"
- },
- "dependent_measure_ref": null
- },
- {
- "id": 4,
- "name": "TRANS_CNT",
- "function": {
- "expression": "COUNT",
- "parameter": {
- "type": "constant",
- "value": "1"
- },
- "returntype": "bigint"
- },
- "dependent_measure_ref": null
- },
- {
- "id": 5,
- "name": "ITEM_COUNT_SUM",
- "function": {
- "expression": "SUM",
- "parameter": {
- "type": "column",
- "value": "ITEM_COUNT"
- },
- "returntype": "bigint"
- },
- "dependent_measure_ref": null
- },
- {
- "id": 6,
- "name": "TOP_SELLER",
- "function": {
- "expression": "TOP_N",
- "parameter": {
- "type": "column",
- "value": "PRICE",
- "displaycolumn": "SELLER_ID"
- },
- "returntype": "topn(100)"
- },
- "dependent_measure_ref": null
- }
- ],
- "rowkey": {
- "rowkey_columns": [
- {
- "column": "cal_dt",
- "length": 0,
- "dictionary": "true",
- "mandatory": false
- }
- ],
- "aggregation_groups": [
- [
- "cal_dt"
- ]
- ]
- },
- "last_modified": 1422435345330,
- "model_name": "test_kylin_left_join_model_desc",
- "null_string": null,
- "hbase_mapping": {
- "column_family": [
- {
- "name": "f1",
- "columns": [
- {
- "qualifier": "m",
- "measure_refs": [
- "gmv_sum",
- "gmv_min",
- "gmv_max",
- "trans_cnt",
- "item_count_sum"
- ]
- }
- ]
- },
- {
- "name": "f2",
- "columns": [
- {
- "qualifier": "m",
- "measure_refs": [
- "top_seller"
- ]
- }
- ]
- }
- ]
- },
- "notify_list": null,
- "engine_type": 2,
- "storage_type": 2
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
index 3f9957b..1e007c3 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
@@ -156,7 +156,11 @@
"expression": "COUNT_DISTINCT",
"parameter": {
"type": "column",
- "value": "LSTG_FORMAT_NAME,SELLER_ID"
+ "value": "LSTG_FORMAT_NAME",
+ "next_parameter": {
+ "type": "column",
+ "value": "SELLER_ID"
+ }
},
"returntype": "hllc(10)"
},
@@ -170,7 +174,10 @@
"parameter": {
"type": "column",
"value": "PRICE",
- "displaycolumn": "SELLER_ID"
+ "next_parameter": {
+ "type": "column",
+ "value": "SELLER_ID"
+ }
},
"returntype": "topn(100)"
},
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
index 907e338..73a58f0 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
@@ -156,7 +156,11 @@
"expression": "COUNT_DISTINCT",
"parameter": {
"type": "column",
- "value": "LSTG_FORMAT_NAME,SELLER_ID"
+ "value": "LSTG_FORMAT_NAME",
+ "next_parameter": {
+ "type": "column",
+ "value": "SELLER_ID"
+ }
},
"returntype": "hllc(10)"
},
@@ -170,7 +174,10 @@
"parameter": {
"type": "column",
"value": "PRICE",
- "displaycolumn": "SELLER_ID"
+ "next_parameter": {
+ "type": "column",
+ "value": "SELLER_ID"
+ }
},
"returntype": "topn(100)"
},
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
index 71737dc..452e3a3 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
@@ -35,7 +35,16 @@ import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.*;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.DimensionDesc;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.IEngineAware;
+import org.apache.kylin.metadata.model.IStorageAware;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
@@ -49,6 +58,7 @@ import com.google.common.collect.Sets;
/**
* @author yangli9
*/
+@SuppressWarnings("serial")
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
public class IIDesc extends RootPersistentEntity {
@@ -231,7 +241,7 @@ public class IIDesc extends RootPersistentEntity {
/**
*
- * @param hllType represents the presision
+ * @param hllType represents the precision
*/
private MeasureDesc makeHLLMeasure(ColumnDesc columnDesc, String hllType) {
String columnName = columnDesc.getName();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
index 7e54a98..e17133f 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
@@ -19,7 +19,6 @@
package org.apache.kylin.invertedindex.model;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index e950911..cbc0c56 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -210,7 +210,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
if (!column.isInnerColumn()) {
parameter = new ParameterDesc();
parameter.setValue(column.getName());
- parameter.setType("column");
+ parameter.setType(FunctionDesc.PARAMETER_TYPE_COLUMN);
parameter.setColRefs(Arrays.asList(column));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index f84e4e6..4d34943 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -87,32 +87,20 @@ public class CubeStorageQuery implements ICachableStorageQuery {
private final CubeInstance cubeInstance;
private final CubeDesc cubeDesc;
private final String uuid;
- private Collection<TblColRef> topNColumns;
public CubeStorageQuery(CubeInstance cube) {
this.cubeInstance = cube;
this.cubeDesc = cube.getDescriptor();
this.uuid = cube.getUuid();
- this.topNColumns = Lists.newArrayList();
- for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
- if (measureDesc.getFunction().isTopN()) {
- List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs();
- topNColumns.add(colRefs.get(colRefs.size() - 1));
- }
- }
}
@Override
public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
- // check whether this is a TopN query;
- checkAndRewriteTopN(context, sqlDigest, returnTupleInfo);
+ // check whether this is a TopN query
+ checkAndRewriteTopN(sqlDigest);
Collection<TblColRef> groups = sqlDigest.groupbyColumns;
- TblColRef topNCol = extractTopNCol(groups);
- if (topNCol != null)
- groups.remove(topNCol);
-
TupleFilter filter = sqlDigest.filter;
// build dimension & metrics
@@ -156,15 +144,15 @@ public class CubeStorageQuery implements ICachableStorageQuery {
// check involved measures, build value decoder for each each family:column
List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHBaseMapping(), metrics, context);
- //memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more
- //setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory
+ // memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more
+ // setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory
setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
setLimit(filter, context);
HConnection conn = HBaseConnection.get(context.getConnUrl());
- return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, topNCol, valueDecoders, context, returnTupleInfo);
- //Notice we're passing filterD down to storage instead of flatFilter
+ // notice we're passing filterD down to storage instead of flatFilter
+ return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo);
}
@Override
@@ -196,11 +184,6 @@ public class CubeStorageQuery implements ICachableStorageQuery {
continue;
}
- // skip topN display col
- if (topNColumns.contains(column)) {
- continue;
- }
-
dimensions.add(column);
}
}
@@ -767,48 +750,33 @@ public class CubeStorageQuery implements ICachableStorageQuery {
ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context);
}
- private void checkAndRewriteTopN(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
- Collection<TblColRef> groups = sqlDigest.groupbyColumns;
- TblColRef topNDisplayCol = extractTopNCol(groups);
- boolean hasTopN = topNDisplayCol != null;
-
- if (hasTopN == false)
+ private void checkAndRewriteTopN(SQLDigest sqlDigest) {
+ FunctionDesc topnFunc = null;
+ TblColRef topnLiteralCol = null;
+ for (MeasureDesc measure : cubeDesc.getMeasures()) {
+ FunctionDesc func = measure.getFunction();
+ if (func.isTopN() && sqlDigest.groupbyColumns.contains(func.getTopNLiteralColumn())) {
+ topnFunc = func;
+ topnLiteralCol = func.getTopNLiteralColumn();
+ }
+ }
+
+ // if TopN is not involved
+ if (topnFunc == null)
return;
if (sqlDigest.aggregations.size() != 1) {
throw new IllegalStateException("When query with topN, only one metrics is allowed.");
}
- FunctionDesc functionDesc = sqlDigest.aggregations.iterator().next();
- if (functionDesc.isSum() == false) {
+ FunctionDesc origFunc = sqlDigest.aggregations.iterator().next();
+ if (origFunc.isSum() == false) {
throw new IllegalStateException("When query with topN, only SUM function is allowed.");
}
- FunctionDesc rewriteFunction = null;
- // replace the SUM to the TopN function
- for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
- if (measureDesc.getFunction().isCompatible(functionDesc) && topNDisplayCol.getName().equalsIgnoreCase(measureDesc.getFunction().getParameter().getDisplayColumn())) {
- rewriteFunction = measureDesc.getFunction();
- break;
- }
- }
-
- if (rewriteFunction == null) {
- throw new IllegalStateException("Didn't find topN measure for " + functionDesc);
- }
-
- sqlDigest.aggregations = Lists.newArrayList(rewriteFunction);
- logger.info("Rewrite function " + functionDesc + " to " + rewriteFunction);
+ sqlDigest.aggregations = Lists.newArrayList(topnFunc);
+ sqlDigest.groupbyColumns.remove(topnLiteralCol);
+ sqlDigest.metricColumns.add(topnLiteralCol);
+ logger.info("Rewrite function " + origFunc + " to " + topnFunc);
}
-
- private TblColRef extractTopNCol(Collection<TblColRef> colRefs) {
- for (TblColRef colRef : colRefs) {
- if (topNColumns.contains(colRef)) {
- return colRef;
- }
- }
-
- return null;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
index 831cadb..0983689 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.ITupleIterator;
@@ -58,7 +59,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
private ITuple next;
public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, //
- Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, TblColRef topNCol, List<RowValueDecoder> rowValueDecoders, //
+ Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, //
StorageContext context, TupleInfo returnTupleInfo) {
this.context = context;
@@ -67,14 +68,9 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
this.segmentIteratorList = new ArrayList<CubeSegmentTupleIterator>(segmentKeyRanges.size());
Map<CubeSegment, List<HBaseKeyRange>> rangesMap = makeRangesMap(segmentKeyRanges);
- boolean useTopN = topNCol != null;
+
for (Map.Entry<CubeSegment, List<HBaseKeyRange>> entry : rangesMap.entrySet()) {
- CubeSegmentTupleIterator segIter;
- if (useTopN)
- segIter = new CubeSegmentTopNTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, topNCol, rowValueDecoders, context, returnTupleInfo);
- else
- segIter = new CubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
- this.segmentIteratorList.add(segIter);
+ this.segmentIteratorList.add(newCubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo));
}
this.segmentIteratorIterator = this.segmentIteratorList.iterator();
@@ -85,6 +81,16 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
}
}
+ private CubeSegmentTupleIterator newCubeSegmentTupleIterator(CubeSegment seg, List<HBaseKeyRange> keyRange, HConnection conn, Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context2, TupleInfo returnTupleInfo) {
+ MeasureDesc topN = RowValueDecoder.findTopN(rowValueDecoders);
+ if (topN != null) {
+ TblColRef topNCol = topN.getFunction().getTopNLiteralColumn();
+ return new CubeSegmentTopNTupleIterator(seg, keyRange, conn, dimensions, filter, groupBy, topNCol, rowValueDecoders, context, returnTupleInfo);
+ } else {
+ return new CubeSegmentTupleIterator(seg, keyRange, conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
+ }
+ }
+
private Map<CubeSegment, List<HBaseKeyRange>> makeRangesMap(List<HBaseKeyRange> segmentKeyRanges) {
Map<CubeSegment, List<HBaseKeyRange>> map = Maps.newHashMap();
for (HBaseKeyRange range : segmentKeyRanges) {