You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/23 03:26:41 UTC
[37/50] [abbrv] kylin git commit: KYLIN-976 Support Custom
Aggregation Types
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index fd86138..f225fe2 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -59,7 +59,6 @@ import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.ParameterDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.query.sqlfunc.HLLDistinctCountAggFunc;
import com.google.common.base.Preconditions;
@@ -74,7 +73,6 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
AGGR_FUNC_MAP.put("$SUM0", "SUM");
AGGR_FUNC_MAP.put("COUNT", "COUNT");
AGGR_FUNC_MAP.put("COUNT_DISTINCT", "COUNT_DISTINCT");
- AGGR_FUNC_MAP.put("HLL_COUNT", "COUNT_DISTINCT");
AGGR_FUNC_MAP.put("MAX", "MAX");
AGGR_FUNC_MAP.put("MIN", "MIN");
}
@@ -301,10 +299,10 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
// rebuild function
RelDataType fieldType = aggCall.getType();
SqlAggFunction newAgg = aggCall.getAggregation();
- if (func.isCountDistinct()) {
- newAgg = createHyperLogLogAggFunction(fieldType);
- } else if (func.isCount()) {
+ if (func.isCount()) {
newAgg = SqlStdOperatorTable.SUM0;
+ } else if (func.getMeasureType().getRewriteCalciteAggrFunctionClass() != null) {
+ newAgg = createCustomAggFunction(func.getExpression(), fieldType, func.getMeasureType().getRewriteCalciteAggrFunctionClass());
}
// rebuild aggregate call
@@ -312,10 +310,10 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
return newAggCall;
}
- private SqlAggFunction createHyperLogLogAggFunction(RelDataType returnType) {
+ private SqlAggFunction createCustomAggFunction(String funcName, RelDataType returnType, Class<?> customAggFuncClz) {
RelDataTypeFactory typeFactory = getCluster().getTypeFactory();
- SqlIdentifier sqlIdentifier = new SqlIdentifier("HLL_COUNT", new SqlParserPos(1, 1));
- AggregateFunction aggFunction = AggregateFunctionImpl.create(HLLDistinctCountAggFunc.class);
+ SqlIdentifier sqlIdentifier = new SqlIdentifier(funcName, new SqlParserPos(1, 1));
+ AggregateFunction aggFunction = AggregateFunctionImpl.create(customAggFuncClz);
List<RelDataType> argTypes = new ArrayList<RelDataType>();
List<SqlTypeFamily> typeFamilies = new ArrayList<SqlTypeFamily>();
for (FunctionParameter o : aggFunction.getParameters()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/Candidate.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/Candidate.java b/query/src/main/java/org/apache/kylin/query/routing/Candidate.java
new file mode 100644
index 0000000..22608a0
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/routing/Candidate.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.routing;
+
+import java.util.Map;
+
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+
+import com.google.common.collect.Maps;
+
+public class Candidate implements Comparable<Candidate> {
+
+ static final Map<RealizationType, Integer> PRIORITIES = Maps.newHashMap();
+
+ static {
+ PRIORITIES.put(RealizationType.HYBRID, 0);
+ PRIORITIES.put(RealizationType.CUBE, 0);
+ PRIORITIES.put(RealizationType.INVERTED_INDEX, 1);
+ }
+
+ /** for test only */
+ public static void setPriorities(Map<RealizationType, Integer> priorities) {
+ PRIORITIES.clear();
+ PRIORITIES.putAll(priorities);
+ }
+
+ // ============================================================================
+
+ IRealization realization;
+ SQLDigest sqlDigest;
+ int priority;
+ CapabilityResult capability;
+
+ public Candidate(IRealization realization, SQLDigest sqlDigest) {
+ this.realization = realization;
+ this.sqlDigest = sqlDigest;
+ this.priority = PRIORITIES.get(realization.getType());
+ }
+
+ public IRealization getRealization() {
+ return realization;
+ }
+
+ public SQLDigest getSqlDigest() {
+ return sqlDigest;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public CapabilityResult getCapability() {
+ return capability;
+ }
+
+ public void setCapability(CapabilityResult capability) {
+ this.capability = capability;
+ }
+
+ @Override
+ public int compareTo(Candidate o) {
+ int comp = this.priority - o.priority;
+ if (comp != 0) {
+ return comp;
+ }
+
+ comp = this.capability.cost - o.capability.cost;
+ if (comp != 0) {
+ return comp;
+ }
+
+ if (this.realization instanceof HybridInstance)
+ return -1;
+ else if (o.realization instanceof HybridInstance)
+ return 1;
+ else
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java b/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
index 7fdc725..7493e08 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
@@ -6,23 +6,30 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
+*/
package org.apache.kylin.query.routing;
import java.util.List;
+import java.util.Set;
import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
+import org.apache.kylin.metadata.realization.CapabilityResult.DimensionAsMeasure;
import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.query.relnode.OLAPContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,22 +46,54 @@ public class QueryRouter {
public static IRealization selectRealization(OLAPContext olapContext) throws NoRealizationFoundException {
ProjectManager prjMgr = ProjectManager.getInstance(olapContext.olapSchema.getConfig());
+ logger.info("The project manager's reference is " + prjMgr);
String factTableName = olapContext.firstTableScan.getTableName();
String projectName = olapContext.olapSchema.getProjectName();
- List<IRealization> realizations = Lists.newArrayList(prjMgr.getRealizationsByTable(projectName, factTableName));
- logger.info("Find candidates by table " + factTableName + " and project=" + projectName + " : " + StringUtils.join(realizations, ","));
+ Set<IRealization> realizations = prjMgr.getRealizationsByTable(projectName, factTableName);
+ SQLDigest sqlDigest = olapContext.getSQLDigest();
+
+ List<Candidate> candidates = Lists.newArrayListWithCapacity(realizations.size());
+ for (IRealization real : realizations) {
+ if (real.isReady())
+ candidates.add(new Candidate(real, sqlDigest));
+ }
- //rule based realization selection, rules might reorder realizations or remove specific realization
- RoutingRule.applyRules(realizations, olapContext);
+ logger.info("Find candidates by table " + factTableName + " and project=" + projectName + " : " + StringUtils.join(candidates, ","));
- if (realizations.size() == 0) {
- throw new NoRealizationFoundException("Can't find any realization. Please confirm with providers. SQL digest: " + olapContext.getSQLDigest().toString());
+ // rule based realization selection, rules might reorder realizations or remove specific realization
+ RoutingRule.applyRules(candidates);
+
+ if (candidates.size() == 0) {
+ throw new NoRealizationFoundException("Can't find any realization. Please confirm with providers. SQL digest: " + sqlDigest.toString());
}
+ Candidate chosen = candidates.get(0);
+ adjustForDimensionAsMeasure(chosen, olapContext);
+
logger.info("The realizations remaining: ");
- logger.info(RoutingRule.getPrintableText(realizations));
- logger.info("The realization being chosen: " + realizations.get(0).getName());
+ logger.info(RoutingRule.getPrintableText(candidates));
+ logger.info("The realization being chosen: " + chosen.realization.getName());
- return realizations.get(0);
+ return chosen.realization;
}
+
+ private static void adjustForDimensionAsMeasure(Candidate chosen, OLAPContext olapContext) {
+ CapabilityResult capability = chosen.getCapability();
+ for (CapabilityInfluence inf : capability.influences) {
+ // convert the metric to dimension
+ if (inf instanceof DimensionAsMeasure) {
+ FunctionDesc functionDesc = ((DimensionAsMeasure) inf).getMeasureFunction();
+ functionDesc.setDimensionAsMetric(true);
+ olapContext.rewriteFields.remove(functionDesc.getRewriteFieldName());
+ for (TblColRef col : functionDesc.getParameter().getColRefs()) {
+ if (col != null) {
+ olapContext.metricsColumns.remove(col);
+ olapContext.groupByColumns.add(col);
+ }
+ }
+ logger.info("Adjust DimensionAsMeasure for " + functionDesc);
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
index 230950e..715f6d1 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
@@ -6,15 +6,15 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
+*/
package org.apache.kylin.query.routing;
@@ -23,17 +23,14 @@ import java.util.List;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRules.AdjustForWeeklyMatchedRealization;
-import org.apache.kylin.query.routing.RoutingRules.RealizationSortRule;
-import org.apache.kylin.query.routing.RoutingRules.RemoveUncapableRealizationsRule;
+import org.apache.kylin.query.routing.rules.RealizationSortRule;
+import org.apache.kylin.query.routing.rules.RemoveUncapableRealizationsRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
/**
- * Created by Hongbin Ma(Binmahone) on 1/5/15.
*/
public abstract class RoutingRule {
private static final Logger logger = LoggerFactory.getLogger(QueryRouter.class);
@@ -45,26 +42,23 @@ public abstract class RoutingRule {
static {
rules.add(new RemoveUncapableRealizationsRule());
rules.add(new RealizationSortRule());
- rules.add(new AdjustForWeeklyMatchedRealization());//this rule might modify olapcontext content, better put it at last
}
- public static void applyRules(List<IRealization> realizations, OLAPContext olapContext) {
+ public static void applyRules(List<Candidate> candidates) {
for (RoutingRule rule : rules) {
- logger.info("Initial realizations order:");
- logger.info(getPrintableText(realizations));
- logger.info("Applying rule " + rule);
-
- rule.apply(realizations, olapContext);
-
- logger.info(getPrintableText(realizations));
+ logger.info("Realizations order before: " + getPrintableText(candidates));
+ logger.info("Applying rule : " + rule);
+ rule.apply(candidates);
+ logger.info("Realizations order after: " + getPrintableText(candidates));
logger.info("===================================================");
}
}
- public static String getPrintableText(List<IRealization> realizations) {
+ public static String getPrintableText(List<Candidate> candidates) {
StringBuffer sb = new StringBuffer();
sb.append("[");
- for (IRealization r : realizations) {
+ for (Candidate candidate : candidates) {
+ IRealization r = candidate.realization;
sb.append(r.getName());
sb.append(",");
}
@@ -112,6 +106,6 @@ public abstract class RoutingRule {
return this.getClass().toString();
}
- public abstract void apply(List<IRealization> realizations, OLAPContext olapContext);
+ public abstract void apply(List<Candidate> candidates);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
deleted file mode 100644
index f457c7d..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.kylin.cube.CubeCapabilityChecker;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class AdjustForWeeklyMatchedRealization extends RoutingRule {
- private static final Logger logger = LoggerFactory.getLogger(AdjustForWeeklyMatchedRealization.class);
-
- @Override
- public void apply(List<IRealization> realizations, OLAPContext olapContext) {
- if (realizations.size() > 0) {
- IRealization first = realizations.get(0);
-
- if (first instanceof HybridInstance) {
- HybridInstance hybrid = (HybridInstance) first;
-
- if (hybrid.getRealizations()[0] instanceof CubeInstance)
- first = hybrid.getRealizations()[0];
- }
-
- if (first instanceof CubeInstance) {
- CubeInstance cube = (CubeInstance) first;
- adjustOLAPContextIfNecessary(cube, olapContext);
- }
-
- if (first instanceof IIInstance) {
- IIInstance ii = (IIInstance) first;
- adjustOLAPContextIfNecessary(ii, olapContext);
- }
- }
- }
-
- private static void adjustOLAPContextIfNecessary(IIInstance ii, OLAPContext olapContext) {
- IIDesc iiDesc = ii.getDescriptor();
- Collection<FunctionDesc> iiFuncs = iiDesc.listAllFunctions();
- convertAggregationToDimension(olapContext, iiFuncs, iiDesc.getFactTableName());
- }
-
- private static void adjustOLAPContextIfNecessary(CubeInstance cube, OLAPContext olapContext) {
- if (CubeCapabilityChecker.check(cube, olapContext.getSQLDigest(), false))
- return;
-
- CubeDesc cubeDesc = cube.getDescriptor();
- Collection<FunctionDesc> cubeFuncs = cubeDesc.listAllFunctions();
- convertAggregationToDimension(olapContext, cubeFuncs, cubeDesc.getFactTable());
- }
-
- private static void convertAggregationToDimension(OLAPContext olapContext, Collection<FunctionDesc> availableAggregations, String factTableName) {
- Iterator<FunctionDesc> it = olapContext.aggregations.iterator();
- while (it.hasNext()) {
- FunctionDesc functionDesc = it.next();
- if (!availableAggregations.contains(functionDesc)) {
- // try to convert the metric to dimension to see if it works
- TblColRef col = functionDesc.getParameter().getColRefs().get(0);
- functionDesc.setDimensionAsMetric(true);
- olapContext.rewriteFields.remove(functionDesc.getRewriteFieldName());
- if (col != null) {
- olapContext.metricsColumns.remove(col);
- olapContext.groupByColumns.add(col);
- }
- logger.info("Adjust OLAPContext for " + functionDesc);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/CubesSortRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/CubesSortRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/CubesSortRule.java
deleted file mode 100644
index 1271344..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/CubesSortRule.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.kylin.common.util.PartialSorter;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-
-/**
- * Created by Hongbin Ma(Binmahone) on 1/5/15.
- */
-public class CubesSortRule extends RoutingRule {
- @Override
- public void apply(List<IRealization> realizations, OLAPContext olapContext) {
-
- // sort cube candidates, 0) the cost indicator, 1) the lesser header
- // columns the better, 2) the lesser body columns the better
- List<Integer> items = super.findRealizationsOf(realizations, RealizationType.CUBE);
- PartialSorter.partialSort(realizations, items, new Comparator<IRealization>() {
- @Override
- public int compare(IRealization o1, IRealization o2) {
- CubeInstance c1 = (CubeInstance) o1;
- CubeInstance c2 = (CubeInstance) o2;
- int comp = 0;
- comp = c1.getCost() - c2.getCost();
- if (comp != 0) {
- return comp;
- }
-
- CubeDesc schema1 = c1.getDescriptor();
- CubeDesc schema2 = c2.getDescriptor();
-
- comp = schema1.listDimensionColumnsIncludingDerived().size() - schema2.listDimensionColumnsIncludingDerived().size();
- if (comp != 0)
- return comp;
-
- comp = schema1.getMeasures().size() - schema2.getMeasures().size();
- return comp;
- }
- });
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
deleted file mode 100644
index 7513efd..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-
-import com.google.common.collect.Maps;
-
-/**
- * Created by Hongbin Ma(Binmahone) on 1/5/15.
- */
-public class RealizationPriorityRule extends RoutingRule {
- static Map<RealizationType, Integer> priorities = Maps.newHashMap();
-
- static {
- priorities.put(RealizationType.CUBE, 1);
- priorities.put(RealizationType.HYBRID, 1);
- priorities.put(RealizationType.INVERTED_INDEX, 2);
- }
-
- public static void setPriorities(Map<RealizationType, Integer> priorities) {
- RealizationPriorityRule.priorities = priorities;
- }
-
- public void apply(List<IRealization> realizations, OLAPContext olapContext) {
-
- Collections.sort(realizations, new Comparator<IRealization>() {
- @Override
- public int compare(IRealization o1, IRealization o2) {
- int i1 = priorities.get(o1.getType());
- int i2 = priorities.get(o2.getType());
- return i1 - i2;
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java
deleted file mode 100644
index 8a1a228..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-
-/**
- */
-public class RealizationSortRule extends RoutingRule {
- @Override
- public void apply(List<IRealization> realizations, final OLAPContext olapContext) {
-
- // sort cube candidates, 0) the priority 1) the cost indicator, 2) the lesser header
- // columns the better, 3) the lesser body columns the better 4) the larger date range the better
-
- Collections.sort(realizations, new Comparator<IRealization>() {
- @Override
- public int compare(IRealization o1, IRealization o2) {
- int i1 = RealizationPriorityRule.priorities.get(o1.getType());
- int i2 = RealizationPriorityRule.priorities.get(o2.getType());
- int comp = i1 - i2;
- if (comp != 0) {
- return comp;
- }
-
- comp = o1.getCost(olapContext.getSQLDigest()) - o2.getCost(olapContext.getSQLDigest());
- if (comp != 0) {
- return comp;
- }
-
- if (o1 instanceof HybridInstance)
- return -1;
- else if (o2 instanceof HybridInstance)
- return 1;
-
- return 0;
- }
- });
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java
deleted file mode 100644
index 40b0491..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-
-/**
- * Created by Hongbin Ma(Binmahone) on 1/5/15.
- */
-public class RemoveUncapableRealizationsRule extends RoutingRule {
- @Override
- public void apply(List<IRealization> realizations, OLAPContext olapContext) {
- for (Iterator<IRealization> iterator = realizations.iterator(); iterator.hasNext();) {
- IRealization realization = iterator.next();
- if (!realization.isCapable(olapContext.getSQLDigest())) {
- iterator.remove();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/SimpleQueryMoreColumnsCubeFirstRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/SimpleQueryMoreColumnsCubeFirstRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/SimpleQueryMoreColumnsCubeFirstRule.java
deleted file mode 100644
index b3628b3..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/SimpleQueryMoreColumnsCubeFirstRule.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.kylin.common.util.PartialSorter;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-
-/**
- * Created by Hongbin Ma(Binmahone) on 1/5/15.
- */
-public class SimpleQueryMoreColumnsCubeFirstRule extends RoutingRule {
- @Override
- public void apply(List<IRealization> realizations, OLAPContext olapContext) {
- List<Integer> itemIndexes = super.findRealizationsOf(realizations, RealizationType.CUBE);
-
- if (olapContext.isSimpleQuery()) {
- PartialSorter.partialSort(realizations, itemIndexes, new Comparator<IRealization>() {
- @Override
- public int compare(IRealization o1, IRealization o2) {
- CubeInstance c1 = (CubeInstance) o1;
- CubeInstance c2 = (CubeInstance) o2;
- return c1.getDescriptor().listDimensionColumnsIncludingDerived().size() - c2.getDescriptor().listDimensionColumnsIncludingDerived().size();
- }
- });
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java b/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java
new file mode 100644
index 0000000..d3c67d7
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.routing.rules;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kylin.query.routing.Candidate;
+import org.apache.kylin.query.routing.RoutingRule;
+
+/**
+ */
+public class RealizationSortRule extends RoutingRule {
+ @Override
+ public void apply(List<Candidate> candidates) {
+ Collections.sort(candidates);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java b/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java
new file mode 100644
index 0000000..576b47f
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.routing.rules;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.query.routing.Candidate;
+import org.apache.kylin.query.routing.RoutingRule;
+
+/**
+ */
+public class RemoveUncapableRealizationsRule extends RoutingRule {
+ @Override
+ public void apply(List<Candidate> candidates) {
+ for (Iterator<Candidate> iterator = candidates.iterator(); iterator.hasNext();) {
+ Candidate candidate = iterator.next();
+
+ CapabilityResult capability = candidate.getRealization().isCapable(candidate.getSqlDigest());
+ if (capability.capable)
+ candidate.setCapability(capability);
+ else
+ iterator.remove();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
index bbf1024..8b1ad29 100644
--- a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
+++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
@@ -175,7 +175,7 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab
metFields.add(fieldName);
ColumnDesc fakeCountCol = new ColumnDesc();
fakeCountCol.setName(fieldName);
- fakeCountCol.setDatatype(func.getSQLType().toString());
+ fakeCountCol.setDatatype(func.getRewriteFieldType().toString());
if (func.isCount())
fakeCountCol.setNullable(false);
fakeCountCol.init(sourceTable);
@@ -188,11 +188,11 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab
HashSet<ColumnDesc> updateColumns = Sets.newHashSet();
for (MeasureDesc m : mgr.listEffectiveMeasures(olapSchema.getProjectName(), sourceTable.getIdentity(), false)) {
if (m.getFunction().isSum()) {
- FunctionDesc functionDesc = m.getFunction();
- if (functionDesc.getReturnDataType() != functionDesc.getSQLType() && //
- functionDesc.getReturnDataType().isBigInt() && //
- functionDesc.getSQLType().isIntegerFamily()) {
- updateColumns.add(functionDesc.getParameter().getColRefs().get(0).getColumnDesc());
+ FunctionDesc func = m.getFunction();
+ if (//func.getReturnDataType() != func.getRewriteFieldType() && //
+ func.getReturnDataType().isBigInt() && //
+ func.getRewriteFieldType().isIntegerFamily()) {
+ updateColumns.add(func.getParameter().getColRefs().get(0).getColumnDesc());
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java b/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
deleted file mode 100644
index 9716ff6..0000000
--- a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.query.sqlfunc;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xjiang
- */
-public class HLLDistinctCountAggFunc {
-
- private static final Logger logger = LoggerFactory.getLogger(HLLDistinctCountAggFunc.class);
-
- public static HyperLogLogPlusCounter init() {
- return null;
- }
-
- public static HyperLogLogPlusCounter initAdd(Object v) {
- if (v instanceof Long) { // holistic case
- long l = (Long) v;
- return new FixedValueHLLCMockup(l);
- } else {
- HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
- return new HyperLogLogPlusCounter(c);
- }
- }
-
- public static HyperLogLogPlusCounter add(HyperLogLogPlusCounter counter, Object v) {
- if (v instanceof Long) { // holistic case
- long l = (Long) v;
- if (counter == null) {
- return new FixedValueHLLCMockup(l);
- } else {
- if (!(counter instanceof FixedValueHLLCMockup))
- throw new IllegalStateException("counter is not FixedValueHLLCMockup");
-
- ((FixedValueHLLCMockup) counter).set(l);
- return counter;
- }
- } else {
- HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
- if (counter == null) {
- return new HyperLogLogPlusCounter(c);
- } else {
- counter.merge(c);
- return counter;
- }
- }
- }
-
- public static HyperLogLogPlusCounter merge(HyperLogLogPlusCounter counter0, Object counter1) {
- return add(counter0, counter1);
- }
-
- public static long result(HyperLogLogPlusCounter counter) {
- return counter == null ? 0L : counter.getCountEstimate();
- }
-
- private static class FixedValueHLLCMockup extends HyperLogLogPlusCounter {
-
- private Long value = null;
-
- FixedValueHLLCMockup(long value) {
- this.value = value;
- }
-
- public void set(long value) {
- if (this.value == null) {
- this.value = value;
- } else {
- long oldValue = Math.abs(this.value.longValue());
- long take = Math.max(oldValue, value);
- logger.warn("Error to aggregate holistic count distinct, old value " + oldValue + ", new value " + value + ", taking " + take);
- this.value = -take; // make it obvious that this value is wrong
- }
- }
-
- @Override
- public void clear() {
- this.value = null;
- }
-
- @Override
- protected void add(long hash) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void merge(HyperLogLogPlusCounter another) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getCountEstimate() {
- return value;
- }
-
- @Override
- public void writeRegisters(ByteBuffer out) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void readRegisters(ByteBuffer in) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = super.hashCode();
- result = prime * result + (int) (value ^ (value >>> 32));
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (!super.equals(obj))
- return false;
- if (getClass() != obj.getClass())
- return false;
- FixedValueHLLCMockup other = (FixedValueHLLCMockup) obj;
- if (!value.equals(other.value))
- return false;
- return true;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
index 9356663..e8c03ae 100644
--- a/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
@@ -21,7 +21,7 @@ package org.apache.kylin.query.test;
import java.util.Map;
import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.routing.RoutingRules.RealizationPriorityRule;
+import org.apache.kylin.query.routing.Candidate;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -35,10 +35,10 @@ public class IIQueryTest extends KylinQueryTest {
KylinQueryTest.setUp();//invoke super class
Map<RealizationType, Integer> priorities = Maps.newHashMap();
- priorities.put(RealizationType.INVERTED_INDEX, 1);
- priorities.put(RealizationType.CUBE, 2);
- priorities.put(RealizationType.HYBRID, 2);
- RealizationPriorityRule.setPriorities(priorities);
+ priorities.put(RealizationType.INVERTED_INDEX, 0);
+ priorities.put(RealizationType.CUBE, 1);
+ priorities.put(RealizationType.HYBRID, 1);
+ Candidate.setPriorities(priorities);
}
@@ -47,10 +47,10 @@ public class IIQueryTest extends KylinQueryTest {
KylinQueryTest.tearDown();//invoke super class
Map<RealizationType, Integer> priorities = Maps.newHashMap();
- priorities.put(RealizationType.INVERTED_INDEX, 2);
- priorities.put(RealizationType.CUBE, 1);
- priorities.put(RealizationType.HYBRID, 1);
- RealizationPriorityRule.setPriorities(priorities);
+ priorities.put(RealizationType.INVERTED_INDEX, 1);
+ priorities.put(RealizationType.CUBE, 0);
+ priorities.put(RealizationType.HYBRID, 0);
+ Candidate.setPriorities(priorities);
}
@Test
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
index 84f1042..f9e575b 100644
--- a/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
@@ -150,7 +150,7 @@ public class KylinQueryTest extends KylinTestBase {
@Test
public void testSingleExecuteQuery() throws Exception {
- String queryFileName = "src/test/resources/query/sql/query39.sql";
+ String queryFileName = "src/test/resources/query/sql/query00.sql";
File sqlFile = new File(queryFileName);
String sql = getTextFromFile(sqlFile);
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java b/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
index 4ca9b47..95a11f8 100644
--- a/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
+++ b/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
@@ -209,7 +209,7 @@ public class KylinTestBase {
if (needSort) {
queryTable = new SortedTable(queryTable, columnNames);
}
- // printResult(queryTable);
+ printResult(queryTable);
return queryTable;
}
@@ -265,7 +265,7 @@ public class KylinTestBase {
if (needSort) {
queryTable = new SortedTable(queryTable, columnNames);
}
- // printResult(queryTable);
+ printResult(queryTable);
return queryTable;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/server/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
index 25f245b..0903f83 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -26,7 +26,6 @@ import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeBuildTypeEnum;
@@ -143,13 +142,8 @@ public class JobService extends BasicService {
builder.setSubmitter(submitter);
if (buildType == CubeBuildTypeEnum.BUILD) {
- if (cube.getDescriptor().hasHolisticCountDistinctMeasures() && cube.getSegments().size() > 0) {
- Pair<CubeSegment, CubeSegment> segs = getCubeManager().appendAndMergeSegments(cube, endDate);
- job = builder.buildAndMergeJob(segs.getFirst(), segs.getSecond());
- } else {
- CubeSegment newSeg = getCubeManager().appendSegments(cube, endDate);
- job = builder.buildJob(newSeg);
- }
+ CubeSegment newSeg = getCubeManager().appendSegments(cube, endDate);
+ job = builder.buildJob(newSeg);
} else if (buildType == CubeBuildTypeEnum.MERGE) {
CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, forceMergeEmptySeg);
job = builder.mergeJob(newSeg);
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java b/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java
index 49f1c32..f529145 100644
--- a/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java
@@ -20,7 +20,7 @@ package org.apache.kylin.storage.filter;
import java.util.List;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.LogicalTupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/ColumnValueRange.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/ColumnValueRange.java b/storage/src/main/java/org/apache/kylin/storage/hbase/ColumnValueRange.java
index 9ae4a6d..7533800 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/ColumnValueRange.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/ColumnValueRange.java
@@ -24,7 +24,7 @@ import java.util.Iterator;
import java.util.Set;
import org.apache.kylin.cube.kv.RowKeyColumnOrder;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
import org.apache.kylin.metadata.model.TblColRef;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
index fa4ccd7..9627efb 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
@@ -22,11 +22,11 @@ import java.io.IOException;
import java.text.MessageFormat;
import java.util.BitSet;
import java.util.Collection;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTableInterface;
@@ -42,6 +42,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.StorageException;
import org.apache.kylin.common.util.Array;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -51,19 +52,21 @@ import org.apache.kylin.cube.kv.RowKeyDecoder;
import org.apache.kylin.cube.kv.RowValueDecoder;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.metadata.tuple.Tuple;
+import org.apache.kylin.metadata.tuple.TupleInfo;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverEnabler;
-import org.apache.kylin.storage.tuple.Tuple;
-import org.apache.kylin.storage.tuple.Tuple.IDerivedColumnFiller;
-import org.apache.kylin.storage.tuple.TupleInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
/**
* @author xjiang
@@ -78,7 +81,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
private final Collection<TblColRef> dimensions;
private final TupleFilter filter;
private final Collection<TblColRef> groupBy;
- private final Collection<RowValueDecoder> rowValueDecoders;
+ private final List<RowValueDecoder> rowValueDecoders;
private final StorageContext context;
private final String tableName;
private final HTableInterface table;
@@ -88,12 +91,21 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
private Scan scan;
private ResultScanner scanner;
private Iterator<Result> resultIterator;
+ private CubeTupleConverter cubeTupleConverter;
private TupleInfo tupleInfo;
- private Tuple tuple;
+ private Tuple oneTuple;
+ private Tuple next;
private int scanCount;
private int scanCountDelta;
- public CubeSegmentTupleIterator(CubeSegment cubeSeg, Collection<HBaseKeyRange> keyRanges, HConnection conn, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
+ final List<MeasureType<?>> measureTypes;
+ final List<MeasureType.IAdvMeasureFiller> advMeasureFillers;
+ final List<Pair<Integer, Integer>> advMeasureIndexInRV;//first=> which rowValueDecoders,second => metric index
+
+ private int advMeasureRowsRemaining;
+ private int advMeasureRowIndex;
+
+ public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context) {
this.cube = cubeSeg.getCubeInstance();
this.cubeSeg = cubeSeg;
this.dimensions = dimensions;
@@ -103,14 +115,22 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
this.context = context;
this.tableName = cubeSeg.getStorageLocationIdentifier();
this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg);
+
+ measureTypes = Lists.newArrayList();
+ advMeasureFillers = Lists.newArrayListWithCapacity(1);
+ advMeasureIndexInRV = Lists.newArrayListWithCapacity(1);
+
+ this.cubeTupleConverter = new CubeTupleConverter();
+ this.tupleInfo = buildTupleInfo(keyRanges.get(0).getCuboid());
+ this.oneTuple = new Tuple(this.tupleInfo);
+ this.rangeIterator = keyRanges.iterator();
+
try {
this.table = conn.getTable(tableName);
} catch (Throwable t) {
throw new StorageException("Error when open connection to table " + tableName, t);
}
- this.rangeIterator = keyRanges.iterator();
- scanNextRange();
}
@Override
@@ -121,7 +141,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
private void closeScanner() {
flushScanCountDelta();
-
+
if (logger.isDebugEnabled() && scan != null) {
logger.debug("Scan " + scan.toString());
byte[] metricsBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
@@ -150,59 +170,90 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
}
}
+ private void flushScanCountDelta() {
+ context.increaseTotalScanCount(scanCountDelta);
+ scanCountDelta = 0;
+ }
+
@Override
- public boolean hasNext() {
- return rangeIterator.hasNext() || resultIterator.hasNext();
+ public void remove() {
+ throw new UnsupportedOperationException();
}
@Override
- public Tuple next() {
- // get next result from hbase
- Result result = null;
- while (hasNext()) {
- if (resultIterator.hasNext()) {
- result = this.resultIterator.next();
- scanCount++;
- if (++scanCountDelta >= 1000)
- flushScanCountDelta();
- break;
- } else {
- scanNextRange();
+ public boolean hasNext() {
+
+ if (next != null)
+ return true;
+
+ // consume any left rows from advanced measure filler
+ if (advMeasureRowsRemaining > 0) {
+ for (MeasureType.IAdvMeasureFiller filler : advMeasureFillers) {
+ filler.fillTuplle(oneTuple, advMeasureRowIndex);
}
+ advMeasureRowIndex++;
+ advMeasureRowsRemaining--;
+ next = oneTuple;
+ return true;
}
- if (result == null) {
- return null;
+
+ if (resultIterator == null) {
+ if (rangeIterator.hasNext() == false)
+ return false;
+
+ resultIterator = doScan(rangeIterator.next());
+ }
+
+ if (resultIterator.hasNext() == false) {
+ closeScanner();
+ resultIterator = null;
+ return hasNext();
}
- // translate result to tuple
+
+ Result result = resultIterator.next();
+ scanCount++;
+ if (++scanCountDelta >= 1000)
+ flushScanCountDelta();
+
+ // translate into tuple
+ List<MeasureType.IAdvMeasureFiller> retFillers = null;
try {
- translateResult(result, this.tuple);
+ retFillers = translateResult(result, oneTuple);
} catch (IOException e) {
- throw new IllegalStateException("Can't translate result " + result, e);
+ throw new RuntimeException(e);
}
- return this.tuple;
- }
- private void flushScanCountDelta() {
- context.increaseTotalScanCount(scanCountDelta);
- scanCountDelta = 0;
- }
+ // the simple case
+ if (retFillers == null) {
+ next = oneTuple;
+ return true;
+ }
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
+ // advanced measure filling, like TopN, will produce multiple tuples out of one record
+ advMeasureRowsRemaining = -1;
+ for (MeasureType.IAdvMeasureFiller filler : retFillers) {
+ if (advMeasureRowsRemaining < 0)
+ advMeasureRowsRemaining = filler.getNumOfRows();
+ if (advMeasureRowsRemaining != filler.getNumOfRows())
+ throw new IllegalStateException();
+ }
+ if (advMeasureRowsRemaining < 0)
+ throw new IllegalStateException();
- private void scanNextRange() {
- if (this.rangeIterator.hasNext()) {
- closeScanner();
- HBaseKeyRange keyRange = this.rangeIterator.next();
- this.tupleInfo = buildTupleInfo(keyRange.getCuboid());
- this.tuple = new Tuple(this.tupleInfo);
+ advMeasureRowIndex = 0;
+ return hasNext();
+ }
- this.resultIterator = doScan(keyRange);
- } else {
- this.resultIterator = Collections.<Result> emptyList().iterator();
+ @Override
+ public Tuple next() {
+ if (next == null) {
+ hasNext();
+ if (next == null)
+ throw new NoSuchElementException();
}
+ Tuple r = next;
+ next = null;
+ return r;
}
private final Iterator<Result> doScan(HBaseKeyRange keyRange) {
@@ -272,7 +323,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
scan.setCacheBlocks(true);
// cache less when there are memory hungry measures
- if (RowValueDecoder.hasMemHungryCountDistinct(rowValueDecoders)) {
+ if (RowValueDecoder.hasMemHungryMeasures(rowValueDecoders)) {
scan.setCaching(scan.getCaching() / 10);
}
}
@@ -306,6 +357,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
}
private TupleInfo buildTupleInfo(Cuboid cuboid) {
+
TupleInfo info = new TupleInfo();
int index = 0;
rowKeyDecoder.setCuboid(cuboid);
@@ -331,21 +383,48 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
info.setField(derivedField, derivedCol, derivedCol.getType().getName(), index++);
}
// add filler
- info.addDerivedColumnFiller(Tuple.newDerivedColumnFiller(rowColumns, hostCols, deriveInfo, info, CubeManager.getInstance(this.cube.getConfig()), cubeSeg));
+ cubeTupleConverter.addDerivedColumnFiller(CubeTupleConverter.newDerivedColumnFiller(rowColumns, hostCols, deriveInfo, info, CubeManager.getInstance(this.cube.getConfig()), cubeSeg));
}
}
- for (RowValueDecoder rowValueDecoder : this.rowValueDecoders) {
- List<String> names = rowValueDecoder.getNames();
+ for (int i = 0; i < rowValueDecoders.size(); i++) {
+ RowValueDecoder rowValueDecoder = rowValueDecoders.get(i);
+ List<String> measureNames = rowValueDecoder.getNames();
MeasureDesc[] measures = rowValueDecoder.getMeasures();
- for (int i = 0; i < measures.length; i++) {
- String dataType = measures[i].getFunction().getSQLType().getName();
- info.setField(names.get(i), null, dataType, index++);
+
+ BitSet projectionIndex = rowValueDecoder.getProjectionIndex();
+ for (int mi = projectionIndex.nextSetBit(0); mi >= 0; mi = projectionIndex.nextSetBit(mi + 1)) {
+ FunctionDesc aggrFunc = measures[mi].getFunction();
+ String dataType = measures[mi].getFunction().getRewriteFieldType().getName();
+ info.setField(measureNames.get(mi), null, dataType, index++);
+
+ MeasureType<?> measureType = aggrFunc.getMeasureType();
+ if (measureType.needAdvancedTupleFilling()) {
+ Map<TblColRef, Dictionary<String>> dictionaryMap = buildDictionaryMap(measureType.getColumnsNeedDictionary(aggrFunc));
+ advMeasureFillers.add(measureType.getAdvancedTupleFiller(aggrFunc, tupleInfo, dictionaryMap));
+ advMeasureIndexInRV.add(Pair.newPair(i, mi));
+ measureTypes.add(null);
+ } else {
+ measureTypes.add(measureType);
+ }
}
+ // for (int i = 0; i < measures.length; i++) {
+ // String dataType = measures[i].getFunction().getRewriteFieldType().getName();
+ // info.setField(measureNames.get(i), null, dataType, index++);
+ // }
}
return info;
}
+ // load only needed dictionaries
+ private Map<TblColRef, Dictionary<String>> buildDictionaryMap(List<TblColRef> columnsNeedDictionary) {
+ Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+ for (TblColRef col : columnsNeedDictionary) {
+ result.put(col, cubeSeg.getDictionary(col));
+ }
+ return result;
+ }
+
private String getFieldName(TblColRef column, Map<TblColRef, String> aliasMap) {
String name = null;
if (aliasMap != null) {
@@ -357,7 +436,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
return name;
}
- private void translateResult(Result res, Tuple tuple) throws IOException {
+ private List<MeasureType.IAdvMeasureFiller> translateResult(Result res, Tuple tuple) throws IOException {
// groups
byte[] rowkey = res.getRow();
rowKeyDecoder.decode(rowkey);
@@ -373,11 +452,12 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
}
// derived
- for (IDerivedColumnFiller filler : tupleInfo.getDerivedColumnFillers()) {
+ for (CubeTupleConverter.IDerivedColumnFiller filler : cubeTupleConverter.getDerivedColumnFillers()) {
filler.fillDerivedColumns(dimensionValues, tuple);
}
// aggregations
+ int measureIndex = 0;
for (RowValueDecoder rowValueDecoder : this.rowValueDecoders) {
HBaseColumnDesc hbaseColumn = rowValueDecoder.getHBaseColumn();
String columnFamily = hbaseColumn.getColumnFamilyName();
@@ -389,8 +469,23 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
Object[] measureValues = rowValueDecoder.getValues();
BitSet projectionIndex = rowValueDecoder.getProjectionIndex();
for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
- tuple.setMeasureValue(measureNames.get(i), measureValues[i]);
+ if (measureTypes.get(measureIndex) != null) {
+ measureTypes.get(measureIndex).fillTupleSimply(tuple, tupleInfo.getFieldIndex(measureNames.get(i)), measureValues[i]);
+ }
+ }
+ measureIndex++;
+ }
+
+ // advanced measure filling, due to possible row split, will complete at caller side
+ if (advMeasureFillers.isEmpty()) {
+ return null;
+ } else {
+ for (int i = 0; i < advMeasureFillers.size(); i++) {
+ Pair<Integer, Integer> metricLocation = advMeasureIndexInRV.get(i);
+ Object measureValue = rowValueDecoders.get(metricLocation.getFirst()).getValues()[metricLocation.getSecond()];
+ advMeasureFillers.get(i).reload(measureValue);
}
+ return advMeasureFillers;
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
index bbf088e..e612eb1 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
@@ -35,6 +35,7 @@ import java.util.TreeSet;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -45,8 +46,8 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.cube.model.HBaseColumnDesc;
import org.apache.kylin.cube.model.HBaseMappingDesc;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.LogicalTupleFilter;
@@ -87,6 +88,8 @@ public class CubeStorageEngine implements IStorageEngine {
@Override
public ITupleIterator search(StorageContext context, SQLDigest sqlDigest) {
+ // allow custom measures hack
+ notifyBeforeStorageQuery(sqlDigest);
Collection<TblColRef> groups = sqlDigest.groupbyColumns;
TupleFilter filter = sqlDigest.filter;
@@ -354,12 +357,6 @@ public class CubeStorageEngine implements IStorageEngine {
for (HBaseColumnDesc hbCol : hbCols) {
bestHBCol = hbCol;
bestIndex = hbCol.findMeasureIndex(aggrFunc);
- MeasureDesc measure = hbCol.getMeasures()[bestIndex];
- // criteria for holistic measure: Exact Aggregation && Exact Cuboid
- if (measure.isHolisticCountDistinct() && context.isExactAggregation()) {
- logger.info("Holistic count distinct chosen for " + aggrFunc);
- break;
- }
}
RowValueDecoder codec = codecMap.get(bestHBCol);
@@ -628,7 +625,7 @@ public class CubeStorageEngine implements IStorageEngine {
}
private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) {
- if (RowValueDecoder.hasMemHungryCountDistinct(valueDecoders) == false) {
+ if (RowValueDecoder.hasMemHungryMeasures(valueDecoders) == false) {
return;
}
@@ -638,7 +635,7 @@ public class CubeStorageEngine implements IStorageEngine {
BitSet projectionIndex = decoder.getProjectionIndex();
for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
FunctionDesc func = measures[i].getFunction();
- rowSizeEst += func.getReturnDataType().getSpaceEstimate();
+ rowSizeEst += func.getReturnDataType().getStorageBytesEstimate();
}
}
@@ -665,4 +662,13 @@ public class CubeStorageEngine implements IStorageEngine {
ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context);
}
+
+ private void notifyBeforeStorageQuery(SQLDigest sqlDigest) {
+ for (MeasureDesc measure : cubeDesc.getMeasures()) {
+ MeasureType<?> measureType = measure.getFunction().getMeasureType();
+ measureType.adjustSqlDigest(measure, sqlDigest);
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java
new file mode 100644
index 0000000..11987a4
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.Tuple;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+
+public class CubeTupleConverter {
+
+ private final List<IDerivedColumnFiller> derivedColumnFillers;
+
+ public CubeTupleConverter() {
+ derivedColumnFillers = new ArrayList<IDerivedColumnFiller>();
+ }
+
+ public void addDerivedColumnFiller(IDerivedColumnFiller derivedColumnFiller) {
+ derivedColumnFillers.add(derivedColumnFiller);
+ }
+
+ public List<IDerivedColumnFiller> getDerivedColumnFillers() {
+ return derivedColumnFillers;
+ }
+
+ // ============================================================================
+
+ public static IDerivedColumnFiller newDerivedColumnFiller(List<TblColRef> rowColumns, TblColRef[] hostCols, CubeDesc.DeriveInfo deriveInfo, TupleInfo tupleInfo, CubeManager cubeMgr, CubeSegment cubeSegment) {
+
+ int[] hostIndex = new int[hostCols.length];
+ for (int i = 0; i < hostCols.length; i++) {
+ hostIndex[i] = rowColumns.indexOf(hostCols[i]);
+ }
+ String[] derivedFieldNames = new String[deriveInfo.columns.length];
+ for (int i = 0; i < deriveInfo.columns.length; i++) {
+ derivedFieldNames[i] = tupleInfo.getFieldName(deriveInfo.columns[i]);
+ }
+
+ switch (deriveInfo.type) {
+ case LOOKUP:
+ LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSegment, deriveInfo.dimension);
+ return new LookupFiller(hostIndex, lookupTable, deriveInfo, derivedFieldNames);
+ case PK_FK:
+ // composite key are split, see CubeDesc.initDimensionColumns()
+ return new PKFKFiller(hostIndex[0], derivedFieldNames[0]);
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ public interface IDerivedColumnFiller {
+ public void fillDerivedColumns(List<String> rowValues, Tuple tuple);
+ }
+
+ static class PKFKFiller implements IDerivedColumnFiller {
+ final int hostIndex;
+ final String derivedFieldName;
+
+ public PKFKFiller(int hostIndex, String derivedFieldName) {
+ this.hostIndex = hostIndex;
+ this.derivedFieldName = derivedFieldName;
+ }
+
+ @Override
+ public void fillDerivedColumns(List<String> rowValues, Tuple tuple) {
+ String value = rowValues.get(hostIndex);
+ tuple.setDimensionValue(derivedFieldName, value);
+ }
+ }
+
+ static class LookupFiller implements IDerivedColumnFiller {
+
+ final int[] hostIndex;
+ final int hostLen;
+ final Array<String> lookupKey;
+ final LookupStringTable lookupTable;
+ final int[] derivedIndex;
+ final int derivedLen;
+ final String[] derivedFieldNames;
+
+ public LookupFiller(int[] hostIndex, LookupStringTable lookupTable, CubeDesc.DeriveInfo deriveInfo, String[] derivedFieldNames) {
+ this.hostIndex = hostIndex;
+ this.hostLen = hostIndex.length;
+ this.lookupKey = new Array<String>(new String[hostLen]);
+ this.lookupTable = lookupTable;
+ this.derivedIndex = new int[deriveInfo.columns.length];
+ this.derivedLen = derivedIndex.length;
+ this.derivedFieldNames = derivedFieldNames;
+
+ for (int i = 0; i < derivedLen; i++) {
+ derivedIndex[i] = deriveInfo.columns[i].getColumn().getZeroBasedIndex();
+ }
+ }
+
+ @Override
+ public void fillDerivedColumns(List<String> rowValues, Tuple tuple) {
+ for (int i = 0; i < hostLen; i++) {
+ lookupKey.data[i] = rowValues.get(hostIndex[i]);
+ }
+
+ String[] lookupRow = lookupTable.getRow(lookupKey);
+
+ if (lookupRow != null) {
+ for (int i = 0; i < derivedLen; i++) {
+ String value = lookupRow[derivedIndex[i]];
+ tuple.setDimensionValue(derivedFieldNames[i], value);
+ }
+ } else {
+ for (int i = 0; i < derivedLen; i++) {
+ tuple.setDimensionValue(derivedFieldNames[i], null);
+ }
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
index a115753..d188a44 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
@@ -52,7 +52,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
private ITupleIterator segmentIterator;
private int scanCount;
- public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
+ public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context) {
this.context = context;
int limit = context.getLimit();
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
index 47a011a..26f95b7 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
@@ -18,11 +18,12 @@
package org.apache.kylin.storage.hbase.coprocessor;
-import java.util.SortedMap;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
import com.google.common.collect.Maps;
+import org.apache.kylin.measure.MeasureAggregator;
+
+import java.util.SortedMap;
/**
* Created by Hongbin Ma(Binmahone) on 11/27/14.
@@ -60,7 +61,7 @@ public abstract class AggregationCache {
rowMemBytes = 0;
MeasureAggregator[] measureAggregators = aggBufMap.get(aggBufMap.firstKey());
for (MeasureAggregator agg : measureAggregators) {
- rowMemBytes += agg.getMemBytes();
+ rowMemBytes += agg.getMemBytesEstimate();
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorFilter.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorFilter.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorFilter.java
index b3e2d31..2f6bf67 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorFilter.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorFilter.java
@@ -24,7 +24,7 @@ import java.util.Set;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.cube.kv.RowKeyColumnIO;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.dict.ISegment;
import org.apache.kylin.metadata.filter.*;
import org.apache.kylin.metadata.filter.TupleFilterSerializer.Decorator;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorTupleFilterTranslator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorTupleFilterTranslator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorTupleFilterTranslator.java
index 6affc18..bd2b912 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorTupleFilterTranslator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorTupleFilterTranslator.java
@@ -19,7 +19,7 @@
package org.apache.kylin.storage.hbase.coprocessor;
import org.apache.kylin.cube.kv.RowKeyColumnIO;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java
index 83e84a9..c33d192 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java
@@ -21,7 +21,7 @@ package org.apache.kylin.storage.hbase.coprocessor.endpoint;
import java.util.Map;
import java.util.Set;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.measure.MeasureAggregator;
import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
import org.apache.kylin.storage.hbase.coprocessor.AggregationCache;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
index 028da1e..3b4a9b2 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
@@ -29,9 +29,10 @@ import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.invertedindex.index.RawTableRecord;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.measure.hllc.HLLCMeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
-import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.storage.hbase.coprocessor.CoprocessorConstants;
@@ -91,7 +92,7 @@ public class EndpointAggregators {
throw new IllegalStateException("Column " + functionDesc.getParameter().getValue() + " is not found in II");
}
- if (functionDesc.isCountDistinct()) {
+ if (HLLCMeasureType.isCountDistinct(functionDesc)) {
metricInfos[i] = new MetricInfo(MetricType.DistinctCount, index, functionDesc.getReturnDataType().getPrecision());
} else {
metricInfos[i] = new MetricInfo(MetricType.Normal, index);
@@ -143,10 +144,10 @@ public class EndpointAggregators {
MeasureAggregator[] aggrs = new MeasureAggregator[funcNames.length];
for (int i = 0; i < aggrs.length; i++) {
if (metricInfos[i].type == MetricType.DistinctCount) {
- aggrs[i] = MeasureAggregator.create(funcNames[i], dataTypes[i]);
+ aggrs[i] = MeasureAggregator.create(funcNames[i], DataType.getInstance(dataTypes[i]));
} else {
//all other fixed length measures can be aggregated as long
- aggrs[i] = MeasureAggregator.create(funcNames[i], "long");
+ aggrs[i] = MeasureAggregator.create(funcNames[i], DataType.getInstance("long"));
}
}
return aggrs;