You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/27 10:01:05 UTC
[10/13] incubator-kylin git commit: KYLIN-976 Add ingester;
Build part done, in-mem cube test pass
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
index ed0a166..832584c 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
@@ -33,10 +33,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.metadata.model.TblColRef;
import org.junit.AfterClass;
@@ -58,7 +58,7 @@ public class DoggedCubeBuilderTest extends LocalFileMetadataTestCase {
private static CubeInstance cube;
private static String flatTable;
- private static Map<TblColRef, Dictionary<?>> dictionaryMap;
+ private static Map<TblColRef, Dictionary<String>> dictionaryMap;
@BeforeClass
public static void before() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
index f853b08..ab87c2b 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
@@ -33,13 +33,13 @@ import java.util.concurrent.Future;
import org.apache.commons.io.FileUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
import org.apache.kylin.gridtable.GTRecord;
@@ -66,7 +66,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
private static CubeInstance cube;
private static String flatTable;
- private static Map<TblColRef, Dictionary<?>> dictionaryMap;
+ private static Map<TblColRef, Dictionary<String>> dictionaryMap;
@BeforeClass
public static void before() throws IOException {
@@ -166,8 +166,8 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
queue.put(new ArrayList<String>(0));
}
- static Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
- Map<TblColRef, Dictionary<?>> result = Maps.newHashMap();
+ static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
+ Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
CubeDesc desc = cube.getDescriptor();
CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
int nColumns = flatTableDesc.getColumnList().size();
@@ -178,7 +178,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
if (desc.getRowkey().isUseDictionary(col)) {
logger.info("Building dictionary for " + col);
List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
- Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList));
+ Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList));
result.put(col, dict);
}
}
@@ -192,7 +192,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
TblColRef literalCol = func.getTopNLiteralColumn();
logger.info("Building dictionary for " + literalCol);
List<byte[]> valueList = readValueList(flatTable, nColumns, literalColIdx);
- Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(literalCol.getType(), new IterableDictionaryValueEnumerator(valueList));
+ Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(literalCol.getType(), new IterableDictionaryValueEnumerator(valueList));
result.put(literalCol, dict);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
index 93f0419..6ae0d0e 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
@@ -23,15 +23,15 @@ import java.util.Random;
import java.util.SortedMap;
import java.util.TreeMap;
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.aggregation.basic.BigDecimalSumAggregator;
-import org.apache.kylin.aggregation.basic.DoubleSumAggregator;
-import org.apache.kylin.aggregation.basic.LongSumAggregator;
-import org.apache.kylin.aggregation.hllc.HLLCAggregator;
-import org.apache.kylin.common.datatype.DoubleMutable;
-import org.apache.kylin.common.datatype.LongMutable;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.measure.basic.BigDecimalSumAggregator;
+import org.apache.kylin.measure.basic.DoubleSumAggregator;
+import org.apache.kylin.measure.basic.LongSumAggregator;
+import org.apache.kylin.measure.hllc.HLLCAggregator;
+import org.apache.kylin.metadata.datatype.DoubleMutable;
+import org.apache.kylin.metadata.datatype.LongMutable;
import org.junit.Test;
public class AggregationCacheMemSizeTest {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
index b3981e8..836bafd 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -27,18 +27,18 @@ import java.util.BitSet;
import java.util.List;
import java.util.Map;
-import org.apache.kylin.common.datatype.DataType;
-import org.apache.kylin.common.datatype.LongMutable;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.gridtable.CubeCodeSystem;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.NumberDictionaryBuilder;
import org.apache.kylin.dict.StringBytesConverter;
import org.apache.kylin.dict.TrieDictionaryBuilder;
import org.apache.kylin.gridtable.GTInfo.Builder;
import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.LongMutable;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
index 4313f4b..5454490 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
@@ -26,9 +26,9 @@ import java.math.BigDecimal;
import java.util.BitSet;
import java.util.List;
-import org.apache.kylin.common.datatype.LongMutable;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.metadata.datatype.LongMutable;
import org.junit.Test;
public class SimpleGridTableTest {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
index 7f4da61..02fbecc 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
@@ -24,10 +24,10 @@ import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import org.apache.kylin.common.datatype.DataType;
-import org.apache.kylin.common.datatype.LongMutable;
-import org.apache.kylin.common.datatype.StringSerializer;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.LongMutable;
+import org.apache.kylin.metadata.datatype.StringSerializer;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
index 4da5471..e9f74a4 100644
--- a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
@@ -23,11 +23,11 @@ import static org.junit.Assert.assertEquals;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
-import org.apache.kylin.aggregation.MeasureCodec;
-import org.apache.kylin.common.datatype.DoubleMutable;
-import org.apache.kylin.common.datatype.LongMutable;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.metadata.datatype.DoubleMutable;
+import org.apache.kylin.metadata.datatype.LongMutable;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
index b882e58..62b06aa 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
@@ -30,6 +30,7 @@ import java.io.UnsupportedEncodingException;
import java.util.Date;
import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.util.Dictionary;
/**
* A dictionary for date string (date only, no time).
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/DictCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictCodeSystem.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictCodeSystem.java
index f19a2a8..b0326c1 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictCodeSystem.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictCodeSystem.java
@@ -3,6 +3,7 @@ package org.apache.kylin.dict;
import java.nio.ByteBuffer;
import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.metadata.filter.IFilterCodeSystem;
/**
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
deleted file mode 100644
index d1fc6f9..0000000
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict;
-
-import java.io.PrintStream;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-
-import org.apache.kylin.common.persistence.Writable;
-import org.apache.kylin.common.util.BytesUtil;
-
-/**
- * A bi-way dictionary that maps from dimension/column values to IDs and vice
- * versa. By storing IDs instead of real values, the size of cube is
- * significantly reduced.
- *
- * - IDs are smallest integers possible for the cardinality of a column, for the
- * purpose of minimal storage space - IDs preserve ordering of values, such that
- * range query can be applied to IDs directly
- *
- * A dictionary once built, is immutable. This allows optimal memory footprint
- * by e.g. flatten the Trie structure into a byte array, replacing node pointers
- * with array offsets.
- *
- * @author yangli9
- */
-@SuppressWarnings("serial")
-abstract public class Dictionary<T> implements Writable, Serializable {
-
- public static final byte NULL = (byte) 0xff;
-
- // ID with all bit-1 (0xff e.g.) reserved for NULL value
- public static final int NULL_ID[] = new int[] { 0, 0xff, 0xffff, 0xffffff, 0xffffffff };
-
- abstract public int getMinId();
-
- abstract public int getMaxId();
-
- public int getSize() {
- return getMaxId() - getMinId() + 1;
- }
-
- /**
- * @return the size of an ID in bytes, determined by the cardinality of column
- */
- abstract public int getSizeOfId();
-
- /**
- * @return the (maximum) size of value in bytes, determined by the longest value
- */
- abstract public int getSizeOfValue();
-
- /**
- * @return true if each entry of this dict is contained by the dict in param
- */
- abstract public boolean contains(Dictionary<?> another);
-
- /**
- * Convenient form of <code>getIdFromValue(value, 0)</code>
- */
- final public int getIdFromValue(T value) throws IllegalArgumentException {
- return getIdFromValue(value, 0);
- }
-
- /**
- * Returns the ID integer of given value. In case of not found
- * <p>
- * - if roundingFlag=0, throw IllegalArgumentException; <br>
- * - if roundingFlag<0, the closest smaller ID integer if exist; <br>
- * - if roundingFlag>0, the closest bigger ID integer if exist. <br>
- * <p>
- * The implementation often has cache, thus faster than the byte[] version getIdFromValueBytes()
- *
- * @throws IllegalArgumentException
- * if value is not found in dictionary and rounding is off;
- * or if rounding cannot find a smaller or bigger ID
- */
- final public int getIdFromValue(T value, int roundingFlag) throws IllegalArgumentException {
- if (isNullObjectForm(value))
- return nullId();
- else
- return getIdFromValueImpl(value, roundingFlag);
- }
-
- final public boolean containsValue(T value) throws IllegalArgumentException {
- if (isNullObjectForm(value)) {
- return true;
- } else {
- try {
- //if no key found, it will throw exception
- getIdFromValueImpl(value, 0);
- } catch (IllegalArgumentException e) {
- return false;
- }
- return true;
- }
- }
-
- protected boolean isNullObjectForm(T value) {
- return value == null;
- }
-
- abstract protected int getIdFromValueImpl(T value, int roundingFlag);
-
- /**
- * @return the value corresponds to the given ID
- * @throws IllegalArgumentException
- * if ID is not found in dictionary
- */
- final public T getValueFromId(int id) throws IllegalArgumentException {
- if (isNullId(id))
- return null;
- else
- return getValueFromIdImpl(id);
- }
-
- abstract protected T getValueFromIdImpl(int id);
-
- /**
- * Convenient form of
- * <code>getIdFromValueBytes(value, offset, len, 0)</code>
- */
- final public int getIdFromValueBytes(byte[] value, int offset, int len) throws IllegalArgumentException {
- return getIdFromValueBytes(value, offset, len, 0);
- }
-
- /**
- * A lower level API, return ID integer from raw value bytes. In case of not found
- * <p>
- * - if roundingFlag=0, throw IllegalArgumentException; <br>
- * - if roundingFlag<0, the closest smaller ID integer if exist; <br>
- * - if roundingFlag>0, the closest bigger ID integer if exist. <br>
- * <p>
- * Bypassing the cache layer, this could be significantly slower than getIdFromValue(T value).
- *
- * @throws IllegalArgumentException
- * if value is not found in dictionary and rounding is off;
- * or if rounding cannot find a smaller or bigger ID
- */
- final public int getIdFromValueBytes(byte[] value, int offset, int len, int roundingFlag) throws IllegalArgumentException {
- if (isNullByteForm(value, offset, len))
- return nullId();
- else {
- int id = getIdFromValueBytesImpl(value, offset, len, roundingFlag);
- if (id < 0)
- throw new IllegalArgumentException("Value not exists!");
- return id;
- }
- }
-
- protected boolean isNullByteForm(byte[] value, int offset, int len) {
- return value == null;
- }
-
- abstract protected int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag);
-
- final public byte[] getValueBytesFromId(int id) {
- if (isNullId(id))
- return BytesUtil.EMPTY_BYTE_ARRAY;
- else
- return getValueBytesFromIdImpl(id);
- }
-
- abstract protected byte[] getValueBytesFromIdImpl(int id);
-
- /**
- * A lower level API, get byte values from ID, return the number of bytes
- * written. Bypassing the cache layer, this could be significantly slower
- * than getIdFromValue(T value).
- *
- * @return size of value bytes, 0 if empty string, -1 if null
- *
- * @throws IllegalArgumentException
- * if ID is not found in dictionary
- */
- final public int getValueBytesFromId(int id, byte[] returnValue, int offset) throws IllegalArgumentException {
- if (isNullId(id))
- return -1;
- else
- return getValueBytesFromIdImpl(id, returnValue, offset);
- }
-
- abstract protected int getValueBytesFromIdImpl(int id, byte[] returnValue, int offset);
-
- abstract public void dump(PrintStream out);
-
- public int nullId() {
- return NULL_ID[getSizeOfId()];
- }
-
- public boolean isNullId(int id) {
- int nullId = NULL_ID[getSizeOfId()];
- return (nullId & id) == nullId;
- }
-
- /** utility that converts a dictionary ID to string, preserving order */
- public static String dictIdToString(byte[] idBytes, int offset, int length) {
- try {
- return new String(idBytes, offset, length, "ISO-8859-1");
- } catch (UnsupportedEncodingException e) {
- // never happen
- return null;
- }
- }
-
- /** the reverse of dictIdToString(), returns integer ID */
- public static int stringToDictId(String str) {
- try {
- byte[] bytes = str.getBytes("ISO-8859-1");
- return BytesUtil.readUnsigned(bytes, 0, bytes.length);
- } catch (UnsupportedEncodingException e) {
- // never happen
- return 0;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index 300c240..4b01e60 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -26,9 +26,10 @@ import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.datatype.DataType;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.source.ReadableTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +56,7 @@ public class DictionaryGenerator {
}
}
- public static Dictionary<?> buildDictionaryFromValueEnumerator(DataType dataType, IDictionaryValueEnumerator valueEnumerator) throws IOException {
+ public static Dictionary<String> buildDictionaryFromValueEnumerator(DataType dataType, IDictionaryValueEnumerator valueEnumerator) throws IOException {
Preconditions.checkNotNull(dataType, "dataType cannot be null");
Dictionary dict;
int baseId = 0; // always 0 for now
@@ -94,7 +95,7 @@ public class DictionaryGenerator {
return buildDictionaryFromValueEnumerator(dataType, new MultipleDictionaryValueEnumerator(sourceDicts));
}
- public static Dictionary<?> buildDictionary(DictionaryInfo info, ReadableTable inpTable) throws IOException {
+ public static Dictionary<String> buildDictionary(DictionaryInfo info, ReadableTable inpTable) throws IOException {
// currently all data types are casted to string to build dictionary
// String dataType = info.getDataType();
@@ -111,7 +112,7 @@ public class DictionaryGenerator {
}
}
- private static Dictionary buildDateDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException {
+ private static Dictionary<String> buildDateDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException {
final int BAD_THRESHOLD = 0;
String matchPattern = null;
byte[] value;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
index f482002..4fba59a 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
@@ -20,6 +20,7 @@ package org.apache.kylin.dict;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.source.ReadableTable.TableSignature;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java
index 5b7a318..69b29fe 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.JsonUtil;
/**
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 2f4b761..f538142 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -30,9 +30,10 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.datatype.DataType;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java
index b80f838..6b47868 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java
@@ -10,6 +10,7 @@ import java.io.OutputStream;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
/**
*/
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryAware.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryAware.java b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryAware.java
index 1c7a009..4586163 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryAware.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryAware.java
@@ -18,6 +18,7 @@
package org.apache.kylin.dict;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.metadata.model.TblColRef;
/**
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
index 13f7394..df7b1c6 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
@@ -19,7 +19,9 @@
package org.apache.kylin.dict;
import com.google.common.collect.Lists;
+
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
import java.io.IOException;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
index 6bae129..65c6c05 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
@@ -7,6 +7,7 @@ import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.Dictionary;
/**
*/
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
index 3a05d0a..552aa92 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
@@ -19,9 +19,11 @@
package org.apache.kylin.dict;
import com.google.common.base.Preconditions;
+
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
index e9a99b9..6297906 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
@@ -30,7 +30,7 @@ import java.util.List;
import org.apache.commons.lang.ArrayUtils;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.dict.StringBytesConverter;
import org.apache.kylin.dict.TrieDictionary;
import org.apache.kylin.dict.TrieDictionaryBuilder;
@@ -44,6 +44,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
/**
* @author yangli9
*/
+@SuppressWarnings("serial")
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
public class SnapshotTable extends RootPersistentEntity implements ReadableTable {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
index 8c8dcfc..8020729 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
@@ -28,8 +28,9 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
-import org.apache.kylin.common.datatype.DataType;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.datatype.DataType;
import org.junit.Test;
import com.google.common.collect.Lists;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/IMeasureFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/IMeasureFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/IMeasureFactory.java
new file mode 100644
index 0000000..0ab547a
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/IMeasureFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+public interface IMeasureFactory {
+
+ public MeasureType createMeasureType(String funcName, String dataType);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java
new file mode 100644
index 0000000..32e5128
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.io.Serializable;
+
+/**
+ */
+@SuppressWarnings("serial")
+abstract public class MeasureAggregator<V> implements Serializable {
+
+ public static MeasureAggregator<?> create(String funcName, String dataType) {
+ return MeasureType.create(funcName, dataType).newAggregator();
+ }
+
+ public static int guessBigDecimalMemBytes() {
+ // 116 returned by AggregationCacheMemSizeTest
+ return 8 // aggregator obj shell
+ + 8 // ref to BigDecimal
+ + 8 // BigDecimal obj shell
+ + 100; // guess of BigDecimal internal
+ }
+
+ public static int guessDoubleMemBytes() {
+ // 29 to 44 returned by AggregationCacheMemSizeTest
+ return 44;
+ /*
+ return 8 // aggregator obj shell
+ + 8 // ref to DoubleWritable
+ + 8 // DoubleWritable obj shell
+ + 8; // size of double
+ */
+ }
+
+ public static int guessLongMemBytes() {
+ // 29 to 44 returned by AggregationCacheMemSizeTest
+ return 44;
+ /*
+ return 8 // aggregator obj shell
+ + 8 // ref to LongWritable
+ + 8 // LongWritable obj shell
+ + 8; // size of long
+ */
+ }
+
+ // ============================================================================
+
+ @SuppressWarnings("rawtypes")
+ public void setDependentAggregator(MeasureAggregator agg) {
+ }
+
+ abstract public void reset();
+
+ abstract public void aggregate(V value);
+
+ abstract public V getState();
+
+ // get an estimate of memory consumption UPPER BOUND
+ abstract public int getMemBytesEstimate();
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
new file mode 100644
index 0000000..c6b456e
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+/**
+ */
+@SuppressWarnings({ "rawtypes", "unchecked", "serial" })
+public class MeasureAggregators implements Serializable {
+
+ private final MeasureAggregator[] aggs;
+ private final int descLength;
+
+ public MeasureAggregators(Collection<MeasureDesc> measureDescs) {
+ this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()]));
+ }
+
+ public MeasureAggregators(MeasureDesc... measureDescs) {
+ descLength = measureDescs.length;
+ aggs = new MeasureAggregator[descLength];
+
+ Map<String, Integer> measureIndexMap = new HashMap<String, Integer>();
+ for (int i = 0; i < descLength; i++) {
+ FunctionDesc func = measureDescs[i].getFunction();
+ aggs[i] = MeasureAggregator.create(func.getExpression(), func.getReturnType());
+ measureIndexMap.put(measureDescs[i].getName(), i);
+ }
+ // fill back dependent aggregator
+ for (int i = 0; i < descLength; i++) {
+ String depMsrRef = measureDescs[i].getDependentMeasureRef();
+ if (depMsrRef != null) {
+ int index = measureIndexMap.get(depMsrRef);
+ aggs[i].setDependentAggregator(aggs[index]);
+ }
+ }
+ }
+
+ public void reset() {
+ for (int i = 0; i < aggs.length; i++) {
+ aggs[i].reset();
+ }
+ }
+
+ public void aggregate(Object[] values) {
+ assert values.length == descLength;
+
+ for (int i = 0; i < descLength; i++) {
+ aggs[i].aggregate(values[i]);
+ }
+ }
+
+ public void collectStates(Object[] states) {
+ for (int i = 0; i < descLength; i++) {
+ states[i] = aggs[i].getState();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
new file mode 100644
index 0000000..b02addd
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+/**
+ * @author yangli9
+ *
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class MeasureCodec {
+
+ int nMeasures;
+ DataTypeSerializer[] serializers;
+
+ public MeasureCodec(Collection<MeasureDesc> measureDescs) {
+ this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()]));
+ }
+
+ public MeasureCodec(MeasureDesc... measureDescs) {
+ String[] dataTypes = new String[measureDescs.length];
+ for (int i = 0; i < dataTypes.length; i++) {
+ dataTypes[i] = measureDescs[i].getFunction().getReturnType();
+ }
+ init(dataTypes);
+ }
+
+ public MeasureCodec(String... dataTypes) {
+ init(dataTypes);
+ }
+
+ private void init(String[] dataTypes) {
+ nMeasures = dataTypes.length;
+ serializers = new DataTypeSerializer[nMeasures];
+
+ for (int i = 0; i < nMeasures; i++) {
+ serializers[i] = DataTypeSerializer.create(dataTypes[i]);
+ }
+ }
+
+ public DataTypeSerializer getSerializer(int idx) {
+ return serializers[idx];
+ }
+
+ public void decode(ByteBuffer buf, Object[] result) {
+ assert result.length == nMeasures;
+ for (int i = 0; i < nMeasures; i++) {
+ result[i] = serializers[i].deserialize(buf);
+ }
+ }
+
+ public void encode(Object[] values, ByteBuffer out) {
+ assert values.length == nMeasures;
+ for (int i = 0; i < nMeasures; i++) {
+ serializers[i].serialize(values[i], out);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
new file mode 100644
index 0000000..8d6e601
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+abstract public class MeasureIngester<V> {
+
+ public static MeasureIngester<?> create(MeasureDesc measure) {
+ return MeasureType.create(measure.getFunction()).newIngester();
+ }
+
+ public static MeasureIngester<?>[] create(Collection<MeasureDesc> measures) {
+ MeasureIngester<?>[] result = new MeasureIngester<?>[measures.size()];
+ int i = 0;
+ for (MeasureDesc measure : measures) {
+ result[i++] = create(measure);
+ }
+ return result;
+ }
+
+ abstract public V valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
new file mode 100644
index 0000000..604a7b6
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.basic.BasicMeasureFactory;
+import org.apache.kylin.measure.hllc.HLLCAggregationFactory;
+import org.apache.kylin.measure.topn.TopNMeasureFactory;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Maps;
+
+abstract public class MeasureType {
+
+ private static final Map<String, IMeasureFactory> factoryRegistry = Maps.newConcurrentMap();
+ private static final IMeasureFactory defaultFactory = new BasicMeasureFactory();
+
+ static {
+ factoryRegistry.put(FunctionDesc.FUNC_COUNT_DISTINCT, new HLLCAggregationFactory());
+ factoryRegistry.put(FunctionDesc.FUNC_TOP_N, new TopNMeasureFactory());
+ }
+
+ public static MeasureType create(FunctionDesc function) {
+ return create(function.getExpression(), function.getReturnType());
+ }
+
+ public static MeasureType create(String funcName, String dataType) {
+ funcName = funcName.toUpperCase();
+ dataType = dataType.toLowerCase();
+
+ IMeasureFactory factory = factoryRegistry.get(funcName);
+ if (factory == null)
+ factory = defaultFactory;
+
+ MeasureType result = factory.createMeasureType(funcName, dataType);
+
+ // register serializer for aggr data type
+ DataType aggregationDataType = result.getAggregationDataType();
+ if (DataTypeSerializer.hasRegistered(aggregationDataType.getName()) == false) {
+ DataTypeSerializer.register(aggregationDataType.getName(), result.getAggregationDataSeralizer());
+ }
+
+ return result;
+ }
+
+ /* ============================================================================
+ * Define
+ * ---------------------------------------------------------------------------- */
+
+ abstract public DataType getAggregationDataType();
+
+ abstract public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer();
+
+ abstract public void validate(MeasureDesc measureDesc) throws IllegalArgumentException;
+
+ /* ============================================================================
+ * Build
+ * ---------------------------------------------------------------------------- */
+
+ abstract public MeasureIngester<?> newIngester();
+
+ abstract public MeasureAggregator<?> newAggregator();
+
+ abstract public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc);
+
+ abstract public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts);
+
+ /* ============================================================================
+ * Cube Selection
+ * ---------------------------------------------------------------------------- */
+
+ /* ============================================================================
+ * Query
+ * ---------------------------------------------------------------------------- */
+
+ /* ============================================================================
+ * Storage
+ * ---------------------------------------------------------------------------- */
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureFactory.java
new file mode 100644
index 0000000..7bfee49
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.kylin.measure.IMeasureFactory;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
+
+public class BasicMeasureFactory implements IMeasureFactory {
+
+ @Override
+ public MeasureType createMeasureType(String funcName, String dataType) {
+ return new BasicMeasureType(funcName, DataType.getType(dataType));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
new file mode 100644
index 0000000..f6bf090
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.metadata.datatype.BigDecimalSerializer;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.datatype.DoubleSerializer;
+import org.apache.kylin.metadata.datatype.LongSerializer;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class BasicMeasureType extends MeasureType {
+
+ private final String funcName;
+ private final DataType dataType;
+
+ public BasicMeasureType(String funcName, DataType dataType) {
+ this.funcName = funcName;
+ this.dataType = dataType;
+ }
+
+ @Override
+ public DataType getAggregationDataType() {
+ return dataType;
+ }
+
+ public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() {
+ if (dataType.isIntegerFamily())
+ return LongSerializer.class;
+ else if (dataType.isDecimal())
+ return BigDecimalSerializer.class;
+ else if (dataType.isNumberFamily())
+ return DoubleSerializer.class;
+ else
+ throw new IllegalArgumentException("No serializer for aggregation type " + dataType);
+ }
+
+ @Override
+ public void validate(MeasureDesc measureDesc) throws IllegalArgumentException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public MeasureIngester<?> newIngester() {
+ if (dataType.isIntegerFamily())
+ return new LongIngester();
+ else if (dataType.isDecimal())
+ return new BigDecimalIngester();
+ else if (dataType.isNumberFamily())
+ return new DoubleIngester();
+ else
+ throw new IllegalArgumentException("No ingester for aggregation type " + dataType);
+ }
+
+ @Override
+ public MeasureAggregator<?> newAggregator() {
+ if (isSum() || isCount()) {
+ if (dataType.isDecimal())
+ return new BigDecimalSumAggregator();
+ else if (dataType.isIntegerFamily())
+ return new LongSumAggregator();
+ else if (dataType.isNumberFamily())
+ return new DoubleSumAggregator();
+ } else if (isMax()) {
+ if (dataType.isDecimal())
+ return new BigDecimalMaxAggregator();
+ else if (dataType.isIntegerFamily())
+ return new LongMaxAggregator();
+ else if (dataType.isNumberFamily())
+ return new DoubleMaxAggregator();
+ } else if (isMin()) {
+ if (dataType.isDecimal())
+ return new BigDecimalMinAggregator();
+ else if (dataType.isIntegerFamily())
+ return new LongMinAggregator();
+ else if (dataType.isNumberFamily())
+ return new DoubleMinAggregator();
+ }
+ throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + dataType + "'");
+ }
+
+ private boolean isSum() {
+ return FunctionDesc.FUNC_SUM.equalsIgnoreCase(funcName);
+ }
+
+ private boolean isCount() {
+ return FunctionDesc.FUNC_COUNT.equalsIgnoreCase(funcName);
+ }
+
+ private boolean isMax() {
+ return FunctionDesc.FUNC_MAX.equalsIgnoreCase(funcName);
+ }
+
+ private boolean isMin() {
+ return FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName);
+ }
+
+ @Override
+ public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
new file mode 100644
index 0000000..bb743d6
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import java.math.BigDecimal;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class BigDecimalIngester extends MeasureIngester<BigDecimal> {
+
+ public BigDecimal valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+ if (values.length > 1)
+ throw new IllegalArgumentException();
+
+ if (values[0] == null)
+ return new BigDecimal(0);
+ else
+ return new BigDecimal(values[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java
new file mode 100644
index 0000000..aa42476
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import java.math.BigDecimal;
+
+import org.apache.kylin.measure.MeasureAggregator;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class BigDecimalMaxAggregator extends MeasureAggregator<BigDecimal> {
+
+ BigDecimal max = null;
+
+ @Override
+ public void reset() {
+ max = null;
+ }
+
+ @Override
+ public void aggregate(BigDecimal value) {
+ if (max == null)
+ max = value;
+ else if (max.compareTo(value) < 0)
+ max = value;
+ }
+
+ @Override
+ public BigDecimal getState() {
+ return max;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessBigDecimalMemBytes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java
new file mode 100644
index 0000000..81193ad
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import java.math.BigDecimal;
+
+import org.apache.kylin.measure.MeasureAggregator;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class BigDecimalMinAggregator extends MeasureAggregator<BigDecimal> {
+
+ BigDecimal max = null;
+
+ @Override
+ public void reset() {
+ max = null;
+ }
+
+ @Override
+ public void aggregate(BigDecimal value) {
+ if (max == null)
+ max = value;
+ else if (max.compareTo(value) > 0)
+ max = value;
+ }
+
+ @Override
+ public BigDecimal getState() {
+ return max;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessBigDecimalMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java
new file mode 100644
index 0000000..5e00c63
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import java.math.BigDecimal;
+
+import org.apache.kylin.measure.MeasureAggregator;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class BigDecimalSumAggregator extends MeasureAggregator<BigDecimal> {
+
+ BigDecimal sum = new BigDecimal(0);
+
+ @Override
+ public void reset() {
+ sum = new BigDecimal(0);
+ }
+
+ @Override
+ public void aggregate(BigDecimal value) {
+ sum = sum.add(value);
+ }
+
+ @Override
+ public BigDecimal getState() {
+ return sum;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessBigDecimalMemBytes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
new file mode 100644
index 0000000..506ed19
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.metadata.datatype.DoubleMutable;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class DoubleIngester extends MeasureIngester<DoubleMutable> {
+
+ // avoid repeated object creation
+ private DoubleMutable current = new DoubleMutable();
+
+ public DoubleMutable valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+ if (values.length > 1)
+ throw new IllegalArgumentException();
+
+ DoubleMutable l = current;
+ if (values[0] == null)
+ l.set(0L);
+ else
+ l.set(Double.parseDouble(values[0]));
+ return l;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java
new file mode 100644
index 0000000..25911e8
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.DoubleMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class DoubleMaxAggregator extends MeasureAggregator<DoubleMutable> {
+
+ DoubleMutable max = null;
+
+ @Override
+ public void reset() {
+ max = null;
+ }
+
+ @Override
+ public void aggregate(DoubleMutable value) {
+ if (max == null)
+ max = new DoubleMutable(value.get());
+ else if (max.get() < value.get())
+ max.set(value.get());
+ }
+
+ @Override
+ public DoubleMutable getState() {
+ return max;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessDoubleMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java
new file mode 100644
index 0000000..be97deb
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.DoubleMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class DoubleMinAggregator extends MeasureAggregator<DoubleMutable> {
+
+ DoubleMutable min = null;
+
+ @Override
+ public void reset() {
+ min = null;
+ }
+
+ @Override
+ public void aggregate(DoubleMutable value) {
+ if (min == null)
+ min = new DoubleMutable(value.get());
+ else if (min.get() > value.get())
+ min.set(value.get());
+ }
+
+ @Override
+ public DoubleMutable getState() {
+ return min;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessDoubleMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
new file mode 100644
index 0000000..f276817
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.DoubleMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class DoubleSumAggregator extends MeasureAggregator<DoubleMutable> {
+
+ DoubleMutable sum = new DoubleMutable();
+
+ @Override
+ public void reset() {
+ sum.set(0.0);
+ }
+
+ @Override
+ public void aggregate(DoubleMutable value) {
+ sum.set(sum.get() + value.get());
+ }
+
+ @Override
+ public DoubleMutable getState() {
+ return sum;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessDoubleMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
new file mode 100644
index 0000000..5bf1257
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.metadata.datatype.LongMutable;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class LongIngester extends MeasureIngester<LongMutable> {
+
+ // avoid repeated object creation
+ private LongMutable current = new LongMutable();
+
+ public LongMutable valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+ if (values.length > 1)
+ throw new IllegalArgumentException();
+
+ LongMutable l = current;
+ if (values[0] == null)
+ l.set(0L);
+ else
+ l.set(Long.parseLong(values[0]));
+ return l;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java
new file mode 100644
index 0000000..ca44f15
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.LongMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class LongMaxAggregator extends MeasureAggregator<LongMutable> {
+
+ LongMutable max = null;
+
+ @Override
+ public void reset() {
+ max = null;
+ }
+
+ @Override
+ public void aggregate(LongMutable value) {
+ if (max == null)
+ max = new LongMutable(value.get());
+ else if (max.get() < value.get())
+ max.set(value.get());
+ }
+
+ @Override
+ public LongMutable getState() {
+ return max;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessLongMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java
new file mode 100644
index 0000000..dadc64e
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.LongMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class LongMinAggregator extends MeasureAggregator<LongMutable> {
+
+ LongMutable min = null;
+
+ @Override
+ public void reset() {
+ min = null;
+ }
+
+ @Override
+ public void aggregate(LongMutable value) {
+ if (min == null)
+ min = new LongMutable(value.get());
+ else if (min.get() > value.get())
+ min.set(value.get());
+ }
+
+ @Override
+ public LongMutable getState() {
+ return min;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessLongMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
new file mode 100644
index 0000000..e7fdc9d
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.LongMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class LongSumAggregator extends MeasureAggregator<LongMutable> {
+
+ LongMutable sum = new LongMutable();
+
+ @Override
+ public void reset() {
+ sum.set(0);
+ }
+
+ @Override
+ public void aggregate(LongMutable value) {
+ sum.set(sum.get() + value.get());
+ }
+
+ @Override
+ public LongMutable getState() {
+ return sum;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessLongMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregationFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregationFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregationFactory.java
new file mode 100644
index 0000000..13e5520
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregationFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.hllc;
+
+import org.apache.kylin.measure.IMeasureFactory;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.FunctionDesc;
+
+public class HLLCAggregationFactory implements IMeasureFactory {
+
+ @Override
+ public MeasureType createMeasureType(String funcName, String dataType) {
+ if (FunctionDesc.FUNC_COUNT_DISTINCT.equalsIgnoreCase(funcName) == false)
+ throw new IllegalArgumentException();
+
+ return new HLLCMeasureType(DataType.getType(dataType));
+ }
+
+}