You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/02 04:15:33 UTC
[1/2] kylin git commit: KYLIN-976 Extract influenceCapabilityCheck()
Repository: kylin
Updated Branches:
refs/heads/KYLIN-976 dac248804 -> 63f376a5a
KYLIN-976 Extract influenceCapabilityCheck()
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6c85ead9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6c85ead9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6c85ead9
Branch: refs/heads/KYLIN-976
Commit: 6c85ead9595df519f60279d92e6d2b009894df00
Parents: dac2488
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Nov 30 17:55:08 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Nov 30 17:55:08 2015 +0800
----------------------------------------------------------------------
.../kylin/cube/CubeCapabilityChecker.java | 215 ++++++++-----------
.../apache/kylin/cube/CubeDimensionDeriver.java | 46 ----
.../org/apache/kylin/cube/CubeInstance.java | 23 +-
.../org/apache/kylin/measure/MeasureType.java | 13 +-
.../kylin/measure/basic/BasicMeasureType.java | 9 -
.../kylin/measure/hllc/HLLCMeasureType.java | 7 -
.../kylin/measure/topn/TopNMeasureType.java | 46 +++-
.../metadata/realization/CapabilityResult.java | 63 ++++++
.../metadata/realization/IRealization.java | 18 +-
.../kylin/storage/hybrid/HybridInstance.java | 32 +--
.../storage/hybrid/HybridStorageQuery.java | 2 +-
.../apache/kylin/invertedindex/IIInstance.java | 29 ++-
.../apache/kylin/query/routing/Candidate.java | 100 +++++++++
.../apache/kylin/query/routing/QueryRouter.java | 56 ++++-
.../apache/kylin/query/routing/RoutingRule.java | 22 +-
.../AdjustForWeaklyMatchedRealization.java | 99 ---------
.../RoutingRules/RealizationPriorityRule.java | 59 -----
.../RoutingRules/RealizationSortRule.java | 65 ------
.../RemoveUncapableRealizationsRule.java | 41 ----
.../routing/rules/RealizationSortRule.java | 34 +++
.../rules/RemoveUncapableRealizationsRule.java | 44 ++++
.../apache/kylin/query/test/ITIIQueryTest.java | 7 +-
22 files changed, 499 insertions(+), 531 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index 343bf11..0c1b3c9 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -18,158 +18,104 @@
package org.apache.kylin.cube;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.basic.BasicMeasureType;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.JoinDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
import org.apache.kylin.metadata.realization.SQLDigest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import com.google.common.collect.Sets;
/**
*/
public class CubeCapabilityChecker {
private static final Logger logger = LoggerFactory.getLogger(CubeCapabilityChecker.class);
- public static boolean check(CubeInstance cube, SQLDigest digest, boolean allowWeakMatch) {
+ public static CapabilityResult check(CubeInstance cube, SQLDigest digest) {
+ CapabilityResult result = new CapabilityResult();
+ result.capable = false;
// retrieve members from olapContext
- Collection<TblColRef> dimensionColumns = CubeDimensionDeriver.getDimensionColumns(digest);
- Collection<FunctionDesc> functions = digest.aggregations;
- Collection<TblColRef> metricsColumns = digest.metricColumns;
+ Collection<TblColRef> dimensionColumns = getDimensionColumns(digest);
+ Collection<FunctionDesc> aggrFunctions = digest.aggregations;
Collection<JoinDesc> joins = digest.joinDescs;
- boolean hasTopN = hasTopNMeasure(cube.getDescriptor());
// match dimensions & aggregations & joins
- boolean isOnline = cube.isReady();
-
- boolean matchDimensions = isMatchedWithDimensions(dimensionColumns, cube);
- boolean matchAggregation = isMatchedWithAggregations(functions, cube);
- boolean matchJoin = isMatchedWithJoins(joins, cube);
-
- // Some cubes are not "perfectly" match, but still save them in case of usage
- if (allowWeakMatch && isOnline && matchDimensions && !matchAggregation && matchJoin) {
- // sometimes metrics are indeed dimensions
- // e.g. select min(cal_dt) from ..., where cal_dt is actually a dimension
- if (isWeaklyMatchedWithAggregations(functions, metricsColumns, cube)) {
- logger.info("Weakly matched cube found " + cube.getName());
- return true;
- }
- }
+ Collection<TblColRef> unmatchedDimensions = unmatchedDimensions(dimensionColumns, cube);
+ Collection<FunctionDesc> unmatchedAggregations = unmatchedAggregations(aggrFunctions, cube);
+ boolean isJoinMatch = isJoinMatch(joins, cube);
- // for topn, the group column can come from measure
- if (hasTopN & matchJoin && !matchDimensions && functions.size() == 1) {
- boolean matchedTopN = isMatchedWithTopN(dimensionColumns, cube, digest);
- matchDimensions = matchedTopN;
- matchAggregation = matchedTopN;
+ if (!isJoinMatch) {
+ logger.info("Exclude cube " + cube.getName() + " because unmatched joins");
+ return result;
}
- if (!isOnline || !matchDimensions || !matchAggregation || !matchJoin) {
- logger.info("Exclude cube " + cube.getName() + " because " + " isOnlne=" + isOnline + ",matchDimensions=" + matchDimensions + ",matchAggregation=" + matchAggregation + ",matchJoin=" + matchJoin);
- return false;
+ // try custom measure types
+ if (!unmatchedDimensions.isEmpty() || !unmatchedAggregations.isEmpty()) {
+ tryCustomMeasureTypes(unmatchedDimensions, unmatchedAggregations, digest, cube, result);
}
-
- return true;
- }
-
- /**
- * Check whether the cube can match the sql digest with TopN measure
- * @param cube
- * @param digest
- * @return
- */
- public static boolean isMatchedWithTopN(CubeInstance cube, SQLDigest digest) {
-
- boolean hasTopN = hasTopNMeasure(cube.getDescriptor());
- if (hasTopN == false) {
- return false;
+ // try dimension-as-measure
+ if (!unmatchedAggregations.isEmpty()) {
+ tryDimensionAsMeasures(unmatchedAggregations, digest, cube, result);
}
- Collection<TblColRef> dimensionColumns = CubeDimensionDeriver.getDimensionColumns(digest);
-
- boolean matchDimensions = isMatchedWithDimensions(dimensionColumns, cube);
-
- if (matchDimensions == true) {
- return false;
- }
-
- Collection<FunctionDesc> functions = digest.aggregations;
- if (functions == null || functions.size() != 1) {
- // topN only allow one measure
- return false;
+ if (!unmatchedDimensions.isEmpty()) {
+ logger.info("Exclude cube " + cube.getName() + " because unmatched dimensions");
+ return result;
}
-
- return isMatchedWithTopN(dimensionColumns, cube, digest);
- }
-
- /**
- * This is the method for internal, consumer need do the check as the public method did
- * @param dimensionColumns
- * @param cube
- * @param digest
- * @return
- */
- private static boolean isMatchedWithTopN(Collection<TblColRef> dimensionColumns, CubeInstance cube, SQLDigest digest) {
-
- CubeDesc cubeDesc = cube.getDescriptor();
- Collection<FunctionDesc> functions = digest.aggregations;
- FunctionDesc onlyFunction = functions.iterator().next();
- if (onlyFunction.isSum() == false) {
- // topN only support SUM expression
- return false;
+ if (!unmatchedAggregations.isEmpty()) {
+ logger.info("Exclude cube " + cube.getName() + " because unmatched aggregations");
+ return result;
}
- Collection<TblColRef> dimensionColumnsCopy = new ArrayList<TblColRef>(dimensionColumns);
- for (MeasureDesc measure : cubeDesc.getMeasures()) {
- if (measure.getFunction().isTopN()) {
- List<TblColRef> cols = measure.getFunction().getParameter().getColRefs();
- TblColRef literalCol = cols.get(cols.size() - 1);
- if (digest.groupbyColumns.contains(literalCol)) {
- dimensionColumnsCopy.remove(literalCol);
- if (isMatchedWithDimensions(dimensionColumnsCopy, cube)) {
- if (measure.getFunction().isTopNCompatibleSum(onlyFunction)) {
- return true;
- }
- }
- dimensionColumnsCopy.add(literalCol);
- }
- }
- }
-
- return false;
+ // cost will be minded by caller
+ result.capable = true;
+ return result;
}
- private static boolean hasTopNMeasure(CubeDesc cubeDesc) {
- for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
- if (measureDesc.getFunction().isTopN())
- return true;
- }
+ private static Collection<TblColRef> getDimensionColumns(SQLDigest sqlDigest) {
+ Collection<TblColRef> groupByColumns = sqlDigest.groupbyColumns;
+ Collection<TblColRef> filterColumns = sqlDigest.filterColumns;
- return false;
+ Collection<TblColRef> dimensionColumns = new HashSet<TblColRef>();
+ dimensionColumns.addAll(groupByColumns);
+ dimensionColumns.addAll(filterColumns);
+ return dimensionColumns;
}
- private static boolean isMatchedWithDimensions(Collection<TblColRef> dimensionColumns, CubeInstance cube) {
+ private static Set<TblColRef> unmatchedDimensions(Collection<TblColRef> dimensionColumns, CubeInstance cube) {
+ HashSet<TblColRef> result = Sets.newHashSet(dimensionColumns);
CubeDesc cubeDesc = cube.getDescriptor();
- boolean matchAgg = cubeDesc.listDimensionColumnsIncludingDerived().containsAll(dimensionColumns);
- return matchAgg;
+ result.removeAll(cubeDesc.listDimensionColumnsIncludingDerived());
+ return result;
}
- private static boolean isMatchedWithAggregations(Collection<FunctionDesc> aggregations, CubeInstance cube) {
+ private static Set<FunctionDesc> unmatchedAggregations(Collection<FunctionDesc> aggregations, CubeInstance cube) {
+ HashSet<FunctionDesc> result = Sets.newHashSet(aggregations);
CubeDesc cubeDesc = cube.getDescriptor();
- boolean matchAgg = cubeDesc.listAllFunctions().containsAll(aggregations);
- return matchAgg;
+ result.removeAll(cubeDesc.listAllFunctions());
+ return result;
}
- private static boolean isMatchedWithJoins(Collection<JoinDesc> joins, CubeInstance cube) {
+ private static boolean isJoinMatch(Collection<JoinDesc> joins, CubeInstance cube) {
CubeDesc cubeDesc = cube.getDescriptor();
List<JoinDesc> cubeJoins = new ArrayList<JoinDesc>(cubeDesc.getDimensions().size());
@@ -204,30 +150,55 @@ public class CubeCapabilityChecker {
return true;
}
- private static boolean isWeaklyMatchedWithAggregations(Collection<FunctionDesc> aggregations, Collection<TblColRef> metricColumns, CubeInstance cube) {
+ private static void tryDimensionAsMeasures(Collection<FunctionDesc> unmatchedAggregations, SQLDigest digest, CubeInstance cube, CapabilityResult result) {
CubeDesc cubeDesc = cube.getDescriptor();
Collection<FunctionDesc> cubeFuncs = cubeDesc.listAllFunctions();
- boolean matched = true;
- for (FunctionDesc functionDesc : aggregations) {
- if (cubeFuncs.contains(functionDesc))
+ Iterator<FunctionDesc> it = unmatchedAggregations.iterator();
+ while (it.hasNext()) {
+ FunctionDesc functionDesc = it.next();
+
+ if (cubeFuncs.contains(functionDesc)) {
+ it.remove();
continue;
+ }
// only inverted-index cube does not have count, and let calcite handle in this case
- if (functionDesc.isCount())
+ if (functionDesc.isCount()) {
+ it.remove();
continue;
+ }
- if (functionDesc.isCountDistinct()) // calcite can not handle distinct count
- matched = false;
-
- TblColRef col = null;
- if (functionDesc.getParameter().getColRefs().size() > 0)
- col = functionDesc.getParameter().getColRefs().get(0);
+ // calcite can not handle distinct count
+ if (functionDesc.isCountDistinct()) {
+ continue;
+ }
- if (col == null || !cubeDesc.listDimensionColumnsIncludingDerived().contains(col)) {
- matched = false;
+ // calcite can do aggregation from columns on-the-fly
+ List<TblColRef> neededCols = functionDesc.getParameter().getColRefs();
+ if (neededCols.size() > 0 && cubeDesc.listDimensionColumnsIncludingDerived().containsAll(neededCols)) {
+ result.influences.add(new CapabilityResult.DimensionAsMeasure(functionDesc));
+ it.remove();
+ continue;
}
}
- return matched;
}
+
+ // custom measure types can cover unmatched dimensions or measures
+ private static void tryCustomMeasureTypes(Collection<TblColRef> unmatchedDimensions, Collection<FunctionDesc> unmatchedAggregations, SQLDigest digest, CubeInstance cube, CapabilityResult result) {
+ CubeDesc cubeDesc = cube.getDescriptor();
+ for (MeasureDesc measure : cubeDesc.getMeasures()) {
+ if (unmatchedDimensions.isEmpty() && unmatchedAggregations.isEmpty())
+ break;
+
+ MeasureType measureType = MeasureType.create(measure.getFunction());
+ if (measureType instanceof BasicMeasureType)
+ continue;
+
+ CapabilityInfluence inf = measureType.influenceCapabilityCheck(unmatchedDimensions, unmatchedAggregations, digest, measure);
+ if (inf != null)
+ result.influences.add(inf);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
deleted file mode 100644
index 138d01e..0000000
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube;
-
-import java.util.Collection;
-import java.util.HashSet;
-
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.SQLDigest;
-
-/**
- *
- * the unified logic for defining a sql's dimension
- */
-public class CubeDimensionDeriver {
-
- public static Collection<TblColRef> getDimensionColumns(SQLDigest sqlDigest) {
- Collection<TblColRef> groupByColumns = sqlDigest.groupbyColumns;
- Collection<TblColRef> filterColumns = sqlDigest.filterColumns;
-
- Collection<MeasureDesc> sortMeasures = sqlDigest.sortMeasures;
- Collection<SQLDigest.OrderEnum> sortOrders = sqlDigest.sortOrders;
-
- Collection<TblColRef> dimensionColumns = new HashSet<TblColRef>();
- dimensionColumns.addAll(groupByColumns);
- dimensionColumns.addAll(filterColumns);
- return dimensionColumns;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 2208136..81c7909 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -32,6 +32,8 @@ import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
@@ -339,12 +341,20 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
}
@Override
- public boolean isCapable(SQLDigest digest) {
- return CubeCapabilityChecker.check(this, digest, true);
+ public CapabilityResult isCapable(SQLDigest digest) {
+ CapabilityResult result = CubeCapabilityChecker.check(this, digest);
+ if (result.capable) {
+ result.cost = getCost(digest);
+ for (CapabilityInfluence i : result.influences) {
+ result.cost *= (i.suggestCostMultiplier() == 0) ? 1.0 : i.suggestCostMultiplier();
+ }
+ } else {
+ result.cost = -1;
+ }
+ return result;
}
- @Override
- public int getCost(SQLDigest digest) {
+ private int getCost(SQLDigest digest) {
int calculatedCost = cost;
calculatedCost += getAllDimensions().size() * COST_WEIGHT_DIMENSION + getMeasures().size() * COST_WEIGHT_MEASURE;
@@ -357,11 +367,6 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
}
}
- if (CubeCapabilityChecker.isMatchedWithTopN(this, digest)) {
- // this is topN query
- calculatedCost = calculatedCost / 3;
- }
-
return calculatedCost;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
index 4fe59c0..8e7de6f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -18,6 +18,7 @@
package org.apache.kylin.measure;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -29,6 +30,8 @@ import org.apache.kylin.metadata.datatype.DataTypeSerializer;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
+import org.apache.kylin.metadata.realization.SQLDigest;
import com.google.common.collect.Maps;
@@ -83,12 +86,18 @@ abstract public class MeasureType {
abstract public MeasureAggregator<?> newAggregator();
- abstract public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc);
-
+ public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
+ return null;
+ }
+
/* ============================================================================
* Cube Selection
* ---------------------------------------------------------------------------- */
+ public CapabilityInfluence influenceCapabilityCheck(Collection<TblColRef> unmatchedDimensions, Collection<FunctionDesc> unmatchedAggregations, SQLDigest digest, MeasureDesc measureDesc) {
+ return null;
+ }
+
/* ============================================================================
* Query
* ---------------------------------------------------------------------------- */
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
index fe53bab..f314870 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
@@ -18,8 +18,6 @@
package org.apache.kylin.measure.basic;
-import java.util.List;
-
import org.apache.kylin.measure.MeasureAggregator;
import org.apache.kylin.measure.MeasureIngester;
import org.apache.kylin.measure.MeasureType;
@@ -30,7 +28,6 @@ import org.apache.kylin.metadata.datatype.DoubleSerializer;
import org.apache.kylin.metadata.datatype.LongSerializer;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
public class BasicMeasureType extends MeasureType {
@@ -119,10 +116,4 @@ public class BasicMeasureType extends MeasureType {
return FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName);
}
- @Override
- public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
- // TODO Auto-generated method stub
- return null;
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
index 2ad7630..4a73478 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
@@ -18,7 +18,6 @@
package org.apache.kylin.measure.hllc;
-import java.util.List;
import java.util.Map;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
@@ -90,10 +89,4 @@ public class HLLCMeasureType extends MeasureType {
return new LDCAggregator();
}
- @Override
- public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
- // TODO Auto-generated method stub
- return null;
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index 561f9f1..d6c5a6f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -18,6 +18,7 @@
package org.apache.kylin.measure.topn;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -33,8 +34,11 @@ import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.measure.hllc.HLLCSerializer;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
+import org.apache.kylin.metadata.realization.SQLDigest;
public class TopNMeasureType extends MeasureType {
@@ -43,9 +47,9 @@ public class TopNMeasureType extends MeasureType {
public TopNMeasureType(DataType dataType) {
if ("topn".equals(dataType.getName()) == false)
throw new IllegalArgumentException();
-
+
this.dataType = dataType;
-
+
if (this.dataType.getPrecision() < 1 || this.dataType.getPrecision() > 1000)
throw new IllegalArgumentException("TopN precision must be between 1 and 1000");
}
@@ -59,11 +63,11 @@ public class TopNMeasureType extends MeasureType {
public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() {
return HLLCSerializer.class;
}
-
+
@Override
public void validate(MeasureDesc measureDesc) throws IllegalArgumentException {
// TODO Auto-generated method stub
-
+
}
@SuppressWarnings("rawtypes")
@@ -74,10 +78,10 @@ public class TopNMeasureType extends MeasureType {
public TopNCounter valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
if (values.length != 2)
throw new IllegalArgumentException();
-
+
double counter = values[0] == null ? 0 : Double.parseDouble(values[0]);
String literal = values[1];
-
+
// encode literal using dictionary
TblColRef literalCol = measureDesc.getFunction().getTopNLiteralColumn();
Dictionary<String> dictionary = dictionaryMap.get(literalCol);
@@ -90,7 +94,7 @@ public class TopNMeasureType extends MeasureType {
topNCounter.offer(key, counter);
return topNCounter;
}
-
+
@SuppressWarnings("unchecked")
@Override
public TopNCounter reEncodeDictionary(TopNCounter value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) {
@@ -135,4 +139,32 @@ public class TopNMeasureType extends MeasureType {
return Collections.singletonList(literalCol);
}
+ @Override
+ public CapabilityInfluence influenceCapabilityCheck(Collection<TblColRef> unmatchedDimensions, Collection<FunctionDesc> unmatchedAggregations, SQLDigest digest, MeasureDesc topN) {
+ // TopN measure can (and only can) provide one numeric measure and one literal dimension
+ // e.g. select seller, sum(gmv) from ... group by seller order by 2 desc limit 100
+
+ // check digest requires only one measure
+ if (digest.aggregations.size() != 1)
+ return null;
+
+ // the measure function must be SUM
+ FunctionDesc onlyFunction = digest.aggregations.iterator().next();
+ if (onlyFunction.isSum() == false)
+ return null;
+
+ TblColRef literalCol = topN.getFunction().getTopNLiteralColumn();
+ if (unmatchedDimensions.contains(literalCol) && topN.getFunction().isTopNCompatibleSum(onlyFunction)) {
+ unmatchedDimensions.remove(literalCol);
+ unmatchedAggregations.remove(onlyFunction);
+ return new CapabilityInfluence() {
+ @Override
+ public double suggestCostMultiplier() {
+ return 0.3; // make sure TopN get ahead of other matched realizations
+ }
+ };
+ } else
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java
new file mode 100644
index 0000000..cc5bae7
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.realization;
+
+import java.util.List;
+
+import org.apache.kylin.metadata.model.FunctionDesc;
+
+import com.google.common.collect.Lists;
+
+public class CapabilityResult {
+
+ /** Is capable or not */
+ public boolean capable;
+
+ /** The smaller the cost, the more capable the realization */
+ public int cost;
+
+ /**
+ * For info purpose, marker objects to indicate all special features
+ * (dimension-as-measure, topN etc.) that have decided the capability.
+ */
+ public List<CapabilityInfluence> influences = Lists.newArrayListWithCapacity(1);
+
+ public static interface CapabilityInfluence {
+ /** Suggest a multiplier to influence query cost */
+ double suggestCostMultiplier();
+ }
+
+ public static class DimensionAsMeasure implements CapabilityInfluence {
+
+ final FunctionDesc function;
+
+ public DimensionAsMeasure(FunctionDesc function) {
+ this.function = function;
+ }
+
+ @Override
+ public double suggestCostMultiplier() {
+ return 1.0;
+ }
+
+ public FunctionDesc getMeasureFunction() {
+ return function;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
index ccfa956..1857c85 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
@@ -26,26 +26,14 @@ import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
public interface IRealization extends IStorageAware {
-
- public boolean isCapable(SQLDigest digest);
-
+
/**
- * Given the features of a query, return an integer indicating how capable the realization
- * is to answer the query.
- *
- * @return -1 if the realization cannot fulfill the query;
- * or a number between 0-100 if the realization can answer the query, the smaller
- * the number, the more efficient the realization.
- * Especially,
- * 0 - means the realization has the exact result pre-calculated, no less no more;
- * 100 - means the realization will scan the full table with little or no indexing.
+ * Given the features of a query, check how capable the realization is to answer the query.
*/
- public int getCost(SQLDigest digest);
+ public CapabilityResult isCapable(SQLDigest digest);
/**
* Get whether this specific realization is a cube or InvertedIndex
- *
- * @return
*/
public RealizationType getType();
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index 2a6e5d3..98330ec 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -29,6 +29,7 @@ import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.project.RealizationEntry;
+import org.apache.kylin.metadata.realization.CapabilityResult;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.RealizationRegistry;
import org.apache.kylin.metadata.realization.RealizationType;
@@ -152,24 +153,23 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
}
@Override
- public boolean isCapable(SQLDigest digest) {
+ public CapabilityResult isCapable(SQLDigest digest) {
+ CapabilityResult result = new CapabilityResult();
+ result.cost = Integer.MAX_VALUE;
+
for (IRealization realization : getRealizations()) {
- if (realization.isCapable(digest))
- return true;
- }
- return false;
- }
-
- @Override
- public int getCost(SQLDigest digest) {
- cost = Integer.MAX_VALUE;
- for (IRealization realization : this.getRealizations()) {
- if (realization.isCapable(digest))
- cost = Math.min(cost, realization.getCost(digest));
+ CapabilityResult child = realization.isCapable(digest);
+ if (child.capable) {
+ result.capable = true;
+ result.cost = Math.min(result.cost, child.cost);
+ result.influences.addAll(child.influences);
+ }
}
-
- // Make hybrid always win its children
- return cost - 1;
+
+ if (result.cost > 0)
+ result.cost--; // let hybrid win its children
+
+ return result;
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageQuery.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageQuery.java
index 49f9fd9..8911c0e 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageQuery.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageQuery.java
@@ -49,7 +49,7 @@ public class HybridStorageQuery implements IStorageQuery {
public ITupleIterator search(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo) {
List<ITupleIterator> tupleIterators = Lists.newArrayList();
for (int i = 0; i < realizations.length; i++) {
- if (realizations[i].isReady() && realizations[i].isCapable(sqlDigest)) {
+ if (realizations[i].isReady() && realizations[i].isCapable(sqlDigest).capable) {
ITupleIterator dataIterator = storageEngines[i].search(context, sqlDigest, returnTupleInfo);
tupleIterators.add(dataIterator);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
index 0ba07ee..114904a 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
@@ -26,7 +26,14 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.model.*;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.IStorageAware;
+import org.apache.kylin.metadata.model.LookupDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.CapabilityResult;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
@@ -270,15 +277,21 @@ public class IIInstance extends RootPersistentEntity implements IRealization, IB
}
@Override
- public boolean isCapable(SQLDigest digest) {
- if (!digest.factTable.equalsIgnoreCase(this.getFactTable()))
- return false;
-
- return IICapabilityChecker.check(this, digest);
+ public CapabilityResult isCapable(SQLDigest digest) {
+ CapabilityResult result = new CapabilityResult();
+
+ if (!digest.factTable.equalsIgnoreCase(this.getFactTable())) {
+ result.capable = false;
+ } else {
+ result.capable = IICapabilityChecker.check(this, digest);
+ if (result.capable)
+ result.cost = getCost(digest);
+ }
+
+ return result;
}
- @Override
- public int getCost(SQLDigest digest) {
+ private int getCost(SQLDigest digest) {
int calculatedCost = cost;
for (LookupDesc lookupDesc : this.getDescriptor().getModel().getLookups()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/main/java/org/apache/kylin/query/routing/Candidate.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/Candidate.java b/query/src/main/java/org/apache/kylin/query/routing/Candidate.java
new file mode 100644
index 0000000..5e4dcf6
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/routing/Candidate.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.routing;
+
+import java.util.Map;
+
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+
+import com.google.common.collect.Maps;
+
+public class Candidate implements Comparable<Candidate> {
+
+ static final Map<RealizationType, Integer> PRIORITIES = Maps.newHashMap();
+
+ static {
+ PRIORITIES.put(RealizationType.HYBRID, 0);
+ PRIORITIES.put(RealizationType.CUBE, 0);
+ PRIORITIES.put(RealizationType.INVERTED_INDEX, 1);
+ }
+
+ /** for test only */
+ public static void setPriorities(Map<RealizationType, Integer> priorities) {
+ PRIORITIES.clear();
+ PRIORITIES.putAll(priorities);
+ }
+
+ // ============================================================================
+
+ IRealization realization;
+ SQLDigest sqlDigest;
+ int priority;
+ CapabilityResult capability;
+
+ public Candidate(IRealization realization, SQLDigest sqlDigest) {
+ this.realization = realization;
+ this.sqlDigest = sqlDigest;
+ this.priority = PRIORITIES.get(realization.getType());
+ }
+
+ public IRealization getRealization() {
+ return realization;
+ }
+
+ public SQLDigest getSqlDigest() {
+ return sqlDigest;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public CapabilityResult getCapability() {
+ return capability;
+ }
+
+ public void setCapability(CapabilityResult capability) {
+ this.capability = capability;
+ }
+
+ @Override
+ public int compareTo(Candidate o) {
+ int comp = this.priority - o.priority;
+ if (comp != 0) {
+ return comp;
+ }
+
+ comp = this.capability.cost - o.capability.cost;
+ if (comp != 0) {
+ return comp;
+ }
+
+ if (this.realization instanceof HybridInstance)
+ return -1;
+ else if (o.realization instanceof HybridInstance)
+ return 1;
+ else
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java b/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
index 7f2bb49..7493e08 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
@@ -19,10 +19,17 @@
package org.apache.kylin.query.routing;
import java.util.List;
+import java.util.Set;
import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
+import org.apache.kylin.metadata.realization.CapabilityResult.DimensionAsMeasure;
import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.query.relnode.OLAPContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,20 +49,51 @@ public class QueryRouter {
logger.info("The project manager's reference is " + prjMgr);
String factTableName = olapContext.firstTableScan.getTableName();
String projectName = olapContext.olapSchema.getProjectName();
- List<IRealization> realizations = Lists.newArrayList(prjMgr.getRealizationsByTable(projectName, factTableName));
- logger.info("Find candidates by table " + factTableName + " and project=" + projectName + " : " + StringUtils.join(realizations, ","));
+ Set<IRealization> realizations = prjMgr.getRealizationsByTable(projectName, factTableName);
+ SQLDigest sqlDigest = olapContext.getSQLDigest();
- //rule based realization selection, rules might reorder realizations or remove specific realization
- RoutingRule.applyRules(realizations, olapContext);
+ List<Candidate> candidates = Lists.newArrayListWithCapacity(realizations.size());
+ for (IRealization real : realizations) {
+ if (real.isReady())
+ candidates.add(new Candidate(real, sqlDigest));
+ }
+
+ logger.info("Find candidates by table " + factTableName + " and project=" + projectName + " : " + StringUtils.join(candidates, ","));
- if (realizations.size() == 0) {
- throw new NoRealizationFoundException("Can't find any realization. Please confirm with providers. SQL digest: " + olapContext.getSQLDigest().toString());
+ // rule based realization selection, rules might reorder realizations or remove specific realization
+ RoutingRule.applyRules(candidates);
+
+ if (candidates.size() == 0) {
+ throw new NoRealizationFoundException("Can't find any realization. Please confirm with providers. SQL digest: " + sqlDigest.toString());
}
+ Candidate chosen = candidates.get(0);
+ adjustForDimensionAsMeasure(chosen, olapContext);
+
logger.info("The realizations remaining: ");
- logger.info(RoutingRule.getPrintableText(realizations));
- logger.info("The realization being chosen: " + realizations.get(0).getName());
+ logger.info(RoutingRule.getPrintableText(candidates));
+ logger.info("The realization being chosen: " + chosen.realization.getName());
- return realizations.get(0);
+ return chosen.realization;
}
+
+ private static void adjustForDimensionAsMeasure(Candidate chosen, OLAPContext olapContext) {
+ CapabilityResult capability = chosen.getCapability();
+ for (CapabilityInfluence inf : capability.influences) {
+ // convert the metric to dimension
+ if (inf instanceof DimensionAsMeasure) {
+ FunctionDesc functionDesc = ((DimensionAsMeasure) inf).getMeasureFunction();
+ functionDesc.setDimensionAsMetric(true);
+ olapContext.rewriteFields.remove(functionDesc.getRewriteFieldName());
+ for (TblColRef col : functionDesc.getParameter().getColRefs()) {
+ if (col != null) {
+ olapContext.metricsColumns.remove(col);
+ olapContext.groupByColumns.add(col);
+ }
+ }
+ logger.info("Adjust DimensionAsMeasure for " + functionDesc);
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
index 1cd55d4..715f6d1 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
@@ -23,10 +23,8 @@ import java.util.List;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRules.AdjustForWeaklyMatchedRealization;
-import org.apache.kylin.query.routing.RoutingRules.RealizationSortRule;
-import org.apache.kylin.query.routing.RoutingRules.RemoveUncapableRealizationsRule;
+import org.apache.kylin.query.routing.rules.RealizationSortRule;
+import org.apache.kylin.query.routing.rules.RemoveUncapableRealizationsRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,23 +42,23 @@ public abstract class RoutingRule {
static {
rules.add(new RemoveUncapableRealizationsRule());
rules.add(new RealizationSortRule());
- rules.add(new AdjustForWeaklyMatchedRealization());//this rule might modify olapcontext content, better put it at last
}
- public static void applyRules(List<IRealization> realizations, OLAPContext olapContext) {
+ public static void applyRules(List<Candidate> candidates) {
for (RoutingRule rule : rules) {
- logger.info("Realizations order before: " + getPrintableText(realizations));
+ logger.info("Realizations order before: " + getPrintableText(candidates));
logger.info("Applying rule : " + rule);
- rule.apply(realizations, olapContext);
- logger.info("Realizations order after: " + getPrintableText(realizations));
+ rule.apply(candidates);
+ logger.info("Realizations order after: " + getPrintableText(candidates));
logger.info("===================================================");
}
}
- public static String getPrintableText(List<IRealization> realizations) {
+ public static String getPrintableText(List<Candidate> candidates) {
StringBuffer sb = new StringBuffer();
sb.append("[");
- for (IRealization r : realizations) {
+ for (Candidate candidate : candidates) {
+ IRealization r = candidate.realization;
sb.append(r.getName());
sb.append(",");
}
@@ -108,6 +106,6 @@ public abstract class RoutingRule {
return this.getClass().toString();
}
- public abstract void apply(List<IRealization> realizations, OLAPContext olapContext);
+ public abstract void apply(List<Candidate> candidates);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeaklyMatchedRealization.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeaklyMatchedRealization.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeaklyMatchedRealization.java
deleted file mode 100644
index 250f2a6..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeaklyMatchedRealization.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.kylin.cube.CubeCapabilityChecker;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class AdjustForWeaklyMatchedRealization extends RoutingRule {
- private static final Logger logger = LoggerFactory.getLogger(AdjustForWeaklyMatchedRealization.class);
-
- @Override
- public void apply(List<IRealization> realizations, OLAPContext olapContext) {
- if (realizations.size() > 0) {
- IRealization first = realizations.get(0);
-
- if (first instanceof HybridInstance) {
- HybridInstance hybrid = (HybridInstance) first;
- first = hybrid.getLatestRealization();
- }
-
- if (first instanceof CubeInstance) {
- CubeInstance cube = (CubeInstance) first;
- adjustOLAPContextIfNecessary(cube, olapContext);
- }
-
- if (first instanceof IIInstance) {
- IIInstance ii = (IIInstance) first;
- adjustOLAPContextIfNecessary(ii, olapContext);
- }
- }
- }
-
- private static void adjustOLAPContextIfNecessary(IIInstance ii, OLAPContext olapContext) {
- IIDesc iiDesc = ii.getDescriptor();
- Collection<FunctionDesc> iiFuncs = iiDesc.listAllFunctions();
- convertAggregationToDimension(olapContext, iiFuncs);
- }
-
- private static void adjustOLAPContextIfNecessary(CubeInstance cube, OLAPContext olapContext) {
- if (CubeCapabilityChecker.check(cube, olapContext.getSQLDigest(), false))
- return;
-
- CubeDesc cubeDesc = cube.getDescriptor();
- Collection<FunctionDesc> cubeFuncs = cubeDesc.listAllFunctions();
- convertAggregationToDimension(olapContext, cubeFuncs);
- }
-
- private static void convertAggregationToDimension(OLAPContext olapContext, Collection<FunctionDesc> availableAggregations) {
- Iterator<FunctionDesc> it = olapContext.aggregations.iterator();
- while (it.hasNext()) {
- FunctionDesc functionDesc = it.next();
- if (!availableAggregations.contains(functionDesc)) {
- // try to convert the metric to dimension to see if it works
- TblColRef col = functionDesc.getParameter().getColRefs().get(0);
- functionDesc.setDimensionAsMetric(true);
- olapContext.rewriteFields.remove(functionDesc.getRewriteFieldName());
- if (col != null) {
- olapContext.metricsColumns.remove(col);
- olapContext.groupByColumns.add(col);
- }
- logger.info("Adjust OLAPContext for " + functionDesc);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
deleted file mode 100644
index 6e196d9..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-
-import com.google.common.collect.Maps;
-
-/**
- */
-public class RealizationPriorityRule extends RoutingRule {
- static Map<RealizationType, Integer> priorities = Maps.newHashMap();
-
- static {
- priorities.put(RealizationType.HYBRID, 0);
- priorities.put(RealizationType.CUBE, 0);
- priorities.put(RealizationType.INVERTED_INDEX, 1);
- }
-
- public static void setPriorities(Map<RealizationType, Integer> priorities) {
- RealizationPriorityRule.priorities = priorities;
- }
-
- public void apply(List<IRealization> realizations, OLAPContext olapContext) {
-
- Collections.sort(realizations, new Comparator<IRealization>() {
- @Override
- public int compare(IRealization o1, IRealization o2) {
- int i1 = priorities.get(o1.getType());
- int i2 = priorities.get(o2.getType());
- return i1 - i2;
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java
deleted file mode 100644
index d101288..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-
-/**
- */
-public class RealizationSortRule extends RoutingRule {
- @Override
- public void apply(List<IRealization> realizations, final OLAPContext olapContext) {
-
- // sort cube candidates, 0) the priority 1) the cost indicator, 2) the lesser header
- // columns the better, 3) the lesser body columns the better 4) the larger date range the better
-
- Collections.sort(realizations, new Comparator<IRealization>() {
- @Override
- public int compare(IRealization o1, IRealization o2) {
- int i1 = RealizationPriorityRule.priorities.get(o1.getType());
- int i2 = RealizationPriorityRule.priorities.get(o2.getType());
- int comp = i1 - i2;
- if (comp != 0) {
- return comp;
- }
-
- comp = o1.getCost(olapContext.getSQLDigest()) - o2.getCost(olapContext.getSQLDigest());
- if (comp != 0) {
- return comp;
- }
-
- if (o1 instanceof HybridInstance)
- return -1;
- else if (o2 instanceof HybridInstance)
- return 1;
-
- return 0;
- }
- });
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java
deleted file mode 100644
index c710ff1..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-
-/**
- */
-public class RemoveUncapableRealizationsRule extends RoutingRule {
- @Override
- public void apply(List<IRealization> realizations, OLAPContext olapContext) {
- for (Iterator<IRealization> iterator = realizations.iterator(); iterator.hasNext();) {
- IRealization realization = iterator.next();
- if (!realization.isCapable(olapContext.getSQLDigest())) {
- iterator.remove();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java b/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java
new file mode 100644
index 0000000..d3c67d7
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.routing.rules;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kylin.query.routing.Candidate;
+import org.apache.kylin.query.routing.RoutingRule;
+
+/**
+ */
+public class RealizationSortRule extends RoutingRule {
+ @Override
+ public void apply(List<Candidate> candidates) {
+ Collections.sort(candidates);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java b/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java
new file mode 100644
index 0000000..576b47f
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.routing.rules;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.query.routing.Candidate;
+import org.apache.kylin.query.routing.RoutingRule;
+
+/**
+ */
+public class RemoveUncapableRealizationsRule extends RoutingRule {
+ @Override
+ public void apply(List<Candidate> candidates) {
+ for (Iterator<Candidate> iterator = candidates.iterator(); iterator.hasNext();) {
+ Candidate candidate = iterator.next();
+
+ CapabilityResult capability = candidate.getRealization().isCapable(candidate.getSqlDigest());
+ if (capability.capable)
+ candidate.setCapability(capability);
+ else
+ iterator.remove();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/test/java/org/apache/kylin/query/test/ITIIQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/ITIIQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/ITIIQueryTest.java
index 78b3d1c..344433a 100644
--- a/query/src/test/java/org/apache/kylin/query/test/ITIIQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/ITIIQueryTest.java
@@ -23,10 +23,9 @@ import java.util.Collection;
import java.util.Map;
import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.routing.RoutingRules.RealizationPriorityRule;
+import org.apache.kylin.query.routing.Candidate;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -45,7 +44,7 @@ public class ITIIQueryTest extends ITKylinQueryTest {
priorities.put(RealizationType.INVERTED_INDEX, 0);
priorities.put(RealizationType.CUBE, 1);
priorities.put(RealizationType.HYBRID, 1);
- RealizationPriorityRule.setPriorities(priorities);
+ Candidate.setPriorities(priorities);
}
@@ -57,7 +56,7 @@ public class ITIIQueryTest extends ITKylinQueryTest {
priorities.put(RealizationType.INVERTED_INDEX, 1);
priorities.put(RealizationType.CUBE, 0);
priorities.put(RealizationType.HYBRID, 0);
- RealizationPriorityRule.setPriorities(priorities);
+ Candidate.setPriorities(priorities);
}
@Parameterized.Parameters
[2/2] kylin git commit: KYLIN-976 Extract Rewrite related methdos
Posted by li...@apache.org.
KYLIN-976 Extract Rewrite related methdos
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/63f376a5
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/63f376a5
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/63f376a5
Branch: refs/heads/KYLIN-976
Commit: 63f376a5aaea84fa66e02ef27b50f11c008790b8
Parents: 6c85ead
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Dec 1 11:39:31 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Tue Dec 1 11:39:31 2015 +0800
----------------------------------------------------------------------
.../kylin/cube/CubeCapabilityChecker.java | 2 +-
.../org/apache/kylin/cube/model/CubeDesc.java | 2 +-
.../apache/kylin/measure/MeasureIngester.java | 6 +-
.../org/apache/kylin/measure/MeasureType.java | 10 +-
.../kylin/measure/basic/BasicMeasureType.java | 11 ++
.../kylin/measure/basic/BigDecimalIngester.java | 5 -
.../kylin/measure/basic/DoubleIngester.java | 5 -
.../kylin/measure/basic/LongIngester.java | 5 -
.../kylin/measure/hllc/HLLCMeasureType.java | 16 +-
.../measure/hllc/HLLDistinctCountAggFunc.java | 153 +++++++++++++++++++
.../kylin/measure/topn/TopNMeasureType.java | 10 ++
.../kylin/metadata/model/FunctionDesc.java | 39 +++--
.../mr/steps/MergeCuboidFromStorageMapper.java | 2 +-
.../engine/mr/steps/MergeCuboidMapper.java | 2 +-
.../kylin/query/relnode/OLAPAggregateRel.java | 13 +-
.../apache/kylin/query/schema/OLAPTable.java | 10 +-
.../query/sqlfunc/HLLDistinctCountAggFunc.java | 152 ------------------
17 files changed, 237 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index 0c1b3c9..624bb0b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -191,7 +191,7 @@ public class CubeCapabilityChecker {
if (unmatchedDimensions.isEmpty() && unmatchedAggregations.isEmpty())
break;
- MeasureType measureType = MeasureType.create(measure.getFunction());
+ MeasureType measureType = measure.getFunction().getMeasureType();
if (measureType instanceof BasicMeasureType)
continue;
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 3e8ee13..050aef2 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -819,7 +819,7 @@ public class CubeDesc extends RootPersistentEntity {
}
for (MeasureDesc measure : measures) {
- MeasureType aggrType = MeasureType.create(measure.getFunction());
+ MeasureType aggrType = measure.getFunction().getMeasureType();
result.addAll(aggrType.getColumnsNeedDictionary(measure));
}
return result;
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
index 9c7b406..bc387fe 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
@@ -28,7 +28,7 @@ import org.apache.kylin.metadata.model.TblColRef;
abstract public class MeasureIngester<V> {
public static MeasureIngester<?> create(MeasureDesc measure) {
- return MeasureType.create(measure.getFunction()).newIngester();
+ return measure.getFunction().getMeasureType().newIngester();
}
public static MeasureIngester<?>[] create(Collection<MeasureDesc> measures) {
@@ -42,5 +42,7 @@ abstract public class MeasureIngester<V> {
abstract public V valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap);
- abstract public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts);
+ public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
index 8e7de6f..0891d1e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -45,10 +45,6 @@ abstract public class MeasureType {
factoryRegistry.put(FunctionDesc.FUNC_TOP_N, new TopNMeasureFactory());
}
- public static MeasureType create(FunctionDesc function) {
- return create(function.getExpression(), function.getReturnType());
- }
-
public static MeasureType create(String funcName, String dataType) {
funcName = funcName.toUpperCase();
dataType = dataType.toLowerCase();
@@ -102,6 +98,12 @@ abstract public class MeasureType {
* Query
* ---------------------------------------------------------------------------- */
+ // TODO support user defined calcite aggr function
+
+ abstract public boolean needRewrite();
+
+ abstract public Class<?> getRewriteAggregationFunctionClass();
+
/* ============================================================================
* Storage
* ---------------------------------------------------------------------------- */
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
index f314870..a6b36bb 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
@@ -115,5 +115,16 @@ public class BasicMeasureType extends MeasureType {
private boolean isMin() {
return FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName);
}
+
+ @Override
+ public boolean needRewrite() {
+ return !isSum();
+ }
+
+ @Override
+ public Class<?> getRewriteAggregationFunctionClass() {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
index ea1495c..721ba00 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
@@ -38,9 +38,4 @@ public class BigDecimalIngester extends MeasureIngester<BigDecimal> {
else
return new BigDecimal(values[0]);
}
-
- @Override
- public BigDecimal reEncodeDictionary(BigDecimal value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) {
- throw new UnsupportedOperationException();
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
index aaa754a..70ca727 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
@@ -43,9 +43,4 @@ public class DoubleIngester extends MeasureIngester<DoubleMutable> {
l.set(Double.parseDouble(values[0]));
return l;
}
-
- @Override
- public DoubleMutable reEncodeDictionary(DoubleMutable value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) {
- throw new UnsupportedOperationException();
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
index bdc1704..2547162 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
@@ -43,9 +43,4 @@ public class LongIngester extends MeasureIngester<LongMutable> {
l.set(Long.parseLong(values[0]));
return l;
}
-
- @Override
- public LongMutable reEncodeDictionary(LongMutable value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) {
- throw new UnsupportedOperationException();
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
index 4a73478..fd71c00 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
@@ -73,11 +73,6 @@ public class HLLCMeasureType extends MeasureType {
hllc.add(v == null ? "__nUlL__" : v);
return hllc;
}
-
- @Override
- public HyperLogLogPlusCounter reEncodeDictionary(HyperLogLogPlusCounter value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) {
- throw new UnsupportedOperationException();
- }
};
}
@@ -89,4 +84,15 @@ public class HLLCMeasureType extends MeasureType {
return new LDCAggregator();
}
+ @Override
+ public boolean needRewrite() {
+ return true;
+ }
+
+ @Override
+ public Class<?> getRewriteAggregationFunctionClass() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java
new file mode 100644
index 0000000..b00da5f
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.hllc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author xjiang
+ */
+public class HLLDistinctCountAggFunc {
+
+ private static final Logger logger = LoggerFactory.getLogger(HLLDistinctCountAggFunc.class);
+
+ public static HyperLogLogPlusCounter init() {
+ return null;
+ }
+
+ public static HyperLogLogPlusCounter initAdd(Object v) {
+ if (v instanceof Long) { // holistic case
+ long l = (Long) v;
+ return new FixedValueHLLCMockup(l);
+ } else {
+ HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
+ return new HyperLogLogPlusCounter(c);
+ }
+ }
+
+ public static HyperLogLogPlusCounter add(HyperLogLogPlusCounter counter, Object v) {
+ if (v instanceof Long) { // holistic case
+ long l = (Long) v;
+ if (counter == null) {
+ return new FixedValueHLLCMockup(l);
+ } else {
+ if (!(counter instanceof FixedValueHLLCMockup))
+ throw new IllegalStateException("counter is not FixedValueHLLCMockup");
+
+ ((FixedValueHLLCMockup) counter).set(l);
+ return counter;
+ }
+ } else {
+ HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
+ if (counter == null) {
+ return new HyperLogLogPlusCounter(c);
+ } else {
+ counter.merge(c);
+ return counter;
+ }
+ }
+ }
+
+ public static HyperLogLogPlusCounter merge(HyperLogLogPlusCounter counter0, Object counter1) {
+ return add(counter0, counter1);
+ }
+
+ public static long result(HyperLogLogPlusCounter counter) {
+ return counter == null ? 0L : counter.getCountEstimate();
+ }
+
+ @SuppressWarnings("serial")
+ private static class FixedValueHLLCMockup extends HyperLogLogPlusCounter {
+
+ private Long value = null;
+
+ FixedValueHLLCMockup(long value) {
+ this.value = value;
+ }
+
+ public void set(long value) {
+ if (this.value == null) {
+ this.value = value;
+ } else {
+ long oldValue = Math.abs(this.value.longValue());
+ long take = Math.max(oldValue, value);
+ logger.warn("Error to aggregate holistic count distinct, old value " + oldValue + ", new value " + value + ", taking " + take);
+ this.value = -take; // make it obvious that this value is wrong
+ }
+ }
+
+ @Override
+ public void clear() {
+ this.value = null;
+ }
+
+ @Override
+ protected void add(long hash) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void merge(HyperLogLogPlusCounter another) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getCountEstimate() {
+ return value;
+ }
+
+ @Override
+ public void writeRegisters(ByteBuffer out) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void readRegisters(ByteBuffer in) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + (int) (value ^ (value >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (!super.equals(obj))
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ FixedValueHLLCMockup other = (FixedValueHLLCMockup) obj;
+ if (!value.equals(other.value))
+ return false;
+ return true;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index d6c5a6f..77319be 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -167,4 +167,14 @@ public class TopNMeasureType extends MeasureType {
return null;
}
+ @Override
+ public boolean needRewrite() {
+ return false;
+ }
+
+ @Override
+ public Class<?> getRewriteAggregationFunctionClass() {
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index 0c36873..17debda 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -21,6 +21,7 @@ package org.apache.kylin.metadata.model;
import java.util.ArrayList;
import java.util.Collection;
+import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.metadata.datatype.DataType;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
@@ -51,6 +52,7 @@ public class FunctionDesc {
private String returnType;
private DataType returnDataType;
+ private MeasureType measureType;
private boolean isDimensionAsMetric = false;
public void init(TableDesc factTable) {
@@ -77,6 +79,23 @@ public class FunctionDesc {
setReturnType(colRefs.get(0).getDatatype());
}
}
+
+ public MeasureType getMeasureType() {
+ if (isDimensionAsMetric)
+ return null;
+
+ if (measureType == null) {
+ measureType = MeasureType.create(getExpression(), getReturnType());
+ }
+ return measureType;
+ }
+
+ public boolean needRewrite() {
+ if (isDimensionAsMetric)
+ return false;
+
+ return getMeasureType().needRewrite();
+ }
public String getRewriteFieldName() {
if (isSum()) {
@@ -88,14 +107,19 @@ public class FunctionDesc {
}
}
- public boolean needRewrite() {
- return !isSum() && !isDimensionAsMetric() && !isTopN();
+ public DataType getRewriteFieldType() {
+ if (isCountDistinct() || isTopN())
+ return DataType.ANY;
+ else if (isSum() || isMax() || isMin())
+ return parameter.getColRefs().get(0).getType();
+ else
+ return returnDataType;
}
public ColumnDesc newFakeRewriteColumn(TableDesc sourceTable) {
ColumnDesc fakeCol = new ColumnDesc();
fakeCol.setName(getRewriteFieldName());
- fakeCol.setDatatype(getSQLType().toString());
+ fakeCol.setDatatype(getRewriteFieldType().toString());
if (isCount())
fakeCol.setNullable(false);
fakeCol.init(sourceTable);
@@ -179,15 +203,6 @@ public class FunctionDesc {
return count;
}
- public DataType getSQLType() {
- if (isCountDistinct() || isTopN())
- return DataType.ANY;
- else if (isSum() || isMax() || isMin())
- return parameter.getColRefs().get(0).getType();
- else
- return returnDataType;
- }
-
public String getReturnType() {
return returnType;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index b4682dd..bb0c073 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -122,7 +122,7 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
dictMeasures = Lists.newArrayList();
for (int i = 0; i < measureDescs.size(); i++) {
MeasureDesc measureDesc = measureDescs.get(i);
- MeasureType measureType = MeasureType.create(measureDesc.getFunction());
+ MeasureType measureType = measureDesc.getFunction().getMeasureType();
if (measureType.getColumnsNeedDictionary(measureDesc).isEmpty() == false) {
dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester()));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 4fc7236..401237b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -124,7 +124,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
dictMeasures = Lists.newArrayList();
for (int i = 0; i < measureDescs.size(); i++) {
MeasureDesc measureDesc = measureDescs.get(i);
- MeasureType measureType = MeasureType.create(measureDesc.getFunction());
+ MeasureType measureType = measureDesc.getFunction().getMeasureType();
if (measureType.getColumnsNeedDictionary(measureDesc).isEmpty() == false) {
dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester()));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index cbc0c56..9aa70ca 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -59,7 +59,6 @@ import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.ParameterDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.query.sqlfunc.HLLDistinctCountAggFunc;
import com.google.common.base.Preconditions;
@@ -301,10 +300,10 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
// rebuild function
RelDataType fieldType = aggCall.getType();
SqlAggFunction newAgg = aggCall.getAggregation();
- if (func.isCountDistinct()) {
- newAgg = createHyperLogLogAggFunction(fieldType);
- } else if (func.isCount()) {
+ if (func.isCount()) {
newAgg = SqlStdOperatorTable.SUM0;
+ } else if (func.getMeasureType().getRewriteAggregationFunctionClass() != null) {
+ newAgg = createCustomAggFunction(fieldType, func.getMeasureType().getRewriteAggregationFunctionClass());
}
// rebuild aggregate call
@@ -312,10 +311,10 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
return newAggCall;
}
- private SqlAggFunction createHyperLogLogAggFunction(RelDataType returnType) {
+ private SqlAggFunction createCustomAggFunction(RelDataType returnType, Class<?> customAggFuncClz) {
RelDataTypeFactory typeFactory = getCluster().getTypeFactory();
- SqlIdentifier sqlIdentifier = new SqlIdentifier("HLL_COUNT", new SqlParserPos(1, 1));
- AggregateFunction aggFunction = AggregateFunctionImpl.create(HLLDistinctCountAggFunc.class);
+ SqlIdentifier sqlIdentifier = new SqlIdentifier(customAggFuncClz.getSimpleName(), new SqlParserPos(1, 1));
+ AggregateFunction aggFunction = AggregateFunctionImpl.create(customAggFuncClz);
List<RelDataType> argTypes = new ArrayList<RelDataType>();
List<SqlTypeFamily> typeFamilies = new ArrayList<SqlTypeFamily>();
for (FunctionParameter o : aggFunction.getParameters()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
index 94a91bf..a8789ea 100644
--- a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
+++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
@@ -184,11 +184,11 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab
HashSet<ColumnDesc> updateColumns = Sets.newHashSet();
for (MeasureDesc m : mgr.listEffectiveMeasures(olapSchema.getProjectName(), sourceTable.getIdentity())) {
if (m.getFunction().isSum()) {
- FunctionDesc functionDesc = m.getFunction();
- if (functionDesc.getReturnDataType() != functionDesc.getSQLType() && //
- functionDesc.getReturnDataType().isBigInt() && //
- functionDesc.getSQLType().isIntegerFamily()) {
- updateColumns.add(functionDesc.getParameter().getColRefs().get(0).getColumnDesc());
+ FunctionDesc func = m.getFunction();
+ if (func.getReturnDataType() != func.getRewriteFieldType() && //
+ func.getReturnDataType().isBigInt() && //
+ func.getRewriteFieldType().isIntegerFamily()) {
+ updateColumns.add(func.getParameter().getColRefs().get(0).getColumnDesc());
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java b/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
deleted file mode 100644
index 7881c42..0000000
--- a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.query.sqlfunc;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xjiang
- */
-public class HLLDistinctCountAggFunc {
-
- private static final Logger logger = LoggerFactory.getLogger(HLLDistinctCountAggFunc.class);
-
- public static HyperLogLogPlusCounter init() {
- return null;
- }
-
- public static HyperLogLogPlusCounter initAdd(Object v) {
- if (v instanceof Long) { // holistic case
- long l = (Long) v;
- return new FixedValueHLLCMockup(l);
- } else {
- HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
- return new HyperLogLogPlusCounter(c);
- }
- }
-
- public static HyperLogLogPlusCounter add(HyperLogLogPlusCounter counter, Object v) {
- if (v instanceof Long) { // holistic case
- long l = (Long) v;
- if (counter == null) {
- return new FixedValueHLLCMockup(l);
- } else {
- if (!(counter instanceof FixedValueHLLCMockup))
- throw new IllegalStateException("counter is not FixedValueHLLCMockup");
-
- ((FixedValueHLLCMockup) counter).set(l);
- return counter;
- }
- } else {
- HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
- if (counter == null) {
- return new HyperLogLogPlusCounter(c);
- } else {
- counter.merge(c);
- return counter;
- }
- }
- }
-
- public static HyperLogLogPlusCounter merge(HyperLogLogPlusCounter counter0, Object counter1) {
- return add(counter0, counter1);
- }
-
- public static long result(HyperLogLogPlusCounter counter) {
- return counter == null ? 0L : counter.getCountEstimate();
- }
-
- private static class FixedValueHLLCMockup extends HyperLogLogPlusCounter {
-
- private Long value = null;
-
- FixedValueHLLCMockup(long value) {
- this.value = value;
- }
-
- public void set(long value) {
- if (this.value == null) {
- this.value = value;
- } else {
- long oldValue = Math.abs(this.value.longValue());
- long take = Math.max(oldValue, value);
- logger.warn("Error to aggregate holistic count distinct, old value " + oldValue + ", new value " + value + ", taking " + take);
- this.value = -take; // make it obvious that this value is wrong
- }
- }
-
- @Override
- public void clear() {
- this.value = null;
- }
-
- @Override
- protected void add(long hash) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void merge(HyperLogLogPlusCounter another) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getCountEstimate() {
- return value;
- }
-
- @Override
- public void writeRegisters(ByteBuffer out) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void readRegisters(ByteBuffer in) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = super.hashCode();
- result = prime * result + (int) (value ^ (value >>> 32));
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (!super.equals(obj))
- return false;
- if (getClass() != obj.getClass())
- return false;
- FixedValueHLLCMockup other = (FixedValueHLLCMockup) obj;
- if (!value.equals(other.value))
- return false;
- return true;
- }
- }
-
-}