You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/06/30 12:22:55 UTC
[kylin] 09/12: KYLIN-3370 enhance segment pruning
This is an automated email from the ASF dual-hosted git repository.
liyang pushed a commit to branch sync
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 18d8663f5e0902ca8093b8e3d6ca89993c1b5d3f
Author: ZhansShaoxiong <sh...@gmail.com>
AuthorDate: Mon Jun 18 10:55:51 2018 +0800
KYLIN-3370 enhance segment pruning
---
.../java/org/apache/kylin/cube/CubeSegment.java | 12 +
.../org/apache/kylin/cube/DimensionRangeInfo.java | 104 ++++++++
.../apache/kylin/cube/common/SegmentPruner.java | 168 +++++++++++++
.../kylin/cube/gridtable/ScanRangePlannerBase.java | 4 +-
.../apache/kylin/cube/DimensionRangeInfoTest.java | 86 +++++++
.../kylin/cube/common/SegmentPrunerTest.java | 186 ++++++++++++++
.../apache/kylin/metadata/filter/TupleFilter.java | 28 ++-
.../kylin/metadata/datatype/DataTypeOrderTest.java | 57 +++++
.../kylin/metadata/filter/TupleFilterTest.java | 23 ++
.../storage/gtrecord/CubeScanRangePlanner.java | 30 +--
.../storage/gtrecord/GTCubeStorageQueryBase.java | 9 +-
.../kylin/storage/gtrecord/DictGridTableTest.java | 69 +++---
.../mr/steps/CalculateStatsFromBaseCuboidJob.java | 1 -
.../mr/steps/FactDistinctColumnPartitioner.java | 5 +-
.../engine/mr/steps/FactDistinctColumnsJob.java | 2 +-
.../engine/mr/steps/FactDistinctColumnsMapper.java | 43 +---
.../mr/steps/FactDistinctColumnsMapperBase.java | 20 +-
.../mr/steps/FactDistinctColumnsReducer.java | 90 ++++---
.../steps/FactDistinctColumnsReducerMapping.java | 87 +++----
.../mr/steps/UpdateCubeInfoAfterBuildStep.java | 83 ++++---
.../mr/steps/UpdateCubeInfoAfterMergeStep.java | 16 +-
.../FactDistinctColumnsReducerMappingTest.java | 12 +-
.../cube/ssb_cube_with_dimention_range.json | 110 +++++++++
.../cube_desc/ssb_cube_with_dimention_range.json | 269 +++++++++++++++++++++
.../kylin/provision/BuildCubeWithEngine.java | 86 +++++--
25 files changed, 1312 insertions(+), 288 deletions(-)
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index d49c273..75c90a4 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -119,6 +119,10 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
@JsonInclude(JsonInclude.Include.NON_EMPTY)
private Map<String, String> additionalInfo = new LinkedHashMap<String, String>();
+ @JsonProperty("dimension_range_info_map")
+ @JsonInclude(JsonInclude.Include.NON_EMPTY)
+ private Map<String, DimensionRangeInfo> dimensionRangeInfoMap = Maps.newHashMap();
+
private Map<Long, Short> cuboidBaseShards = Maps.newConcurrentMap(); // cuboid id ==> base(starting) shard for this cuboid
// lazy init
@@ -575,4 +579,12 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
public void setSourcePartitionOffsetStart(Map<Integer, Long> sourcePartitionOffsetStart) {
this.sourcePartitionOffsetStart = sourcePartitionOffsetStart;
}
+
+ public Map<String, DimensionRangeInfo> getDimensionRangeInfoMap() {
+ return dimensionRangeInfoMap;
+ }
+
+ public void setDimensionRangeInfoMap(Map<String, DimensionRangeInfo> dimensionRangeInfoMap) {
+ this.dimensionRangeInfoMap = dimensionRangeInfoMap;
+ }
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
new file mode 100644
index 0000000..e36ca96
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube;
+
+import java.util.Map;
+
+import org.apache.kylin.metadata.datatype.DataTypeOrder;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Maps;
+
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+public class DimensionRangeInfo {
+
+ private static final Logger logger = LoggerFactory.getLogger(DimensionRangeInfo.class);
+
+ public static Map<String, DimensionRangeInfo> mergeRangeMap(DataModelDesc model, Map<String, DimensionRangeInfo> m1,
+ Map<String, DimensionRangeInfo> m2) {
+
+ if (!m1.keySet().equals(m2.keySet())) {
+ logger.warn("Merging incompatible maps of DimensionRangeInfo, keys in m1 " + m1.keySet() + ", keys in m2 "
+ + m2.keySet());
+ }
+
+ Map<String, DimensionRangeInfo> result = Maps.newHashMap();
+
+ for (String colId : m1.keySet()) {
+ if (!m2.containsKey(colId))
+ continue;
+
+ DimensionRangeInfo r1 = m1.get(colId);
+ DimensionRangeInfo r2 = m2.get(colId);
+
+ DimensionRangeInfo newR;
+ if (r1.getMin() == null && r1.getMax() == null) {
+ newR = r2; // when r1 is all null or has 0 records
+ } else if (r2.getMin() == null && r2.getMax() == null) {
+ newR = r1; // when r2 is all null or has 0 records
+ } else {
+ DataTypeOrder order = model.findColumn(colId).getType().getOrder();
+ String newMin = order.min(r1.getMin(), r2.getMin());
+ String newMax = order.max(r1.getMax(), r2.getMax());
+ newR = new DimensionRangeInfo(newMin, newMax);
+ }
+
+ result.put(colId, newR);
+ }
+
+ return result;
+ }
+
+ // ============================================================================
+
+ @JsonProperty("min")
+ private String min;
+
+ @JsonProperty("max")
+ private String max;
+
+ public DimensionRangeInfo() {}
+
+ public DimensionRangeInfo(String min, String max) {
+ if (min == null && max != null || min != null && max == null)
+ throw new IllegalStateException();
+
+ this.min = min;
+ this.max = max;
+ }
+
+ public String getMin() {
+ return min;
+ }
+
+ public String getMax() {
+ return max;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + min + ", " + max + "]";
+ }
+
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java b/core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java
new file mode 100644
index 0000000..098f334
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.common;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeOrder;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SegmentPruner {
+ private static final Logger logger = LoggerFactory.getLogger(SegmentPruner.class);
+
+ final private Set<CompareTupleFilter> mustTrueCompares;
+
+ public SegmentPruner(TupleFilter filter) {
+ this.mustTrueCompares = filter == null ? Collections.<CompareTupleFilter> emptySet()
+ : filter.findMustTrueCompareFilters();
+ }
+
+ public List<CubeSegment> listSegmentsForQuery(CubeInstance cube) {
+ List<CubeSegment> r = new ArrayList<>();
+ for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
+ if (check(seg))
+ r.add(seg);
+ }
+ return r;
+ }
+
+ public boolean check(CubeSegment seg) {
+
+ if (seg.getInputRecords() == 0) {
+ if (seg.getConfig().isSkippingEmptySegments()) {
+ logger.debug("Prune segment {} due to 0 input record", seg);
+ return false;
+ } else {
+ logger.debug("Insist scan of segment {} having 0 input record", seg);
+ }
+ }
+
+ Map<String, DimensionRangeInfo> segDimRangInfoMap = seg.getDimensionRangeInfoMap();
+ for (CompareTupleFilter comp : mustTrueCompares) {
+ TblColRef col = comp.getColumn();
+
+ DimensionRangeInfo dimRangeInfo = segDimRangInfoMap.get(col.getIdentity());
+ if (dimRangeInfo == null)
+ dimRangeInfo = tryDeduceRangeFromPartitionCol(seg, col);
+ if (dimRangeInfo == null)
+ continue;
+
+ String minVal = dimRangeInfo.getMin();
+ String maxVal = dimRangeInfo.getMax();
+
+ if (!satisfy(comp, minVal, maxVal)) {
+ logger.debug("Prune segment {} due to given filter", seg);
+ return false;
+ }
+ }
+
+ logger.debug("Pruner passed on segment {}", seg);
+ return true;
+ }
+
+ private DimensionRangeInfo tryDeduceRangeFromPartitionCol(CubeSegment seg, TblColRef col) {
+ DataModelDesc model = seg.getModel();
+ PartitionDesc part = model.getPartitionDesc();
+
+ if (!part.isPartitioned())
+ return null;
+ if (!col.equals(part.getPartitionDateColumnRef()))
+ return null;
+
+ // deduce the dim range from TSRange
+ TSRange tsRange = seg.getTSRange();
+ if (tsRange.start.isMin || tsRange.end.isMax)
+ return null; // DimensionRangeInfo cannot express infinite
+
+ String min = tsRangeToStr(tsRange.start.v, part);
+ String max = tsRangeToStr(tsRange.end.v - 1, part); // note the -1, end side is exclusive
+ return new DimensionRangeInfo(min, max);
+ }
+
+ private String tsRangeToStr(long ts, PartitionDesc part) {
+ String value;
+ DataType partitionColType = part.getPartitionDateColumnRef().getType();
+ if (partitionColType.isDate()) {
+ value = DateFormat.formatToDateStr(ts);
+ } else if (partitionColType.isTimeFamily()) {
+ value = DateFormat.formatToTimeWithoutMilliStr(ts);
+ } else if (partitionColType.isStringFamily() || partitionColType.isIntegerFamily()) {//integer like 20160101
+ String partitionDateFormat = part.getPartitionDateFormat();
+ if (StringUtils.isEmpty(partitionDateFormat)) {
+ value = "" + ts;
+ } else {
+ value = DateFormat.formatToDateStr(ts, partitionDateFormat);
+ }
+ } else {
+ throw new RuntimeException("Type " + partitionColType + " is not valid partition column type");
+ }
+ return value;
+ }
+
+ private boolean satisfy(CompareTupleFilter comp, String minVal, String maxVal) {
+
+ TblColRef col = comp.getColumn();
+ DataTypeOrder order = col.getType().getOrder();
+ String filterVal = toString(comp.getFirstValue());
+
+ switch (comp.getOperator()) {
+ case EQ:
+ case IN:
+ String filterMin = order.min((Set<String>) comp.getValues());
+ String filterMax = order.max((Set<String>) comp.getValues());
+ return order.compare(filterMin, maxVal) <= 0 && order.compare(minVal, filterMax) <= 0;
+ case LT:
+ return order.compare(minVal, filterVal) < 0;
+ case LTE:
+ return order.compare(minVal, filterVal) <= 0;
+ case GT:
+ return order.compare(maxVal, filterVal) > 0;
+ case GTE:
+ return order.compare(maxVal, filterVal) >= 0;
+ case NEQ:
+ case NOTIN:
+ case ISNULL:
+ case ISNOTNULL:
+ default:
+ return true;
+ }
+ }
+
+ private String toString(Object v) {
+ return v == null ? null : v.toString();
+ }
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
index 811d512..97e4dc3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
@@ -29,7 +29,6 @@ import java.util.Set;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.metadata.expression.TupleExpression;
@@ -49,14 +48,13 @@ public abstract class ScanRangePlannerBase {
//GT
protected GTInfo gtInfo;
protected TupleFilter gtFilter;
- protected Pair<ByteArray, ByteArray> gtStartAndEnd;
- protected TblColRef gtPartitionCol;
protected ImmutableBitSet gtDimensions;
protected ImmutableBitSet gtAggrGroups;
protected ImmutableBitSet gtAggrMetrics;
protected String[] gtAggrFuncs;
protected TupleFilter havingFilter;
protected boolean isPartitionColUsingDatetimeEncoding = true;
+ protected int onlyShardId = -1;
protected RecordComparator rangeStartComparator;
protected RecordComparator rangeEndComparator;
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java b/core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java
new file mode 100644
index 0000000..94acf9f
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.DataModelManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DimensionRangeInfoTest extends LocalFileMetadataTestCase {
+
+ @Before
+ public void setUp() throws Exception {
+ this.createTestMetadata();
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ }
+
+ @Test
+ public void testMergeRangeMap() {
+ DataModelDesc model = DataModelManager.getInstance(getTestConfig()).getDataModelDesc("ci_inner_join_model");
+ String colId = "TEST_KYLIN_FACT.CAL_DT";
+
+ // normal merge
+ {
+ Map<String, DimensionRangeInfo> m1 = new HashMap<>();
+ m1.put(colId, new DimensionRangeInfo("2012-01-01", "2012-05-31"));
+
+ Map<String, DimensionRangeInfo> m2 = new HashMap<>();
+ m2.put(colId, new DimensionRangeInfo("2012-06-01", "2013-06-30"));
+
+ DimensionRangeInfo r1 = DimensionRangeInfo.mergeRangeMap(model, m1, m2).get(colId);
+ Assert.assertEquals("2012-01-01", r1.getMin());
+ Assert.assertEquals("2013-06-30", r1.getMax());
+ }
+
+ // missing column on one side
+ {
+ Map<String, DimensionRangeInfo> m1 = new HashMap<>();
+ m1.put(colId, new DimensionRangeInfo("2012-01-01", "2012-05-31"));
+
+ Map<String, DimensionRangeInfo> m2 = new HashMap<>();
+
+ Assert.assertTrue(DimensionRangeInfo.mergeRangeMap(model, m1, m2).isEmpty());
+ }
+
+ // null min/max value (happens on empty segment, or all-null columns)
+ {
+ Map<String, DimensionRangeInfo> m1 = new HashMap<>();
+ m1.put(colId, new DimensionRangeInfo(null, null));
+
+ Map<String, DimensionRangeInfo> m2 = new HashMap<>();
+ m2.put(colId, new DimensionRangeInfo("2012-06-01", "2013-06-30"));
+
+ DimensionRangeInfo r1 = DimensionRangeInfo.mergeRangeMap(model, m1, m2).get(colId);
+ Assert.assertEquals("2012-06-01", r1.getMin());
+ Assert.assertEquals("2013-06-30", r1.getMax());
+ }
+
+ }
+}
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java
new file mode 100644
index 0000000..7bf7a87
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.common;
+
+import static org.apache.kylin.metadata.filter.TupleFilter.compare;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.SetAndUnsetSystemProp;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class SegmentPrunerTest extends LocalFileMetadataTestCase {
+ private CubeInstance cube;
+
+ @Before
+ public void setUp() {
+ this.createTestMetadata();
+ cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube("ssb_cube_with_dimention_range");
+ }
+
+ @After
+ public void after() {
+ this.cleanupTestMetadata();
+ }
+
+ @Test
+ public void testEmptySegment() {
+ CubeSegment seg = cube.getFirstSegment();
+ TblColRef col = cube.getModel().findColumn("CUSTOMER.C_NATION");
+
+ // a normal hit
+ TupleFilter f = compare(col, FilterOperatorEnum.EQ, "CHINA");
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertTrue(segmentPruner.check(seg));
+
+ // make the segment empty, it should be pruned
+ seg.setInputRecords(0);
+ Assert.assertFalse(segmentPruner.check(seg));
+ }
+
+ @Test
+ public void testDimensionRangeCheck() {
+ CubeSegment cubeSegment = cube.getSegments().getFirstSegment();
+
+ //integer
+ TblColRef qtyCol = cube.getModel().findColumn("V_LINEORDER.LO_QUANTITY");
+ TupleFilter constFilter_LO_QUANTITY0 = new ConstantTupleFilter(Sets.newHashSet("8", "18", "28"));//between min and max value
+ TupleFilter constFilter_LO_QUANTITY1 = new ConstantTupleFilter("1");//min value
+ TupleFilter constFilter_LO_QUANTITY2 = new ConstantTupleFilter("50");//max value
+ TupleFilter constFilter_LO_QUANTITY3 = new ConstantTupleFilter("0");//lt min value
+ TupleFilter constFilter_LO_QUANTITY4 = new ConstantTupleFilter("200");//gt max value
+ TupleFilter constFilter_LO_QUANTITY5 = new ConstantTupleFilter(Sets.newHashSet("51", "52", "53"));//gt max values
+
+ // is null
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.ISNULL);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertTrue(segmentPruner.check(cubeSegment));
+ }
+
+ //lt min value
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.LT, constFilter_LO_QUANTITY1);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertFalse(segmentPruner.check(cubeSegment));
+ }
+
+ //lte min value
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.LTE, constFilter_LO_QUANTITY1);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertTrue(segmentPruner.check(cubeSegment));
+ }
+
+ //lt max value
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.LT, constFilter_LO_QUANTITY2);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertTrue(segmentPruner.check(cubeSegment));
+ }
+
+ //gt max value
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.GT, constFilter_LO_QUANTITY2);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertFalse(segmentPruner.check(cubeSegment));
+ }
+
+ //gte max value
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.GTE, constFilter_LO_QUANTITY2);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertTrue(segmentPruner.check(cubeSegment));
+ }
+
+ //gt min value
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.GT, constFilter_LO_QUANTITY1);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertTrue(segmentPruner.check(cubeSegment));
+ }
+
+ //in over-max values
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.IN, constFilter_LO_QUANTITY5);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertFalse(segmentPruner.check(cubeSegment));
+ }
+
+ //in normal values
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.IN, constFilter_LO_QUANTITY0);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertTrue(segmentPruner.check(cubeSegment));
+ }
+
+ //lte under-min value
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.LTE, constFilter_LO_QUANTITY3);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertFalse(segmentPruner.check(cubeSegment));
+ }
+
+ //gte over-max value
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.GTE, constFilter_LO_QUANTITY4);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertFalse(segmentPruner.check(cubeSegment));
+ }
+ }
+
+ @Test
+ public void testLegacyCubeSeg() {
+ // legacy cube segments does not have DimensionRangeInfo, but with TSRange can do some pruning
+ CubeInstance cube = CubeManager.getInstance(getTestConfig())
+ .getCube("test_kylin_cube_without_slr_left_join_ready_2_segments");
+
+ TblColRef col = cube.getModel().findColumn("TEST_KYLIN_FACT.CAL_DT");
+ CubeSegment seg = cube.getSegments(SegmentStatusEnum.READY).get(0);
+ TSRange tsRange = seg.getTSRange();
+ long start = tsRange.start.v;
+
+ try (SetAndUnsetSystemProp sns = new SetAndUnsetSystemProp("kylin.query.skip-empty-segments", "false")) {
+ {
+ TupleFilter f = compare(col, FilterOperatorEnum.LTE, start);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertTrue(segmentPruner.check(seg));
+ }
+ {
+ TupleFilter f = compare(col, FilterOperatorEnum.LT, start);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertFalse(segmentPruner.check(seg));
+ }
+ }
+ }
+}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
index 16ea8ee..43b204a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
/**
*
@@ -92,7 +93,7 @@ public abstract class TupleFilter {
public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object val) {
CompareTupleFilter r = new CompareTupleFilter(op);
r.addChild(new ColumnTupleFilter(col));
- r.addChild(new ConstantTupleFilter(val));
+ r.addChild(val instanceof ConstantTupleFilter ? (ConstantTupleFilter) val : new ConstantTupleFilter(val));
return r;
}
@@ -309,6 +310,31 @@ public abstract class TupleFilter {
}
}
+ //find must true compareTupleFilter
+ public Set<CompareTupleFilter> findMustTrueCompareFilters() {
+ Set<CompareTupleFilter> result = Sets.newHashSet();
+ findMustTrueCompareFilters(this, result);
+ return result;
+ }
+
+ private void findMustTrueCompareFilters(TupleFilter filter, Set<CompareTupleFilter> result) {
+ if (filter instanceof CompareTupleFilter) {
+ if (((CompareTupleFilter) filter).getColumn() != null) {
+ result.add((CompareTupleFilter) filter);
+ }
+ return;
+ }
+
+ if (filter instanceof LogicalTupleFilter) {
+ if (filter.getOperator() == FilterOperatorEnum.AND) {
+ for (TupleFilter child : filter.getChildren()) {
+ findMustTrueCompareFilters(child, result);
+ }
+ }
+ return;
+ }
+ }
+
public abstract boolean isEvaluable();
public abstract boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs);
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java
new file mode 100644
index 0000000..7666ffe
--- /dev/null
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Set;
+
+public class DataTypeOrderTest {
+ @Test
+ public void testDataTypeOrder() {
+ DataType intType = DataType.getType("integer");
+ DataTypeOrder dataTypeOrder = intType.getOrder();
+ Set<String> integers = Sets.newHashSet("100000", "2", "1000", "100", "77", "10", "9", "2000000", "-10000", "0");
+ Assert.assertEquals("2000000", dataTypeOrder.max(integers));
+ Assert.assertEquals("-10000", dataTypeOrder.min(integers));
+
+ DataType doubleType = DataType.getType("double");
+ dataTypeOrder = doubleType.getOrder();
+ Set<String> doubels = Sets.newHashSet("1.1", "-299.5", "100000", "1.000", "4.000000001", "0.00", "-1000000.231231", "8000000",
+ "10", "10.00");
+ Assert.assertEquals("8000000", dataTypeOrder.max(doubels));
+ Assert.assertEquals("-1000000.231231", dataTypeOrder.min(doubels));
+
+ DataType datetimeType = DataType.getType("date");
+ dataTypeOrder = datetimeType.getOrder();
+ Set<String> datetimes = Sets.newHashSet("2010-01-02", "2888-08-09", "2018-05-26", "1527512082000", "2010-02-03 23:59:59",
+ "2000-12-12 12:00:00", "1970-01-19 00:18:32", "1998-12-02", "2018-05-28 10:00:00.255", "1995-09-20 20:00:00.220");
+ Assert.assertEquals("2888-08-09", dataTypeOrder.max(datetimes));
+ Assert.assertEquals("1970-01-19 00:18:32", dataTypeOrder.min(datetimes));
+
+ DataType stringType = new DataType("varchar", 256, 10);
+ dataTypeOrder = stringType.getOrder();
+ Set<String> strings = Sets.newHashSet(null, "", "中国", "China No.1", "神兽麒麟", "Rocket", "Apache Kylin", "google", "NULL",
+ "empty");
+ Assert.assertEquals("神兽麒麟", dataTypeOrder.max(strings));
+ Assert.assertEquals("", dataTypeOrder.min(strings));
+ }
+}
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java
index b1e8f62..95609eb 100644
--- a/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java
@@ -116,4 +116,27 @@ public class TupleFilterTest {
r.put(col2, v2);
return r;
}
+
+ @Test
+ public void testMustTrueTupleFilter() {
+ TupleFilter andFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+ TupleFilter andFilter2 = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+ TupleFilter orFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.OR);
+ andFilter.addChild(andFilter2);
+ andFilter.addChild(orFilter);
+
+ Set<CompareTupleFilter> trueTupleFilters = andFilter.findMustTrueCompareFilters();
+ Assert.assertTrue(trueTupleFilters.isEmpty());
+
+ TupleFilter compFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+ compFilter.addChild(new ColumnTupleFilter(TblColRef.newInnerColumn("test1", TblColRef.InnerDataTypeEnum.LITERAL)));
+ TupleFilter compFilter2 = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+ compFilter2.addChild(new ColumnTupleFilter(TblColRef.newInnerColumn("test2", TblColRef.InnerDataTypeEnum.LITERAL)));
+ andFilter2.addChild(compFilter);
+ orFilter.addChild(compFilter2);
+ Assert.assertEquals(Sets.newHashSet(compFilter), andFilter.findMustTrueCompareFilters());
+
+ Assert.assertEquals(Sets.newHashSet(compFilter2), compFilter2.findMustTrueCompareFilters());
+ }
+
}
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 5f86a45..229ef01 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -31,7 +31,6 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.FuzzyValueCombination;
import org.apache.kylin.cube.cuboid.Cuboid;
@@ -39,7 +38,6 @@ import org.apache.kylin.cube.gridtable.CubeGridTable;
import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.gridtable.RecordComparators;
import org.apache.kylin.cube.gridtable.ScanRangePlannerBase;
-import org.apache.kylin.cube.gridtable.SegmentGTStartAndEnd;
import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.gridtable.GTInfo;
@@ -137,15 +135,9 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
this.gtDynColumns = new ImmutableBitSet(tmpGtDynCols);
this.gtRtAggrMetrics = mapping.makeGridTableColumns(tmpRtAggrMetrics);
- if (cubeSegment.getModel().getPartitionDesc().isPartitioned()) {
- int index = mapping.getIndexOf(cubeSegment.getModel().getPartitionDesc().getPartitionDateColumnRef());
- if (index >= 0) {
- SegmentGTStartAndEnd segmentGTStartAndEnd = new SegmentGTStartAndEnd(cubeSegment, gtInfo);
- this.gtStartAndEnd = segmentGTStartAndEnd.getSegmentStartAndEnd(index);
- this.isPartitionColUsingDatetimeEncoding = segmentGTStartAndEnd.isUsingDatetimeEncoding(index);
- this.gtPartitionCol = gtInfo.colRef(index);
- }
- }
+ this.gtAggrGroups = mapping.makeGridTableColumns(replaceDerivedColumns(groupByPushDown, cubeSegment.getCubeDesc()));
+ this.gtAggrMetrics = mapping.makeGridTableColumns(metrics);
+ this.gtAggrFuncs = mapping.makeAggrFuncs(metrics);
}
protected StorageContext context;
@@ -153,7 +145,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
/**
* Construct GTScanRangePlanner with incomplete information. For UT only.
*/
- public CubeScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> gtStartAndEnd, TblColRef gtPartitionCol, TupleFilter gtFilter) {
+ public CubeScanRangePlanner(GTInfo info, TblColRef gtPartitionCol, TupleFilter gtFilter) {
this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
this.maxFuzzyKeysPerSplit = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
@@ -170,8 +162,6 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
this.rangeStartEndComparator = RecordComparators.getRangeStartEndComparator(comp);
this.gtFilter = gtFilter;
- this.gtStartAndEnd = gtStartAndEnd;
- this.gtPartitionCol = gtPartitionCol;
}
public GTScanRequest planScanRequest() {
@@ -237,18 +227,6 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap();
for (ColumnRange range : andDimRanges) {
- if (gtPartitionCol != null && range.column.equals(gtPartitionCol)) {
- int beginCompare = rangeStartEndComparator.comparator.compare(range.begin, gtStartAndEnd.getSecond());
- int endCompare = rangeStartEndComparator.comparator.compare(gtStartAndEnd.getFirst(), range.end);
-
- if ((isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare < 0) || (!isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare <= 0)) {
- //segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded when using dict encoding, so use <= when has equals in condition.
- } else {
- logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}", //
- gtPartitionCol, makeReadable(gtStartAndEnd.getFirst()), makeReadable(gtStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end));
- return null;
- }
- }
int col = range.column.getColumnDesc().getZeroBasedIndex();
if (!gtInfo.getPrimaryKey().get(col))
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 352a868..269833f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -33,6 +33,7 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.RawQueryLastHacker;
+import org.apache.kylin.cube.common.SegmentPruner;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.gridtable.CuboidToGridTableMappingExt;
@@ -87,14 +88,10 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
GTCubeStorageQueryRequest request = getStorageQueryRequest(context, sqlDigest, returnTupleInfo);
List<CubeSegmentScanner> scanners = Lists.newArrayList();
- for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
+ SegmentPruner segPruner = new SegmentPruner(sqlDigest.filter);
+ for (CubeSegment cubeSeg : segPruner.listSegmentsForQuery(cubeInstance)) {
CubeSegmentScanner scanner;
- if (cubeDesc.getConfig().isSkippingEmptySegments() && cubeSeg.getInputRecords() == 0) {
- logger.info("Skip cube segment {} because its input record is 0", cubeSeg);
- continue;
- }
-
scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), //
request.getGroups(), request.getDynGroups(), request.getDynGroupExprs(), //
request.getMetrics(), request.getDynFuncs(), //
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
index 073c12c..08bcb65 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
@@ -32,7 +32,6 @@ import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.gridtable.CubeCodeSystem;
import org.apache.kylin.dict.NumberDictionaryForestBuilder;
import org.apache.kylin.dict.StringBytesConverter;
@@ -121,13 +120,11 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
ByteArray segmentStart = enc(info, 0, "2015-01-14");
ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free
- ByteArray segmentEnd = enc(info, 0, "2015-01-15");
assertEquals(segmentStart, segmentStartX);
{
LogicalTupleFilter filter = and(timeComp0, ageComp1);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals(1, r.size());//scan range are [close,close]
assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
@@ -136,64 +133,57 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
}
{
LogicalTupleFilter filter = and(timeComp2, ageComp1);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
- assertEquals(0, r.size());
+ assertEquals(1, r.size());
}
{
LogicalTupleFilter filter = and(timeComp4, ageComp1);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
- assertEquals(0, r.size());
+ assertEquals(1, r.size());
}
{
LogicalTupleFilter filter = and(timeComp5, ageComp1);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
- assertEquals(0, r.size());
+ assertEquals(1, r.size());
}
{
LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1),
and(timeComp6, ageComp1));
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
- assertEquals(1, r.size());
- assertEquals("[1421193600000, 10]-[null, 10]", r.get(0).toString());
+ assertEquals(2, r.size());
+ assertEquals("[1421193600000, 10]-[null, 10]", r.get(1).toString());
assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]",
- r.get(0).fuzzyKeys.toString());
+ r.get(1).fuzzyKeys.toString());
}
{
LogicalTupleFilter filter = or(and(timeComp3, ageComp3), and(timeComp7, ageComp1));
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals("[[0, 10]-[1421280000000, 30]]", r.toString());
}
{
LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
- assertEquals(1, r.size());
- assertEquals("[1421193600000, null]-[null, null]", r.get(0).toString());
- assertEquals(0, r.get(0).fuzzyKeys.size());
+ assertEquals(2, r.size());
+ assertEquals("[1421193600000, null]-[null, null]", r.get(1).toString());
+ assertEquals(0, r.get(1).fuzzyKeys.size());
}
{
//skip FALSE filter
LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals(0, r.size());
}
{
//TRUE or FALSE filter
LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals(1, r.size());
assertEquals("[null, null]-[null, null]", r.get(0).toString());
@@ -201,8 +191,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
{
//TRUE or other filter
LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals(1, r.size());
assertEquals("[null, null]-[null, null]", r.get(0).toString());
@@ -211,12 +200,9 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
@Test
public void verifySegmentSkipping2() {
- ByteArray segmentEnd = enc(info, 0, "2015-01-15");
-
{
LogicalTupleFilter filter = and(timeComp0, ageComp1);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals(1, r.size());//scan range are [close,close]
assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
@@ -226,10 +212,9 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
{
LogicalTupleFilter filter = and(timeComp5, ageComp1);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
- assertEquals(0, r.size());//scan range are [close,close]
+ assertEquals(1, r.size());//scan range are [close,close]
}
}
@@ -239,7 +224,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
// flatten or-and & hbase fuzzy value
{
LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals(1, r.size());
assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
@@ -249,7 +234,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
// pre-evaluate ever false
{
LogicalTupleFilter filter = and(timeComp1, timeComp2);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals(0, r.size());
}
@@ -257,7 +242,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
// pre-evaluate ever true
{
LogicalTupleFilter filter = or(timeComp1, ageComp4);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals("[[null, null]-[null, null]]", r.toString());
}
@@ -265,7 +250,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
// merge overlap range
{
LogicalTupleFilter filter = or(timeComp1, timeComp3);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals("[[null, null]-[null, null]]", r.toString());
}
@@ -274,7 +259,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
{
LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2),
and(timeComp4, ageComp3));
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals(3, r.size());
assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
index f3bdabd..4305a25 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
@@ -104,7 +104,6 @@ public class CalculateStatsFromBaseCuboidJob extends AbstractHadoopJob {
private void setupReducer(Path output, CubeSegment cubeSeg) throws IOException {
int hllShardBase = MapReduceUtil.getCuboidHLLCounterReducerNum(cubeSeg.getCubeInstance());
- job.getConfiguration().setInt(BatchConstants.CFG_HLL_REDUCER_NUM, hllShardBase);
job.setReducerClass(CalculateStatsFromBaseCuboidReducer.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index 9bede82..4ea04d5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@ -55,8 +55,7 @@ public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortabl
String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
- reducerMapping = new FactDistinctColumnsReducerMapping(cube,
- conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
+ reducerMapping = new FactDistinctColumnsReducerMapping(cube);
}
@Override
@@ -65,8 +64,6 @@ public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortabl
if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);
- } else if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL) {
- return reducerMapping.getReducerIdForDatePartitionColumn();
} else {
return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index f96944a..8f5d176 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -131,6 +131,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
throws IOException {
FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeSeg.getCubeInstance());
int numberOfReducers = reducerMapping.getTotalReducerNum();
+ logger.info("{} has reducers {}.", this.getClass().getName(), numberOfReducers);
if (numberOfReducers > 250) {
throw new IllegalArgumentException(
"The max reducer number for FactDistinctColumnsJob is 250, but now it is "
@@ -141,7 +142,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
job.setReducerClass(FactDistinctColumnsReducer.class);
job.setPartitionerClass(FactDistinctColumnPartitioner.class);
job.setNumReduceTasks(numberOfReducers);
- job.getConfiguration().setInt(BatchConstants.CFG_HLL_REDUCER_NUM, reducerMapping.getCuboidRowCounterReducerNum());
// make each reducer output to respective dir
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index 569b810..fc9dc65 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -37,7 +37,6 @@ import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.kylin.measure.hllc.RegisterType;
import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,9 +67,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
private static final Text EMPTY_TEXT = new Text();
- private int partitionColumnIndex = -1;
- private boolean needFetchPartitionCol = true;
-
private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
@Override
@@ -96,18 +92,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE);
}
- TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
- if (partitionColRef != null) {
- partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef);
- }
-
- // check whether need fetch the partition col values
- if (partitionColumnIndex < 0) {
- // if partition col not on cube, no need
- needFetchPartitionCol = false;
- } else {
- needFetchPartitionCol = true;
- }
//for KYLIN-2518 backward compatibility
boolean isUsePutRowKeyToHllNewAlgorithm;
if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
@@ -172,12 +156,12 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
for (String[] row : rowCollection) {
context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
- for (int i = 0; i < dictCols.size(); i++) {
- String fieldValue = row[dictionaryColumnIndex[i]];
+ for (int i = 0; i < allDimDictCols.size(); i++) {
+ String fieldValue = row[columnIndex[i]];
if (fieldValue == null)
continue;
- int reducerIndex = reducerMapping.getReducerIdForDictCol(i, fieldValue);
+ int reducerIndex = reducerMapping.getReducerIdForCol(i, fieldValue);
tmpbuf.clear();
byte[] valueBytes = Bytes.toBytes(fieldValue);
@@ -188,15 +172,14 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
tmpbuf.put(valueBytes);
outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
- DataType type = dictCols.get(i).getType();
+ DataType type = allDimDictCols.get(i).getType();
sortableKey.init(outputKey, type);
- //judge type
context.write(sortableKey, EMPTY_TEXT);
// log a few rows for troubleshooting
if (rowCount < 10) {
logger.info(
- "Sample output: " + dictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
+ "Sample output: " + allDimDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
}
}
@@ -204,22 +187,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
putRowKeyToHLL(row);
}
- if (needFetchPartitionCol == true) {
- String fieldValue = row[partitionColumnIndex];
- if (fieldValue != null) {
- tmpbuf.clear();
- byte[] valueBytes = Bytes.toBytes(fieldValue);
- int size = valueBytes.length + 1;
- if (size >= tmpbuf.capacity()) {
- tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
- }
- tmpbuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL);
- tmpbuf.put(valueBytes);
- outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
- sortableKey.init(outputKey, (byte) 0);
- context.write(sortableKey, EMPTY_TEXT);
- }
- }
rowCount++;
}
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index c66042b..ceddeb5 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -39,8 +39,6 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.metadata.model.TblColRef;
-import com.google.common.collect.Lists;
-
/**
*/
abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, SelfDefineSortableKey, Text> {
@@ -50,8 +48,8 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
protected CubeSegment cubeSeg;
protected CubeDesc cubeDesc;
protected long baseCuboidId;
- protected List<TblColRef> dictCols;
protected IMRTableInputFormat flatTableInputFormat;
+ protected List<TblColRef> allDimDictCols;
protected Text outputKey = new Text();
//protected SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
@@ -59,7 +57,7 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
protected int errorRecordCounter = 0;
protected CubeJoinedFlatTableEnrich intermediateTableDesc;
- protected int[] dictionaryColumnIndex;
+ protected int[] columnIndex;
protected FactDistinctColumnsReducerMapping reducerMapping;
@@ -74,20 +72,18 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
cubeSeg = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
cubeDesc = cube.getDescriptor();
baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- dictCols = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt());
+ reducerMapping = new FactDistinctColumnsReducerMapping(cube);
+ allDimDictCols = reducerMapping.getAllDimDictCols();
flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
- dictionaryColumnIndex = new int[dictCols.size()];
- for (int i = 0; i < dictCols.size(); i++) {
- TblColRef colRef = dictCols.get(i);
+ columnIndex = new int[allDimDictCols.size()];
+ for (int i = 0; i < allDimDictCols.size(); i++) {
+ TblColRef colRef = allDimDictCols.get(i);
int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
- dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
+ columnIndex[i] = columnIndexOnFlatTbl;
}
-
- reducerMapping = new FactDistinctColumnsReducerMapping(cube,
- conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
}
protected void handleErrorRecord(String[] record, Exception ex) throws IOException {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 801771a..61ba247 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -71,17 +70,16 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
private boolean isStatistics = false;
private KylinConfig cubeConfig;
private int taskId;
- private boolean isPartitionCol = false;
private int rowCount = 0;
private FactDistinctColumnsReducerMapping reducerMapping;
//local build dict
private boolean buildDictInReducer;
private IDictionaryBuilder builder;
- private long timeMaxValue = Long.MIN_VALUE;
- private long timeMinValue = Long.MAX_VALUE;
+ private String maxValue = null;
+ private String minValue = null;
public static final String DICT_FILE_POSTFIX = ".rldict";
- public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".pci";
+ public static final String DIMENSION_COL_INFO_FILE_POSTFIX = ".dci";
private MultipleOutputs mos;
@@ -97,11 +95,9 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
cubeConfig = cube.getConfig();
cubeDesc = cube.getDescriptor();
- int numberOfTasks = context.getNumReduceTasks();
taskId = context.getTaskAttemptID().getTaskID().getId();
- reducerMapping = new FactDistinctColumnsReducerMapping(cube,
- conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
+ reducerMapping = new FactDistinctColumnsReducerMapping(cube);
logger.info("reducer no " + taskId + ", role play " + reducerMapping.getRolePlayOfReducer(taskId));
@@ -114,18 +110,9 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
samplingPercentage = Integer
.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
logger.info("Reducer " + taskId + " handling stats");
- } else if (reducerMapping.isPartitionColReducer(taskId)) {
- // partition col
- isPartitionCol = true;
- col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
- if (col == null) {
- logger.info("No partition col. This reducer will do nothing");
- } else {
- logger.info("Reducer " + taskId + " handling partition col " + col.getIdentity());
- }
} else {
// normal col
- col = reducerMapping.getDictColForReducer(taskId);
+ col = reducerMapping.getColForReducer(taskId);
Preconditions.checkNotNull(col);
// local build dict
@@ -133,7 +120,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder
buildDictInReducer = false;
}
- if (reducerMapping.getReducerNumForDictCol(col) > 1) {
+ if (reducerMapping.getReducerNumForDimCol(col) > 1) {
buildDictInReducer = false; // only works if this is the only reducer of a dictionary column
}
if (buildDictInReducer) {
@@ -167,24 +154,29 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
cuboidHLLMap.put(cuboidId, hll);
}
}
- } else if (isPartitionCol) {
- // partition col
+ } else {
String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
logAFewRows(value);
- long time = DateFormat.stringToMillis(value);
- timeMinValue = Math.min(timeMinValue, time);
- timeMaxValue = Math.max(timeMaxValue, time);
- } else {
- // normal col
- if (buildDictInReducer) {
- String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
- logAFewRows(value);
- builder.addValue(value);
- } else {
- byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
- // output written to baseDir/colName/-r-00000 (etc)
- String fileName = col.getIdentity() + "/";
- mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
+ // if dimension col, compute max/min value
+ if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+ if (minValue == null || col.getType().compare(minValue, value) > 0) {
+ minValue = value;
+ }
+ if (maxValue == null || col.getType().compare(maxValue, value) < 0) {
+ maxValue = value;
+ }
+ }
+
+ //if dict column
+ if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
+ if (buildDictInReducer) {
+ builder.addValue(value);
+ } else {
+ byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
+ // output written to baseDir/colName/-r-00000 (etc)
+ String fileName = col.getIdentity() + "/";
+ mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
+ }
}
}
@@ -207,11 +199,12 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
logMapperAndCuboidStatistics(allCuboids); // for human check
outputStatistics(allCuboids);
- } else if (isPartitionCol) {
- // partition col
- outputPartitionInfo();
} else {
- // normal col
+ //dimension col
+ if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+ outputDimRangeInfo();
+ }
+ // dic col
if (buildDictInReducer) {
Dictionary<String> dict = builder.build();
outputDict(col, dict);
@@ -221,14 +214,17 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
mos.close();
}
- private void outputPartitionInfo() throws IOException, InterruptedException {
- if (col != null) {
- // output written to baseDir/colName/colName.pci-r-00000 (etc)
- String partitionFileName = col.getIdentity() + "/" + col.getName() + PARTITION_COL_INFO_FILE_POSTFIX;
-
- mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMinValue), partitionFileName);
- mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMaxValue), partitionFileName);
- logger.info("write partition info for col : " + col.getName() + " minValue:" + timeMinValue + " maxValue:" + timeMaxValue);
+ private void outputDimRangeInfo() throws IOException, InterruptedException {
+ if (col != null && minValue != null) {
+ // output written to baseDir/colName/colName.dci-r-00000 (etc)
+ String dimRangeFileName = col.getIdentity() + "/" + col.getName() + DIMENSION_COL_INFO_FILE_POSTFIX;
+
+ mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new Text(minValue.getBytes()),
+ dimRangeFileName);
+ mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new Text(maxValue.getBytes()),
+ dimRangeFileName);
+ logger.info("write dimension range info for col : " + col.getName() + " minValue:" + minValue + " maxValue:"
+ + maxValue);
}
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java
index 51594c3..b60e4ce 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java
@@ -18,73 +18,74 @@
package org.apache.kylin.engine.mr.steps;
-import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.common.MapReduceUtil;
import org.apache.kylin.metadata.model.TblColRef;
+import com.google.common.collect.Lists;
+
/**
* Reducers play different roles based on reducer-id:
- * - (start from 0) one reducer for each dictionary column, UHC may have more than one reducer
- * - one reducer to get min/max of date partition column
+ * - (start from 0) one reducer for each dimension column, dictionary column, UHC may have more than one reducer
* - (at the end) one or more reducers to collect row counts for cuboids using HLL
*/
public class FactDistinctColumnsReducerMapping {
- public static final int MARK_FOR_PARTITION_COL = -2;
public static final int MARK_FOR_HLL_COUNTER = -1;
- final private int nDictValueCollectors;
- final private int datePartitionReducerId;
final private int nCuboidRowCounters;
+ final private int nDimReducers;
final private int nTotalReducers;
- final private List<TblColRef> allDictCols;
- final private int[] dictColIdToReducerBeginId;
+ final private List<TblColRef> allDimDictCols = Lists.newArrayList();
+ final private int[] colIdToReducerBeginId;
final private int[] reducerRolePlay; // >=0 for dict col id, <0 for partition col and hll counter (using markers)
public FactDistinctColumnsReducerMapping(CubeInstance cube) {
this(cube, 0);
}
- public FactDistinctColumnsReducerMapping(CubeInstance cube, int cuboidRowCounterReducerNum) {
+ private FactDistinctColumnsReducerMapping(CubeInstance cube, int cuboidRowCounterReducerNum) {
CubeDesc desc = cube.getDescriptor();
+ Set<TblColRef> allCols = cube.getAllColumns();
+ Set<TblColRef> dictCols = desc.getAllColumnsNeedDictionaryBuilt();
+ List<TblColRef> dimCols = desc.listDimensionColumnsExcludingDerived(true);
+ for (TblColRef colRef : allCols) {
+ if (dictCols.contains(colRef)) {
+ allDimDictCols.add(colRef);
+ } else if (dimCols.indexOf(colRef) >= 0){
+ allDimDictCols.add(colRef);
+ }
+ }
- allDictCols = new ArrayList(desc.getAllColumnsNeedDictionaryBuilt());
-
- dictColIdToReducerBeginId = new int[allDictCols.size() + 1];
+ colIdToReducerBeginId = new int[allDimDictCols.size() + 1];
int uhcReducerCount = cube.getConfig().getUHCReducerCount();
List<TblColRef> uhcList = desc.getAllUHCColumns();
int counter = 0;
- for (int i = 0; i < allDictCols.size(); i++) {
- dictColIdToReducerBeginId[i] = counter;
- boolean isUHC = uhcList.contains(allDictCols.get(i));
+ for (int i = 0; i < allDimDictCols.size(); i++) {
+ colIdToReducerBeginId[i] = counter;
+ boolean isUHC = uhcList.contains(allDimDictCols.get(i));
counter += (isUHC) ? uhcReducerCount : 1;
}
-
- dictColIdToReducerBeginId[allDictCols.size()] = counter;
- nDictValueCollectors = counter;
- datePartitionReducerId = counter;
+ colIdToReducerBeginId[allDimDictCols.size()] = counter;
+ nDimReducers = counter;
nCuboidRowCounters = cuboidRowCounterReducerNum == 0 ? //
MapReduceUtil.getCuboidHLLCounterReducerNum(cube) : cuboidRowCounterReducerNum;
- nTotalReducers = nDictValueCollectors + 1 + nCuboidRowCounters;
+ nTotalReducers = nDimReducers + nCuboidRowCounters;
reducerRolePlay = new int[nTotalReducers];
for (int i = 0, dictId = 0; i < nTotalReducers; i++) {
- if (i > datePartitionReducerId) {
+ if (i >= nDimReducers) {
// cuboid HLL counter reducer
reducerRolePlay[i] = MARK_FOR_HLL_COUNTER;
- } else if (i == datePartitionReducerId) {
- // date partition min/max reducer
- reducerRolePlay[i] = MARK_FOR_PARTITION_COL;
} else {
- // dict value collector reducer
- if (i == dictColIdToReducerBeginId[dictId + 1])
+ if (i == colIdToReducerBeginId[dictId + 1])
dictId++;
reducerRolePlay[i] = dictId;
@@ -92,8 +93,8 @@ public class FactDistinctColumnsReducerMapping {
}
}
- public List<TblColRef> getAllDictCols() {
- return allDictCols;
+ public List<TblColRef> getAllDimDictCols() {
+ return allDimDictCols;
}
public int getTotalReducerNum() {
@@ -104,9 +105,9 @@ public class FactDistinctColumnsReducerMapping {
return nCuboidRowCounters;
}
- public int getReducerIdForDictCol(int dictColId, Object fieldValue) {
- int begin = dictColIdToReducerBeginId[dictColId];
- int span = dictColIdToReducerBeginId[dictColId + 1] - begin;
+ public int getReducerIdForCol(int colId, Object fieldValue) {
+ int begin = colIdToReducerBeginId[colId];
+ int span = colIdToReducerBeginId[colId + 1] - begin;
if (span == 1)
return begin;
@@ -120,37 +121,29 @@ public class FactDistinctColumnsReducerMapping {
}
public int getRolePlayOfReducer(int reducerId) {
- return reducerRolePlay[reducerId];
+ return reducerRolePlay[reducerId % nTotalReducers];
}
public boolean isCuboidRowCounterReducer(int reducerId) {
return getRolePlayOfReducer(reducerId) == MARK_FOR_HLL_COUNTER;
}
-
- public boolean isPartitionColReducer(int reducerId) {
- return getRolePlayOfReducer(reducerId) == MARK_FOR_PARTITION_COL;
- }
- public TblColRef getDictColForReducer(int reducerId) {
- int role = getRolePlayOfReducer(reducerId);
+ public TblColRef getColForReducer(int reducerId) {
+ int role = getRolePlayOfReducer(reducerId % nTotalReducers);
if (role < 0)
throw new IllegalStateException();
- return allDictCols.get(role);
- }
-
- public int getReducerNumForDictCol(TblColRef col) {
- int dictColId = allDictCols.indexOf(col);
- return dictColIdToReducerBeginId[dictColId + 1] - dictColIdToReducerBeginId[dictColId];
+ return allDimDictCols.get(role);
}
- public int getReducerIdForDatePartitionColumn() {
- return datePartitionReducerId;
+ public int getReducerNumForDimCol(TblColRef col) {
+ int dictColId = allDimDictCols.indexOf(col);
+ return colIdToReducerBeginId[dictColId + 1] - colIdToReducerBeginId[dictColId];
}
public int getReducerIdForCuboidRowCount(long cuboidId) {
int rowCounterId = (int) (Math.abs(cuboidId) % nCuboidRowCounters);
- return datePartitionReducerId + 1 + rowCounterId;
+ return nDimReducers + rowCounterId;
}
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index f749c80..fdb19db 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -23,15 +23,18 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
import org.apache.kylin.cube.model.SnapshotTableDesc;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.LookupMaterializeContext;
@@ -40,11 +43,14 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.datatype.DataTypeOrder;
import org.apache.kylin.metadata.model.SegmentRange.TSRange;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Sets;
+
/**
*/
public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
@@ -76,9 +82,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
try {
saveExtSnapshotIfNeeded(cubeManager, cube, segment);
- if (segment.isOffsetCube()) {
- updateTimeRange(segment);
- }
+ updateSegment(segment);
cubeManager.promoteNewlyBuiltSegments(cube, segment);
return new ExecuteResult();
@@ -115,40 +119,51 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
}
}
- private void updateTimeRange(CubeSegment segment) throws IOException {
+ private void updateSegment(CubeSegment segment) throws IOException {
final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
- if (partitionCol == null) {
- return;
- }
- final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
- Path colDir = new Path(factColumnsInputPath, partitionCol.getIdentity());
- FileSystem fs = HadoopUtil.getWorkingFileSystem();
- Path outputFile = HadoopUtil.getFilterOnlyPath(fs, colDir,
- partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
- if (outputFile == null) {
- throw new IOException("fail to find the partition file in base dir: " + colDir);
- }
-
- FSDataInputStream is = null;
- BufferedReader bufferedReader = null;
- InputStreamReader isr = null;
- long minValue, maxValue;
- try {
- is = fs.open(outputFile);
- isr = new InputStreamReader(is);
- bufferedReader = new BufferedReader(isr);
- minValue = Long.parseLong(bufferedReader.readLine());
- maxValue = Long.parseLong(bufferedReader.readLine());
- } finally {
- IOUtils.closeQuietly(is);
- IOUtils.closeQuietly(isr);
- IOUtils.closeQuietly(bufferedReader);
- }
+ for (TblColRef dimColRef : segment.getCubeDesc().listDimensionColumnsExcludingDerived(true)) {
+ final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
+ Path colDir = new Path(factColumnsInputPath, dimColRef.getIdentity());
+ FileSystem fs = HadoopUtil.getWorkingFileSystem();
+
+ //handle multiple reducers
+ Path[] outputFiles = HadoopUtil.getFilteredPath(fs, colDir,
+ dimColRef.getName() + FactDistinctColumnsReducer.DIMENSION_COL_INFO_FILE_POSTFIX);
+ if (outputFiles == null || outputFiles.length == 0) {
+ segment.getDimensionRangeInfoMap().put(dimColRef.getIdentity(), new DimensionRangeInfo(null, null));
+ continue;
+ }
- logger.info("updateTimeRange step. minValue:" + minValue + " maxValue:" + maxValue);
- if (minValue != timeMinValue && maxValue != timeMaxValue) {
- segment.setTSRange(new TSRange(minValue, maxValue + 1));
+ FSDataInputStream is = null;
+ BufferedReader bufferedReader = null;
+ InputStreamReader isr = null;
+ Set<String> minValues = Sets.newHashSet(), maxValues = Sets.newHashSet();
+ for (Path outputFile : outputFiles) {
+ try {
+ is = fs.open(outputFile);
+ isr = new InputStreamReader(is);
+ bufferedReader = new BufferedReader(isr);
+ minValues.add(bufferedReader.readLine());
+ maxValues.add(bufferedReader.readLine());
+ } finally {
+ IOUtils.closeQuietly(is);
+ IOUtils.closeQuietly(isr);
+ IOUtils.closeQuietly(bufferedReader);
+ }
+ }
+ DataTypeOrder order = dimColRef.getType().getOrder();
+ String minValue = order.min(minValues);
+ String maxValue = order.max(maxValues);
+ logger.info("updateSegment step. {} minValue:" + minValue + " maxValue:" + maxValue, dimColRef.getName());
+
+ if (segment.isOffsetCube() && partitionCol != null && partitionCol.getIdentity().equals(dimColRef.getIdentity())) {
+ logger.info("update partition. {} timeMinValue:" + minValue + " timeMaxValue:" + maxValue, dimColRef.getName());
+ if (DateFormat.stringToMillis(minValue) != timeMinValue && DateFormat.stringToMillis(maxValue) != timeMaxValue) {
+ segment.setTSRange(new TSRange(DateFormat.stringToMillis(minValue), DateFormat.stringToMillis(maxValue) + 1));
+ }
+ }
+ segment.getDimensionRangeInfoMap().put(dimColRef.getIdentity(), new DimensionRangeInfo(minValue, maxValue));
}
}
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index e7aec8c..56f5077 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -20,10 +20,12 @@ package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.exception.SegmentNotFoundException;
import org.apache.kylin.job.exception.ExecuteException;
@@ -66,8 +68,19 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
boolean isOffsetCube = mergedSegment.isOffsetCube();
Long tsStartMin = Long.MAX_VALUE, tsEndMax = 0L;
+ Map<String, DimensionRangeInfo> mergedSegDimRangeMap = null;
for (String id : mergingSegmentIds) {
CubeSegment segment = cube.getSegmentById(id);
+ Map<String, DimensionRangeInfo> segDimRangeMap = segment.getDimensionRangeInfoMap();
+ if (segDimRangeMap.isEmpty()) {
+ continue;
+ }
+ if (mergedSegDimRangeMap == null) {
+ mergedSegDimRangeMap = segDimRangeMap;
+ } else {
+ mergedSegDimRangeMap = DimensionRangeInfo.mergeRangeMap(cube.getModel(), segDimRangeMap,
+ mergedSegDimRangeMap);
+ }
sourceCount += segment.getInputRecords();
sourceSize += segment.getInputRecordsSize();
tsStartMin = Math.min(tsStartMin, segment.getTSRange().start.v);
@@ -80,8 +93,9 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
mergedSegment.setInputRecordsSize(sourceSize);
mergedSegment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
mergedSegment.setLastBuildTime(System.currentTimeMillis());
+ mergedSegment.setDimensionRangeInfoMap(mergedSegDimRangeMap);
- if (isOffsetCube == true) {
+ if (isOffsetCube) {
SegmentRange.TSRange tsRange = new SegmentRange.TSRange(tsStartMin, tsEndMax);
mergedSegment.setTSRange(tsRange);
}
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
index a6bc019..3c58b26 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
@@ -60,24 +60,22 @@ public class FactDistinctColumnsReducerMappingTest extends LocalFileMetadataTest
int totalReducerNum = mapping.getTotalReducerNum();
Assert.assertEquals(2, mapping.getCuboidRowCounterReducerNum());
- // check partition column reducer & cuboid row count reducers
+ // check cuboid row count reducers
Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER,
mapping.getRolePlayOfReducer(totalReducerNum - 1));
Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER,
mapping.getRolePlayOfReducer(totalReducerNum - 2));
- Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL,
- mapping.getRolePlayOfReducer(totalReducerNum - 3));
// check all dict column reducers
- int dictEnd = totalReducerNum - 3;
+ int dictEnd = totalReducerNum - 2;
for (int i = 0; i < dictEnd; i++)
Assert.assertTrue(mapping.getRolePlayOfReducer(i) >= 0);
// check a UHC dict column
- Assert.assertEquals(2, mapping.getReducerNumForDictCol(aUHC));
+ Assert.assertEquals(2, mapping.getReducerNumForDimCol(aUHC));
int uhcReducerBegin = -1;
for (int i = 0; i < dictEnd; i++) {
- if (mapping.getDictColForReducer(i).equals(aUHC)) {
+ if (mapping.getColForReducer(i).equals(aUHC)) {
uhcReducerBegin = i;
break;
}
@@ -86,7 +84,7 @@ public class FactDistinctColumnsReducerMappingTest extends LocalFileMetadataTest
int[] allRolePlay = mapping.getAllRolePlaysForReducers();
Assert.assertEquals(allRolePlay[uhcReducerBegin], allRolePlay[uhcReducerBegin + 1]);
for (int i = 0; i < 5; i++) {
- int reducerId = mapping.getReducerIdForDictCol(uhcReducerBegin, i);
+ int reducerId = mapping.getReducerIdForCol(uhcReducerBegin, i);
Assert.assertTrue(uhcReducerBegin <= reducerId && reducerId <= uhcReducerBegin + 1);
}
}
diff --git a/examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json b/examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json
new file mode 100644
index 0000000..124081c
--- /dev/null
+++ b/examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json
@@ -0,0 +1,110 @@
+{
+ "uuid" : "70a9f288-3c01-4745-a04b-5641e82d6c72",
+ "name" : "ssb_cube_with_dimention_range",
+ "owner" : "ADMIN",
+ "cost" : 50,
+ "status" : "DISABLED",
+ "segments" : [{
+ "uuid": "f34e2a88-50e4-4c8c-8dd1-64e922ce8c37",
+ "name": "19920523163722_20180523173026",
+ "storage_location_identifier": "KYLIN_KFV177RJOS",
+ "date_range_start": 706639042000,
+ "date_range_end": 1527096626000,
+ "source_offset_start": 0,
+ "source_offset_end": 0,
+ "status": "READY",
+ "size_kb": 100,
+ "input_records": 1000,
+ "input_records_size": 102400,
+ "last_build_time": 1527068515334,
+ "last_build_job_id": "a5010166-b64a-4289-ac56-064640f7f2fa",
+ "create_time_utc": 1527067828660,
+ "total_shards": 0,
+ "blackout_cuboids": [],
+ "binary_signature": null,
+ "dimension_range_info_map": {
+ "PART.P_BRAND": {
+ "min": "MFGR#1101",
+ "max": "MFGR#5540"
+ },
+ "V_LINEORDER.LO_QUANTITY": {
+ "min": "1",
+ "max": "50"
+ },
+ "PART.P_MFGR": {
+ "min": "MFGR#1",
+ "max": "MFGR#5"
+ },
+ "DATES.D_YEARMONTHNUM": {
+ "min": "199205",
+ "max": "199808"
+ },
+ "CUSTOMER.C_NATION": {
+ "min": "ALGERIA",
+ "max": "VIETNAM"
+ },
+ "DATES.D_YEARMONTH": {
+ "min": "Apr1993",
+ "max": "Sep1997"
+ },
+ "CUSTOMER.C_CUSTKEY": {
+ "min": "1",
+ "max": "2999"
+ },
+ "DATES.D_DATEKEY": {
+ "min": "19920523",
+ "max": "19980802"
+ },
+ "SUPPLIER.S_SUPPKEY": {
+ "min": "1",
+ "max": "200"
+ },
+ "SUPPLIER.S_REGION": {
+ "min": "AFRICA",
+ "max": "MIDDLE EAST"
+ },
+ "PART.P_PARTKEY": {
+ "min": "1",
+ "max": "20000"
+ },
+ "PART.P_CATEGORY": {
+ "min": "MFGR#11",
+ "max": "MFGR#55"
+ },
+ "DATES.D_WEEKNUMINYEAR": {
+ "min": "1",
+ "max": "53"
+ },
+ "CUSTOMER.C_CITY": {
+ "min": "ALGERIA 0",
+ "max": "VIETNAM 9"
+ },
+ "SUPPLIER.S_NATION": {
+ "min": "ALGERIA",
+ "max": "VIETNAM"
+ },
+ "SUPPLIER.S_CITY": {
+ "min": "ALGERIA 1",
+ "max": "VIETNAM 9"
+ },
+ "CUSTOMER.C_REGION": {
+ "min": "AFRICA",
+ "max": "MIDDLE EAST"
+ },
+ "DATES.D_YEAR": {
+ "min": "1992",
+ "max": "1998"
+ },
+ "V_LINEORDER.LO_DISCOUNT": {
+ "min": "0",
+ "max": "10"
+ }
+ }
+ } ],
+ "last_modified" : 1457534216410,
+ "descriptor" : "ssb_cube_with_dimention_range",
+ "create_time_utc" : 1457444500888,
+ "size_kb" : 100,
+ "input_records_count" : 1000,
+ "input_records_size" : 102400
+}
\ No newline at end of file
diff --git a/examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json b/examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json
new file mode 100644
index 0000000..d91b420
--- /dev/null
+++ b/examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json
@@ -0,0 +1,269 @@
+{
+ "uuid" : "5c44df30-daec-486e-af90-927bf7851060",
+ "name" : "ssb_cube_with_dimention_range",
+ "description" : "",
+ "dimensions" : [ {
+ "name" : "PART.P_MFGR",
+ "table" : "PART",
+ "column" : "P_MFGR",
+ "derived" : null
+ }, {
+ "name" : "DATES.D_YEARMONTH",
+ "table" : "DATES",
+ "column" : "D_YEARMONTH",
+ "derived" : null
+ }, {
+ "name" : "SUPPLIER.S_NATION",
+ "table" : "SUPPLIER",
+ "column" : "S_NATION",
+ "derived" : null
+ }, {
+ "name" : "V_LINEORDER.LO_DISCOUNT",
+ "table" : "V_LINEORDER",
+ "column" : "LO_DISCOUNT",
+ "derived" : null
+ }, {
+ "name" : "CUSTOMER.C_CUSTKEY",
+ "table" : "CUSTOMER",
+ "column" : "C_CUSTKEY",
+ "derived" : null
+ }, {
+ "name" : "CUSTOMER.C_NATION",
+ "table" : "CUSTOMER",
+ "column" : "C_NATION",
+ "derived" : null
+ }, {
+ "name" : "SUPPLIER.S_REGION",
+ "table" : "SUPPLIER",
+ "column" : "S_REGION",
+ "derived" : null
+ }, {
+ "name" : "CUSTOMER.C_CITY",
+ "table" : "CUSTOMER",
+ "column" : "C_CITY",
+ "derived" : null
+ }, {
+ "name" : "SUPPLIER.S_SUPPKEY",
+ "table" : "SUPPLIER",
+ "column" : "S_SUPPKEY",
+ "derived" : null
+ }, {
+ "name" : "V_LINEORDER.LO_QUANTITY",
+ "table" : "V_LINEORDER",
+ "column" : "LO_QUANTITY",
+ "derived" : null
+ }, {
+ "name" : "PART.P_BRAND",
+ "table" : "PART",
+ "column" : "P_BRAND",
+ "derived" : null
+ }, {
+ "name" : "SUPPLIER.S_CITY",
+ "table" : "SUPPLIER",
+ "column" : "S_CITY",
+ "derived" : null
+ }, {
+ "name" : "PART.P_PARTKEY",
+ "table" : "PART",
+ "column" : "P_PARTKEY",
+ "derived" : null
+ }, {
+ "name" : "DATES.D_YEARMONTHNUM",
+ "table" : "DATES",
+ "column" : "D_YEARMONTHNUM",
+ "derived" : null
+ }, {
+ "name" : "DATES.D_WEEKNUMINYEAR",
+ "table" : "DATES",
+ "column" : "D_WEEKNUMINYEAR",
+ "derived" : null
+ }, {
+ "name" : "CUSTOMER.C_REGION",
+ "table" : "CUSTOMER",
+ "column" : "C_REGION",
+ "derived" : null
+ }, {
+ "name" : "PART.P_CATEGORY",
+ "table" : "PART",
+ "column" : "P_CATEGORY",
+ "derived" : null
+ }, {
+ "name" : "DATES.D_YEAR",
+ "table" : "DATES",
+ "column" : "D_YEAR",
+ "derived" : null
+ }, {
+ "name" : "DATES.D_DATEKEY",
+ "table" : "DATES",
+ "column" : "D_DATEKEY",
+ "derived" : null
+ } ],
+ "measures" : [ {
+ "name" : "_COUNT_",
+ "function" : {
+ "expression" : "COUNT",
+ "parameter" : {
+ "type" : "constant",
+ "value" : "1",
+ "next_parameter" : null
+ },
+ "returntype" : "bigint"
+ },
+ "dependent_measure_ref" : null
+ }, {
+ "name" : "TOTAL_REVENUE",
+ "function" : {
+ "expression" : "SUM",
+ "parameter" : {
+ "type" : "column",
+ "value" : "LO_REVENUE",
+ "next_parameter" : null
+ },
+ "returntype" : "bigint"
+ },
+ "dependent_measure_ref" : null
+ }, {
+ "name" : "TOTAL_SUPPLYCOST",
+ "function" : {
+ "expression" : "SUM",
+ "parameter" : {
+ "type" : "column",
+ "value" : "LO_SUPPLYCOST",
+ "next_parameter" : null
+ },
+ "returntype" : "bigint"
+ },
+ "dependent_measure_ref" : null
+ } ],
+ "rowkey" : {
+ "rowkey_columns" : [ {
+ "column" : "PART.P_MFGR",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "DATES.D_YEARMONTHNUM",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "CUSTOMER.C_REGION",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "V_LINEORDER.LO_QUANTITY",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "DATES.D_YEARMONTH",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "DATES.D_WEEKNUMINYEAR",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "SUPPLIER.S_REGION",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "V_LINEORDER.LO_DISCOUNT",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "CUSTOMER.C_CITY",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "PART.P_CATEGORY",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "DATES.D_YEAR",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "PART.P_BRAND",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "SUPPLIER.S_CITY",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "SUPPLIER.S_NATION",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "CUSTOMER.C_NATION",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "DATES.D_DATEKEY",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "SUPPLIER.S_SUPPKEY",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "CUSTOMER.C_CUSTKEY",
+ "encoding" : "dict",
+ "isShardBy" : true,
+ "index" : "eq"
+ }, {
+ "column" : "PART.P_PARTKEY",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ } ]
+ },
+ "signature" : "",
+ "last_modified" : 1457503036686,
+ "model_name" : "ssb",
+ "null_string" : null,
+ "hbase_mapping" : {
+ "column_family" : [ {
+ "name" : "F1",
+ "columns" : [ {
+ "qualifier" : "M",
+ "measure_refs" : [ "_COUNT_", "TOTAL_REVENUE", "TOTAL_SUPPLYCOST"]
+ } ]
+ } ]
+ },
+ "aggregation_groups" : [ {
+ "includes" : [ "SUPPLIER.S_REGION", "CUSTOMER.C_NATION", "SUPPLIER.S_NATION", "CUSTOMER.C_REGION", "DATES.D_YEAR", "DATES.D_WEEKNUMINYEAR", "DATES.D_YEARMONTH", "DATES.D_YEARMONTHNUM", "SUPPLIER.S_CITY" ],
+ "select_rule" : {
+ "hierarchy_dims" : [ [ "S_REGION", "S_NATION", "S_CITY" ] ],
+ "mandatory_dims" : [ "DATES.D_YEAR" ],
+ "joint_dims" : [ ]
+ }
+ } ],
+ "notify_list" : [ ],
+ "status_need_notify" : [ ],
+ "partition_date_start" : 3153000000000,
+ "partition_date_end" : 3153600000000,
+ "auto_merge_time_ranges" : [ 604800000, 2419200000 ],
+ "retention_range" : 0,
+ "engine_type" : 2,
+ "storage_type" : 2,
+ "override_kylin_properties" : {
+ "kylin.storage.hbase.compression-codec" : "lz4",
+ "kylin.cube.aggrgroup.is-mandatory-only-valid" : "true"
+ }
+}
\ No newline at end of file
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index afd9788..2f22bd4 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -18,15 +18,31 @@
package org.apache.kylin.provision;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
@@ -34,6 +50,7 @@ import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.EngineFactory;
@@ -48,6 +65,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.rest.job.StorageCleanupJob;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
@@ -55,21 +73,9 @@ import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
public class BuildCubeWithEngine {
@@ -304,21 +310,29 @@ public class BuildCubeWithEngine {
if (!buildSegment(cubeName, date1, date2))
return false;
+ checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
if (!buildSegment(cubeName, date2, date3))
return false;
if (!optimizeCube(cubeName))
return false;
+ checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
if (!buildSegment(cubeName, date3, date4))
return false;
+ checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
if (!buildSegment(cubeName, date4, date5)) // one empty segment
return false;
+ checkEmptySegRangeInfo(cubeManager.getCube(cubeName));
if (!buildSegment(cubeName, date5, date6)) // another empty segment
return false;
+ checkEmptySegRangeInfo(cubeManager.getCube(cubeName));
+
if (!mergeSegment(cubeName, date2, date4)) // merge 2 normal segments
return false;
+ checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
if (!mergeSegment(cubeName, date2, date5)) // merge normal and empty
return false;
+ checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
// now have 2 normal segments [date1, date2) [date2, date5) and 1 empty segment [date5, date6)
return true;
@@ -460,4 +474,40 @@ public class BuildCubeWithEngine {
}
}
+ private void checkEmptySegRangeInfo(CubeInstance cube) {
+ CubeSegment segment = getLastModifiedSegment(cube);
+ for (String colId : segment.getDimensionRangeInfoMap().keySet()) {
+ DimensionRangeInfo range = segment.getDimensionRangeInfoMap().get(colId);
+ if (!(range.getMax() == null && range.getMin() == null)) {
+ throw new RuntimeException("Empty segment must have null info.");
+ }
+ }
+ }
+
+ private void checkNormalSegRangeInfo(CubeInstance cube) {
+ CubeSegment segment = getLastModifiedSegment(cube);
+ if (segment.getModel().getPartitionDesc().isPartitioned()) {
+ TblColRef colRef = segment.getModel().getPartitionDesc().getPartitionDateColumnRef();
+ DimensionRangeInfo dmRangeInfo = segment.getDimensionRangeInfoMap().get(colRef.getIdentity());
+ long min_v = DateFormat.stringToMillis(dmRangeInfo.getMin());
+ long max_v = DateFormat.stringToMillis(dmRangeInfo.getMax());
+ long ts_range_start = segment.getTSRange().start.v;
+ long ts_range_end = segment.getTSRange().end.v;
+ if (!(ts_range_start <= min_v && max_v <= ts_range_end -1)) {
+ throw new RuntimeException(String.format(
+ "Build cube failed, wrong partition column min/max value."
+ + " Segment: %s, min value: %s, TsRange.start: %s, max value: %s, TsRange.end: %s",
+ segment, min_v, ts_range_start, max_v, ts_range_end));
+ }
+ }
+ }
+
+ private CubeSegment getLastModifiedSegment(CubeInstance cube) {
+ return Collections.max(cube.getSegments(), new Comparator<CubeSegment>() {
+ @Override
+ public int compare(CubeSegment o1, CubeSegment o2) {
+ return Long.compare(o1.getLastBuildTime(), o2.getLastBuildTime());
+ }
+ });
+ }
}