You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/27 10:00:59 UTC
[04/13] incubator-kylin git commit: KYLIN-976 very initial
KYLIN-976 very initial
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1218bbde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1218bbde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1218bbde
Branch: refs/heads/KYLIN-976
Commit: 1218bbde487e973de0391162204d73c76f1a9e81
Parents: 6515b0a
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Nov 24 13:15:54 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Nov 27 14:47:05 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/common/util/DoubleMutable.java | 68 +++++
.../apache/kylin/common/util/LongMutable.java | 70 +++++
.../apache/kylin/aggregation/Aggregation.java | 42 +++
.../kylin/aggregation/DataTypeSerializer.java | 118 +++++++++
.../kylin/aggregation/MeasureAggregator.java | 133 ++++++++++
.../kylin/aggregation/MeasureAggregators.java | 81 ++++++
.../apache/kylin/aggregation/MeasureCodec.java | 78 ++++++
.../basic/BigDecimalMaxAggregator.java | 54 ++++
.../basic/BigDecimalMinAggregator.java | 55 ++++
.../aggregation/basic/BigDecimalSerializer.java | 111 ++++++++
.../basic/BigDecimalSumAggregator.java | 51 ++++
.../aggregation/basic/DateTimeSerializer.java | 65 +++++
.../aggregation/basic/DoubleMaxAggregator.java | 54 ++++
.../aggregation/basic/DoubleMinAggregator.java | 54 ++++
.../aggregation/basic/DoubleSerializer.java | 84 ++++++
.../aggregation/basic/DoubleSumAggregator.java | 51 ++++
.../aggregation/basic/LongMaxAggregator.java | 54 ++++
.../aggregation/basic/LongMinAggregator.java | 54 ++++
.../kylin/aggregation/basic/LongSerializer.java | 91 +++++++
.../aggregation/basic/LongSumAggregator.java | 51 ++++
.../aggregation/basic/StringSerializer.java | 56 ++++
.../kylin/aggregation/hllc/HLLCAggregator.java | 64 +++++
.../kylin/aggregation/hllc/HLLCSerializer.java | 98 +++++++
.../kylin/aggregation/hllc/LDCAggregator.java | 63 +++++
.../kylin/aggregation/topn/TopNAggregator.java | 66 +++++
.../aggregation/topn/TopNCounterSerializer.java | 117 +++++++++
.../kylin/cube/gridtable/CubeCodeSystem.java | 6 +-
.../cube/gridtable/TrimmedCubeCodeSystem.java | 6 +-
.../cube/inmemcubing/DoggedCubeBuilder.java | 2 +-
.../cube/inmemcubing/InMemCubeBuilder.java | 2 +-
.../InMemCubeBuilderInputConverter.java | 5 +-
.../kylin/gridtable/GTAggregateScanner.java | 2 +-
.../kylin/gridtable/GTSampleCodeSystem.java | 4 +-
.../apache/kylin/gridtable/IGTCodeSystem.java | 2 +-
.../apache/kylin/gridtable/UnitTestSupport.java | 2 +-
.../basic/BigDecimalSerializerTest.java | 53 ++++
.../topn/TopNCounterSerializerTest.java | 61 +++++
.../gridtable/AggregationCacheMemSizeTest.java | 14 +-
.../kylin/gridtable/DictGridTableTest.java | 2 +-
.../kylin/gridtable/SimpleGridTableTest.java | 2 +-
.../gridtable/SimpleInvertedIndexTest.java | 5 +-
.../metadata/measure/MeasureCodecTest.java | 3 +
.../measure/BigDecimalMaxAggregator.java | 52 ----
.../measure/BigDecimalMinAggregator.java | 53 ----
.../measure/BigDecimalSumAggregator.java | 49 ----
.../metadata/measure/DoubleMaxAggregator.java | 51 ----
.../metadata/measure/DoubleMinAggregator.java | 51 ----
.../kylin/metadata/measure/DoubleMutable.java | 68 -----
.../metadata/measure/DoubleSumAggregator.java | 48 ----
.../kylin/metadata/measure/HLLCAggregator.java | 63 -----
.../kylin/metadata/measure/LDCAggregator.java | 60 -----
.../metadata/measure/LongMaxAggregator.java | 51 ----
.../metadata/measure/LongMinAggregator.java | 51 ----
.../kylin/metadata/measure/LongMutable.java | 70 -----
.../metadata/measure/LongSumAggregator.java | 48 ----
.../metadata/measure/MeasureAggregator.java | 121 ---------
.../metadata/measure/MeasureAggregators.java | 81 ------
.../kylin/metadata/measure/MeasureCodec.java | 79 ------
.../kylin/metadata/measure/TopNAggregator.java | 66 -----
.../measure/fixedlen/FixedHLLCodec.java | 80 ------
.../measure/fixedlen/FixedLenMeasureCodec.java | 49 ----
.../measure/fixedlen/FixedPointLongCodec.java | 117 ---------
.../serializer/BigDecimalSerializer.java | 110 --------
.../measure/serializer/DataTypeSerializer.java | 111 --------
.../measure/serializer/DateTimeSerializer.java | 64 -----
.../measure/serializer/DoubleSerializer.java | 83 ------
.../measure/serializer/HLLCSerializer.java | 97 -------
.../measure/serializer/LongSerializer.java | 90 -------
.../measure/serializer/StringSerializer.java | 55 ----
.../serializer/TopNCounterSerializer.java | 116 --------
.../apache/kylin/metadata/model/DataType.java | 2 +-
.../fixedlen/FixedPointLongCodecTest.java | 44 ----
.../serializer/BigDecimalSerializerTest.java | 52 ----
.../serializer/TopNCounterSerializerTest.java | 61 -----
.../org/apache/kylin/storage/tuple/Tuple.java | 4 +-
.../engine/mr/steps/BaseCuboidMapperBase.java | 3 +-
.../kylin/engine/mr/steps/CuboidReducer.java | 4 +-
.../engine/mr/steps/InMemCuboidReducer.java | 4 +-
.../mr/steps/MergeCuboidFromStorageMapper.java | 2 +-
.../engine/mr/steps/MergeCuboidMapper.java | 2 +-
.../kylin/engine/mr/steps/CubeReducerTest.java | 4 +-
.../apache/kylin/engine/spark/SparkCubing.java | 8 +-
.../invertedindex/index/RawTableRecord.java | 4 +-
.../kylin/invertedindex/index/TableRecord.java | 2 +-
.../invertedindex/index/TableRecordInfo.java | 2 +-
.../index/TableRecordInfoDigest.java | 4 +-
.../invertedindex/measure/FixedHLLCodec.java | 80 ++++++
.../measure/FixedLenMeasureCodec.java | 49 ++++
.../measure/FixedPointLongCodec.java | 117 +++++++++
.../invertedindex/model/IIKeyValueCodec.java | 2 +-
.../kylin/invertedindex/IIDescManagerTest.java | 104 ++++++++
.../apache/kylin/invertedindex/IIDescTest.java | 67 +++++
.../kylin/invertedindex/IIInstanceTest.java | 74 ++++++
.../invertedindex/InvertedIndexLocalTest.java | 262 +++++++++++++++++++
.../org/apache/kylin/invertedindex/LZFTest.java | 49 ++++
.../invertedindex/IIDescManagerTest.java | 104 --------
.../invertedindex/invertedindex/IIDescTest.java | 67 -----
.../invertedindex/IIInstanceTest.java | 74 ------
.../invertedindex/InvertedIndexLocalTest.java | 262 -------------------
.../invertedindex/invertedindex/LZFTest.java | 49 ----
.../measure/FixedPointLongCodecTest.java | 45 ++++
.../common/coprocessor/AggregationCache.java | 2 +-
.../observer/AggregationScanner.java | 2 +-
.../observer/ObserverAggregationCache.java | 2 +-
.../observer/ObserverAggregators.java | 4 +-
.../endpoint/EndpointAggregationCache.java | 2 +-
.../endpoint/EndpointAggregators.java | 6 +-
.../ii/coprocessor/endpoint/IIEndpoint.java | 3 +-
.../storage/hbase/steps/CubeHFileMapper.java | 2 +-
.../hbase/steps/HBaseMROutput2Transition.java | 2 +-
.../storage/hbase/steps/KeyValueCreator.java | 2 +-
.../storage/hbase/steps/RowValueDecoder.java | 6 +-
.../observer/AggregateRegionObserverTest.java | 2 +-
.../endpoint/EndpointAggregationTest.java | 7 +-
.../hbase/steps/CubeHFileMapper2Test.java | 2 +-
.../hbase/steps/RowValueDecoderTest.java | 4 +-
116 files changed, 2924 insertions(+), 2819 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java b/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java
new file mode 100644
index 0000000..520cd74
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+public class DoubleMutable implements Comparable<DoubleMutable>, Serializable {
+
+ private double v;
+
+ public DoubleMutable() {
+ this(0);
+ }
+
+ public DoubleMutable(double v) {
+ set(v);
+ }
+
+ public double get() {
+ return v;
+ }
+
+ public void set(double v) {
+ this.v = v;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof DoubleMutable)) {
+ return false;
+ }
+ DoubleMutable other = (DoubleMutable) o;
+ return this.v == other.v;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) Double.doubleToLongBits(v);
+ }
+
+ @Override
+ public int compareTo(DoubleMutable o) {
+ return (v < o.v ? -1 : (v == o.v ? 0 : 1));
+ }
+
+ @Override
+ public String toString() {
+ return Double.toString(v);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java b/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java
new file mode 100644
index 0000000..238bb86
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+public class LongMutable implements Comparable<LongMutable>, Serializable {
+
+ private long v;
+
+ public LongMutable() {
+ this(0);
+ }
+
+ public LongMutable(long v) {
+ set(v);
+ }
+
+ public long get() {
+ return v;
+ }
+
+ public void set(long v) {
+ this.v = v;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof LongMutable)) {
+ return false;
+ }
+ LongMutable other = (LongMutable) o;
+ return this.v == other.v;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) v;
+ }
+
+ @Override
+ public int compareTo(LongMutable o) {
+ long thisValue = this.v;
+ long thatValue = o.v;
+ return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+ }
+
+ @Override
+ public String toString() {
+ return Long.toString(v);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java b/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java
new file mode 100644
index 0000000..193c5de
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java
@@ -0,0 +1,42 @@
+package org.apache.kylin.aggregation;
+
+import java.util.List;
+
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+abstract public class Aggregation {
+
+ /* ============================================================================
+ * Define
+ * ---------------------------------------------------------------------------- */
+
+ abstract public DataType getAggregationDataType();
+
+ abstract public DataType getResultDataType();
+
+ abstract public void validate(MeasureDesc measureDesc) throws IllegalArgumentException;
+
+ /* ============================================================================
+ * Build
+ * ---------------------------------------------------------------------------- */
+
+ abstract public DataTypeSerializer<?> getSeralizer();
+
+ abstract public MeasureAggregator<?> getAggregator();
+
+ abstract public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc);
+
+ abstract public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts);
+
+ /* ============================================================================
+ * Cube Selection
+ * ---------------------------------------------------------------------------- */
+
+ /* ============================================================================
+ * Query
+ * ---------------------------------------------------------------------------- */
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java
new file mode 100644
index 0000000..df6833c
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.aggregation.basic.BigDecimalSerializer;
+import org.apache.kylin.aggregation.basic.DateTimeSerializer;
+import org.apache.kylin.aggregation.basic.DoubleSerializer;
+import org.apache.kylin.aggregation.basic.LongSerializer;
+import org.apache.kylin.aggregation.basic.StringSerializer;
+import org.apache.kylin.aggregation.hllc.HLLCSerializer;
+import org.apache.kylin.aggregation.topn.TopNCounterSerializer;
+import org.apache.kylin.common.util.BytesSerializer;
+import org.apache.kylin.metadata.model.DataType;
+
+import com.google.common.collect.Maps;
+
+/**
+ * @author yangli9
+ *
+ * Note: the implementations MUST be thread-safe.
+ *
+ */
+abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
+
+ final static Map<String, Class<?>> implementations;
+ static {
+ HashMap<String, Class<?>> impl = Maps.newHashMap();
+ impl.put("varchar", StringSerializer.class);
+ impl.put("decimal", BigDecimalSerializer.class);
+ impl.put("double", DoubleSerializer.class);
+ impl.put("float", DoubleSerializer.class);
+ impl.put("bigint", LongSerializer.class);
+ impl.put("long", LongSerializer.class);
+ impl.put("integer", LongSerializer.class);
+ impl.put("int", LongSerializer.class);
+ impl.put("smallint", LongSerializer.class);
+ impl.put("date", DateTimeSerializer.class);
+ impl.put("datetime", DateTimeSerializer.class);
+ impl.put("timestamp", DateTimeSerializer.class);
+ implementations = Collections.unmodifiableMap(impl);
+
+ }
+
+ public static DataTypeSerializer<?> create(String dataType) {
+ return create(DataType.getInstance(dataType));
+ }
+
+ public static DataTypeSerializer<?> create(DataType type) {
+ if (type.isHLLC()) {
+ return new HLLCSerializer(type);
+ }
+
+ if (type.isTopN()) {
+ return new TopNCounterSerializer(type);
+ }
+
+ Class<?> clz = implementations.get(type.getName());
+ if (clz == null)
+ throw new RuntimeException("No MeasureSerializer for type " + type);
+
+ try {
+ return (DataTypeSerializer<?>) clz.getConstructor(DataType.class).newInstance(type);
+ } catch (Exception e) {
+ throw new RuntimeException(e); // never happen
+ }
+ }
+
+ /** peek into buffer and return the length of serialization */
+ abstract public int peekLength(ByteBuffer in);
+
+ /** return the max number of bytes to the longest serialization */
+ abstract public int maxLength();
+
+ /** get an estimate of size in bytes of the serialized data */
+ abstract public int getStorageBytesEstimate();
+
+ /** convert from String to obj (string often come as byte[] in mapred) */
+ abstract public T valueOf(byte[] value);
+
+ /** convert from String to obj */
+ public T valueOf(String value) {
+ try {
+ return valueOf(value.getBytes("UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e); // never happen
+ }
+ }
+
+ /** convert from obj to string */
+ public String toString(T value) {
+ if (value == null)
+ return "NULL";
+ else
+ return value.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java
new file mode 100644
index 0000000..9c8945d
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation;
+
+import org.apache.kylin.aggregation.basic.BigDecimalMaxAggregator;
+import org.apache.kylin.aggregation.basic.BigDecimalMinAggregator;
+import org.apache.kylin.aggregation.basic.BigDecimalSumAggregator;
+import org.apache.kylin.aggregation.basic.DoubleMaxAggregator;
+import org.apache.kylin.aggregation.basic.DoubleMinAggregator;
+import org.apache.kylin.aggregation.basic.DoubleSumAggregator;
+import org.apache.kylin.aggregation.basic.LongMaxAggregator;
+import org.apache.kylin.aggregation.basic.LongMinAggregator;
+import org.apache.kylin.aggregation.basic.LongSumAggregator;
+import org.apache.kylin.aggregation.hllc.HLLCAggregator;
+import org.apache.kylin.aggregation.hllc.LDCAggregator;
+import org.apache.kylin.aggregation.topn.TopNAggregator;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.FunctionDesc;
+
+import java.io.Serializable;
+
+/**
+ */
+@SuppressWarnings("serial")
+abstract public class MeasureAggregator<V> implements Serializable {
+
+ public static MeasureAggregator<?> create(String funcName, String returnType) {
+ if (FunctionDesc.FUNC_SUM.equalsIgnoreCase(funcName) || FunctionDesc.FUNC_COUNT.equalsIgnoreCase(funcName)) {
+ if (isInteger(returnType))
+ return new LongSumAggregator();
+ else if (isBigDecimal(returnType))
+ return new BigDecimalSumAggregator();
+ else if (isDouble(returnType))
+ return new DoubleSumAggregator();
+ } else if (FunctionDesc.FUNC_COUNT_DISTINCT.equalsIgnoreCase(funcName)) {
+ DataType hllcType = DataType.getInstance(returnType);
+ if (hllcType.isHLLC())
+ return new HLLCAggregator(hllcType.getPrecision());
+ else
+ return new LDCAggregator();
+ } else if (FunctionDesc.FUNC_MAX.equalsIgnoreCase(funcName)) {
+ if (isInteger(returnType))
+ return new LongMaxAggregator();
+ else if (isBigDecimal(returnType))
+ return new BigDecimalMaxAggregator();
+ else if (isDouble(returnType))
+ return new DoubleMaxAggregator();
+ } else if (FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName)) {
+ if (isInteger(returnType))
+ return new LongMinAggregator();
+ else if (isBigDecimal(returnType))
+ return new BigDecimalMinAggregator();
+ else if (isDouble(returnType))
+ return new DoubleMinAggregator();
+ } else if (FunctionDesc.FUNC_TOP_N.equalsIgnoreCase(funcName)) {
+ return new TopNAggregator();
+ }
+ throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + returnType + "'");
+ }
+
+ public static boolean isBigDecimal(String type) {
+ return type.startsWith("decimal");
+ }
+
+ public static boolean isDouble(String type) {
+ return "double".equalsIgnoreCase(type) || "float".equalsIgnoreCase(type) || "real".equalsIgnoreCase(type);
+ }
+
+ public static boolean isInteger(String type) {
+ return "long".equalsIgnoreCase(type) || "bigint".equalsIgnoreCase(type) || "int".equalsIgnoreCase(type) || "integer".equalsIgnoreCase(type);
+ }
+
+ public static int guessBigDecimalMemBytes() {
+ // 116 returned by AggregationCacheMemSizeTest
+ return 8 // aggregator obj shell
+ + 8 // ref to BigDecimal
+ + 8 // BigDecimal obj shell
+ + 100; // guess of BigDecimal internal
+ }
+
+ public static int guessDoubleMemBytes() {
+ // 29 to 44 returned by AggregationCacheMemSizeTest
+ return 44;
+ /*
+ return 8 // aggregator obj shell
+ + 8 // ref to DoubleWritable
+ + 8 // DoubleWritable obj shell
+ + 8; // size of double
+ */
+ }
+
+ public static int guessLongMemBytes() {
+ // 29 to 44 returned by AggregationCacheMemSizeTest
+ return 44;
+ /*
+ return 8 // aggregator obj shell
+ + 8 // ref to LongWritable
+ + 8 // LongWritable obj shell
+ + 8; // size of long
+ */
+ }
+
+ // ============================================================================
+
+ @SuppressWarnings("rawtypes")
+ public void setDependentAggregator(MeasureAggregator agg) {
+ }
+
+ abstract public void reset();
+
+ abstract public void aggregate(V value);
+
+ abstract public V getState();
+
+ // get an estimate of memory consumption UPPER BOUND
+ abstract public int getMemBytesEstimate();
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregators.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregators.java b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregators.java
new file mode 100644
index 0000000..3aa575b
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregators.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+/**
+ */
+@SuppressWarnings({ "rawtypes", "unchecked", "serial" })
+public class MeasureAggregators implements Serializable {
+
+ private final MeasureAggregator[] aggs;
+ private final int descLength;
+
+ public MeasureAggregators(Collection<MeasureDesc> measureDescs) {
+ this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()]));
+ }
+
+ public MeasureAggregators(MeasureDesc... measureDescs) {
+ descLength = measureDescs.length;
+ aggs = new MeasureAggregator[descLength];
+
+ Map<String, Integer> measureIndexMap = new HashMap<String, Integer>();
+ for (int i = 0; i < descLength; i++) {
+ FunctionDesc func = measureDescs[i].getFunction();
+ aggs[i] = MeasureAggregator.create(func.getExpression(), func.getReturnType());
+ measureIndexMap.put(measureDescs[i].getName(), i);
+ }
+ // fill back dependent aggregator
+ for (int i = 0; i < descLength; i++) {
+ String depMsrRef = measureDescs[i].getDependentMeasureRef();
+ if (depMsrRef != null) {
+ int index = measureIndexMap.get(depMsrRef);
+ aggs[i].setDependentAggregator(aggs[index]);
+ }
+ }
+ }
+
+ public void reset() {
+ for (int i = 0; i < aggs.length; i++) {
+ aggs[i].reset();
+ }
+ }
+
+ public void aggregate(Object[] values) {
+ assert values.length == descLength;
+
+ for (int i = 0; i < descLength; i++) {
+ aggs[i].aggregate(values[i]);
+ }
+ }
+
+ public void collectStates(Object[] states) {
+ for (int i = 0; i < descLength; i++) {
+ states[i] = aggs[i].getState();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java
new file mode 100644
index 0000000..cbcb3a8
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+/**
+ * @author yangli9
+ *
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class MeasureCodec {
+
+ int nMeasures;
+ DataTypeSerializer[] serializers;
+
+ public MeasureCodec(Collection<MeasureDesc> measureDescs) {
+ this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()]));
+ }
+
+ public MeasureCodec(MeasureDesc... measureDescs) {
+ String[] dataTypes = new String[measureDescs.length];
+ for (int i = 0; i < dataTypes.length; i++) {
+ dataTypes[i] = measureDescs[i].getFunction().getReturnType();
+ }
+ init(dataTypes);
+ }
+
+ public MeasureCodec(String... dataTypes) {
+ init(dataTypes);
+ }
+
+ private void init(String[] dataTypes) {
+ nMeasures = dataTypes.length;
+ serializers = new DataTypeSerializer[nMeasures];
+
+ for (int i = 0; i < nMeasures; i++) {
+ serializers[i] = DataTypeSerializer.create(dataTypes[i]);
+ }
+ }
+
+ public DataTypeSerializer getSerializer(int idx) {
+ return serializers[idx];
+ }
+
+ public void decode(ByteBuffer buf, Object[] result) {
+ assert result.length == nMeasures;
+ for (int i = 0; i < nMeasures; i++) {
+ result[i] = serializers[i].deserialize(buf);
+ }
+ }
+
+ public void encode(Object[] values, ByteBuffer out) {
+ assert values.length == nMeasures;
+ for (int i = 0; i < nMeasures; i++) {
+ serializers[i].serialize(values[i], out);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMaxAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMaxAggregator.java
new file mode 100644
index 0000000..ca044d0
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMaxAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import java.math.BigDecimal;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class BigDecimalMaxAggregator extends MeasureAggregator<BigDecimal> {
+
+ BigDecimal max = null;
+
+ @Override
+ public void reset() {
+ max = null;
+ }
+
+ @Override
+ public void aggregate(BigDecimal value) {
+ if (max == null)
+ max = value;
+ else if (max.compareTo(value) < 0)
+ max = value;
+ }
+
+ @Override
+ public BigDecimal getState() {
+ return max;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessBigDecimalMemBytes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMinAggregator.java
new file mode 100644
index 0000000..3c3c85e
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMinAggregator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import java.math.BigDecimal;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class BigDecimalMinAggregator extends MeasureAggregator<BigDecimal> {
+
+ BigDecimal max = null;
+
+ @Override
+ public void reset() {
+ max = null;
+ }
+
+ @Override
+ public void aggregate(BigDecimal value) {
+ if (max == null)
+ max = value;
+ else if (max.compareTo(value) > 0)
+ max = value;
+ }
+
+ @Override
+ public BigDecimal getState() {
+ return max;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessBigDecimalMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java
new file mode 100644
index 0000000..9f7c3cf
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.model.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ *
+ */
+public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
+
+ private static final Logger logger = LoggerFactory.getLogger(BigDecimalSerializer.class);
+
+ final DataType type;
+ final int maxLength;
+
+ int avoidVerbose = 0;
+
+ public BigDecimalSerializer(DataType type) {
+ this.type = type;
+ // see serialize(): 1 byte scale, 1 byte length, assume every 2 digits takes 1 byte
+ this.maxLength = 1 + 1 + (type.getPrecision() + 1) / 2;
+ }
+
+ @Override
+ public void serialize(BigDecimal value, ByteBuffer out) {
+ if (value.scale() > type.getScale()) {
+ if (avoidVerbose % 10000 == 0) {
+ logger.warn("value's scale has exceeded the " + type.getScale() + ", cut it off, to ensure encoded value do not exceed maxLength " + maxLength + " times:" + (avoidVerbose++));
+ }
+ value = value.setScale(type.getScale(), BigDecimal.ROUND_HALF_EVEN);
+ }
+ byte[] bytes = value.unscaledValue().toByteArray();
+ if (bytes.length + 2 > maxLength) {
+ throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
+ }
+
+ BytesUtil.writeVInt(value.scale(), out);
+ BytesUtil.writeVInt(bytes.length, out);
+ out.put(bytes);
+ }
+
+ @Override
+ public BigDecimal deserialize(ByteBuffer in) {
+ int scale = BytesUtil.readVInt(in);
+ int n = BytesUtil.readVInt(in);
+
+ byte[] bytes = new byte[n];
+ in.get(bytes);
+
+ return new BigDecimal(new BigInteger(bytes), scale);
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ int mark = in.position();
+
+ @SuppressWarnings("unused")
+ int scale = BytesUtil.readVInt(in);
+ int n = BytesUtil.readVInt(in);
+ int len = in.position() - mark + n;
+
+ in.position(mark);
+ return len;
+ }
+
+ @Override
+ public int maxLength() {
+ return maxLength;
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return 8;
+ }
+
+ @Override
+ public BigDecimal valueOf(byte[] value) {
+ if (value == null)
+ return new BigDecimal(0);
+ else
+ return new BigDecimal(Bytes.toString(value));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSumAggregator.java
new file mode 100644
index 0000000..19aef3c
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSumAggregator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import java.math.BigDecimal;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class BigDecimalSumAggregator extends MeasureAggregator<BigDecimal> {
+
+ BigDecimal sum = new BigDecimal(0);
+
+ @Override
+ public void reset() {
+ sum = new BigDecimal(0);
+ }
+
+ @Override
+ public void aggregate(BigDecimal value) {
+ sum = sum.add(value);
+ }
+
+ @Override
+ public BigDecimal getState() {
+ return sum;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessBigDecimalMemBytes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java
new file mode 100644
index 0000000..0bf4aba
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java
@@ -0,0 +1,65 @@
+package org.apache.kylin.aggregation.basic;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.LongMutable;
+import org.apache.kylin.metadata.model.DataType;
+
+public class DateTimeSerializer extends DataTypeSerializer<LongMutable> {
+
+ // be thread-safe and avoid repeated obj creation
+ private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+
+ public DateTimeSerializer(DataType type) {
+ }
+
+ @Override
+ public void serialize(LongMutable value, ByteBuffer out) {
+ out.putLong(value.get());
+ }
+
+ private LongMutable current() {
+ LongMutable l = current.get();
+ if (l == null) {
+ l = new LongMutable();
+ current.set(l);
+ }
+ return l;
+ }
+
+ @Override
+ public LongMutable deserialize(ByteBuffer in) {
+ LongMutable l = current();
+ l.set(in.getLong());
+ return l;
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return 8;
+ }
+
+ @Override
+ public int maxLength() {
+ return 8;
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return 8;
+ }
+
+ @Override
+ public LongMutable valueOf(byte[] value) {
+ LongMutable l = current();
+ if (value == null)
+ l.set(0L);
+ else
+ l.set(DateFormat.stringToMillis(Bytes.toString(value)));
+ return l;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java
new file mode 100644
index 0000000..f09614d
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.util.DoubleMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class DoubleMaxAggregator extends MeasureAggregator<DoubleMutable> {
+
+ DoubleMutable max = null;
+
+ @Override
+ public void reset() {
+ max = null;
+ }
+
+ @Override
+ public void aggregate(DoubleMutable value) {
+ if (max == null)
+ max = new DoubleMutable(value.get());
+ else if (max.get() < value.get())
+ max.set(value.get());
+ }
+
+ @Override
+ public DoubleMutable getState() {
+ return max;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessDoubleMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
new file mode 100644
index 0000000..b93c15c
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.util.DoubleMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class DoubleMinAggregator extends MeasureAggregator<DoubleMutable> {
+
+ DoubleMutable min = null;
+
+ @Override
+ public void reset() {
+ min = null;
+ }
+
+ @Override
+ public void aggregate(DoubleMutable value) {
+ if (min == null)
+ min = new DoubleMutable(value.get());
+ else if (min.get() > value.get())
+ min.set(value.get());
+ }
+
+ @Override
+ public DoubleMutable getState() {
+ return min;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessDoubleMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java
new file mode 100644
index 0000000..f207054
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.DoubleMutable;
+import org.apache.kylin.metadata.model.DataType;
+
+/**
+ */
+public class DoubleSerializer extends DataTypeSerializer<DoubleMutable> {
+
+ // be thread-safe and avoid repeated obj creation
+ private ThreadLocal<DoubleMutable> current = new ThreadLocal<DoubleMutable>();
+
+ public DoubleSerializer(DataType type) {
+ }
+
+ @Override
+ public void serialize(DoubleMutable value, ByteBuffer out) {
+ out.putDouble(value.get());
+ }
+
+ private DoubleMutable current() {
+ DoubleMutable d = current.get();
+ if (d == null) {
+ d = new DoubleMutable();
+ current.set(d);
+ }
+ return d;
+ }
+
+ @Override
+ public DoubleMutable deserialize(ByteBuffer in) {
+ DoubleMutable d = current();
+ d.set(in.getDouble());
+ return d;
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return 8;
+ }
+
+ @Override
+ public int maxLength() {
+ return 8;
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return 8;
+ }
+
+ @Override
+ public DoubleMutable valueOf(byte[] value) {
+ DoubleMutable d = current();
+ if (value == null)
+ d.set(0d);
+ else
+ d.set(Double.parseDouble(Bytes.toString(value)));
+ return d;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
new file mode 100644
index 0000000..298cec6
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.util.DoubleMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class DoubleSumAggregator extends MeasureAggregator<DoubleMutable> {
+
+ DoubleMutable sum = new DoubleMutable();
+
+ @Override
+ public void reset() {
+ sum.set(0.0);
+ }
+
+ @Override
+ public void aggregate(DoubleMutable value) {
+ sum.set(sum.get() + value.get());
+ }
+
+ @Override
+ public DoubleMutable getState() {
+ return sum;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessDoubleMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
new file mode 100644
index 0000000..71d95f2
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.util.LongMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class LongMaxAggregator extends MeasureAggregator<LongMutable> {
+
+ LongMutable max = null;
+
+ @Override
+ public void reset() {
+ max = null;
+ }
+
+ @Override
+ public void aggregate(LongMutable value) {
+ if (max == null)
+ max = new LongMutable(value.get());
+ else if (max.get() < value.get())
+ max.set(value.get());
+ }
+
+ @Override
+ public LongMutable getState() {
+ return max;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessLongMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
new file mode 100644
index 0000000..d1e93f2
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.util.LongMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class LongMinAggregator extends MeasureAggregator<LongMutable> {
+
+ LongMutable min = null;
+
+ @Override
+ public void reset() {
+ min = null;
+ }
+
+ @Override
+ public void aggregate(LongMutable value) {
+ if (min == null)
+ min = new LongMutable(value.get());
+ else if (min.get() > value.get())
+ min.set(value.get());
+ }
+
+ @Override
+ public LongMutable getState() {
+ return min;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessLongMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSerializer.java
new file mode 100644
index 0000000..202596d
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSerializer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.LongMutable;
+import org.apache.kylin.metadata.model.DataType;
+
+/**
+ */
+public class LongSerializer extends DataTypeSerializer<LongMutable> {
+
+ // be thread-safe and avoid repeated obj creation
+ private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+
+ public LongSerializer(DataType type) {
+ }
+
+ @Override
+ public void serialize(LongMutable value, ByteBuffer out) {
+ BytesUtil.writeVLong(value.get(), out);
+ }
+
+ private LongMutable current() {
+ LongMutable l = current.get();
+ if (l == null) {
+ l = new LongMutable();
+ current.set(l);
+ }
+ return l;
+ }
+
+ @Override
+ public LongMutable deserialize(ByteBuffer in) {
+ LongMutable l = current();
+ l.set(BytesUtil.readVLong(in));
+ return l;
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ int mark = in.position();
+
+ BytesUtil.readVLong(in);
+ int len = in.position() - mark;
+
+ in.position(mark);
+ return len;
+ }
+
+ @Override
+ public int maxLength() {
+ return 9; // vlong: 1 + 8
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return 5;
+ }
+
+ @Override
+ public LongMutable valueOf(byte[] value) {
+ LongMutable l = current();
+ if (value == null)
+ l.set(0L);
+ else
+ l.set(Long.parseLong(Bytes.toString(value)));
+ return l;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java
new file mode 100644
index 0000000..c85c83c
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.util.LongMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class LongSumAggregator extends MeasureAggregator<LongMutable> {
+
+ LongMutable sum = new LongMutable();
+
+ @Override
+ public void reset() {
+ sum.set(0);
+ }
+
+ @Override
+ public void aggregate(LongMutable value) {
+ sum.set(sum.get() + value.get());
+ }
+
+ @Override
+ public LongMutable getState() {
+ return sum;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessLongMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/StringSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/StringSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/StringSerializer.java
new file mode 100644
index 0000000..e84278d
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/StringSerializer.java
@@ -0,0 +1,56 @@
+package org.apache.kylin.aggregation.basic;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.model.DataType;
+
+public class StringSerializer extends DataTypeSerializer<String> {
+
+ final DataType type;
+ final int maxLength;
+
+ public StringSerializer(DataType type) {
+ this.type = type;
+ // see serialize(): 2 byte length, rest is String.toBytes()
+ this.maxLength = 2 + type.getPrecision();
+ }
+
+ @Override
+ public void serialize(String value, ByteBuffer out) {
+ int start = out.position();
+
+ BytesUtil.writeUTFString(value, out);
+
+ if (out.position() - start > maxLength)
+ throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
+ }
+
+ @Override
+ public String deserialize(ByteBuffer in) {
+ return BytesUtil.readUTFString(in);
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return BytesUtil.peekByteArrayLength(in);
+ }
+
+ @Override
+ public int maxLength() {
+ return maxLength;
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return maxLength;
+ }
+
+ @Override
+ public String valueOf(byte[] value) {
+ return Bytes.toString(value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java
new file mode 100644
index 0000000..8f85fe8
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.hllc;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounter> {
+
+ final int precision;
+ HyperLogLogPlusCounter sum = null;
+
+ public HLLCAggregator(int precision) {
+ this.precision = precision;
+ }
+
+ @Override
+ public void reset() {
+ sum = null;
+ }
+
+ @Override
+ public void aggregate(HyperLogLogPlusCounter value) {
+ if (sum == null)
+ sum = new HyperLogLogPlusCounter(value);
+ else
+ sum.merge(value);
+ }
+
+ @Override
+ public HyperLogLogPlusCounter getState() {
+ return sum;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ // 1024 + 60 returned by AggregationCacheMemSizeTest
+ return 8 // aggregator obj shell
+ + 4 // precision
+ + 8 // ref to HLLC
+ + 8 // HLLC obj shell
+ + 32 + (1 << precision); // HLLC internal
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java
new file mode 100644
index 0000000..f7804f4
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.hllc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.metadata.model.DataType;
+
+/**
+ * @author yangli9
+ *
+ */
+public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> {
+
+ // be thread-safe and avoid repeated obj creation
+ private ThreadLocal<HyperLogLogPlusCounter> current = new ThreadLocal<HyperLogLogPlusCounter>();
+
+ private int precision;
+
+ public HLLCSerializer(DataType type) {
+ this.precision = type.getPrecision();
+ }
+
+ @Override
+ public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) {
+ try {
+ value.writeRegisters(out);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private HyperLogLogPlusCounter current() {
+ HyperLogLogPlusCounter hllc = current.get();
+ if (hllc == null) {
+ hllc = new HyperLogLogPlusCounter(precision);
+ current.set(hllc);
+ }
+ return hllc;
+ }
+
+ @Override
+ public HyperLogLogPlusCounter deserialize(ByteBuffer in) {
+ HyperLogLogPlusCounter hllc = current();
+ try {
+ hllc.readRegisters(in);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return hllc;
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return current().peekLength(in);
+ }
+
+ @Override
+ public int maxLength() {
+ return current().maxLength();
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return current().maxLength();
+ }
+
+ @Override
+ public HyperLogLogPlusCounter valueOf(byte[] value) {
+ HyperLogLogPlusCounter hllc = current();
+ hllc.clear();
+ if (value == null)
+ hllc.add("__nUlL__");
+ else
+ hllc.add(value);
+ return hllc;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java
new file mode 100644
index 0000000..643bcae
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.hllc;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.util.LongMutable;
+
+/**
+ * Long Distinct Count
+ */
+@SuppressWarnings("serial")
+public class LDCAggregator extends MeasureAggregator<LongMutable> {
+
+ private static LongMutable ZERO = new LongMutable(0);
+
+ private HLLCAggregator hllAgg = null;
+ private LongMutable state = new LongMutable(0);
+
+ @SuppressWarnings("rawtypes")
+ public void setDependentAggregator(MeasureAggregator agg) {
+ this.hllAgg = (HLLCAggregator) agg;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void aggregate(LongMutable value) {
+ }
+
+ @Override
+ public LongMutable getState() {
+ if (hllAgg == null) {
+ return ZERO;
+ } else {
+ state.set(hllAgg.getState().getCountEstimate());
+ return state;
+ }
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessLongMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java
new file mode 100644
index 0000000..4f6c7ee
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.topn;
+
+import java.util.Map;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+
+import com.google.common.collect.Maps;
+
+/**
+ *
+ */
+@SuppressWarnings("serial")
+public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
+
+ int capacity = 0;
+ TopNCounter<ByteArray> sum = null;
+ Map<ByteArray, Double> sanityCheckMap;
+
+ @Override
+ public void reset() {
+ sum = null;
+ }
+
+ @Override
+ public void aggregate(TopNCounter<ByteArray> value) {
+ if (sum == null) {
+ capacity = value.getCapacity();
+ sum = new TopNCounter<>(capacity);
+ sanityCheckMap = Maps.newHashMap();
+ }
+ sum.merge(value);
+ }
+
+ @Override
+ public TopNCounter<ByteArray> getState() {
+
+ //sum.retain(capacity);
+ return sum;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return 8 * capacity / 4;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java
new file mode 100644
index 0000000..8c44f8f
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.topn;
+
+import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.DoubleDeltaSerializer;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.model.DataType;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ *
+ */
+public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteArray>> {
+
+ private DoubleDeltaSerializer dds = new DoubleDeltaSerializer(3);
+
+ private int precision;
+
+ public TopNCounterSerializer(DataType dataType) {
+ this.precision = dataType.getPrecision();
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ int mark = in.position();
+ @SuppressWarnings("unused")
+ int capacity = in.getInt();
+ int size = in.getInt();
+ int keyLength = in.getInt();
+ dds.deserialize(in);
+ int len = in.position() - mark + keyLength * size;
+ in.position(mark);
+ return len;
+ }
+
+ @Override
+ public int maxLength() {
+ return precision * TopNCounter.EXTRA_SPACE_RATE * (4 + 8);
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return precision * TopNCounter.EXTRA_SPACE_RATE * 8;
+ }
+
+ @Override
+ public TopNCounter<ByteArray> valueOf(byte[] value) {
+ ByteBuffer buffer = ByteBuffer.wrap(value);
+ int sizeOfId = buffer.getInt();
+ int keyEncodedValue = buffer.getInt();
+ double counter = buffer.getDouble();
+
+ ByteArray key = new ByteArray(sizeOfId);
+ BytesUtil.writeUnsigned(keyEncodedValue, key.array(), 0, sizeOfId);
+
+ TopNCounter<ByteArray> topNCounter = new TopNCounter<ByteArray>(precision * TopNCounter.EXTRA_SPACE_RATE);
+ topNCounter.offer(key, counter);
+ return topNCounter;
+ }
+
+ @Override
+ public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
+ double[] counters = value.getCounters();
+ List<ByteArray> peek = value.peek(1);
+ int keyLength = peek.size() > 0 ? peek.get(0).length() : 0;
+ out.putInt(value.getCapacity());
+ out.putInt(value.size());
+ out.putInt(keyLength);
+ dds.serialize(counters, out);
+ Iterator<Counter<ByteArray>> iterator = value.iterator();
+ while (iterator.hasNext()) {
+ out.put(iterator.next().getItem().array());
+ }
+ }
+
+ @Override
+ public TopNCounter<ByteArray> deserialize(ByteBuffer in) {
+ int capacity = in.getInt();
+ int size = in.getInt();
+ int keyLength = in.getInt();
+ double[] counters = dds.deserialize(in);
+
+ TopNCounter<ByteArray> counter = new TopNCounter<ByteArray>(capacity);
+ ByteArray byteArray;
+ for (int i = 0; i < size; i++) {
+ byteArray = new ByteArray(keyLength);
+ in.get(byteArray.array());
+ counter.offerToHead(byteArray, counters[i]);
+ }
+
+ return counter;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
index 65d639f..b88f9df 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
@@ -6,6 +6,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
+import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.aggregation.basic.StringSerializer;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ImmutableBitSet;
@@ -15,9 +18,6 @@ import org.apache.kylin.gridtable.DefaultGTComparator;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.IGTCodeSystem;
import org.apache.kylin.gridtable.IGTComparator;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
-import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer;
-import org.apache.kylin.metadata.measure.serializer.StringSerializer;
/**
* defines how column values will be encoded to/ decoded from GTRecord
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
index e4f32fb..26f1636 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
@@ -23,13 +23,13 @@ package org.apache.kylin.cube.gridtable;
import java.nio.ByteBuffer;
import java.util.Map;
+import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.aggregation.basic.StringSerializer;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.IGTCodeSystem;
import org.apache.kylin.gridtable.IGTComparator;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
-import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer;
-import org.apache.kylin.metadata.measure.serializer.StringSerializer;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class TrimmedCubeCodeSystem implements IGTCodeSystem {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index 7fe2122..ce912a3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -29,6 +29,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.TimeUnit;
+import org.apache.kylin.aggregation.MeasureAggregators;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
@@ -37,7 +38,6 @@ import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.metadata.measure.MeasureAggregators;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index e9d940a..8c6146b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kylin.common.topn.Counter;
import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.DoubleMutable;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.common.util.Pair;
@@ -46,7 +47,6 @@ import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.GridTable;
import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.metadata.measure.DoubleMutable;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
index 69a9fc9..951c054 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
@@ -18,15 +18,16 @@
package org.apache.kylin.cube.inmemcubing;
import com.google.common.base.Preconditions;
+
+import org.apache.kylin.aggregation.MeasureCodec;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.LongMutable;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.metadata.measure.LongMutable;
-import org.apache.kylin.metadata.measure.MeasureCodec;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index cd2881e..01696e8 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -7,10 +7,10 @@ import java.util.Iterator;
import java.util.Map.Entry;
import java.util.SortedMap;
+import org.apache.kylin.aggregation.MeasureAggregator;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;