You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/08 09:03:10 UTC
[kylin] 04/06: KYLIN-3370 enhance segment pruning
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 05baf2123fd0b0fcfee613e1fe18c93722c8ed9b
Author: Li Yang <li...@apache.org>
AuthorDate: Tue May 22 12:00:19 2018 +0800
KYLIN-3370 enhance segment pruning
Signed-off-by: shaofengshi <sh...@apache.org>
---
.../java/org/apache/kylin/cube/CubeSegment.java | 12 +
.../org/apache/kylin/cube/DimensionRangeInfo.java | 104 ++++++++
.../apache/kylin/cube/common/SegmentPruner.java | 179 ++++++++++++++
.../kylin/cube/gridtable/ScanRangePlannerBase.java | 4 +-
.../inmemcubing/InputConverterUnitForRawData.java | 2 -
.../apache/kylin/cube/kv/RowKeyColumnOrder.java | 108 --------
.../kylin/cube/model/CubeJoinedFlatTableDesc.java | 13 +-
.../apache/kylin/cube/DimensionRangeInfoTest.java | 87 +++++++
.../kylin/cube/common/SegmentPrunerTest.java | 195 +++++++++++++++
.../apache/kylin/metadata/datatype/DataType.java | 26 +-
.../kylin/metadata/datatype/DataTypeOrder.java | 155 ++++++++++++
.../kylin/metadata/filter/CompareTupleFilter.java | 2 +-
.../apache/kylin/metadata/filter/TupleFilter.java | 116 +++++++--
.../kylin/metadata/datatype/DataTypeOrderTest.java | 57 +++++
.../kylin/metadata/filter/TupleFilterTest.java | 77 ++++++
.../storage/gtrecord/CubeScanRangePlanner.java | 30 +--
.../storage/gtrecord/GTCubeStorageQueryBase.java | 9 +-
.../kylin/storage/translate/ColumnValueRange.java | 214 ----------------
.../storage/translate/DerivedFilterTranslator.java | 6 +-
.../kylin/storage/translate/HBaseKeyRange.java | 273 ---------------------
.../kylin/storage/gtrecord/DictGridTableTest.java | 69 ++----
.../storage/translate/ColumnValueRangeTest.java | 126 ----------
.../kylin/engine/mr/common/BaseCuboidBuilder.java | 3 +-
.../engine/mr/steps/BaseCuboidMapperBase.java | 9 +-
.../mr/steps/CalculateStatsFromBaseCuboidJob.java | 1 -
.../mr/steps/FactDistinctColumnPartitioner.java | 5 +-
.../engine/mr/steps/FactDistinctColumnsJob.java | 2 +-
.../engine/mr/steps/FactDistinctColumnsMapper.java | 43 +---
.../mr/steps/FactDistinctColumnsMapperBase.java | 20 +-
.../mr/steps/FactDistinctColumnsReducer.java | 90 ++++---
.../steps/FactDistinctColumnsReducerMapping.java | 87 +++----
.../mr/steps/UpdateCubeInfoAfterBuildStep.java | 83 ++++---
.../mr/steps/UpdateCubeInfoAfterMergeStep.java | 20 +-
.../mr/steps/UpdateCubeInfoAfterOptimizeStep.java | 1 +
.../FactDistinctColumnsReducerMappingTest.java | 12 +-
.../cube/ssb_cube_with_dimention_range.json | 110 +++++++++
.../cube_desc/ssb_cube_with_dimention_range.json | 269 ++++++++++++++++++++
.../kylin/provision/BuildCubeWithEngine.java | 87 +++++--
.../query/optrule/AggregateMultipleExpandRule.java | 16 +-
.../apache/kylin/query/relnode/OLAPFilterRel.java | 26 +-
.../org/apache/kylin/source/hive/HiveMRInput.java | 1 -
.../apache/kylin/source/hive/HiveTableReader.java | 2 +-
42 files changed, 1669 insertions(+), 1082 deletions(-)
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index d49c273..75c90a4 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -119,6 +119,10 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
@JsonInclude(JsonInclude.Include.NON_EMPTY)
private Map<String, String> additionalInfo = new LinkedHashMap<String, String>();
+ @JsonProperty("dimension_range_info_map")
+ @JsonInclude(JsonInclude.Include.NON_EMPTY)
+ private Map<String, DimensionRangeInfo> dimensionRangeInfoMap = Maps.newHashMap();
+
private Map<Long, Short> cuboidBaseShards = Maps.newConcurrentMap(); // cuboid id ==> base(starting) shard for this cuboid
// lazy init
@@ -575,4 +579,12 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
public void setSourcePartitionOffsetStart(Map<Integer, Long> sourcePartitionOffsetStart) {
this.sourcePartitionOffsetStart = sourcePartitionOffsetStart;
}
+
+ public Map<String, DimensionRangeInfo> getDimensionRangeInfoMap() {
+ return dimensionRangeInfoMap;
+ }
+
+ public void setDimensionRangeInfoMap(Map<String, DimensionRangeInfo> dimensionRangeInfoMap) {
+ this.dimensionRangeInfoMap = dimensionRangeInfoMap;
+ }
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
new file mode 100644
index 0000000..e36ca96
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube;
+
+import java.util.Map;
+
+import org.apache.kylin.metadata.datatype.DataTypeOrder;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Maps;
+
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+public class DimensionRangeInfo {
+
+ private static final Logger logger = LoggerFactory.getLogger(DimensionRangeInfo.class);
+
+ public static Map<String, DimensionRangeInfo> mergeRangeMap(DataModelDesc model, Map<String, DimensionRangeInfo> m1,
+ Map<String, DimensionRangeInfo> m2) {
+
+ if (!m1.keySet().equals(m2.keySet())) {
+ logger.warn("Merging incompatible maps of DimensionRangeInfo, keys in m1 " + m1.keySet() + ", keys in m2 "
+ + m2.keySet());
+ }
+
+ Map<String, DimensionRangeInfo> result = Maps.newHashMap();
+
+ for (String colId : m1.keySet()) {
+ if (!m2.containsKey(colId))
+ continue;
+
+ DimensionRangeInfo r1 = m1.get(colId);
+ DimensionRangeInfo r2 = m2.get(colId);
+
+ DimensionRangeInfo newR;
+ if (r1.getMin() == null && r1.getMax() == null) {
+ newR = r2; // when r1 is all null or has 0 records
+ } else if (r2.getMin() == null && r2.getMax() == null) {
+ newR = r1; // when r2 is all null or has 0 records
+ } else {
+ DataTypeOrder order = model.findColumn(colId).getType().getOrder();
+ String newMin = order.min(r1.getMin(), r2.getMin());
+ String newMax = order.max(r1.getMax(), r2.getMax());
+ newR = new DimensionRangeInfo(newMin, newMax);
+ }
+
+ result.put(colId, newR);
+ }
+
+ return result;
+ }
+
+ // ============================================================================
+
+ @JsonProperty("min")
+ private String min;
+
+ @JsonProperty("max")
+ private String max;
+
+ public DimensionRangeInfo() {}
+
+ public DimensionRangeInfo(String min, String max) {
+ if (min == null && max != null || min != null && max == null)
+ throw new IllegalStateException();
+
+ this.min = min;
+ this.max = max;
+ }
+
+ public String getMin() {
+ return min;
+ }
+
+ public String getMax() {
+ return max;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + min + ", " + max + "]";
+ }
+
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java b/core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java
new file mode 100644
index 0000000..de77511
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.common;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeOrder;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SegmentPruner {
+ private static final Logger logger = LoggerFactory.getLogger(SegmentPruner.class);
+
+ final private Set<CompareTupleFilter> mustTrueCompares;
+
+ public SegmentPruner(TupleFilter filter) {
+ this.mustTrueCompares = filter == null ? Collections.<CompareTupleFilter> emptySet()
+ : filter.findMustTrueCompareFilters();
+ }
+
+ public List<CubeSegment> listSegmentsForQuery(CubeInstance cube) {
+ List<CubeSegment> r = new ArrayList<>();
+ for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
+ if (check(seg))
+ r.add(seg);
+ }
+ return r;
+ }
+
+ public boolean check(CubeSegment seg) {
+
+ if (seg.getInputRecords() == 0) {
+ if (seg.getConfig().isSkippingEmptySegments()) {
+ logger.debug("Prune segment {} due to 0 input record", seg);
+ return false;
+ } else {
+ logger.debug("Insist scan of segment {} having 0 input record", seg);
+ }
+ }
+
+ Map<String, DimensionRangeInfo> segDimRangInfoMap = seg.getDimensionRangeInfoMap();
+ for (CompareTupleFilter comp : mustTrueCompares) {
+ TblColRef col = comp.getColumn();
+
+ DimensionRangeInfo dimRangeInfo = segDimRangInfoMap.get(col.getIdentity());
+ if (dimRangeInfo == null)
+ dimRangeInfo = tryDeduceRangeFromPartitionCol(seg, col);
+ if (dimRangeInfo == null)
+ continue;
+
+ String minVal = dimRangeInfo.getMin();
+ String maxVal = dimRangeInfo.getMax();
+
+ if (!satisfy(comp, minVal, maxVal)) {
+ logger.debug("Prune segment {} due to given filter", seg);
+ return false;
+ }
+ }
+
+ logger.debug("Pruner passed on segment {}", seg);
+ return true;
+ }
+
+ private DimensionRangeInfo tryDeduceRangeFromPartitionCol(CubeSegment seg, TblColRef col) {
+ DataModelDesc model = seg.getModel();
+ PartitionDesc part = model.getPartitionDesc();
+
+ if (!part.isPartitioned())
+ return null;
+ if (!col.equals(part.getPartitionDateColumnRef()))
+ return null;
+
+ // deduce the dim range from TSRange
+ TSRange tsRange = seg.getTSRange();
+ if (tsRange.start.isMin || tsRange.end.isMax)
+ return null; // DimensionRangeInfo cannot express infinite
+
+ String min = tsRangeToStr(tsRange.start.v, part);
+ String max = tsRangeToStr(tsRange.end.v - 1, part); // note the -1, end side is exclusive
+ return new DimensionRangeInfo(min, max);
+ }
+
+ private String tsRangeToStr(long ts, PartitionDesc part) {
+ String value;
+ DataType partitionColType = part.getPartitionDateColumnRef().getType();
+ if (partitionColType.isDate()) {
+ value = DateFormat.formatToDateStr(ts);
+ } else if (partitionColType.isTimeFamily()) {
+ value = DateFormat.formatToTimeWithoutMilliStr(ts);
+ } else if (partitionColType.isStringFamily() || partitionColType.isIntegerFamily()) {//integer like 20160101
+ String partitionDateFormat = part.getPartitionDateFormat();
+ if (StringUtils.isEmpty(partitionDateFormat)) {
+ value = "" + ts;
+ } else {
+ value = DateFormat.formatToDateStr(ts, partitionDateFormat);
+ }
+ } else {
+ throw new RuntimeException("Type " + partitionColType + " is not valid partition column type");
+ }
+ return value;
+ }
+
+ private boolean satisfy(CompareTupleFilter comp, String minVal, String maxVal) {
+
+ // When both min and max are null, it means all cells of the column are null.
+ // In such case, return true to let query engine scan the segment, since the
+ // result of null comparison is query engine specific.
+ if (minVal == null && maxVal == null)
+ return true;
+
+ // pass on non-constant filter
+ if (comp.getChildren().size() > 1 && !(comp.getChildren().get(1) instanceof ConstantTupleFilter))
+ return true;
+
+ TblColRef col = comp.getColumn();
+ DataTypeOrder order = col.getType().getOrder();
+ String filterVal = toString(comp.getFirstValue());
+
+ switch (comp.getOperator()) {
+ case EQ:
+ case IN:
+ String filterMin = order.min((Set<String>) comp.getValues());
+ String filterMax = order.max((Set<String>) comp.getValues());
+ return order.compare(filterMin, maxVal) <= 0 && order.compare(minVal, filterMax) <= 0;
+ case LT:
+ return order.compare(minVal, filterVal) < 0;
+ case LTE:
+ return order.compare(minVal, filterVal) <= 0;
+ case GT:
+ return order.compare(maxVal, filterVal) > 0;
+ case GTE:
+ return order.compare(maxVal, filterVal) >= 0;
+ case NEQ:
+ case NOTIN:
+ case ISNULL:
+ case ISNOTNULL:
+ default:
+ return true;
+ }
+ }
+
+ private String toString(Object v) {
+ return v == null ? null : v.toString();
+ }
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
index 811d512..97e4dc3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
@@ -29,7 +29,6 @@ import java.util.Set;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.metadata.expression.TupleExpression;
@@ -49,14 +48,13 @@ public abstract class ScanRangePlannerBase {
//GT
protected GTInfo gtInfo;
protected TupleFilter gtFilter;
- protected Pair<ByteArray, ByteArray> gtStartAndEnd;
- protected TblColRef gtPartitionCol;
protected ImmutableBitSet gtDimensions;
protected ImmutableBitSet gtAggrGroups;
protected ImmutableBitSet gtAggrMetrics;
protected String[] gtAggrFuncs;
protected TupleFilter havingFilter;
protected boolean isPartitionColUsingDatetimeEncoding = true;
+ protected int onlyShardId = -1;
protected RecordComparator rangeStartComparator;
protected RecordComparator rangeEndComparator;
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
index fc34f37..2ff7ee0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
@@ -43,7 +43,6 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(InputConverterUnitForRawData.class);
- public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
public static final String[] END_ROW = new String[0];
public static final String[] CUT_ROW = { "" };
@@ -149,7 +148,6 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
private void initNullBytes(CubeDesc cubeDesc) {
nullBytes = Lists.newArrayList();
- nullBytes.add(HIVE_NULL);
String[] nullStrings = cubeDesc.getNullStrings();
if (nullStrings != null) {
for (String s : nullStrings) {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
deleted file mode 100644
index 3f4f6f4..0000000
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube.kv;
-
-import java.util.Collection;
-import java.util.Comparator;
-
-import org.apache.kylin.metadata.datatype.DataType;
-
-/**
- * @author yangli9
- */
-abstract public class RowKeyColumnOrder implements Comparator<String> {
-
- public static final NumberOrder NUMBER_ORDER = new NumberOrder();
- public static final StringOrder STRING_ORDER = new StringOrder();
-
- public static RowKeyColumnOrder getInstance(DataType type) {
- if (type.isNumberFamily() || type.isDateTimeFamily())
- return NUMBER_ORDER;
- else
- return STRING_ORDER;
- }
-
- public String max(Collection<String> values) {
- String max = null;
- for (String v : values) {
- if (max == null || compare(max, v) < 0)
- max = v;
- }
- return max;
- }
-
- public String min(Collection<String> values) {
- String min = null;
- for (String v : values) {
- if (min == null || compare(min, v) > 0)
- min = v;
- }
- return min;
- }
-
- public String min(String v1, String v2) {
- if (v1 == null)
- return v2;
- else if (v2 == null)
- return v1;
- else
- return compare(v1, v2) <= 0 ? v1 : v2;
- }
-
- public String max(String v1, String v2) {
- if (v1 == null)
- return v2;
- else if (v2 == null)
- return v1;
- else
- return compare(v1, v2) >= 0 ? v1 : v2;
- }
-
- @Override
- public int compare(String o1, String o2) {
- // consider null
- if (o1 == o2)
- return 0;
- if (o1 == null)
- return -1;
- if (o2 == null)
- return 1;
-
- return compareNonNull(o1, o2);
- }
-
- abstract int compareNonNull(String o1, String o2);
-
- private static class StringOrder extends RowKeyColumnOrder {
- @Override
- public int compareNonNull(String o1, String o2) {
- return o1.compareTo(o2);
- }
- }
-
- private static class NumberOrder extends RowKeyColumnOrder {
- @Override
- public int compareNonNull(String o1, String o2) {
- double d1 = Double.parseDouble(o1);
- double d2 = Double.parseDouble(o2);
- return Double.compare(d1, d2);
- }
- }
-
-}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index 2ab7aac..467a294 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.List;
import java.util.Map;
-import org.apache.kylin.common.util.BytesSplitter;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.model.DataModelDesc;
@@ -147,16 +146,10 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
return factColumns;
}
- // sanity check the input record (in bytes) matches what's expected
- public void sanityCheck(BytesSplitter bytesSplitter) {
- if (columnCount != bytesSplitter.getBufferSize()) {
- throw new IllegalArgumentException("Expect " + columnCount + " columns, but see "
- + bytesSplitter.getBufferSize() + " -- " + bytesSplitter);
- }
-
- // TODO: check data types here
+ public int getColumnCount() {
+ return columnCount;
}
-
+
@Override
public String getTableName() {
return tableName;
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java b/core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java
new file mode 100644
index 0000000..43175c9
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.DataModelManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DimensionRangeInfoTest extends LocalFileMetadataTestCase {
+
+ @Before
+ public void setUp() throws Exception {
+ this.createTestMetadata();
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ }
+
+ @Test
+ public void testMergeRangeMap() {
+ DataModelDesc model = DataModelManager.getInstance(getTestConfig()).getDataModelDesc("ci_inner_join_model");
+ String colId = "TEST_KYLIN_FACT.CAL_DT";
+
+ // normal merge
+ {
+ Map<String, DimensionRangeInfo> m1 = new HashMap<>();
+ m1.put(colId, new DimensionRangeInfo("2012-01-01", "2012-05-31"));
+
+ Map<String, DimensionRangeInfo> m2 = new HashMap<>();
+ m2.put(colId, new DimensionRangeInfo("2012-06-01", "2013-06-30"));
+
+ DimensionRangeInfo r1 = DimensionRangeInfo.mergeRangeMap(model, m1, m2).get(colId);
+ Assert.assertEquals("2012-01-01", r1.getMin());
+ Assert.assertEquals("2013-06-30", r1.getMax());
+ }
+
+ // missing column on one side
+ {
+ Map<String, DimensionRangeInfo> m1 = new HashMap<>();
+ m1.put(colId, new DimensionRangeInfo("2012-01-01", "2012-05-31"));
+
+ Map<String, DimensionRangeInfo> m2 = new HashMap<>();
+
+ Assert.assertTrue(DimensionRangeInfo.mergeRangeMap(model, m1, m2).isEmpty());
+ Assert.assertTrue(DimensionRangeInfo.mergeRangeMap(model, m2, m1).isEmpty());
+ }
+
+ // null min/max value (happens on empty segment, or all-null columns)
+ {
+ Map<String, DimensionRangeInfo> m1 = new HashMap<>();
+ m1.put(colId, new DimensionRangeInfo(null, null));
+
+ Map<String, DimensionRangeInfo> m2 = new HashMap<>();
+ m2.put(colId, new DimensionRangeInfo("2012-06-01", "2013-06-30"));
+
+ DimensionRangeInfo r1 = DimensionRangeInfo.mergeRangeMap(model, m1, m2).get(colId);
+ Assert.assertEquals("2012-06-01", r1.getMin());
+ Assert.assertEquals("2013-06-30", r1.getMax());
+ }
+
+ }
+}
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java
new file mode 100644
index 0000000..603db97
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.common;
+
+import static org.apache.kylin.metadata.filter.TupleFilter.compare;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.SetAndUnsetSystemProp;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class SegmentPrunerTest extends LocalFileMetadataTestCase {
+ private CubeInstance cube;
+
+ @Before
+ public void setUp() {
+ this.createTestMetadata();
+ cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube("ssb_cube_with_dimention_range");
+ }
+
+ @After
+ public void after() {
+ this.cleanupTestMetadata();
+ }
+
+ @Test
+ public void testEmptySegment() {
+ CubeSegment seg = cube.getFirstSegment();
+ TblColRef col = cube.getModel().findColumn("CUSTOMER.C_NATION");
+
+ // a normal hit
+ TupleFilter f = compare(col, FilterOperatorEnum.EQ, "CHINA");
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertTrue(segmentPruner.check(seg));
+
+ // make the segment empty, it should be pruned
+ seg.setInputRecords(0);
+ Assert.assertFalse(segmentPruner.check(seg));
+ }
+
+ @Test
+ public void testDimensionRangeCheck() {
+ CubeSegment cubeSegment = cube.getSegments().getFirstSegment();
+
+ //integer
+ TblColRef qtyCol = cube.getModel().findColumn("V_LINEORDER.LO_QUANTITY");
+ TblColRef revCol = cube.getModel().findColumn("V_LINEORDER.V_REVENUE");
+
+ TupleFilter constFilter_LO_QUANTITY0 = new ConstantTupleFilter(Sets.newHashSet("8", "18", "28"));//between min and max value
+ TupleFilter constFilter_LO_QUANTITY1 = new ConstantTupleFilter("1");//min value
+ TupleFilter constFilter_LO_QUANTITY2 = new ConstantTupleFilter("50");//max value
+ TupleFilter constFilter_LO_QUANTITY3 = new ConstantTupleFilter("0");//lt min value
+ TupleFilter constFilter_LO_QUANTITY4 = new ConstantTupleFilter("200");//gt max value
+ TupleFilter constFilter_LO_QUANTITY5 = new ConstantTupleFilter(Sets.newHashSet("51", "52", "53"));//gt max values
+
+ // non-constant filter
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.EQ, revCol);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertTrue(segmentPruner.check(cubeSegment));
+ }
+
+ // is null
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.ISNULL);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertTrue(segmentPruner.check(cubeSegment));
+ }
+
+ //lt min value
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.LT, constFilter_LO_QUANTITY1);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertFalse(segmentPruner.check(cubeSegment));
+ }
+
+ //lte min value
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.LTE, constFilter_LO_QUANTITY1);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertTrue(segmentPruner.check(cubeSegment));
+ }
+
+ //lt max value
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.LT, constFilter_LO_QUANTITY2);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertTrue(segmentPruner.check(cubeSegment));
+ }
+
+ //gt max value
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.GT, constFilter_LO_QUANTITY2);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertFalse(segmentPruner.check(cubeSegment));
+ }
+
+ //gte max value
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.GTE, constFilter_LO_QUANTITY2);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertTrue(segmentPruner.check(cubeSegment));
+ }
+
+ //gt min value
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.GT, constFilter_LO_QUANTITY1);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertTrue(segmentPruner.check(cubeSegment));
+ }
+
+ //in over-max values
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.IN, constFilter_LO_QUANTITY5);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertFalse(segmentPruner.check(cubeSegment));
+ }
+
+ //in normal values
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.IN, constFilter_LO_QUANTITY0);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertTrue(segmentPruner.check(cubeSegment));
+ }
+
+ //lte under-min value
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.LTE, constFilter_LO_QUANTITY3);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertFalse(segmentPruner.check(cubeSegment));
+ }
+
+ //gte over-max value
+ {
+ TupleFilter f = compare(qtyCol, FilterOperatorEnum.GTE, constFilter_LO_QUANTITY4);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertFalse(segmentPruner.check(cubeSegment));
+ }
+ }
+
+ @Test
+ public void testLegacyCubeSeg() {
+ // legacy cube segments does not have DimensionRangeInfo, but with TSRange can do some pruning
+ CubeInstance cube = CubeManager.getInstance(getTestConfig())
+ .getCube("test_kylin_cube_without_slr_left_join_ready_2_segments");
+
+ TblColRef col = cube.getModel().findColumn("TEST_KYLIN_FACT.CAL_DT");
+ CubeSegment seg = cube.getSegments(SegmentStatusEnum.READY).get(0);
+ TSRange tsRange = seg.getTSRange();
+ long start = tsRange.start.v;
+
+ try (SetAndUnsetSystemProp sns = new SetAndUnsetSystemProp("kylin.query.skip-empty-segments", "false")) {
+ {
+ TupleFilter f = compare(col, FilterOperatorEnum.LTE, start);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertTrue(segmentPruner.check(seg));
+ }
+ {
+ TupleFilter f = compare(col, FilterOperatorEnum.LT, start);
+ SegmentPruner segmentPruner = new SegmentPruner(f);
+ Assert.assertFalse(segmentPruner.check(seg));
+ }
+ }
+ }
+}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
index 5ccc1f3..d9eefb5 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
@@ -19,7 +19,6 @@
package org.apache.kylin.metadata.datatype;
import java.io.Serializable;
-import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
@@ -35,7 +34,6 @@ import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.measure.MeasureTypeFactory;
import org.apache.kylin.metadata.model.TblColRef.InnerDataTypeEnum;
@@ -162,6 +160,7 @@ public class DataType implements Serializable {
private String name;
private int precision;
private int scale;
+ private transient DataTypeOrder order;
public DataType(String name, int precision, int scale) {
this.name = name;
@@ -224,24 +223,17 @@ public class DataType implements Serializable {
scale = KylinConfig.getInstanceFromEnv().getDefaultDecimalScale();
}
}
-
}
+ public DataTypeOrder getOrder() {
+ if (order == null)
+ order = DataTypeOrder.getInstance(this);
+
+ return order;
+ }
+
public int compare(String value1, String value2) {
- if (isDateTimeFamily()) {
- Long millis1 = DateFormat.stringToMillis(value1);
- Long millis2 = DateFormat.stringToMillis(value2);
- return millis1.compareTo(millis2);
- } else if (isIntegerFamily()) {
- Long l1 = new Long(value1);
- Long l2 = new Long(value2);
- return l1.compareTo(l2);
- } else if (isNumberFamily()) {
- BigDecimal bigDecimal1 = new BigDecimal(value1);
- BigDecimal bigDecimal2 = new BigDecimal(value2);
- return bigDecimal1.compareTo(bigDecimal2);
- }
- return value1.compareTo(value2);
+ return getOrder().compare(value1, value2);
}
private String replaceLegacy(String str) {
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeOrder.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeOrder.java
new file mode 100644
index 0000000..091e2ae
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeOrder.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.Comparator;
+
+import org.apache.kylin.common.util.DateFormat;
+
+/**
+ * Define order for string literals based on its underlying data type.
+ *
+ * Null is the smallest.
+ */
+abstract public class DataTypeOrder implements Comparator<String> {
+
+ public static final DataTypeOrder INTEGER_ORDER = new IntegerOrder();
+ public static final DataTypeOrder DOUBLE_ORDER = new DoubleOrder();
+ public static final DataTypeOrder DECIMAL_ORDER = new DecimalOrder();
+ public static final DataTypeOrder DATETIME_ORDER = new DateTimeOrder();
+ public static final DataTypeOrder STRING_ORDER = new StringOrder();
+
+ // package private, access via DataType.getOrder()
+ static DataTypeOrder getInstance(DataType type) throws IllegalArgumentException {
+ if (type.isStringFamily())
+ return STRING_ORDER;
+ else if (type.isDateTimeFamily())
+ return DATETIME_ORDER;
+ else if (type.isIntegerFamily())
+ return INTEGER_ORDER;
+ else if (type.isFloat() || type.isDouble())
+ return DOUBLE_ORDER;
+ else if (type.isDecimal())
+ return DECIMAL_ORDER;
+ else
+ throw new IllegalArgumentException("Unsupported data type " + type);
+ }
+
+ public String max(Collection<String> values) {
+ String max = null;
+ for (String v : values) {
+ max = max(max, v);
+ }
+ return max;
+ }
+
+ public String min(Collection<String> values) {
+ String min = null;
+ for (String v : values) {
+ min = min(min, v);
+ }
+ return min;
+ }
+
+ public String min(String v1, String v2) {
+ if (v1 == null)
+ return v2;
+ else if (v2 == null)
+ return v1;
+ else
+ return compare(v1, v2) <= 0 ? v1 : v2;
+ }
+
+ public String max(String v1, String v2) {
+ if (v1 == null)
+ return v2;
+ else if (v2 == null)
+ return v1;
+ else
+ return compare(v1, v2) >= 0 ? v1 : v2;
+ }
+
+ @Override
+ public int compare(String s1, String s2) {
+ Comparable o1 = toComparable(s1);
+ Comparable o2 = toComparable(s2);
+
+ // consider null
+ if (o1 == o2)
+ return 0;
+ if (o1 == null)
+ return -1;
+ if (o2 == null)
+ return 1;
+
+ return o1.compareTo(o2);
+ }
+
+ abstract Comparable toComparable(String s);
+
+ private static class StringOrder extends DataTypeOrder {
+ @Override
+ public String toComparable(String s) {
+ return s;
+ }
+ }
+
+ private static class IntegerOrder extends DataTypeOrder {
+ @Override
+ public Long toComparable(String s) {
+ if (s == null || s.isEmpty())
+ return null;
+ else
+ return Long.parseLong(s);
+ }
+ }
+
+ private static class DoubleOrder extends DataTypeOrder {
+ @Override
+ public Double toComparable(String s) {
+ if (s == null || s.isEmpty())
+ return null;
+ else
+ return Double.parseDouble(s);
+ }
+ }
+
+ private static class DecimalOrder extends DataTypeOrder {
+ @Override
+ public BigDecimal toComparable(String s) {
+ if (s == null || s.isEmpty())
+ return null;
+ else
+ return new BigDecimal(s);
+ }
+ }
+
+ private static class DateTimeOrder extends DataTypeOrder {
+ @Override
+ public Long toComparable(String s) {
+ if (s == null || s.isEmpty())
+ return null;
+ else
+ return DateFormat.stringToMillis(s);
+ }
+ }
+
+}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
index 16f165a..2c75ec1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
@@ -33,7 +33,7 @@ import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
* @author xjiang
*/
public class CompareTupleFilter extends TupleFilter implements IOptimizeableTupleFilter {
-
+
public enum CompareResultType {
AlwaysTrue, AlwaysFalse, Unknown
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
index 09b41f5..672aba0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
@@ -20,6 +20,7 @@ package org.apache.kylin.metadata.filter;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -31,6 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
/**
*
@@ -81,6 +83,44 @@ public abstract class TupleFilter {
SWAP_OP_MAP.put(FilterOperatorEnum.LT, FilterOperatorEnum.GT);
SWAP_OP_MAP.put(FilterOperatorEnum.GTE, FilterOperatorEnum.LTE);
}
+
+ public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op) {
+ CompareTupleFilter r = new CompareTupleFilter(op);
+ r.addChild(new ColumnTupleFilter(col));
+ return r;
+ }
+
+ public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object val) {
+ CompareTupleFilter r = new CompareTupleFilter(op);
+ r.addChild(new ColumnTupleFilter(col));
+ if (val instanceof TupleFilter)
+ r.addChild((TupleFilter) val);
+ else if (val instanceof TblColRef)
+ r.addChild(new ColumnTupleFilter((TblColRef) col));
+ else
+ r.addChild(new ConstantTupleFilter(val));
+ return r;
+ }
+
+ public static LogicalTupleFilter and(TupleFilter... children) {
+ LogicalTupleFilter r = new LogicalTupleFilter(FilterOperatorEnum.AND);
+ r.addChildren(children);
+ return r;
+ }
+
+ public static LogicalTupleFilter or(TupleFilter... children) {
+ LogicalTupleFilter r = new LogicalTupleFilter(FilterOperatorEnum.OR);
+ r.addChildren(children);
+ return r;
+ }
+
+ public static LogicalTupleFilter not(TupleFilter child) {
+ LogicalTupleFilter r = new LogicalTupleFilter(FilterOperatorEnum.NOT);
+ r.addChild(child);
+ return r;
+ }
+
+ // ============================================================================
protected final List<TupleFilter> children;
protected FilterOperatorEnum operator;
@@ -245,7 +285,61 @@ public abstract class TupleFilter {
}
return oldProductFilters;
}
+
+ public HashMap<TblColRef, Object> findMustEqualColsAndValues(Collection<TblColRef> lookingForCols) {
+ HashMap<TblColRef, Object> result = new HashMap<>();
+ findMustEqualColsAndValues(this, lookingForCols, result);
+ return result;
+ }
+ private void findMustEqualColsAndValues(TupleFilter filter, Collection<TblColRef> lookingForCols, HashMap<TblColRef, Object> result) {
+ if (filter instanceof CompareTupleFilter) {
+ CompareTupleFilter comp = (CompareTupleFilter) filter;
+ TblColRef col = comp.getColumn();
+ if (lookingForCols.contains(col)) {
+ if (comp.getOperator() == FilterOperatorEnum.EQ)
+ result.put(col, comp.getFirstValue());
+ else if (comp.getOperator() == FilterOperatorEnum.ISNULL)
+ result.put(col, null);
+ }
+ return;
+ }
+
+ if (filter instanceof LogicalTupleFilter) {
+ LogicalTupleFilter logic = (LogicalTupleFilter) filter;
+ if (logic.getOperator() == FilterOperatorEnum.AND) {
+ for (TupleFilter child : logic.getChildren())
+ findMustEqualColsAndValues(child, lookingForCols, result);
+ }
+ return;
+ }
+ }
+
+ //find must true compareTupleFilter
+ public Set<CompareTupleFilter> findMustTrueCompareFilters() {
+ Set<CompareTupleFilter> result = Sets.newHashSet();
+ findMustTrueCompareFilters(this, result);
+ return result;
+ }
+
+ private void findMustTrueCompareFilters(TupleFilter filter, Set<CompareTupleFilter> result) {
+ if (filter instanceof CompareTupleFilter) {
+ if (((CompareTupleFilter) filter).getColumn() != null) {
+ result.add((CompareTupleFilter) filter);
+ }
+ return;
+ }
+
+ if (filter instanceof LogicalTupleFilter) {
+ if (filter.getOperator() == FilterOperatorEnum.AND) {
+ for (TupleFilter child : filter.getChildren()) {
+ findMustTrueCompareFilters(child, result);
+ }
+ }
+ return;
+ }
+ }
+
public abstract boolean isEvaluable();
public abstract boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs);
@@ -284,26 +378,4 @@ public abstract class TupleFilter {
}
}
- public static TupleFilter and(TupleFilter f1, TupleFilter f2) {
- if (f1 == null)
- return f2;
- if (f2 == null)
- return f1;
-
- if (f1.getOperator() == FilterOperatorEnum.AND) {
- f1.addChild(f2);
- return f1;
- }
-
- if (f2.getOperator() == FilterOperatorEnum.AND) {
- f2.addChild(f1);
- return f2;
- }
-
- LogicalTupleFilter and = new LogicalTupleFilter(FilterOperatorEnum.AND);
- and.addChild(f1);
- and.addChild(f2);
- return and;
- }
-
}
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java
new file mode 100644
index 0000000..7666ffe
--- /dev/null
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Set;
+
+public class DataTypeOrderTest {
+ @Test
+ public void testDataTypeOrder() {
+ DataType intType = DataType.getType("integer");
+ DataTypeOrder dataTypeOrder = intType.getOrder();
+ Set<String> integers = Sets.newHashSet("100000", "2", "1000", "100", "77", "10", "9", "2000000", "-10000", "0");
+ Assert.assertEquals("2000000", dataTypeOrder.max(integers));
+ Assert.assertEquals("-10000", dataTypeOrder.min(integers));
+
+ DataType doubleType = DataType.getType("double");
+ dataTypeOrder = doubleType.getOrder();
+ Set<String> doubels = Sets.newHashSet("1.1", "-299.5", "100000", "1.000", "4.000000001", "0.00", "-1000000.231231", "8000000",
+ "10", "10.00");
+ Assert.assertEquals("8000000", dataTypeOrder.max(doubels));
+ Assert.assertEquals("-1000000.231231", dataTypeOrder.min(doubels));
+
+ DataType datetimeType = DataType.getType("date");
+ dataTypeOrder = datetimeType.getOrder();
+ Set<String> datetimes = Sets.newHashSet("2010-01-02", "2888-08-09", "2018-05-26", "1527512082000", "2010-02-03 23:59:59",
+ "2000-12-12 12:00:00", "1970-01-19 00:18:32", "1998-12-02", "2018-05-28 10:00:00.255", "1995-09-20 20:00:00.220");
+ Assert.assertEquals("2888-08-09", dataTypeOrder.max(datetimes));
+ Assert.assertEquals("1970-01-19 00:18:32", dataTypeOrder.min(datetimes));
+
+ DataType stringType = new DataType("varchar", 256, 10);
+ dataTypeOrder = stringType.getOrder();
+ Set<String> strings = Sets.newHashSet(null, "", "中国", "China No.1", "神兽麒麟", "Rocket", "Apache Kylin", "google", "NULL",
+ "empty");
+ Assert.assertEquals("神兽麒麟", dataTypeOrder.max(strings));
+ Assert.assertEquals("", dataTypeOrder.min(strings));
+ }
+}
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java
index e17f4d1..95609eb 100644
--- a/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java
@@ -18,9 +18,21 @@
package org.apache.kylin.metadata.filter;
+import static org.apache.kylin.metadata.filter.TupleFilter.and;
+import static org.apache.kylin.metadata.filter.TupleFilter.compare;
+import static org.apache.kylin.metadata.filter.TupleFilter.not;
+
+import java.util.HashMap;
+import java.util.Set;
+
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
import org.junit.Assert;
import org.junit.Test;
+import com.google.common.collect.Sets;
+
public class TupleFilterTest {
@Test
// true ==> true
@@ -62,4 +74,69 @@ public class TupleFilterTest {
andFilter2.addChildren(ConstantTupleFilter.FALSE, ConstantTupleFilter.FALSE);
Assert.assertEquals(andFilter2, andFilter.removeNot());
}
+
+ @Test
+ public void testFindMustEqualColsAndValues() {
+ TableDesc tbl = TableDesc.mockup("mockup_table");
+ TblColRef colA = TblColRef.mockup(tbl, 0, "A", "bigint");
+ TblColRef colB = TblColRef.mockup(tbl, 1, "B", "char(256)");
+ Set<TblColRef> cols = Sets.newHashSet(colA, colB);
+
+ {
+ TupleFilter f = compare(colA, FilterOperatorEnum.EQ, "1234");
+ Assert.assertEquals(map(colA, "1234"), f.findMustEqualColsAndValues(cols));
+ }
+
+ {
+ TupleFilter f = compare(colA, FilterOperatorEnum.ISNULL);
+ Assert.assertEquals(map(colA, null), f.findMustEqualColsAndValues(cols));
+ }
+
+ {
+ TupleFilter f = and(compare(colA, FilterOperatorEnum.ISNULL), compare(colB, FilterOperatorEnum.EQ, "1234"));
+ Assert.assertEquals(map(colA, null, colB, "1234"), f.findMustEqualColsAndValues(cols));
+ Assert.assertTrue(not(f).findMustEqualColsAndValues(cols).isEmpty());
+ }
+
+ {
+ TupleFilter f = compare(colA, FilterOperatorEnum.LT, "1234");
+ Assert.assertTrue(f.findMustEqualColsAndValues(cols).isEmpty());
+ }
+ }
+
+ private HashMap<TblColRef, Object> map(TblColRef col, String v) {
+ HashMap<TblColRef, Object> r = new HashMap<>();
+ r.put(col, v);
+ return r;
+ }
+
+ private HashMap<TblColRef, Object> map(TblColRef col, String v, TblColRef col2, String v2) {
+ HashMap<TblColRef, Object> r = new HashMap<>();
+ r.put(col, v);
+ r.put(col2, v2);
+ return r;
+ }
+
+ @Test
+ public void testMustTrueTupleFilter() {
+ TupleFilter andFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+ TupleFilter andFilter2 = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+ TupleFilter orFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.OR);
+ andFilter.addChild(andFilter2);
+ andFilter.addChild(orFilter);
+
+ Set<CompareTupleFilter> trueTupleFilters = andFilter.findMustTrueCompareFilters();
+ Assert.assertTrue(trueTupleFilters.isEmpty());
+
+ TupleFilter compFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+ compFilter.addChild(new ColumnTupleFilter(TblColRef.newInnerColumn("test1", TblColRef.InnerDataTypeEnum.LITERAL)));
+ TupleFilter compFilter2 = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+ compFilter2.addChild(new ColumnTupleFilter(TblColRef.newInnerColumn("test2", TblColRef.InnerDataTypeEnum.LITERAL)));
+ andFilter2.addChild(compFilter);
+ orFilter.addChild(compFilter2);
+ Assert.assertEquals(Sets.newHashSet(compFilter), andFilter.findMustTrueCompareFilters());
+
+ Assert.assertEquals(Sets.newHashSet(compFilter2), compFilter2.findMustTrueCompareFilters());
+ }
+
}
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 5f86a45..229ef01 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -31,7 +31,6 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.FuzzyValueCombination;
import org.apache.kylin.cube.cuboid.Cuboid;
@@ -39,7 +38,6 @@ import org.apache.kylin.cube.gridtable.CubeGridTable;
import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.gridtable.RecordComparators;
import org.apache.kylin.cube.gridtable.ScanRangePlannerBase;
-import org.apache.kylin.cube.gridtable.SegmentGTStartAndEnd;
import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.gridtable.GTInfo;
@@ -137,15 +135,9 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
this.gtDynColumns = new ImmutableBitSet(tmpGtDynCols);
this.gtRtAggrMetrics = mapping.makeGridTableColumns(tmpRtAggrMetrics);
- if (cubeSegment.getModel().getPartitionDesc().isPartitioned()) {
- int index = mapping.getIndexOf(cubeSegment.getModel().getPartitionDesc().getPartitionDateColumnRef());
- if (index >= 0) {
- SegmentGTStartAndEnd segmentGTStartAndEnd = new SegmentGTStartAndEnd(cubeSegment, gtInfo);
- this.gtStartAndEnd = segmentGTStartAndEnd.getSegmentStartAndEnd(index);
- this.isPartitionColUsingDatetimeEncoding = segmentGTStartAndEnd.isUsingDatetimeEncoding(index);
- this.gtPartitionCol = gtInfo.colRef(index);
- }
- }
+ this.gtAggrGroups = mapping.makeGridTableColumns(replaceDerivedColumns(groupByPushDown, cubeSegment.getCubeDesc()));
+ this.gtAggrMetrics = mapping.makeGridTableColumns(metrics);
+ this.gtAggrFuncs = mapping.makeAggrFuncs(metrics);
}
protected StorageContext context;
@@ -153,7 +145,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
/**
* Construct GTScanRangePlanner with incomplete information. For UT only.
*/
- public CubeScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> gtStartAndEnd, TblColRef gtPartitionCol, TupleFilter gtFilter) {
+ public CubeScanRangePlanner(GTInfo info, TblColRef gtPartitionCol, TupleFilter gtFilter) {
this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
this.maxFuzzyKeysPerSplit = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
@@ -170,8 +162,6 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
this.rangeStartEndComparator = RecordComparators.getRangeStartEndComparator(comp);
this.gtFilter = gtFilter;
- this.gtStartAndEnd = gtStartAndEnd;
- this.gtPartitionCol = gtPartitionCol;
}
public GTScanRequest planScanRequest() {
@@ -237,18 +227,6 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap();
for (ColumnRange range : andDimRanges) {
- if (gtPartitionCol != null && range.column.equals(gtPartitionCol)) {
- int beginCompare = rangeStartEndComparator.comparator.compare(range.begin, gtStartAndEnd.getSecond());
- int endCompare = rangeStartEndComparator.comparator.compare(gtStartAndEnd.getFirst(), range.end);
-
- if ((isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare < 0) || (!isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare <= 0)) {
- //segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded when using dict encoding, so use <= when has equals in condition.
- } else {
- logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}", //
- gtPartitionCol, makeReadable(gtStartAndEnd.getFirst()), makeReadable(gtStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end));
- return null;
- }
- }
int col = range.column.getColumnDesc().getZeroBasedIndex();
if (!gtInfo.getPrimaryKey().get(col))
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 352a868..269833f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -33,6 +33,7 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.RawQueryLastHacker;
+import org.apache.kylin.cube.common.SegmentPruner;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.cube.gridtable.CuboidToGridTableMappingExt;
@@ -87,14 +88,10 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
GTCubeStorageQueryRequest request = getStorageQueryRequest(context, sqlDigest, returnTupleInfo);
List<CubeSegmentScanner> scanners = Lists.newArrayList();
- for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
+ SegmentPruner segPruner = new SegmentPruner(sqlDigest.filter);
+ for (CubeSegment cubeSeg : segPruner.listSegmentsForQuery(cubeInstance)) {
CubeSegmentScanner scanner;
- if (cubeDesc.getConfig().isSkippingEmptySegments() && cubeSeg.getInputRecords() == 0) {
- logger.info("Skip cube segment {} because its input record is 0", cubeSeg);
- continue;
- }
-
scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), //
request.getGroups(), request.getDynGroups(), request.getDynGroupExprs(), //
request.getMetrics(), request.getDynFuncs(), //
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
deleted file mode 100644
index 56b1106..0000000
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.translate;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.kv.RowKeyColumnOrder;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Sets;
-
-/**
- *
- * @author xjiang
- *
- */
-public class ColumnValueRange {
- private TblColRef column;
- private RowKeyColumnOrder order;
- private String beginValue;
- private String endValue;
- private Set<String> equalValues;
-
- public ColumnValueRange(TblColRef column, Collection<String> values, FilterOperatorEnum op) {
- this.column = column;
- this.order = RowKeyColumnOrder.getInstance(column.getType());
-
- switch (op) {
- case EQ:
- case IN:
- equalValues = new HashSet<String>(values);
- refreshBeginEndFromEquals();
- break;
- case LT:
- case LTE:
- endValue = order.max(values);
- break;
- case GT:
- case GTE:
- beginValue = order.min(values);
- break;
- case NEQ:
- case NOTIN:
- case ISNULL: // TODO ISNULL worth pass down as a special equal value
- case ISNOTNULL:
- // let Optiq filter it!
- break;
- default:
- throw new UnsupportedOperationException(op.name());
- }
- }
-
- public ColumnValueRange(TblColRef column, String beginValue, String endValue, Set<String> equalValues) {
- copy(column, beginValue, endValue, equalValues);
- }
-
- void copy(TblColRef column, String beginValue, String endValue, Set<String> equalValues) {
- this.column = column;
- this.order = RowKeyColumnOrder.getInstance(column.getType());
- this.beginValue = beginValue;
- this.endValue = endValue;
- this.equalValues = equalValues;
- }
-
- public TblColRef getColumn() {
- return column;
- }
-
- public String getBeginValue() {
- return beginValue;
- }
-
- public String getEndValue() {
- return endValue;
- }
-
- public Set<String> getEqualValues() {
- return equalValues;
- }
-
- private void refreshBeginEndFromEquals() {
- this.beginValue = order.min(this.equalValues);
- this.endValue = order.max(this.equalValues);
- }
-
- public boolean satisfyAll() {
- return beginValue == null && endValue == null && equalValues == null; // the NEQ case
- }
-
- public boolean satisfyNone() {
- if (equalValues != null) {
- return equalValues.isEmpty();
- } else if (beginValue != null && endValue != null) {
- return order.compare(beginValue, endValue) > 0;
- } else {
- return false;
- }
- }
-
- public void andMerge(ColumnValueRange another) {
- assert this.column.equals(another.column);
-
- if (another.satisfyAll()) {
- return;
- }
-
- if (this.satisfyAll()) {
- copy(another.column, another.beginValue, another.endValue, another.equalValues);
- return;
- }
-
- if (this.equalValues != null && another.equalValues != null) {
- this.equalValues.retainAll(another.equalValues);
- refreshBeginEndFromEquals();
- return;
- }
-
- if (this.equalValues != null) {
- this.equalValues = filter(this.equalValues, another.beginValue, another.endValue);
- refreshBeginEndFromEquals();
- return;
- }
-
- if (another.equalValues != null) {
- this.equalValues = filter(another.equalValues, this.beginValue, this.endValue);
- refreshBeginEndFromEquals();
- return;
- }
-
- this.beginValue = order.max(this.beginValue, another.beginValue);
- this.endValue = order.min(this.endValue, another.endValue);
- }
-
- private Set<String> filter(Set<String> equalValues, String beginValue, String endValue) {
- Set<String> result = Sets.newHashSetWithExpectedSize(equalValues.size());
- for (String v : equalValues) {
- if (between(v, beginValue, endValue)) {
- result.add(v);
- }
- }
- return equalValues;
- }
-
- private boolean between(String v, String beginValue, String endValue) {
- return (beginValue == null || order.compare(beginValue, v) <= 0) && (endValue == null || order.compare(v, endValue) <= 0);
- }
-
- // remove invalid EQ/IN values and round start/end according to dictionary
- public void preEvaluateWithDict(Dictionary<String> dict) {
- if (dict == null || dict.getSize() == 0)
- return;
-
- if (equalValues != null) {
- Iterator<String> it = equalValues.iterator();
- while (it.hasNext()) {
- String v = it.next();
- try {
- dict.getIdFromValue(v);
- } catch (IllegalArgumentException e) {
- // value not in dictionary
- it.remove();
- }
- }
- refreshBeginEndFromEquals();
- }
-
- if (beginValue != null) {
- try {
- beginValue = dict.getValueFromId(dict.getIdFromValue(beginValue, 1));
- } catch (IllegalArgumentException e) {
- // beginValue is greater than the biggest in dictionary, mark FALSE
- equalValues = Sets.newHashSet();
- }
- }
-
- if (endValue != null) {
- try {
- endValue = dict.getValueFromId(dict.getIdFromValue(endValue, -1));
- } catch (IllegalArgumentException e) {
- // endValue is lesser than the smallest in dictionary, mark FALSE
- equalValues = Sets.newHashSet();
- }
- }
- }
-
- public String toString() {
- if (equalValues == null) {
- return column.getName() + " between " + beginValue + " and " + endValue;
- } else {
- return column.getName() + " in " + equalValues;
- }
- }
-}
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
index 7fa426f..4a80d29 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
@@ -24,10 +24,10 @@ import java.util.Set;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Array;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.kv.RowKeyColumnOrder;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.cube.model.CubeDesc.DeriveType;
import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.metadata.datatype.DataTypeOrder;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -164,9 +164,9 @@ public class DerivedFilterTranslator {
private static void findMinMax(Set<Array<String>> satisfyingHostRecords, TblColRef[] hostCols, String[] min, String[] max) {
- RowKeyColumnOrder[] orders = new RowKeyColumnOrder[hostCols.length];
+ DataTypeOrder[] orders = new DataTypeOrder[hostCols.length];
for (int i = 0; i < hostCols.length; i++) {
- orders[i] = RowKeyColumnOrder.getInstance(hostCols[i].getType());
+ orders[i] = hostCols[i].getType().getOrder();
}
for (Array<String> rec : satisfyingHostRecords) {
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
deleted file mode 100644
index 85678ac..0000000
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.translate;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.common.debug.BackdoorToggles;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-import org.apache.kylin.cube.kv.FuzzyKeyEncoder;
-import org.apache.kylin.cube.kv.FuzzyMaskEncoder;
-import org.apache.kylin.cube.kv.LazyRowKeyEncoder;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * @author xjiang
- */
-public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
-
- private static final Logger logger = LoggerFactory.getLogger(HBaseKeyRange.class);
-
- private static final int FUZZY_VALUE_CAP = 100;
- private static final byte[] ZERO_TAIL_BYTES = new byte[] { 0 };
-
- private final CubeSegment cubeSeg;
- private final Cuboid cuboid;
- private final List<Collection<ColumnValueRange>> flatOrAndFilter; // OR-AND filter, (A AND B AND ..) OR (C AND D AND ..) OR ..
-
- private byte[] startKey;
- private byte[] stopKey;
- private List<Pair<byte[], byte[]>> fuzzyKeys;
-
- private String startKeyString;
- private String stopKeyString;
- private String fuzzyKeyString;
-
- private long partitionColumnStartDate = Long.MIN_VALUE;
- private long partitionColumnEndDate = Long.MAX_VALUE;
-
- public HBaseKeyRange(CubeSegment cubeSeg, Cuboid cuboid, byte[] startKey, byte[] stopKey, List<Pair<byte[], byte[]>> fuzzyKeys, List<Collection<ColumnValueRange>> flatColumnValueFilter, long partitionColumnStartDate, long partitionColumnEndDate) {
- this.cubeSeg = cubeSeg;
- this.cuboid = cuboid;
- this.startKey = startKey;
- this.stopKey = stopKey;
- this.fuzzyKeys = fuzzyKeys;
- this.flatOrAndFilter = flatColumnValueFilter;
- this.partitionColumnStartDate = partitionColumnStartDate;
- this.partitionColumnEndDate = partitionColumnEndDate;
- initDebugString();
- }
-
- public HBaseKeyRange(Collection<TblColRef> dimensionColumns, Collection<ColumnValueRange> andDimensionRanges, CubeSegment cubeSeg, CubeDesc cubeDesc) {
- this.cubeSeg = cubeSeg;
- long cuboidId = this.calculateCuboidID(cubeDesc, dimensionColumns);
- this.cuboid = Cuboid.findById(cubeSeg, cuboidId);
- this.flatOrAndFilter = Lists.newLinkedList();
- this.flatOrAndFilter.add(andDimensionRanges);
- init(andDimensionRanges);
- initDebugString();
- }
-
- private long calculateCuboidID(CubeDesc cube, Collection<TblColRef> dimensions) {
- long cuboidID = 0;
- for (TblColRef column : dimensions) {
- int index = cube.getRowkey().getColumnBitIndex(column);
- cuboidID |= 1L << index;
- }
- return cuboidID;
- }
-
- private void init(Collection<ColumnValueRange> andDimensionRanges) {
- int size = andDimensionRanges.size();
- Map<TblColRef, String> startValues = Maps.newHashMapWithExpectedSize(size);
- Map<TblColRef, String> stopValues = Maps.newHashMapWithExpectedSize(size);
- Map<TblColRef, Set<String>> fuzzyValues = Maps.newHashMapWithExpectedSize(size);
- for (ColumnValueRange dimRange : andDimensionRanges) {
- TblColRef column = dimRange.getColumn();
- startValues.put(column, dimRange.getBeginValue());
- stopValues.put(column, dimRange.getEndValue());
- fuzzyValues.put(column, dimRange.getEqualValues());
-
- TblColRef partitionDateColumnRef = cubeSeg.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
- if (column.equals(partitionDateColumnRef)) {
- initPartitionRange(dimRange);
- }
- }
-
- AbstractRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid);
- encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE);
- this.startKey = encoder.encode(startValues);
- encoder.setBlankByte(RowConstants.ROWKEY_UPPER_BYTE);
- // In order to make stopRow inclusive add a trailing 0 byte. #See Scan.setStopRow(byte [] stopRow)
- this.stopKey = Bytes.add(encoder.encode(stopValues), ZERO_TAIL_BYTES);
-
- // always fuzzy match cuboid ID to lock on the selected cuboid
- this.fuzzyKeys = buildFuzzyKeys(fuzzyValues);
- }
-
- private void initPartitionRange(ColumnValueRange dimRange) {
- if (null != dimRange.getBeginValue()) {
- this.partitionColumnStartDate = DateFormat.stringToMillis(dimRange.getBeginValue());
- }
- if (null != dimRange.getEndValue()) {
- this.partitionColumnEndDate = DateFormat.stringToMillis(dimRange.getEndValue());
- }
- }
-
- private void initDebugString() {
- this.startKeyString = BytesUtil.toHex(this.startKey);
- this.stopKeyString = BytesUtil.toHex(this.stopKey);
- StringBuilder buf = new StringBuilder();
- for (Pair<byte[], byte[]> fuzzyKey : this.fuzzyKeys) {
- buf.append(BytesUtil.toHex(fuzzyKey.getFirst()));
- buf.append(" ");
- buf.append(BytesUtil.toHex(fuzzyKey.getSecond()));
- buf.append(";");
- }
- this.fuzzyKeyString = buf.toString();
- }
-
- private List<Pair<byte[], byte[]>> buildFuzzyKeys(Map<TblColRef, Set<String>> fuzzyValueSet) {
- ArrayList<Pair<byte[], byte[]>> result = new ArrayList<Pair<byte[], byte[]>>();
-
- // debug/profiling purpose
- if (BackdoorToggles.getDisableFuzzyKey()) {
- logger.info("The execution of this query will not use fuzzy key");
- return result;
- }
-
- FuzzyKeyEncoder fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);
- FuzzyMaskEncoder fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid);
-
- List<Map<TblColRef, String>> fuzzyValues = FuzzyValueCombination.calculate(fuzzyValueSet, FUZZY_VALUE_CAP);
- for (Map<TblColRef, String> fuzzyValue : fuzzyValues) {
- result.add(Pair.newPair(fuzzyKeyEncoder.encode(fuzzyValue), fuzzyMaskEncoder.encode(fuzzyValue)));
- }
- return result;
- }
-
- public CubeSegment getCubeSegment() {
- return this.cubeSeg;
- }
-
- public Cuboid getCuboid() {
- return cuboid;
- }
-
- public byte[] getStartKey() {
- return startKey;
- }
-
- public byte[] getStopKey() {
- return stopKey;
- }
-
- public List<Pair<byte[], byte[]>> getFuzzyKeys() {
- return fuzzyKeys;
- }
-
- public String getStartKeyAsString() {
- return startKeyString;
- }
-
- public String getStopKeyAsString() {
- return stopKeyString;
- }
-
- public String getFuzzyKeyAsString() {
- return fuzzyKeyString;
- }
-
- public List<Collection<ColumnValueRange>> getFlatOrAndFilter() {
- return flatOrAndFilter;
- }
-
- public long getPartitionColumnStartDate() {
- return partitionColumnStartDate;
- }
-
- public long getPartitionColumnEndDate() {
- return partitionColumnEndDate;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((cubeSeg == null) ? 0 : cubeSeg.hashCode());
- result = prime * result + ((cuboid == null) ? 0 : cuboid.hashCode());
- result = prime * result + ((fuzzyKeyString == null) ? 0 : fuzzyKeyString.hashCode());
- result = prime * result + ((startKeyString == null) ? 0 : startKeyString.hashCode());
- result = prime * result + ((stopKeyString == null) ? 0 : stopKeyString.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- HBaseKeyRange other = (HBaseKeyRange) obj;
- if (cubeSeg == null) {
- if (other.cubeSeg != null)
- return false;
- } else if (!cubeSeg.equals(other.cubeSeg))
- return false;
- if (cuboid == null) {
- if (other.cuboid != null)
- return false;
- } else if (!cuboid.equals(other.cuboid))
- return false;
- if (fuzzyKeyString == null) {
- if (other.fuzzyKeyString != null)
- return false;
- } else if (!fuzzyKeyString.equals(other.fuzzyKeyString))
- return false;
- if (startKeyString == null) {
- if (other.startKeyString != null)
- return false;
- } else if (!startKeyString.equals(other.startKeyString))
- return false;
- if (stopKeyString == null) {
- if (other.stopKeyString != null)
- return false;
- } else if (!stopKeyString.equals(other.stopKeyString))
- return false;
- return true;
- }
-
- @Override
- public int compareTo(HBaseKeyRange other) {
- return Bytes.compareTo(this.startKey, other.startKey);
- }
-
- public boolean hitSegment() {
- return cubeSeg.getTSRange().start.v <= getPartitionColumnEndDate() && cubeSeg.getTSRange().end.v >= getPartitionColumnStartDate();
- }
-}
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
index 073c12c..08bcb65 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
@@ -32,7 +32,6 @@ import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.gridtable.CubeCodeSystem;
import org.apache.kylin.dict.NumberDictionaryForestBuilder;
import org.apache.kylin.dict.StringBytesConverter;
@@ -121,13 +120,11 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
ByteArray segmentStart = enc(info, 0, "2015-01-14");
ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free
- ByteArray segmentEnd = enc(info, 0, "2015-01-15");
assertEquals(segmentStart, segmentStartX);
{
LogicalTupleFilter filter = and(timeComp0, ageComp1);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals(1, r.size());//scan range are [close,close]
assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
@@ -136,64 +133,57 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
}
{
LogicalTupleFilter filter = and(timeComp2, ageComp1);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
- assertEquals(0, r.size());
+ assertEquals(1, r.size());
}
{
LogicalTupleFilter filter = and(timeComp4, ageComp1);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
- assertEquals(0, r.size());
+ assertEquals(1, r.size());
}
{
LogicalTupleFilter filter = and(timeComp5, ageComp1);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
- assertEquals(0, r.size());
+ assertEquals(1, r.size());
}
{
LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1),
and(timeComp6, ageComp1));
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
- assertEquals(1, r.size());
- assertEquals("[1421193600000, 10]-[null, 10]", r.get(0).toString());
+ assertEquals(2, r.size());
+ assertEquals("[1421193600000, 10]-[null, 10]", r.get(1).toString());
assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]",
- r.get(0).fuzzyKeys.toString());
+ r.get(1).fuzzyKeys.toString());
}
{
LogicalTupleFilter filter = or(and(timeComp3, ageComp3), and(timeComp7, ageComp1));
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals("[[0, 10]-[1421280000000, 30]]", r.toString());
}
{
LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
- assertEquals(1, r.size());
- assertEquals("[1421193600000, null]-[null, null]", r.get(0).toString());
- assertEquals(0, r.get(0).fuzzyKeys.size());
+ assertEquals(2, r.size());
+ assertEquals("[1421193600000, null]-[null, null]", r.get(1).toString());
+ assertEquals(0, r.get(1).fuzzyKeys.size());
}
{
//skip FALSE filter
LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals(0, r.size());
}
{
//TRUE or FALSE filter
LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals(1, r.size());
assertEquals("[null, null]-[null, null]", r.get(0).toString());
@@ -201,8 +191,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
{
//TRUE or other filter
LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals(1, r.size());
assertEquals("[null, null]-[null, null]", r.get(0).toString());
@@ -211,12 +200,9 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
@Test
public void verifySegmentSkipping2() {
- ByteArray segmentEnd = enc(info, 0, "2015-01-15");
-
{
LogicalTupleFilter filter = and(timeComp0, ageComp1);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals(1, r.size());//scan range are [close,close]
assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
@@ -226,10 +212,9 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
{
LogicalTupleFilter filter = and(timeComp5, ageComp1);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd),
- info.colRef(0), filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
List<GTScanRange> r = planner.planScanRanges();
- assertEquals(0, r.size());//scan range are [close,close]
+ assertEquals(1, r.size());//scan range are [close,close]
}
}
@@ -239,7 +224,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
// flatten or-and & hbase fuzzy value
{
LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals(1, r.size());
assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
@@ -249,7 +234,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
// pre-evaluate ever false
{
LogicalTupleFilter filter = and(timeComp1, timeComp2);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals(0, r.size());
}
@@ -257,7 +242,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
// pre-evaluate ever true
{
LogicalTupleFilter filter = or(timeComp1, ageComp4);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals("[[null, null]-[null, null]]", r.toString());
}
@@ -265,7 +250,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
// merge overlap range
{
LogicalTupleFilter filter = or(timeComp1, timeComp3);
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals("[[null, null]-[null, null]]", r.toString());
}
@@ -274,7 +259,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
{
LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2),
and(timeComp4, ageComp3));
- CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
List<GTScanRange> r = planner.planScanRanges();
assertEquals(3, r.size());
assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java b/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java
deleted file mode 100644
index 0e7e91f..0000000
--- a/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.translate;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.dict.StringBytesConverter;
-import org.apache.kylin.dict.TrieDictionaryBuilder;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class ColumnValueRangeTest extends LocalFileMetadataTestCase {
-
- @BeforeClass
- public static void setUp() throws Exception {
- staticCreateTestMetadata();
- }
-
- @AfterClass
- public static void after() throws Exception {
- cleanAfterClass();
- }
-
- @Test
- public void testPreEvaluateWithDict() {
- TblColRef col = mockupTblColRef();
- Dictionary<String> dict = mockupDictionary(col, "CN", "US");
-
- ColumnValueRange r1 = new ColumnValueRange(col, set("CN", "US", "Other"), FilterOperatorEnum.EQ);
- r1.preEvaluateWithDict(dict);
- assertEquals(set("CN", "US"), r1.getEqualValues());
-
- // less than rounding
- {
- ColumnValueRange r2 = new ColumnValueRange(col, set("CN"), FilterOperatorEnum.LT);
- r2.preEvaluateWithDict(dict);
- assertEquals(null, r2.getBeginValue());
- assertEquals("CN", r2.getEndValue());
-
- ColumnValueRange r3 = new ColumnValueRange(col, set("Other"), FilterOperatorEnum.LT);
- r3.preEvaluateWithDict(dict);
- assertEquals(null, r3.getBeginValue());
- assertEquals("CN", r3.getEndValue());
-
- ColumnValueRange r4 = new ColumnValueRange(col, set("UT"), FilterOperatorEnum.LT);
- r4.preEvaluateWithDict(dict);
- assertEquals(null, r4.getBeginValue());
- assertEquals("US", r4.getEndValue());
- }
-
- // greater than rounding
- {
- ColumnValueRange r2 = new ColumnValueRange(col, set("CN"), FilterOperatorEnum.GTE);
- r2.preEvaluateWithDict(dict);
- assertEquals("CN", r2.getBeginValue());
- assertEquals(null, r2.getEndValue());
-
- ColumnValueRange r3 = new ColumnValueRange(col, set("Other"), FilterOperatorEnum.GTE);
- r3.preEvaluateWithDict(dict);
- assertEquals("US", r3.getBeginValue());
- assertEquals(null, r3.getEndValue());
-
- ColumnValueRange r4 = new ColumnValueRange(col, set("CI"), FilterOperatorEnum.GTE);
- r4.preEvaluateWithDict(dict);
- assertEquals("CN", r4.getBeginValue());
- assertEquals(null, r4.getEndValue());
- }
-
- // ever false check
- {
- ColumnValueRange r2 = new ColumnValueRange(col, set("UT"), FilterOperatorEnum.GTE);
- r2.preEvaluateWithDict(dict);
- assertTrue(r2.satisfyNone());
-
- ColumnValueRange r3 = new ColumnValueRange(col, set("CM"), FilterOperatorEnum.LT);
- r3.preEvaluateWithDict(dict);
- assertTrue(r3.satisfyNone());
- }
- }
-
- public static Dictionary<String> mockupDictionary(TblColRef col, String... values) {
- TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<String>(new StringBytesConverter());
- for (String v : values) {
- builder.addValue(v);
- }
- return builder.build(0);
- }
-
- private static Set<String> set(String... values) {
- HashSet<String> list = new HashSet<String>();
- list.addAll(Arrays.asList(values));
- return list;
- }
-
- public static TblColRef mockupTblColRef() {
- TableDesc t = TableDesc.mockup("table_a");
- return TblColRef.mockup(t, 1, "col_1", "string");
- }
-}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
index 40f1ac5..5dd55b2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
@@ -43,10 +43,10 @@ import com.google.common.collect.Sets;
/**
*/
+@SuppressWarnings("serial")
public class BaseCuboidBuilder implements java.io.Serializable {
protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidBuilder.class);
- public static final String HIVE_NULL = "\\N";
protected String cubeName;
protected Cuboid baseCuboid;
protected CubeDesc cubeDesc;
@@ -95,7 +95,6 @@ public class BaseCuboidBuilder implements java.io.Serializable {
private void initNullBytes() {
nullStrs = Sets.newHashSet();
- nullStrs.add(HIVE_NULL);
String[] nullStrings = cubeDesc.getNullStrings();
if (nullStrings != null) {
for (String s : nullStrings) {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 0ad4b9e..091f9a2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -18,6 +18,10 @@
package org.apache.kylin.engine.mr.steps;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
@@ -34,15 +38,10 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
/**
*/
abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> {
protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidMapperBase.class);
- public static final String HIVE_NULL = "\\N";
public static final byte[] ONE = Bytes.toBytes("1");
protected String cubeName;
protected String segmentID;
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
index f3bdabd..4305a25 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
@@ -104,7 +104,6 @@ public class CalculateStatsFromBaseCuboidJob extends AbstractHadoopJob {
private void setupReducer(Path output, CubeSegment cubeSeg) throws IOException {
int hllShardBase = MapReduceUtil.getCuboidHLLCounterReducerNum(cubeSeg.getCubeInstance());
- job.getConfiguration().setInt(BatchConstants.CFG_HLL_REDUCER_NUM, hllShardBase);
job.setReducerClass(CalculateStatsFromBaseCuboidReducer.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index 9bede82..4ea04d5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@ -55,8 +55,7 @@ public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortabl
String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
- reducerMapping = new FactDistinctColumnsReducerMapping(cube,
- conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
+ reducerMapping = new FactDistinctColumnsReducerMapping(cube);
}
@Override
@@ -65,8 +64,6 @@ public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortabl
if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);
- } else if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL) {
- return reducerMapping.getReducerIdForDatePartitionColumn();
} else {
return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index f96944a..8f5d176 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -131,6 +131,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
throws IOException {
FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeSeg.getCubeInstance());
int numberOfReducers = reducerMapping.getTotalReducerNum();
+ logger.info("{} has reducers {}.", this.getClass().getName(), numberOfReducers);
if (numberOfReducers > 250) {
throw new IllegalArgumentException(
"The max reducer number for FactDistinctColumnsJob is 250, but now it is "
@@ -141,7 +142,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
job.setReducerClass(FactDistinctColumnsReducer.class);
job.setPartitionerClass(FactDistinctColumnPartitioner.class);
job.setNumReduceTasks(numberOfReducers);
- job.getConfiguration().setInt(BatchConstants.CFG_HLL_REDUCER_NUM, reducerMapping.getCuboidRowCounterReducerNum());
// make each reducer output to respective dir
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index 569b810..fc9dc65 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -37,7 +37,6 @@ import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.kylin.measure.hllc.RegisterType;
import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,9 +67,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
private static final Text EMPTY_TEXT = new Text();
- private int partitionColumnIndex = -1;
- private boolean needFetchPartitionCol = true;
-
private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
@Override
@@ -96,18 +92,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE);
}
- TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
- if (partitionColRef != null) {
- partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef);
- }
-
- // check whether need fetch the partition col values
- if (partitionColumnIndex < 0) {
- // if partition col not on cube, no need
- needFetchPartitionCol = false;
- } else {
- needFetchPartitionCol = true;
- }
//for KYLIN-2518 backward compatibility
boolean isUsePutRowKeyToHllNewAlgorithm;
if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
@@ -172,12 +156,12 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
for (String[] row : rowCollection) {
context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
- for (int i = 0; i < dictCols.size(); i++) {
- String fieldValue = row[dictionaryColumnIndex[i]];
+ for (int i = 0; i < allDimDictCols.size(); i++) {
+ String fieldValue = row[columnIndex[i]];
if (fieldValue == null)
continue;
- int reducerIndex = reducerMapping.getReducerIdForDictCol(i, fieldValue);
+ int reducerIndex = reducerMapping.getReducerIdForCol(i, fieldValue);
tmpbuf.clear();
byte[] valueBytes = Bytes.toBytes(fieldValue);
@@ -188,15 +172,14 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
tmpbuf.put(valueBytes);
outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
- DataType type = dictCols.get(i).getType();
+ DataType type = allDimDictCols.get(i).getType();
sortableKey.init(outputKey, type);
- //judge type
context.write(sortableKey, EMPTY_TEXT);
// log a few rows for troubleshooting
if (rowCount < 10) {
logger.info(
- "Sample output: " + dictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
+ "Sample output: " + allDimDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
}
}
@@ -204,22 +187,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
putRowKeyToHLL(row);
}
- if (needFetchPartitionCol == true) {
- String fieldValue = row[partitionColumnIndex];
- if (fieldValue != null) {
- tmpbuf.clear();
- byte[] valueBytes = Bytes.toBytes(fieldValue);
- int size = valueBytes.length + 1;
- if (size >= tmpbuf.capacity()) {
- tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
- }
- tmpbuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL);
- tmpbuf.put(valueBytes);
- outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
- sortableKey.init(outputKey, (byte) 0);
- context.write(sortableKey, EMPTY_TEXT);
- }
- }
rowCount++;
}
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index c66042b..ceddeb5 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -39,8 +39,6 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.metadata.model.TblColRef;
-import com.google.common.collect.Lists;
-
/**
*/
abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, SelfDefineSortableKey, Text> {
@@ -50,8 +48,8 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
protected CubeSegment cubeSeg;
protected CubeDesc cubeDesc;
protected long baseCuboidId;
- protected List<TblColRef> dictCols;
protected IMRTableInputFormat flatTableInputFormat;
+ protected List<TblColRef> allDimDictCols;
protected Text outputKey = new Text();
//protected SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
@@ -59,7 +57,7 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
protected int errorRecordCounter = 0;
protected CubeJoinedFlatTableEnrich intermediateTableDesc;
- protected int[] dictionaryColumnIndex;
+ protected int[] columnIndex;
protected FactDistinctColumnsReducerMapping reducerMapping;
@@ -74,20 +72,18 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
cubeSeg = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
cubeDesc = cube.getDescriptor();
baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- dictCols = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt());
+ reducerMapping = new FactDistinctColumnsReducerMapping(cube);
+ allDimDictCols = reducerMapping.getAllDimDictCols();
flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
- dictionaryColumnIndex = new int[dictCols.size()];
- for (int i = 0; i < dictCols.size(); i++) {
- TblColRef colRef = dictCols.get(i);
+ columnIndex = new int[allDimDictCols.size()];
+ for (int i = 0; i < allDimDictCols.size(); i++) {
+ TblColRef colRef = allDimDictCols.get(i);
int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
- dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
+ columnIndex[i] = columnIndexOnFlatTbl;
}
-
- reducerMapping = new FactDistinctColumnsReducerMapping(cube,
- conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
}
protected void handleErrorRecord(String[] record, Exception ex) throws IOException {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 801771a..61ba247 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -71,17 +70,16 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
private boolean isStatistics = false;
private KylinConfig cubeConfig;
private int taskId;
- private boolean isPartitionCol = false;
private int rowCount = 0;
private FactDistinctColumnsReducerMapping reducerMapping;
//local build dict
private boolean buildDictInReducer;
private IDictionaryBuilder builder;
- private long timeMaxValue = Long.MIN_VALUE;
- private long timeMinValue = Long.MAX_VALUE;
+ private String maxValue = null;
+ private String minValue = null;
public static final String DICT_FILE_POSTFIX = ".rldict";
- public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".pci";
+ public static final String DIMENSION_COL_INFO_FILE_POSTFIX = ".dci";
private MultipleOutputs mos;
@@ -97,11 +95,9 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
cubeConfig = cube.getConfig();
cubeDesc = cube.getDescriptor();
- int numberOfTasks = context.getNumReduceTasks();
taskId = context.getTaskAttemptID().getTaskID().getId();
- reducerMapping = new FactDistinctColumnsReducerMapping(cube,
- conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
+ reducerMapping = new FactDistinctColumnsReducerMapping(cube);
logger.info("reducer no " + taskId + ", role play " + reducerMapping.getRolePlayOfReducer(taskId));
@@ -114,18 +110,9 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
samplingPercentage = Integer
.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
logger.info("Reducer " + taskId + " handling stats");
- } else if (reducerMapping.isPartitionColReducer(taskId)) {
- // partition col
- isPartitionCol = true;
- col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
- if (col == null) {
- logger.info("No partition col. This reducer will do nothing");
- } else {
- logger.info("Reducer " + taskId + " handling partition col " + col.getIdentity());
- }
} else {
// normal col
- col = reducerMapping.getDictColForReducer(taskId);
+ col = reducerMapping.getColForReducer(taskId);
Preconditions.checkNotNull(col);
// local build dict
@@ -133,7 +120,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder
buildDictInReducer = false;
}
- if (reducerMapping.getReducerNumForDictCol(col) > 1) {
+ if (reducerMapping.getReducerNumForDimCol(col) > 1) {
buildDictInReducer = false; // only works if this is the only reducer of a dictionary column
}
if (buildDictInReducer) {
@@ -167,24 +154,29 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
cuboidHLLMap.put(cuboidId, hll);
}
}
- } else if (isPartitionCol) {
- // partition col
+ } else {
String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
logAFewRows(value);
- long time = DateFormat.stringToMillis(value);
- timeMinValue = Math.min(timeMinValue, time);
- timeMaxValue = Math.max(timeMaxValue, time);
- } else {
- // normal col
- if (buildDictInReducer) {
- String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
- logAFewRows(value);
- builder.addValue(value);
- } else {
- byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
- // output written to baseDir/colName/-r-00000 (etc)
- String fileName = col.getIdentity() + "/";
- mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
+ // if dimension col, compute max/min value
+ if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+ if (minValue == null || col.getType().compare(minValue, value) > 0) {
+ minValue = value;
+ }
+ if (maxValue == null || col.getType().compare(maxValue, value) < 0) {
+ maxValue = value;
+ }
+ }
+
+ //if dict column
+ if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
+ if (buildDictInReducer) {
+ builder.addValue(value);
+ } else {
+ byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
+ // output written to baseDir/colName/-r-00000 (etc)
+ String fileName = col.getIdentity() + "/";
+ mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
+ }
}
}
@@ -207,11 +199,12 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
logMapperAndCuboidStatistics(allCuboids); // for human check
outputStatistics(allCuboids);
- } else if (isPartitionCol) {
- // partition col
- outputPartitionInfo();
} else {
- // normal col
+ //dimension col
+ if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+ outputDimRangeInfo();
+ }
+ // dic col
if (buildDictInReducer) {
Dictionary<String> dict = builder.build();
outputDict(col, dict);
@@ -221,14 +214,17 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
mos.close();
}
- private void outputPartitionInfo() throws IOException, InterruptedException {
- if (col != null) {
- // output written to baseDir/colName/colName.pci-r-00000 (etc)
- String partitionFileName = col.getIdentity() + "/" + col.getName() + PARTITION_COL_INFO_FILE_POSTFIX;
-
- mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMinValue), partitionFileName);
- mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMaxValue), partitionFileName);
- logger.info("write partition info for col : " + col.getName() + " minValue:" + timeMinValue + " maxValue:" + timeMaxValue);
+ private void outputDimRangeInfo() throws IOException, InterruptedException {
+ if (col != null && minValue != null) {
+ // output written to baseDir/colName/colName.dci-r-00000 (etc)
+ String dimRangeFileName = col.getIdentity() + "/" + col.getName() + DIMENSION_COL_INFO_FILE_POSTFIX;
+
+ mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new Text(minValue.getBytes()),
+ dimRangeFileName);
+ mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new Text(maxValue.getBytes()),
+ dimRangeFileName);
+ logger.info("write dimension range info for col : " + col.getName() + " minValue:" + minValue + " maxValue:"
+ + maxValue);
}
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java
index 51594c3..b60e4ce 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java
@@ -18,73 +18,74 @@
package org.apache.kylin.engine.mr.steps;
-import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.common.MapReduceUtil;
import org.apache.kylin.metadata.model.TblColRef;
+import com.google.common.collect.Lists;
+
/**
* Reducers play different roles based on reducer-id:
- * - (start from 0) one reducer for each dictionary column, UHC may have more than one reducer
- * - one reducer to get min/max of date partition column
+ * - (start from 0) one reducer for each dimension column, dictionary column, UHC may have more than one reducer
* - (at the end) one or more reducers to collect row counts for cuboids using HLL
*/
public class FactDistinctColumnsReducerMapping {
- public static final int MARK_FOR_PARTITION_COL = -2;
public static final int MARK_FOR_HLL_COUNTER = -1;
- final private int nDictValueCollectors;
- final private int datePartitionReducerId;
final private int nCuboidRowCounters;
+ final private int nDimReducers;
final private int nTotalReducers;
- final private List<TblColRef> allDictCols;
- final private int[] dictColIdToReducerBeginId;
+ final private List<TblColRef> allDimDictCols = Lists.newArrayList();
+ final private int[] colIdToReducerBeginId;
final private int[] reducerRolePlay; // >=0 for dict col id, <0 for partition col and hll counter (using markers)
public FactDistinctColumnsReducerMapping(CubeInstance cube) {
this(cube, 0);
}
- public FactDistinctColumnsReducerMapping(CubeInstance cube, int cuboidRowCounterReducerNum) {
+ private FactDistinctColumnsReducerMapping(CubeInstance cube, int cuboidRowCounterReducerNum) {
CubeDesc desc = cube.getDescriptor();
+ Set<TblColRef> allCols = cube.getAllColumns();
+ Set<TblColRef> dictCols = desc.getAllColumnsNeedDictionaryBuilt();
+ List<TblColRef> dimCols = desc.listDimensionColumnsExcludingDerived(true);
+ for (TblColRef colRef : allCols) {
+ if (dictCols.contains(colRef)) {
+ allDimDictCols.add(colRef);
+ } else if (dimCols.indexOf(colRef) >= 0){
+ allDimDictCols.add(colRef);
+ }
+ }
- allDictCols = new ArrayList(desc.getAllColumnsNeedDictionaryBuilt());
-
- dictColIdToReducerBeginId = new int[allDictCols.size() + 1];
+ colIdToReducerBeginId = new int[allDimDictCols.size() + 1];
int uhcReducerCount = cube.getConfig().getUHCReducerCount();
List<TblColRef> uhcList = desc.getAllUHCColumns();
int counter = 0;
- for (int i = 0; i < allDictCols.size(); i++) {
- dictColIdToReducerBeginId[i] = counter;
- boolean isUHC = uhcList.contains(allDictCols.get(i));
+ for (int i = 0; i < allDimDictCols.size(); i++) {
+ colIdToReducerBeginId[i] = counter;
+ boolean isUHC = uhcList.contains(allDimDictCols.get(i));
counter += (isUHC) ? uhcReducerCount : 1;
}
-
- dictColIdToReducerBeginId[allDictCols.size()] = counter;
- nDictValueCollectors = counter;
- datePartitionReducerId = counter;
+ colIdToReducerBeginId[allDimDictCols.size()] = counter;
+ nDimReducers = counter;
nCuboidRowCounters = cuboidRowCounterReducerNum == 0 ? //
MapReduceUtil.getCuboidHLLCounterReducerNum(cube) : cuboidRowCounterReducerNum;
- nTotalReducers = nDictValueCollectors + 1 + nCuboidRowCounters;
+ nTotalReducers = nDimReducers + nCuboidRowCounters;
reducerRolePlay = new int[nTotalReducers];
for (int i = 0, dictId = 0; i < nTotalReducers; i++) {
- if (i > datePartitionReducerId) {
+ if (i >= nDimReducers) {
// cuboid HLL counter reducer
reducerRolePlay[i] = MARK_FOR_HLL_COUNTER;
- } else if (i == datePartitionReducerId) {
- // date partition min/max reducer
- reducerRolePlay[i] = MARK_FOR_PARTITION_COL;
} else {
- // dict value collector reducer
- if (i == dictColIdToReducerBeginId[dictId + 1])
+ if (i == colIdToReducerBeginId[dictId + 1])
dictId++;
reducerRolePlay[i] = dictId;
@@ -92,8 +93,8 @@ public class FactDistinctColumnsReducerMapping {
}
}
- public List<TblColRef> getAllDictCols() {
- return allDictCols;
+ public List<TblColRef> getAllDimDictCols() {
+ return allDimDictCols;
}
public int getTotalReducerNum() {
@@ -104,9 +105,9 @@ public class FactDistinctColumnsReducerMapping {
return nCuboidRowCounters;
}
- public int getReducerIdForDictCol(int dictColId, Object fieldValue) {
- int begin = dictColIdToReducerBeginId[dictColId];
- int span = dictColIdToReducerBeginId[dictColId + 1] - begin;
+ public int getReducerIdForCol(int colId, Object fieldValue) {
+ int begin = colIdToReducerBeginId[colId];
+ int span = colIdToReducerBeginId[colId + 1] - begin;
if (span == 1)
return begin;
@@ -120,37 +121,29 @@ public class FactDistinctColumnsReducerMapping {
}
public int getRolePlayOfReducer(int reducerId) {
- return reducerRolePlay[reducerId];
+ return reducerRolePlay[reducerId % nTotalReducers];
}
public boolean isCuboidRowCounterReducer(int reducerId) {
return getRolePlayOfReducer(reducerId) == MARK_FOR_HLL_COUNTER;
}
-
- public boolean isPartitionColReducer(int reducerId) {
- return getRolePlayOfReducer(reducerId) == MARK_FOR_PARTITION_COL;
- }
- public TblColRef getDictColForReducer(int reducerId) {
- int role = getRolePlayOfReducer(reducerId);
+ public TblColRef getColForReducer(int reducerId) {
+ int role = getRolePlayOfReducer(reducerId % nTotalReducers);
if (role < 0)
throw new IllegalStateException();
- return allDictCols.get(role);
- }
-
- public int getReducerNumForDictCol(TblColRef col) {
- int dictColId = allDictCols.indexOf(col);
- return dictColIdToReducerBeginId[dictColId + 1] - dictColIdToReducerBeginId[dictColId];
+ return allDimDictCols.get(role);
}
- public int getReducerIdForDatePartitionColumn() {
- return datePartitionReducerId;
+ public int getReducerNumForDimCol(TblColRef col) {
+ int dictColId = allDimDictCols.indexOf(col);
+ return colIdToReducerBeginId[dictColId + 1] - colIdToReducerBeginId[dictColId];
}
public int getReducerIdForCuboidRowCount(long cuboidId) {
int rowCounterId = (int) (Math.abs(cuboidId) % nCuboidRowCounters);
- return datePartitionReducerId + 1 + rowCounterId;
+ return nDimReducers + rowCounterId;
}
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index f749c80..fdb19db 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -23,15 +23,18 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
import org.apache.kylin.cube.model.SnapshotTableDesc;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.LookupMaterializeContext;
@@ -40,11 +43,14 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.datatype.DataTypeOrder;
import org.apache.kylin.metadata.model.SegmentRange.TSRange;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Sets;
+
/**
*/
public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
@@ -76,9 +82,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
try {
saveExtSnapshotIfNeeded(cubeManager, cube, segment);
- if (segment.isOffsetCube()) {
- updateTimeRange(segment);
- }
+ updateSegment(segment);
cubeManager.promoteNewlyBuiltSegments(cube, segment);
return new ExecuteResult();
@@ -115,40 +119,51 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
}
}
- private void updateTimeRange(CubeSegment segment) throws IOException {
+ private void updateSegment(CubeSegment segment) throws IOException {
final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
- if (partitionCol == null) {
- return;
- }
- final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
- Path colDir = new Path(factColumnsInputPath, partitionCol.getIdentity());
- FileSystem fs = HadoopUtil.getWorkingFileSystem();
- Path outputFile = HadoopUtil.getFilterOnlyPath(fs, colDir,
- partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
- if (outputFile == null) {
- throw new IOException("fail to find the partition file in base dir: " + colDir);
- }
-
- FSDataInputStream is = null;
- BufferedReader bufferedReader = null;
- InputStreamReader isr = null;
- long minValue, maxValue;
- try {
- is = fs.open(outputFile);
- isr = new InputStreamReader(is);
- bufferedReader = new BufferedReader(isr);
- minValue = Long.parseLong(bufferedReader.readLine());
- maxValue = Long.parseLong(bufferedReader.readLine());
- } finally {
- IOUtils.closeQuietly(is);
- IOUtils.closeQuietly(isr);
- IOUtils.closeQuietly(bufferedReader);
- }
+ for (TblColRef dimColRef : segment.getCubeDesc().listDimensionColumnsExcludingDerived(true)) {
+ final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
+ Path colDir = new Path(factColumnsInputPath, dimColRef.getIdentity());
+ FileSystem fs = HadoopUtil.getWorkingFileSystem();
+
+ //handle multiple reducers
+ Path[] outputFiles = HadoopUtil.getFilteredPath(fs, colDir,
+ dimColRef.getName() + FactDistinctColumnsReducer.DIMENSION_COL_INFO_FILE_POSTFIX);
+ if (outputFiles == null || outputFiles.length == 0) {
+ segment.getDimensionRangeInfoMap().put(dimColRef.getIdentity(), new DimensionRangeInfo(null, null));
+ continue;
+ }
- logger.info("updateTimeRange step. minValue:" + minValue + " maxValue:" + maxValue);
- if (minValue != timeMinValue && maxValue != timeMaxValue) {
- segment.setTSRange(new TSRange(minValue, maxValue + 1));
+ FSDataInputStream is = null;
+ BufferedReader bufferedReader = null;
+ InputStreamReader isr = null;
+ Set<String> minValues = Sets.newHashSet(), maxValues = Sets.newHashSet();
+ for (Path outputFile : outputFiles) {
+ try {
+ is = fs.open(outputFile);
+ isr = new InputStreamReader(is);
+ bufferedReader = new BufferedReader(isr);
+ minValues.add(bufferedReader.readLine());
+ maxValues.add(bufferedReader.readLine());
+ } finally {
+ IOUtils.closeQuietly(is);
+ IOUtils.closeQuietly(isr);
+ IOUtils.closeQuietly(bufferedReader);
+ }
+ }
+ DataTypeOrder order = dimColRef.getType().getOrder();
+ String minValue = order.min(minValues);
+ String maxValue = order.max(maxValues);
+ logger.info("updateSegment step. {} minValue:" + minValue + " maxValue:" + maxValue, dimColRef.getName());
+
+ if (segment.isOffsetCube() && partitionCol != null && partitionCol.getIdentity().equals(dimColRef.getIdentity())) {
+ logger.info("update partition. {} timeMinValue:" + minValue + " timeMaxValue:" + maxValue, dimColRef.getName());
+ if (DateFormat.stringToMillis(minValue) != timeMinValue && DateFormat.stringToMillis(maxValue) != timeMaxValue) {
+ segment.setTSRange(new TSRange(DateFormat.stringToMillis(minValue), DateFormat.stringToMillis(maxValue) + 1));
+ }
+ }
+ segment.getDimensionRangeInfoMap().put(dimColRef.getIdentity(), new DimensionRangeInfo(minValue, maxValue));
}
}
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index e7aec8c..4cf6fc6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -20,10 +20,12 @@ package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.exception.SegmentNotFoundException;
import org.apache.kylin.job.exception.ExecuteException;
@@ -61,9 +63,9 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
if (mergingSegmentIds.isEmpty()) {
return ExecuteResult.createFailed(new SegmentNotFoundException("there are no merging segments"));
}
+
long sourceCount = 0L;
long sourceSize = 0L;
-
boolean isOffsetCube = mergedSegment.isOffsetCube();
Long tsStartMin = Long.MAX_VALUE, tsEndMax = 0L;
for (String id : mergingSegmentIds) {
@@ -74,14 +76,26 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
tsEndMax = Math.max(tsEndMax, segment.getTSRange().end.v);
}
+ Map<String, DimensionRangeInfo> mergedSegDimRangeMap = null;
+ for (String id : mergingSegmentIds) {
+ CubeSegment segment = cube.getSegmentById(id);
+ Map<String, DimensionRangeInfo> segDimRangeMap = segment.getDimensionRangeInfoMap();
+ if (mergedSegDimRangeMap == null) {
+ mergedSegDimRangeMap = segDimRangeMap;
+ } else {
+ mergedSegDimRangeMap = DimensionRangeInfo.mergeRangeMap(cube.getModel(), segDimRangeMap,
+ mergedSegDimRangeMap);
+ }
+ }
+
// update segment info
mergedSegment.setSizeKB(cubeSizeBytes / 1024);
mergedSegment.setInputRecords(sourceCount);
mergedSegment.setInputRecordsSize(sourceSize);
mergedSegment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
mergedSegment.setLastBuildTime(System.currentTimeMillis());
-
- if (isOffsetCube == true) {
+ mergedSegment.setDimensionRangeInfoMap(mergedSegDimRangeMap);
+ if (isOffsetCube) {
SegmentRange.TSRange tsRange = new SegmentRange.TSRange(tsStartMin, tsEndMax);
mergedSegment.setTSRange(tsRange);
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
index d013386..03aa616 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
@@ -59,6 +59,7 @@ public class UpdateCubeInfoAfterOptimizeStep extends AbstractExecutable {
segment.setSizeKB(cubeSizeBytes / 1024);
segment.setInputRecords(sourceCount);
segment.setInputRecordsSize(sourceSizeBytes);
+ segment.setDimensionRangeInfoMap(originalSegment.getDimensionRangeInfoMap());
try {
cubeManager.promoteNewlyOptimizeSegments(cube, segment);
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
index a6bc019..3c58b26 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
@@ -60,24 +60,22 @@ public class FactDistinctColumnsReducerMappingTest extends LocalFileMetadataTest
int totalReducerNum = mapping.getTotalReducerNum();
Assert.assertEquals(2, mapping.getCuboidRowCounterReducerNum());
- // check partition column reducer & cuboid row count reducers
+ // check cuboid row count reducers
Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER,
mapping.getRolePlayOfReducer(totalReducerNum - 1));
Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER,
mapping.getRolePlayOfReducer(totalReducerNum - 2));
- Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL,
- mapping.getRolePlayOfReducer(totalReducerNum - 3));
// check all dict column reducers
- int dictEnd = totalReducerNum - 3;
+ int dictEnd = totalReducerNum - 2;
for (int i = 0; i < dictEnd; i++)
Assert.assertTrue(mapping.getRolePlayOfReducer(i) >= 0);
// check a UHC dict column
- Assert.assertEquals(2, mapping.getReducerNumForDictCol(aUHC));
+ Assert.assertEquals(2, mapping.getReducerNumForDimCol(aUHC));
int uhcReducerBegin = -1;
for (int i = 0; i < dictEnd; i++) {
- if (mapping.getDictColForReducer(i).equals(aUHC)) {
+ if (mapping.getColForReducer(i).equals(aUHC)) {
uhcReducerBegin = i;
break;
}
@@ -86,7 +84,7 @@ public class FactDistinctColumnsReducerMappingTest extends LocalFileMetadataTest
int[] allRolePlay = mapping.getAllRolePlaysForReducers();
Assert.assertEquals(allRolePlay[uhcReducerBegin], allRolePlay[uhcReducerBegin + 1]);
for (int i = 0; i < 5; i++) {
- int reducerId = mapping.getReducerIdForDictCol(uhcReducerBegin, i);
+ int reducerId = mapping.getReducerIdForCol(uhcReducerBegin, i);
Assert.assertTrue(uhcReducerBegin <= reducerId && reducerId <= uhcReducerBegin + 1);
}
}
diff --git a/examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json b/examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json
new file mode 100644
index 0000000..124081c
--- /dev/null
+++ b/examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json
@@ -0,0 +1,110 @@
+{
+ "uuid" : "70a9f288-3c01-4745-a04b-5641e82d6c72",
+ "name" : "ssb_cube_with_dimention_range",
+ "owner" : "ADMIN",
+ "cost" : 50,
+ "status" : "DISABLED",
+ "segments" : [{
+ "uuid": "f34e2a88-50e4-4c8c-8dd1-64e922ce8c37",
+ "name": "19920523163722_20180523173026",
+ "storage_location_identifier": "KYLIN_KFV177RJOS",
+ "date_range_start": 706639042000,
+ "date_range_end": 1527096626000,
+ "source_offset_start": 0,
+ "source_offset_end": 0,
+ "status": "READY",
+ "size_kb": 100,
+ "input_records": 1000,
+ "input_records_size": 102400,
+ "last_build_time": 1527068515334,
+ "last_build_job_id": "a5010166-b64a-4289-ac56-064640f7f2fa",
+ "create_time_utc": 1527067828660,
+ "total_shards": 0,
+ "blackout_cuboids": [],
+ "binary_signature": null,
+ "dimension_range_info_map": {
+ "PART.P_BRAND": {
+ "min": "MFGR#1101",
+ "max": "MFGR#5540"
+ },
+ "V_LINEORDER.LO_QUANTITY": {
+ "min": "1",
+ "max": "50"
+ },
+ "PART.P_MFGR": {
+ "min": "MFGR#1",
+ "max": "MFGR#5"
+ },
+ "DATES.D_YEARMONTHNUM": {
+ "min": "199205",
+ "max": "199808"
+ },
+ "CUSTOMER.C_NATION": {
+ "min": "ALGERIA",
+ "max": "VIETNAM"
+ },
+ "DATES.D_YEARMONTH": {
+ "min": "Apr1993",
+ "max": "Sep1997"
+ },
+ "CUSTOMER.C_CUSTKEY": {
+ "min": "1",
+ "max": "2999"
+ },
+ "DATES.D_DATEKEY": {
+ "min": "19920523",
+ "max": "19980802"
+ },
+ "SUPPLIER.S_SUPPKEY": {
+ "min": "1",
+ "max": "200"
+ },
+ "SUPPLIER.S_REGION": {
+ "min": "AFRICA",
+ "max": "MIDDLE EAST"
+ },
+ "PART.P_PARTKEY": {
+ "min": "1",
+ "max": "20000"
+ },
+ "PART.P_CATEGORY": {
+ "min": "MFGR#11",
+ "max": "MFGR#55"
+ },
+ "DATES.D_WEEKNUMINYEAR": {
+ "min": "1",
+ "max": "53"
+ },
+ "CUSTOMER.C_CITY": {
+ "min": "ALGERIA 0",
+ "max": "VIETNAM 9"
+ },
+ "SUPPLIER.S_NATION": {
+ "min": "ALGERIA",
+ "max": "VIETNAM"
+ },
+ "SUPPLIER.S_CITY": {
+ "min": "ALGERIA 1",
+ "max": "VIETNAM 9"
+ },
+ "CUSTOMER.C_REGION": {
+ "min": "AFRICA",
+ "max": "MIDDLE EAST"
+ },
+ "DATES.D_YEAR": {
+ "min": "1992",
+ "max": "1998"
+ },
+ "V_LINEORDER.LO_DISCOUNT": {
+ "min": "0",
+ "max": "10"
+ }
+ }
+ } ],
+ "last_modified" : 1457534216410,
+ "descriptor" : "ssb_cube_with_dimention_range",
+ "create_time_utc" : 1457444500888,
+ "size_kb" : 100,
+ "input_records_count" : 1000,
+ "input_records_size" : 102400
+}
\ No newline at end of file
diff --git a/examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json b/examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json
new file mode 100644
index 0000000..d91b420
--- /dev/null
+++ b/examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json
@@ -0,0 +1,269 @@
+{
+ "uuid" : "5c44df30-daec-486e-af90-927bf7851060",
+ "name" : "ssb_cube_with_dimention_range",
+ "description" : "",
+ "dimensions" : [ {
+ "name" : "PART.P_MFGR",
+ "table" : "PART",
+ "column" : "P_MFGR",
+ "derived" : null
+ }, {
+ "name" : "DATES.D_YEARMONTH",
+ "table" : "DATES",
+ "column" : "D_YEARMONTH",
+ "derived" : null
+ }, {
+ "name" : "SUPPLIER.S_NATION",
+ "table" : "SUPPLIER",
+ "column" : "S_NATION",
+ "derived" : null
+ }, {
+ "name" : "V_LINEORDER.LO_DISCOUNT",
+ "table" : "V_LINEORDER",
+ "column" : "LO_DISCOUNT",
+ "derived" : null
+ }, {
+ "name" : "CUSTOMER.C_CUSTKEY",
+ "table" : "CUSTOMER",
+ "column" : "C_CUSTKEY",
+ "derived" : null
+ }, {
+ "name" : "CUSTOMER.C_NATION",
+ "table" : "CUSTOMER",
+ "column" : "C_NATION",
+ "derived" : null
+ }, {
+ "name" : "SUPPLIER.S_REGION",
+ "table" : "SUPPLIER",
+ "column" : "S_REGION",
+ "derived" : null
+ }, {
+ "name" : "CUSTOMER.C_CITY",
+ "table" : "CUSTOMER",
+ "column" : "C_CITY",
+ "derived" : null
+ }, {
+ "name" : "SUPPLIER.S_SUPPKEY",
+ "table" : "SUPPLIER",
+ "column" : "S_SUPPKEY",
+ "derived" : null
+ }, {
+ "name" : "V_LINEORDER.LO_QUANTITY",
+ "table" : "V_LINEORDER",
+ "column" : "LO_QUANTITY",
+ "derived" : null
+ }, {
+ "name" : "PART.P_BRAND",
+ "table" : "PART",
+ "column" : "P_BRAND",
+ "derived" : null
+ }, {
+ "name" : "SUPPLIER.S_CITY",
+ "table" : "SUPPLIER",
+ "column" : "S_CITY",
+ "derived" : null
+ }, {
+ "name" : "PART.P_PARTKEY",
+ "table" : "PART",
+ "column" : "P_PARTKEY",
+ "derived" : null
+ }, {
+ "name" : "DATES.D_YEARMONTHNUM",
+ "table" : "DATES",
+ "column" : "D_YEARMONTHNUM",
+ "derived" : null
+ }, {
+ "name" : "DATES.D_WEEKNUMINYEAR",
+ "table" : "DATES",
+ "column" : "D_WEEKNUMINYEAR",
+ "derived" : null
+ }, {
+ "name" : "CUSTOMER.C_REGION",
+ "table" : "CUSTOMER",
+ "column" : "C_REGION",
+ "derived" : null
+ }, {
+ "name" : "PART.P_CATEGORY",
+ "table" : "PART",
+ "column" : "P_CATEGORY",
+ "derived" : null
+ }, {
+ "name" : "DATES.D_YEAR",
+ "table" : "DATES",
+ "column" : "D_YEAR",
+ "derived" : null
+ }, {
+ "name" : "DATES.D_DATEKEY",
+ "table" : "DATES",
+ "column" : "D_DATEKEY",
+ "derived" : null
+ } ],
+ "measures" : [ {
+ "name" : "_COUNT_",
+ "function" : {
+ "expression" : "COUNT",
+ "parameter" : {
+ "type" : "constant",
+ "value" : "1",
+ "next_parameter" : null
+ },
+ "returntype" : "bigint"
+ },
+ "dependent_measure_ref" : null
+ }, {
+ "name" : "TOTAL_REVENUE",
+ "function" : {
+ "expression" : "SUM",
+ "parameter" : {
+ "type" : "column",
+ "value" : "LO_REVENUE",
+ "next_parameter" : null
+ },
+ "returntype" : "bigint"
+ },
+ "dependent_measure_ref" : null
+ }, {
+ "name" : "TOTAL_SUPPLYCOST",
+ "function" : {
+ "expression" : "SUM",
+ "parameter" : {
+ "type" : "column",
+ "value" : "LO_SUPPLYCOST",
+ "next_parameter" : null
+ },
+ "returntype" : "bigint"
+ },
+ "dependent_measure_ref" : null
+ } ],
+ "rowkey" : {
+ "rowkey_columns" : [ {
+ "column" : "PART.P_MFGR",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "DATES.D_YEARMONTHNUM",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "CUSTOMER.C_REGION",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "V_LINEORDER.LO_QUANTITY",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "DATES.D_YEARMONTH",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "DATES.D_WEEKNUMINYEAR",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "SUPPLIER.S_REGION",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "V_LINEORDER.LO_DISCOUNT",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "CUSTOMER.C_CITY",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "PART.P_CATEGORY",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "DATES.D_YEAR",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "PART.P_BRAND",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "SUPPLIER.S_CITY",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "SUPPLIER.S_NATION",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "CUSTOMER.C_NATION",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "DATES.D_DATEKEY",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "SUPPLIER.S_SUPPKEY",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ }, {
+ "column" : "CUSTOMER.C_CUSTKEY",
+ "encoding" : "dict",
+ "isShardBy" : true,
+ "index" : "eq"
+ }, {
+ "column" : "PART.P_PARTKEY",
+ "encoding" : "dict",
+ "isShardBy" : false,
+ "index" : "eq"
+ } ]
+ },
+ "signature" : "",
+ "last_modified" : 1457503036686,
+ "model_name" : "ssb",
+ "null_string" : null,
+ "hbase_mapping" : {
+ "column_family" : [ {
+ "name" : "F1",
+ "columns" : [ {
+ "qualifier" : "M",
+ "measure_refs" : [ "_COUNT_", "TOTAL_REVENUE", "TOTAL_SUPPLYCOST"]
+ } ]
+ } ]
+ },
+ "aggregation_groups" : [ {
+ "includes" : [ "SUPPLIER.S_REGION", "CUSTOMER.C_NATION", "SUPPLIER.S_NATION", "CUSTOMER.C_REGION", "DATES.D_YEAR", "DATES.D_WEEKNUMINYEAR", "DATES.D_YEARMONTH", "DATES.D_YEARMONTHNUM", "SUPPLIER.S_CITY" ],
+ "select_rule" : {
+ "hierarchy_dims" : [ [ "S_REGION", "S_NATION", "S_CITY" ] ],
+ "mandatory_dims" : [ "DATES.D_YEAR" ],
+ "joint_dims" : [ ]
+ }
+ } ],
+ "notify_list" : [ ],
+ "status_need_notify" : [ ],
+ "partition_date_start" : 3153000000000,
+ "partition_date_end" : 3153600000000,
+ "auto_merge_time_ranges" : [ 604800000, 2419200000 ],
+ "retention_range" : 0,
+ "engine_type" : 2,
+ "storage_type" : 2,
+ "override_kylin_properties" : {
+ "kylin.storage.hbase.compression-codec" : "lz4",
+ "kylin.cube.aggrgroup.is-mandatory-only-valid" : "true"
+ }
+}
\ No newline at end of file
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index afd9788..16ceede 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -18,15 +18,31 @@
package org.apache.kylin.provision;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
@@ -34,6 +50,7 @@ import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.EngineFactory;
@@ -48,6 +65,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.rest.job.StorageCleanupJob;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
@@ -55,21 +73,9 @@ import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
public class BuildCubeWithEngine {
@@ -304,21 +310,30 @@ public class BuildCubeWithEngine {
if (!buildSegment(cubeName, date1, date2))
return false;
+ checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
if (!buildSegment(cubeName, date2, date3))
return false;
+ checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
if (!optimizeCube(cubeName))
return false;
+ checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
if (!buildSegment(cubeName, date3, date4))
return false;
+ checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
if (!buildSegment(cubeName, date4, date5)) // one empty segment
return false;
+ checkEmptySegRangeInfo(cubeManager.getCube(cubeName));
if (!buildSegment(cubeName, date5, date6)) // another empty segment
return false;
+ checkEmptySegRangeInfo(cubeManager.getCube(cubeName));
+
if (!mergeSegment(cubeName, date2, date4)) // merge 2 normal segments
return false;
+ checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
if (!mergeSegment(cubeName, date2, date5)) // merge normal and empty
return false;
+ checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
// now have 2 normal segments [date1, date2) [date2, date5) and 1 empty segment [date5, date6)
return true;
@@ -460,4 +475,40 @@ public class BuildCubeWithEngine {
}
}
+ private void checkEmptySegRangeInfo(CubeInstance cube) {
+ CubeSegment segment = getLastModifiedSegment(cube);
+ for (String colId : segment.getDimensionRangeInfoMap().keySet()) {
+ DimensionRangeInfo range = segment.getDimensionRangeInfoMap().get(colId);
+ if (!(range.getMax() == null && range.getMin() == null)) {
+ throw new RuntimeException("Empty segment must have null info.");
+ }
+ }
+ }
+
+ private void checkNormalSegRangeInfo(CubeInstance cube) {
+ CubeSegment segment = getLastModifiedSegment(cube);
+ if (segment.getModel().getPartitionDesc().isPartitioned()) {
+ TblColRef colRef = segment.getModel().getPartitionDesc().getPartitionDateColumnRef();
+ DimensionRangeInfo dmRangeInfo = segment.getDimensionRangeInfoMap().get(colRef.getIdentity());
+ long min_v = DateFormat.stringToMillis(dmRangeInfo.getMin());
+ long max_v = DateFormat.stringToMillis(dmRangeInfo.getMax());
+ long ts_range_start = segment.getTSRange().start.v;
+ long ts_range_end = segment.getTSRange().end.v;
+ if (!(ts_range_start <= min_v && max_v <= ts_range_end -1)) {
+ throw new RuntimeException(String.format(
+ "Build cube failed, wrong partition column min/max value."
+ + " Segment: %s, min value: %s, TsRange.start: %s, max value: %s, TsRange.end: %s",
+ segment, min_v, ts_range_start, max_v, ts_range_end));
+ }
+ }
+ }
+
+ private CubeSegment getLastModifiedSegment(CubeInstance cube) {
+ return Collections.max(cube.getSegments(), new Comparator<CubeSegment>() {
+ @Override
+ public int compare(CubeSegment o1, CubeSegment o2) {
+ return Long.compare(o1.getLastBuildTime(), o2.getLastBuildTime());
+ }
+ });
+ }
}
diff --git a/query/src/main/java/org/apache/kylin/query/optrule/AggregateMultipleExpandRule.java b/query/src/main/java/org/apache/kylin/query/optrule/AggregateMultipleExpandRule.java
index cdd6004..39c90e8 100644
--- a/query/src/main/java/org/apache/kylin/query/optrule/AggregateMultipleExpandRule.java
+++ b/query/src/main/java/org/apache/kylin/query/optrule/AggregateMultipleExpandRule.java
@@ -18,8 +18,12 @@
package org.apache.kylin.query.optrule;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
@@ -33,13 +37,11 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.ImmutableBitSet;
-import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
/**
- * Supoort grouping query. Expand the non-simple aggregate to more than one simple aggregates.
+ * Support grouping query. Expand the non-simple aggregate to more than one simple aggregates.
* Add project on expanded simple aggregate to add indicators of origin aggregate.
* All projects on aggregate added into one union, which replace the origin aggregate.
* The new aggregates will be transformed by {@link org.apache.kylin.query.optrule.AggregateProjectReduceRule}, to reduce rolled up dimensions.
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
index 39a2669..b34b42e 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
@@ -39,7 +39,9 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.rex.RexProgramBuilder;
import org.apache.kylin.metadata.filter.FilterOptimizeTransformer;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.query.relnode.visitor.TupleFilterVisitor;
@@ -117,7 +119,29 @@ public class OLAPFilterRel extends Filter implements OLAPRel {
}
}
- context.filter = TupleFilter.and(context.filter, filter);
+ context.filter = and(context.filter, filter);
+ }
+
+ private TupleFilter and(TupleFilter f1, TupleFilter f2) {
+ if (f1 == null)
+ return f2;
+ if (f2 == null)
+ return f1;
+
+ if (f1.getOperator() == FilterOperatorEnum.AND) {
+ f1.addChild(f2);
+ return f1;
+ }
+
+ if (f2.getOperator() == FilterOperatorEnum.AND) {
+ f2.addChild(f1);
+ return f2;
+ }
+
+ LogicalTupleFilter and = new LogicalTupleFilter(FilterOperatorEnum.AND);
+ and.addChild(f1);
+ and.addChild(f2);
+ return and;
}
@Override
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 8aec8b9..bfea632 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -45,7 +45,6 @@ import com.google.common.collect.Lists;
public class HiveMRInput extends HiveInputBase implements IMRInput {
- @SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(HiveMRInput.class);
@Override
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
index 75f322f..4ebdf3d 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
@@ -126,7 +126,7 @@ public class HiveTableReader implements TableReader {
String[] arr = new String[record.size()];
for (int i = 0; i < arr.length; i++) {
Object o = record.get(i);
- arr[i] = (o == null) ? null : o.toString();
+ arr[i] = (o == null || "\\N".equals(o)) ? null : o.toString();
}
return arr;
}