You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/23 03:26:44 UTC
[40/50] [abbrv] kylin git commit: KYLIN-976 Support Custom
Aggregation Types
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index e399a70..0ba6566 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -21,15 +21,14 @@ package org.apache.kylin.dict;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.*;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.*;
import org.apache.kylin.dict.lookup.ReadableTable;
-import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.datatype.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +42,7 @@ public class DictionaryGenerator {
private static final Logger logger = LoggerFactory.getLogger(DictionaryGenerator.class);
- private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" };
+ private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" };
private static int getDictionaryMaxCardinality() {
try {
@@ -53,20 +52,20 @@ public class DictionaryGenerator {
}
}
- public static Dictionary<?> buildDictionaryFromValueEnumerator(DictionaryInfo info, IDictionaryValueEnumerator valueEnumerator) throws IOException{
- Dictionary dict = null;
+ public static org.apache.kylin.common.util.Dictionary<?> buildDictionaryFromValueEnumerator(DictionaryInfo info, IDictionaryValueEnumerator valueEnumerator) throws IOException{
+ org.apache.kylin.common.util.Dictionary dict = null;
int baseId = 0; // always 0 for now
- final int nSamples = 5;
- ArrayList samples = Lists.newArrayListWithCapacity(nSamples);
+ final int nSamples = 5;
+ ArrayList samples = Lists.newArrayListWithCapacity(nSamples);
// build dict, case by data type
DataType dataType = DataType.getInstance(info.getDataType());
if (dataType.isDateTimeFamily())
- dict = buildDateStrDict(valueEnumerator, baseId, nSamples, samples);
+ dict = buildDateStrDict(valueEnumerator, baseId, nSamples, samples);
else if (dataType.isNumberFamily())
- dict = buildNumberDict(valueEnumerator, baseId, nSamples, samples);
+ dict = buildNumberDict(valueEnumerator, baseId, nSamples, samples);
else
- dict = buildStringDict(valueEnumerator, baseId, nSamples, samples);
+ dict = buildStringDict(valueEnumerator, baseId, nSamples, samples);
// log a few samples
StringBuilder buf = new StringBuilder();
@@ -76,54 +75,54 @@ public class DictionaryGenerator {
buf.append(s.toString()).append("=>").append(dict.getIdFromValue(s));
}
logger.info("Dictionary value samples: " + buf.toString());
- logger.info("Dictionary cardinality: " + dict.getSize());
+ logger.info("Dictionary cardinality: " + dict.getSize());
- if (dict instanceof TrieDictionary && dict.getSize() > DICT_MAX_CARDINALITY)
- throw new IllegalArgumentException("Too high cardinality is not suitable for dictionary -- " + info.getSourceTable() + "." + info.getSourceColumn() + " cardinality: " + dict.getSize());
+ if (dict instanceof TrieDictionary && dict.getSize() > DICT_MAX_CARDINALITY)
+ throw new IllegalArgumentException("Too high cardinality is not suitable for dictionary -- " + info.getSourceTable() + "." + info.getSourceColumn() + " cardinality: " + dict.getSize());
return dict;
}
- public static Dictionary mergeDictionaries(DictionaryInfo targetInfo, List<DictionaryInfo> sourceDicts) throws IOException {
- return buildDictionaryFromValueEnumerator(targetInfo, new MultipleDictionaryValueEnumerator(sourceDicts));
+ public static org.apache.kylin.common.util.Dictionary mergeDictionaries(DictionaryInfo targetInfo, List<DictionaryInfo> sourceDicts) throws IOException {
+ return buildDictionaryFromValueEnumerator(targetInfo, new MultipleDictionaryValueEnumerator(sourceDicts));
}
- public static Dictionary<?> buildDictionary(DictionaryInfo info, ReadableTable inpTable) throws IOException {
+ public static org.apache.kylin.common.util.Dictionary<?> buildDictionary(DictionaryInfo info, ReadableTable inpTable) throws IOException {
// currently all data types are casted to string to build dictionary
// String dataType = info.getDataType();
- IDictionaryValueEnumerator columnValueEnumerator = null;
- try {
- logger.info("Building dictionary " + JsonUtil.writeValueAsString(info));
+ IDictionaryValueEnumerator columnValueEnumerator = null;
+ try {
+ logger.info("Building dictionary " + JsonUtil.writeValueAsString(info));
- columnValueEnumerator = new TableColumnValueEnumerator(inpTable.getReader(), info.getSourceColumnIndex());
- return buildDictionaryFromValueEnumerator(info, columnValueEnumerator);
- } finally {
- if (columnValueEnumerator != null)
- columnValueEnumerator.close();
- }
+ columnValueEnumerator = new TableColumnValueEnumerator(inpTable.getReader(), info.getSourceColumnIndex());
+ return buildDictionaryFromValueEnumerator(info, columnValueEnumerator);
+ } finally {
+ if (columnValueEnumerator != null)
+ columnValueEnumerator.close();
+ }
}
- private static Dictionary buildDateStrDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException {
+ private static org.apache.kylin.common.util.Dictionary buildDateStrDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException {
final int BAD_THRESHOLD = 2;
String matchPattern = null;
- byte[] value;
+ byte[] value;
for (String ptn : DATE_PATTERNS) {
matchPattern = ptn; // be optimistic
int badCount = 0;
SimpleDateFormat sdf = new SimpleDateFormat(ptn);
-
- while (valueEnumerator.moveNext()) {
- value = valueEnumerator.current();
+
+ while (valueEnumerator.moveNext()) {
+ value = valueEnumerator.current();
if (value.length == 0)
continue;
String str = Bytes.toString(value);
try {
sdf.parse(str);
- if (samples.size() < nSamples && !samples.contains(str))
+ if (samples.size() < nSamples && !samples.contains(str))
samples.add(str);
} catch (ParseException e) {
logger.info("Unrecognized datetime value: " + str);
@@ -134,33 +133,33 @@ public class DictionaryGenerator {
}
}
}
- if (matchPattern != null) {
+ if (matchPattern != null) {
return new DateStrDictionary(matchPattern, baseId);
- }
+ }
}
throw new IllegalStateException("Unrecognized datetime value");
}
- private static Dictionary buildStringDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException {
+ private static org.apache.kylin.common.util.Dictionary buildStringDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException {
TrieDictionaryBuilder builder = new TrieDictionaryBuilder(new StringBytesConverter());
- byte[] value;
- while (valueEnumerator.moveNext()) {
- value = valueEnumerator.current();
+ byte[] value;
+ while (valueEnumerator.moveNext()) {
+ value = valueEnumerator.current();
if (value == null)
continue;
String v = Bytes.toString(value);
builder.addValue(v);
- if (samples.size() < nSamples && !samples.contains(v))
+ if (samples.size() < nSamples && !samples.contains(v))
samples.add(v);
}
return builder.build(baseId);
}
- private static Dictionary buildNumberDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException {
+ private static org.apache.kylin.common.util.Dictionary buildNumberDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException {
NumberDictionaryBuilder builder = new NumberDictionaryBuilder(new StringBytesConverter());
- byte[] value;
- while (valueEnumerator.moveNext()) {
- value = valueEnumerator.current();
+ byte[] value;
+ while (valueEnumerator.moveNext()) {
+ value = valueEnumerator.current();
if (value == null)
continue;
String v = Bytes.toString(value);
@@ -168,7 +167,7 @@ public class DictionaryGenerator {
continue;
builder.addValue(v);
- if (samples.size() < nSamples && !samples.contains(v))
+ if (samples.size() < nSamples && !samples.contains(v))
samples.add(v);
}
return builder.build(baseId);
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
index 0d6c9b1..645722c 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
@@ -20,6 +20,7 @@ package org.apache.kylin.dict;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java
index 9ac8206..6381643 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.JsonUtil;
/**
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 1c492c4..aa0bc5d 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.dict.lookup.FileTable;
import org.apache.kylin.dict.lookup.HiveTable;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/ISegment.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/ISegment.java b/dictionary/src/main/java/org/apache/kylin/dict/ISegment.java
index b877eef..68368b5 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/ISegment.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/ISegment.java
@@ -18,6 +18,7 @@
package org.apache.kylin.dict;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.metadata.model.TblColRef;
/**
@@ -27,7 +28,7 @@ public interface ISegment {
public abstract int getColumnLength(TblColRef col);
- public abstract Dictionary<?> getDictionary(TblColRef col);
+ public abstract Dictionary<String> getDictionary(TblColRef col);
public String getName();
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java b/dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
index 13f7394..43d62a3 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
@@ -20,6 +20,7 @@ package org.apache.kylin.dict;
import com.google.common.collect.Lists;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
import java.io.IOException;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java b/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
index 3479dbd..2b6d77d 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
index 96a7cd6..cc3c637 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
@@ -30,7 +30,7 @@ import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.dict.StringBytesConverter;
import org.apache.kylin.dict.TrieDictionary;
import org.apache.kylin.dict.TrieDictionaryBuilder;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
index 485170b..fedb2c1 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
@@ -169,7 +169,12 @@
"expression": "COUNT_DISTINCT",
"parameter": {
"type": "column",
- "value": "LSTG_FORMAT_NAME,SELLER_ID"
+ "value": "LSTG_FORMAT_NAME",
+ "next_parameter": {
+ "type": "column",
+ "value": "SELLER_ID",
+ "next_parameter": null
+ }
},
"returntype": "hllc(10)"
},
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
index c6bd12f..7e6934b 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
@@ -169,7 +169,12 @@
"expression": "COUNT_DISTINCT",
"parameter": {
"type": "column",
- "value": "LSTG_FORMAT_NAME,SELLER_ID"
+ "value": "LSTG_FORMAT_NAME",
+ "next_parameter": {
+ "type": "column",
+ "value": "SELLER_ID",
+ "next_parameter": null
+ }
},
"returntype": "hllc(10)"
},
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/IICapabilityChecker.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IICapabilityChecker.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IICapabilityChecker.java
new file mode 100644
index 0000000..4ee8f50
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IICapabilityChecker.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.invertedindex;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.LookupDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+/**
+ */
+public class IICapabilityChecker {
+ private static final Logger logger = LoggerFactory.getLogger(IICapabilityChecker.class);
+
+ public static CapabilityResult check(IIInstance ii, SQLDigest digest) {
+ CapabilityResult result = new CapabilityResult();
+ result.capable = false;
+
+ // match fact table
+ if (!digest.factTable.equalsIgnoreCase(ii.getFactTable())) {
+ logger.info("Exclude II " + ii.getName() + " because fact table unmatch");
+ return result;
+ }
+
+ // match joins
+ boolean matchJoin = isMatchedWithJoins(digest.joinDescs, ii);
+ if (!matchJoin) {
+ logger.info("Exclude II " + ii.getName() + " because unmatched joins");
+ return result;
+ }
+
+ // dimensions & measures
+ Collection<TblColRef> dimensionColumns = getDimensionColumns(digest);
+ Collection<FunctionDesc> aggrFunctions = digest.aggregations;
+ Collection<TblColRef> unmatchedDimensions = unmatchedDimensions(dimensionColumns, ii);
+ Collection<FunctionDesc> unmatchedAggregations = unmatchedAggregations(aggrFunctions, ii);
+
+ // try dimension-as-measure
+ if (!unmatchedAggregations.isEmpty()) {
+ tryDimensionAsMeasures(unmatchedAggregations, digest, ii, result);
+ }
+
+ if (!unmatchedDimensions.isEmpty()) {
+ logger.info("Exclude ii " + ii.getName() + " because unmatched dimensions");
+ return result;
+ }
+
+ if (!unmatchedAggregations.isEmpty()) {
+ logger.info("Exclude ii " + ii.getName() + " because unmatched aggregations");
+ return result;
+ }
+
+ // cost will be minded by caller
+ result.capable = true;
+ return result;
+ }
+
+ private static boolean isMatchedWithJoins(Collection<JoinDesc> joins, IIInstance iiInstance) {
+ IIDesc iiDesc = iiInstance.getDescriptor();
+ List<TableDesc> tables = iiDesc.listTables();
+
+ List<JoinDesc> cubeJoins = new ArrayList<JoinDesc>(tables.size());
+ for (TableDesc tableDesc : tables) {
+ JoinDesc join = null;
+ for (LookupDesc lookup : iiDesc.getModel().getLookups()) {
+ if (lookup.getTable().equalsIgnoreCase(tableDesc.getIdentity())) {
+ join = lookup.getJoin();
+ cubeJoins.add(join);
+ break;
+ }
+ }
+ }
+
+ for (JoinDesc j : joins) {
+ // optiq engine can't decide which one is fk or pk
+ String pTable = j.getPrimaryKeyColumns()[0].getTable();
+ String factTable = iiDesc.getModel().getFactTable();
+ if (factTable.equals(pTable)) {
+ j.swapPKFK();
+ }
+
+ // check primary key, all PK column should refer to same tale, the Fact Table of iiInstance.
+ // Using first column's table name to check.
+ String fTable = j.getForeignKeyColumns()[0].getTable();
+ if (!factTable.equals(fTable)) {
+ logger.info("Fact Table" + factTable + " not matched in join: " + j + " on ii " + iiInstance.getName());
+ return false;
+ }
+
+ // The hashcode() function of JoinDesc has been overwritten,
+ // which takes into consideration: pk,fk,jointype
+ if (!cubeJoins.contains(j)) {
+ logger.info("Query joins don't match on ii " + iiInstance.getName());
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static Collection<TblColRef> getDimensionColumns(SQLDigest sqlDigest) {
+ Collection<TblColRef> groupByColumns = sqlDigest.groupbyColumns;
+ Collection<TblColRef> filterColumns = sqlDigest.filterColumns;
+
+ Collection<TblColRef> dimensionColumns = new HashSet<TblColRef>();
+ dimensionColumns.addAll(groupByColumns);
+ dimensionColumns.addAll(filterColumns);
+ return dimensionColumns;
+ }
+
+ private static Set<TblColRef> unmatchedDimensions(Collection<TblColRef> dimensionColumns, IIInstance ii) {
+ HashSet<TblColRef> result = Sets.newHashSet(dimensionColumns);
+ result.removeAll(ii.getDescriptor().listAllDimensions());
+ return result;
+ }
+
+ private static Set<FunctionDesc> unmatchedAggregations(Collection<FunctionDesc> aggregations, IIInstance ii) {
+ HashSet<FunctionDesc> result = Sets.newHashSet(aggregations);
+ result.removeAll(ii.getDescriptor().listAllFunctions());
+ return result;
+ }
+
+ private static void tryDimensionAsMeasures(Collection<FunctionDesc> unmatchedAggregations, SQLDigest digest, IIInstance ii, CapabilityResult result) {
+ IIDesc iiDesc = ii.getDescriptor();
+ Collection<FunctionDesc> iiFuncs = iiDesc.listAllFunctions();
+
+ Iterator<FunctionDesc> it = unmatchedAggregations.iterator();
+ while (it.hasNext()) {
+ FunctionDesc functionDesc = it.next();
+
+ if (iiFuncs.contains(functionDesc)) {
+ it.remove();
+ continue;
+ }
+
+ // let calcite handle count
+ if (functionDesc.isCount()) {
+ it.remove();
+ continue;
+ }
+
+ // calcite can do aggregation from columns on-the-fly
+ List<TblColRef> neededCols = functionDesc.getParameter().getColRefs();
+ if (neededCols.size() > 0 && iiDesc.listAllDimensions().containsAll(neededCols)) {
+ result.influences.add(new CapabilityResult.DimensionAsMeasure(functionDesc));
+ it.remove();
+ continue;
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
index b372c70..86b0543 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
@@ -29,6 +29,7 @@ import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.CapabilityResult;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
@@ -265,15 +266,16 @@ public class IIInstance extends RootPersistentEntity implements IRealization {
}
@Override
- public boolean isCapable(SQLDigest digest) {
- //TODO: currently II is nearly omnipotent
- if (!digest.factTable.equalsIgnoreCase(this.getFactTable()))
- return false;
-
- return true;
+ public CapabilityResult isCapable(SQLDigest digest) {
+ CapabilityResult result = IICapabilityChecker.check(this, digest);
+ if (result.capable) {
+ result.cost = getCost(digest);
+ } else {
+ result.cost = -1;
+ }
+ return result;
}
- @Override
public int getCost(SQLDigest digest) {
return 0;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
index cb5a1cb..f7e70f4 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
@@ -33,7 +33,7 @@ import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.restclient.Broadcaster;
import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.invertedindex.model.IIDesc;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
index 33ff1b0..77a876c 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
@@ -25,7 +25,7 @@ import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.dict.ISegment;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.model.IIDesc;
@@ -271,7 +271,7 @@ public class IISegment implements Comparable<IISegment>, ISegment {
}
@Override
- public Dictionary<?> getDictionary(TblColRef col) {
+ public Dictionary<String> getDictionary(TblColRef col) {
int index = getTableRecordInfo().findColumn(col);
return getTableRecordInfo().dict(index);
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BitMapContainer.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BitMapContainer.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BitMapContainer.java
index 2594936..164e2b9 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BitMapContainer.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BitMapContainer.java
@@ -25,7 +25,7 @@ import java.util.List;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
import org.roaringbitmap.RoaringBitmap;
/**
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/CompressedValueContainer.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/CompressedValueContainer.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/CompressedValueContainer.java
index c58261d..334457c 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/CompressedValueContainer.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/CompressedValueContainer.java
@@ -24,7 +24,7 @@ import java.util.Arrays;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
import com.ning.compress.lzf.LZFDecoder;
import com.ning.compress.lzf.LZFEncoder;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
index baae663..b9f963e 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
/**
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
index 11ecdf8..c41a70c 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
@@ -21,7 +21,7 @@ package org.apache.kylin.invertedindex.index;
import java.io.IOException;
import java.util.List;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.model.IIDesc;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
index da78627..0ed58b0 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
-import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.datatype.DataType;
/**
* Created by honma on 11/10/14.
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
index c8597bc..f10712a 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
@@ -207,6 +207,10 @@ public class IIDesc extends RootPersistentEntity {
p1.setColRefs(ImmutableList.of(new TblColRef(columnDesc)));
f1.setParameter(p1);
f1.setReturnType(returnType);
+ if (f1.isSum() && f1.getReturnDataType().isIntegerFamily()) {
+ f1.setReturnType("bigint");
+ }
+
measureDesc.setFunction(f1);
return measureDesc;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
index 4ad38b5..50af8a4 100644
--- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
+++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.invertedindex.IIDescManager;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
index 5f75b04..cfa4ba6 100644
--- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
+++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.index.BitMapContainer;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
index 470225e..929408a 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
@@ -22,12 +22,14 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.mr.KylinMapper;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesSplitter;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.SplittedBytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -39,11 +41,13 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc;
-import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureIngester;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.ParameterDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,6 +79,8 @@ public class BaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, Text
private int errorRecordCounter;
private Text outputKey = new Text();
private Text outputValue = new Text();
+ protected MeasureIngester<?>[] aggrIngesters;
+ protected Map<TblColRef, Dictionary<String>> dictionaryMap;
private Object[] measures;
private byte[][] keyBytesBuf;
private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
@@ -116,6 +122,9 @@ public class BaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, Text
int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
keyBytesBuf = new byte[colCount][];
+ aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
+ dictionaryMap = cubeSegment.buildDictionaryMap();
+
initNullBytes();
}
@@ -153,52 +162,45 @@ public class BaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, Text
private void buildValue(SplittedBytes[] splitBuffers) {
for (int i = 0; i < measures.length; i++) {
- byte[] valueBytes = getValueBytes(splitBuffers, i);
- measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes);
+ measures[i] = buildValueOf(i, splitBuffers);
}
valueBuf.clear();
measureCodec.encode(measures, valueBuf);
}
- private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) {
- MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx);
- FunctionDesc func = desc.getFunction();
- ParameterDesc paramDesc = func.getParameter();
- int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
-
- byte[] result = null;
-
- // constant
- if (flatTableIdx == null) {
- result = Bytes.toBytes(paramDesc.getValue());
- }
- // column values
- else {
- // for multiple columns, their values are joined
- for (int i = 0; i < flatTableIdx.length; i++) {
- SplittedBytes split = splitBuffers[flatTableIdx[i]];
- if (result == null) {
- result = Arrays.copyOf(split.value, split.length);
- } else {
- byte[] newResult = new byte[result.length + split.length];
- System.arraycopy(result, 0, newResult, 0, result.length);
- System.arraycopy(split.value, 0, newResult, result.length, split.length);
- result = newResult;
- }
+ private Object buildValueOf(int idxOfMeasure, SplittedBytes[] splitBuffers) {
+ MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure);
+ FunctionDesc function = measure.getFunction();
+ int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure];
+
+ int paramCount = function.getParameterCount();
+ String[] inputToMeasure = new String[paramCount];
+
+ // pick up parameter values
+ ParameterDesc param = function.getParameter();
+ int colParamIdx = 0; // index among parameters of column type
+ for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
+ String value;
+ if (function.isCount()) {
+ value = "1";
+ } else if (param.isColumnType()) {
+ value = getCell(colIdxOnFlatTable[colParamIdx++], splitBuffers);
+ } else {
+ value = param.getValue();
}
+ inputToMeasure[i] = value;
}
- if (func.isCount() || func.isHolisticCountDistinct()) {
- // note for holistic count distinct, this value will be ignored
- result = ONE;
- }
-
- if (isNull(result)) {
- result = null;
- }
+ return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
+ }
- return result;
+ private String getCell(int i, SplittedBytes[] splitBuffers) {
+ byte[] bytes = Arrays.copyOf(splitBuffers[i].value, splitBuffers[i].length);
+ if (isNull(bytes))
+ return null;
+ else
+ return Bytes.toString(bytes);
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
index 36bf80c..c19e54b 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
@@ -37,7 +37,7 @@ import org.apache.kylin.cube.model.HBaseColumnDesc;
import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureCodec;
import org.apache.kylin.metadata.model.MeasureDesc;
import com.google.common.collect.Lists;
@@ -93,7 +93,7 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWrita
} else { // normal (complex) case that distributes measures to multiple
// HBase columns
- inputCodec.decode(value, inputMeasures);
+ inputCodec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), inputMeasures);
for (int i = 0; i < n; i++) {
outputValue = keyValueCreators.get(i).create(key, inputMeasures);
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
index a74f2a1..bff9e3a 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
@@ -161,9 +161,8 @@ public class CuboidJob extends AbstractHadoopJob {
// number of reduce tasks
int numReduceTasks = (int) Math.round(totalReduceInputMB / perReduceInputMB * reduceCountRatio);
- // adjust reducer number for cube which has DISTINCT_COUNT measures for
- // better performance
- if (cubeDesc.hasHolisticCountDistinctMeasures()) {
+ // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
+ if (cubeDesc.hasMemoryHungryMeasures()) {
numReduceTasks = numReduceTasks * 4;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
index bcb4b52..4527f30 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
@@ -30,8 +30,8 @@ import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.measure.MeasureAggregators;
-import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.measure.MeasureCodec;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,7 +81,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
aggs.reset();
for (Text value : values) {
- codec.decode(value, input);
+ codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input);
aggs.aggregate(input);
}
aggs.collectStates(result);
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
index 2516745..2528e07 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
@@ -19,17 +19,22 @@
package org.apache.kylin.job.hadoop.cube;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.google.common.collect.Lists;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.mr.KylinMapper;
import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.SplittedBytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -38,10 +43,14 @@ import org.apache.kylin.cube.common.RowKeySplitter;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
@@ -60,6 +69,17 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
private CubeSegment sourceCubeSegment;// Must be unique during a mapper's
// life cycle
+
+ // for re-encode measures that use dictionary
+ private List<Pair<Integer, MeasureIngester>> dictMeasures;
+ private Map<TblColRef, Dictionary<String>> oldDicts;
+ private Map<TblColRef, Dictionary<String>> newDicts;
+ private List<MeasureDesc> measureDescs;
+ private MeasureCodec codec;
+ private Object[] measureObjs;
+ private ByteBuffer valueBuf;
+ private Text outputValue;
+
private Text outputKey = new Text();
private byte[] newKeyBuf;
@@ -133,6 +153,26 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
System.out.println(sourceCubeSegment);
this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+
+
+ measureDescs = cubeDesc.getMeasures();
+ codec = new MeasureCodec(measureDescs);
+ measureObjs = new Object[measureDescs.size()];
+ valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ outputValue = new Text();
+
+ dictMeasures = Lists.newArrayList();
+ for (int i = 0; i < measureDescs.size(); i++) {
+ MeasureDesc measureDesc = measureDescs.get(i);
+ MeasureType measureType = measureDesc.getFunction().getMeasureType();
+ if (measureType.getColumnsNeedDictionary(measureDesc.getFunction()).isEmpty() == false) {
+ dictMeasures.add(Pair.newPair(i, measureType.newIngester()));
+ }
+ }
+ if (dictMeasures.size() > 0) {
+ oldDicts = sourceCubeSegment.buildDictionaryMap();
+ newDicts = mergedCubeSegment.buildDictionaryMap();
+ }
}
@Override
@@ -187,6 +227,21 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset);
outputKey.set(newKey, 0, newKey.length);
+
+ // re-encode measures if dictionary is used
+ if (dictMeasures.size() > 0) {
+ codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
+ for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
+ int i = pair.getFirst();
+ MeasureIngester ingester = pair.getSecond();
+ measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
+ }
+ valueBuf.clear();
+ codec.encode(measureObjs, valueBuf);
+ outputValue.set(valueBuf.array(), 0, valueBuf.position());
+ value = outputValue;
+ }
+
context.write(outputKey, value);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
deleted file mode 100644
index 35b4662..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinMapper;
-import org.apache.kylin.common.util.Array;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesSplitter;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.lookup.HiveTable;
-import org.apache.kylin.dict.lookup.LookupBytesTable;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author George Song (ysong1),honma
- */
-public class NewBaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, Text> {
-
- private static final Logger logger = LoggerFactory.getLogger(NewBaseCuboidMapper.class);
-
- private String cubeName;
- private String segmentName;
- private Cuboid baseCuboid;
- private CubeInstance cube;
- private CubeSegment cubeSegment;
-
- private CubeDesc cubeDesc;
- private MetadataManager metadataManager;
- private TableDesc factTableDesc;
-
- private boolean byteRowDelimiterInferred = false;
- private byte byteRowDelimiter;
-
- private int counter;
- private Text outputKey = new Text();
- private Text outputValue = new Text();
- private Object[] measures;
- private byte[][] keyBytesBuf;
- private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
- private BytesSplitter bytesSplitter;
- private AbstractRowKeyEncoder rowKeyEncoder;
- private MeasureCodec measureCodec;
-
- // deal with table join
- private HashMap<String, LookupBytesTable> lookupTables;// name -> table
- private LinkedList<TableJoin> tableJoins;
- private LinkedList<Pair<Integer, Integer>> factTblColAsRowKey;// similar as
- // TableJoin.dimTblColAsRowKey
- private int[][] measureColumnIndice;
- private byte[] nullValue;
-
- private class TableJoin {
- public LinkedList<Integer> fkIndice;// zero-based join columns on fact
- // table
- public String lookupTableName;
- public String joinType;
-
- // Pair.first -> zero-based column index in lookup table
- // Pair.second -> zero based row key index
- public LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey;
-
- private TableJoin(String joinType, LinkedList<Integer> fkIndice, String lookupTableName, LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey) {
- this.joinType = joinType;
- this.fkIndice = fkIndice;
- this.lookupTableName = lookupTableName;
- this.dimTblColAsRowKey = dimTblColAsRowKey;
- }
- }
-
- @Override
- protected void setup(Context context) throws IOException {
- super.publishConfiguration(context.getConfiguration());
-
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
-
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
- metadataManager = MetadataManager.getInstance(config);
- cube = CubeManager.getInstance(config).getCube(cubeName);
- cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
- cubeDesc = cube.getDescriptor();
- factTableDesc = metadataManager.getTableDesc(cubeDesc.getFactTable());
-
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-
- // intermediateTableDesc = new
- // JoinedFlatTableDesc(cube.getDescriptor());
-
- rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
-
- measureCodec = new MeasureCodec(cubeDesc.getMeasures());
- measures = new Object[cubeDesc.getMeasures().size()];
-
- int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
- keyBytesBuf = new byte[colCount][];
-
- bytesSplitter = new BytesSplitter(factTableDesc.getColumns().length, 4096);
-
- nullValue = new byte[] { (byte) '\\', (byte) 'N' };// As in Hive, null
- // value is
- // represented by \N
-
- prepareJoins();
- prepareMetrics();
- }
-
- private void prepareJoins() throws IOException {
- this.lookupTables = new HashMap<String, LookupBytesTable>();
- this.tableJoins = new LinkedList<TableJoin>();
- this.factTblColAsRowKey = new LinkedList<Pair<Integer, Integer>>();
-
- for (DimensionDesc dim : cubeDesc.getDimensions()) {
- JoinDesc join = dim.getJoin();
- if (join != null) {
- String joinType = join.getType().toUpperCase();
- String lookupTableName = dim.getTable();
-
- // load lookup tables
- if (!lookupTables.containsKey(lookupTableName)) {
- HiveTable htable = new HiveTable(metadataManager, lookupTableName);
- LookupBytesTable btable = new LookupBytesTable(metadataManager.getTableDesc(lookupTableName), join.getPrimaryKey(), htable);
- lookupTables.put(lookupTableName, btable);
- }
-
- // create join infos
- LinkedList<Integer> fkIndice = new LinkedList<Integer>();
- for (TblColRef colRef : join.getForeignKeyColumns()) {
- fkIndice.add(colRef.getColumn().getZeroBasedIndex());
- }
- this.tableJoins.add(new TableJoin(joinType, fkIndice, lookupTableName, this.findColumnRowKeyRelationships(dim)));
-
- } else {
-
- this.factTblColAsRowKey.addAll(this.findColumnRowKeyRelationships(dim));
- }
- }
-
- // put composite keys joins ahead of single key joins
- Collections.sort(tableJoins, new Comparator<TableJoin>() {
- @Override
- public int compare(TableJoin o1, TableJoin o2) {
- return Integer.valueOf(o2.fkIndice.size()).compareTo(Integer.valueOf(o1.fkIndice.size()));
- }
- });
- }
-
- private LinkedList<Pair<Integer, Integer>> findColumnRowKeyRelationships(DimensionDesc dim) {
- LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey = new LinkedList<Pair<Integer, Integer>>();
- for (TblColRef colRef : dim.getColumnRefs()) {
- int dimTableIndex = colRef.getColumn().getZeroBasedIndex();
- int rowKeyIndex = cubeDesc.getRowkey().getRowKeyIndexByColumnName(colRef.getName());
- dimTblColAsRowKey.add(new Pair<Integer, Integer>(dimTableIndex, rowKeyIndex));
- }
- return dimTblColAsRowKey;
- }
-
- private void prepareMetrics() {
- List<MeasureDesc> measures = cubeDesc.getMeasures();
- int measureSize = measures.size();
- measureColumnIndice = new int[measureSize][];
- for (int i = 0; i < measureSize; i++) {
- FunctionDesc func = measures.get(i).getFunction();
- List<TblColRef> colRefs = func.getParameter().getColRefs();
- if (colRefs == null) {
- measureColumnIndice[i] = null;
- } else {
- measureColumnIndice[i] = new int[colRefs.size()];
- for (int j = 0; j < colRefs.size(); j++) {
- TblColRef c = colRefs.get(j);
- int factTblIdx = factTableDesc.findColumnByName(c.getName()).getZeroBasedIndex();
- measureColumnIndice[i][j] = factTblIdx;
- }
- }
- }
- }
-
- private byte[] trimSplitBuffer(SplittedBytes splittedBytes) {
- return Arrays.copyOf(splittedBytes.value, splittedBytes.length);
- }
-
- private byte[] buildKey(SplittedBytes[] splitBuffers) {
-
- int filledDimension = 0;// debug
-
- // join lookup tables, and fill into RowKey the columns in lookup table
- for (TableJoin tableJoin : this.tableJoins) {
- String dimTblName = tableJoin.lookupTableName;
- LookupBytesTable dimTbl = this.lookupTables.get(dimTblName);
- ByteArray[] rawKey = new ByteArray[tableJoin.fkIndice.size()];
- for (int i = 0; i < tableJoin.fkIndice.size(); ++i) {
- rawKey[i] = new ByteArray(trimSplitBuffer(splitBuffers[tableJoin.fkIndice.get(i)]));
- }
- Array<ByteArray> key = new Array<ByteArray>(rawKey);
- ByteArray[] dimRow = dimTbl.getRow(key);
- if (dimRow == null) {
- if (tableJoin.joinType.equalsIgnoreCase("INNER")) {
- return null;
- } else if (tableJoin.joinType.equalsIgnoreCase("LEFT")) {
- for (Pair<Integer, Integer> relation : tableJoin.dimTblColAsRowKey) {
- keyBytesBuf[relation.getSecond()] = nullValue;
- filledDimension++;
- }
- }
- } else {
- for (Pair<Integer, Integer> relation : tableJoin.dimTblColAsRowKey) {
- keyBytesBuf[relation.getSecond()] = dimRow[relation.getFirst()].data;
- filledDimension++;
- }
- }
- }
-
- // fill into RowKey the columns in fact table
- for (Pair<Integer, Integer> relation : this.factTblColAsRowKey) {
- keyBytesBuf[relation.getSecond()] = trimSplitBuffer(splitBuffers[relation.getFirst()]);
- filledDimension++;
- }
-
- assert filledDimension == keyBytesBuf.length;
-
- // all the row key slots(keyBytesBuf) should be complete now
- return rowKeyEncoder.encode(keyBytesBuf);
- }
-
- private void buildValue(SplittedBytes[] splitBuffers) {
-
- for (int i = 0; i < measures.length; i++) {
- byte[] valueBytes = getValueBytes(splitBuffers, i);
- measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes);
- }
-
- valueBuf.clear();
- measureCodec.encode(measures, valueBuf);
- }
-
- private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) {
- MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx);
- ParameterDesc paramDesc = desc.getFunction().getParameter();
- int[] flatTableIdx = this.measureColumnIndice[measureIdx];
-
- byte[] result = null;
-
- // constant
- if (flatTableIdx == null) {
- result = Bytes.toBytes(paramDesc.getValue());
- }
- // column values
- else {
- for (int i = 0; i < flatTableIdx.length; i++) {
- SplittedBytes split = splitBuffers[flatTableIdx[i]];
- result = Arrays.copyOf(split.value, split.length);
- }
- }
-
- if (desc.getFunction().isCount()) {
- result = Bytes.toBytes("1");
- }
-
- return result;
- }
-
- @Override
- public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
- // combining the hive table flattening logic into base cuboid building.
- // the input of this mapper is the fact table rows
-
- counter++;
- if (counter % BatchConstants.COUNTER_MAX == 0) {
- logger.info("Handled " + counter + " records!");
- }
-
- if (!byteRowDelimiterInferred)
- byteRowDelimiter = bytesSplitter.inferByteRowDelimiter(value.getBytes(), value.getLength(), factTableDesc.getColumns().length);
-
- bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
-
- try {
- byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
- if (rowKey == null)
- return;// skip this fact table row
-
- outputKey.set(rowKey, 0, rowKey.length);
-
- buildValue(bytesSplitter.getSplitBuffers());
- outputValue.set(valueBuf.array(), 0, valueBuf.position());
-
- context.write(outputKey, outputValue);
-
- } catch (Throwable t) {
- logger.error("", t);
- context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Error records").increment(1L);
- return;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
index c9988fc..fa7e51b 100644
--- a/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
+++ b/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
@@ -45,7 +45,7 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.JoinDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java
index f100490..58d093a 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.List;
import org.apache.commons.io.FileUtils;
@@ -36,7 +37,7 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.kv.RowKeyDecoder;
import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureCodec;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.junit.After;
import org.junit.Before;
@@ -100,7 +101,7 @@ public class BaseCuboidMapperTest extends LocalFileMetadataTestCase {
private void verifyMeasures(List<MeasureDesc> measures, Text valueBytes, String... valueStr) {
MeasureCodec codec = new MeasureCodec(measures);
Object[] values = new Object[measures.size()];
- codec.decode(valueBytes, values);
+ codec.decode(ByteBuffer.wrap(valueBytes.getBytes()), values);
assertTrue(new BigDecimal(valueStr[0]).equals(values[0]));
assertTrue(new BigDecimal(valueStr[1]).equals(values[1]));
assertTrue(new BigDecimal(valueStr[2]).equals(values[2]));
@@ -138,6 +139,6 @@ public class BaseCuboidMapperTest extends LocalFileMetadataTestCase {
assertEquals(511, Bytes.toLong(cuboidId));
assertEquals(22, restKey.length);
- verifyMeasures(cube.getDescriptor().getMeasures(), result.get(0).getSecond(), "0", "0", "0", "1","22");
+ verifyMeasures(cube.getDescriptor().getMeasures(), result.get(0).getSecond(), "0", "0", "0", "1", "22");
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper2Test.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper2Test.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper2Test.java
index 867faa6..c9b7eba 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper2Test.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper2Test.java
@@ -34,7 +34,7 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureCodec;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java
index 837f759..16cfa89 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java
@@ -37,7 +37,7 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureCodec;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
index 9a1fdfb..ea5c163 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
@@ -33,7 +33,7 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/test/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapperTest.java
index 7ce6ee9..d60ec67 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapperTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapperTest.java
@@ -24,8 +24,11 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
@@ -37,6 +40,8 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import javax.annotation.Nullable;
+
public class NDCuboidMapperTest extends LocalFileMetadataTestCase {
MapReduceDriver<Text, Text, Text, Text, Text, Text> mapReduceDriver;
@@ -81,7 +86,17 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase {
byte[] resultValue = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1, 1 };
Pair<Text, Text> output1 = new Pair<Text, Text>(new Text(resultKey), new Text(resultValue));
- assertTrue(result.contains(output1));
+
+ //As we will truncate decimal(KYLIN-766), value will no longer equals to resultValue
+ Collection<Text> keys = Collections2.transform(result, new Function<Pair<Text, Text>, Text>() {
+ @Nullable
+ @Override
+ public Text apply(Pair<Text, Text> input) {
+ return input.getFirst();
+ }
+ });
+ assertTrue(keys.contains(output1.getFirst()));
+ assertTrue(!result.contains(output1));
long[] keySet = new long[result.size()];
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java b/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java
new file mode 100644
index 0000000..7b74225
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.io.Serializable;
+
+import org.apache.kylin.metadata.datatype.DataType;
+
+/**
+ */
+@SuppressWarnings("serial")
+abstract public class MeasureAggregator<V> implements Serializable {
+
+ public static MeasureAggregator<?> create(String funcName, DataType dataType) {
+ return MeasureTypeFactory.create(funcName, dataType).newAggregator();
+ }
+
+ public static int guessBigDecimalMemBytes() {
+ // 116 returned by AggregationCacheMemSizeTest
+ return 8 // aggregator obj shell
+ + 8 // ref to BigDecimal
+ + 8 // BigDecimal obj shell
+ + 100; // guess of BigDecimal internal
+ }
+
+ public static int guessDoubleMemBytes() {
+ // 29 to 44 returned by AggregationCacheMemSizeTest
+ return 44;
+ /*
+ return 8 // aggregator obj shell
+ + 8 // ref to DoubleWritable
+ + 8 // DoubleWritable obj shell
+ + 8; // size of double
+ */
+ }
+
+ public static int guessLongMemBytes() {
+ // 29 to 44 returned by AggregationCacheMemSizeTest
+ return 44;
+ /*
+ return 8 // aggregator obj shell
+ + 8 // ref to LongWritable
+ + 8 // LongWritable obj shell
+ + 8; // size of long
+ */
+ }
+
+ // ============================================================================
+
+ @SuppressWarnings("rawtypes")
+ public void setDependentAggregator(MeasureAggregator agg) {
+ }
+
+ abstract public void reset();
+
+ abstract public void aggregate(V value);
+
+ abstract public V getState();
+
+ // get an estimate of memory consumption UPPER BOUND
+ abstract public int getMemBytesEstimate();
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java b/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
new file mode 100644
index 0000000..12832ff
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+/**
+ */
+@SuppressWarnings({ "rawtypes", "unchecked", "serial" })
+public class MeasureAggregators implements Serializable {
+
+ private final MeasureAggregator[] aggs;
+ private final int descLength;
+
+ public MeasureAggregators(Collection<MeasureDesc> measureDescs) {
+ this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()]));
+ }
+
+ public MeasureAggregators(MeasureDesc... measureDescs) {
+ descLength = measureDescs.length;
+ aggs = new MeasureAggregator[descLength];
+
+ Map<String, Integer> measureIndexMap = new HashMap<String, Integer>();
+ for (int i = 0; i < descLength; i++) {
+ FunctionDesc func = measureDescs[i].getFunction();
+ aggs[i] = func.getMeasureType().newAggregator();
+ measureIndexMap.put(measureDescs[i].getName(), i);
+ }
+ // fill back dependent aggregator
+ for (int i = 0; i < descLength; i++) {
+ String depMsrRef = measureDescs[i].getDependentMeasureRef();
+ if (depMsrRef != null) {
+ int index = measureIndexMap.get(depMsrRef);
+ aggs[i].setDependentAggregator(aggs[index]);
+ }
+ }
+ }
+
+ public void reset() {
+ for (int i = 0; i < aggs.length; i++) {
+ aggs[i].reset();
+ }
+ }
+
+ public void aggregate(Object[] values) {
+ assert values.length == descLength;
+
+ for (int i = 0; i < descLength; i++) {
+ aggs[i].aggregate(values[i]);
+ }
+ }
+
+ public void collectStates(Object[] states) {
+ for (int i = 0; i < descLength; i++) {
+ states[i] = aggs[i].getState();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java b/metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
new file mode 100644
index 0000000..6209079
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+/**
+ * @author yangli9
+ *
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class MeasureCodec {
+
+ int nMeasures;
+ DataTypeSerializer[] serializers;
+
+ public MeasureCodec(Collection<MeasureDesc> measureDescs) {
+ this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()]));
+ }
+
+ public MeasureCodec(MeasureDesc... measureDescs) {
+ String[] dataTypes = new String[measureDescs.length];
+ for (int i = 0; i < dataTypes.length; i++) {
+ dataTypes[i] = measureDescs[i].getFunction().getReturnType();
+ }
+ init(dataTypes);
+ }
+
+ public MeasureCodec(String... dataTypes) {
+ init(dataTypes);
+ }
+
+ private void init(String[] dataTypes) {
+ nMeasures = dataTypes.length;
+ serializers = new DataTypeSerializer[nMeasures];
+
+ for (int i = 0; i < nMeasures; i++) {
+ serializers[i] = DataTypeSerializer.create(dataTypes[i]);
+ }
+ }
+
+ public DataTypeSerializer getSerializer(int idx) {
+ return serializers[idx];
+ }
+
+ public void decode(ByteBuffer buf, Object[] result) {
+ assert result.length == nMeasures;
+ for (int i = 0; i < nMeasures; i++) {
+ result[i] = serializers[i].deserialize(buf);
+ }
+ }
+
+ public void encode(Object[] values, ByteBuffer out) {
+ assert values.length == nMeasures;
+ for (int i = 0; i < nMeasures; i++) {
+ serializers[i].serialize(values[i], out);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java b/metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
new file mode 100644
index 0000000..0076252
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+abstract public class MeasureIngester<V> {
+
+ public static MeasureIngester<?> create(MeasureDesc measure) {
+ return measure.getFunction().getMeasureType().newIngester();
+ }
+
+ public static MeasureIngester<?>[] create(Collection<MeasureDesc> measures) {
+ MeasureIngester<?>[] result = new MeasureIngester<?>[measures.size()];
+ int i = 0;
+ for (MeasureDesc measure : measures) {
+ result[i++] = create(measure);
+ }
+ return result;
+ }
+
+ abstract public V valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap);
+
+ public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) {
+ throw new UnsupportedOperationException();
+ }
+}