You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/27 10:01:02 UTC
[07/13] incubator-kylin git commit: KYLIN-976 AggregationType
interface, serializer and aggregator
KYLIN-976 AggregationType interface, serializer and aggregator
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8f2a56cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8f2a56cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8f2a56cf
Branch: refs/heads/KYLIN-976
Commit: 8f2a56cfb5f340cd19c67bf22c8dfbdf2f9458a1
Parents: 1218bbd
Author: Li, Yang <ya...@ebay.com>
Authored: Wed Nov 25 11:13:18 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Nov 27 14:47:09 2015 +0800
----------------------------------------------------------------------
.../kylin/job/dataGen/FactTableGenerator.java | 2 +-
.../streaming/StreamingTableDataGenerator.java | 2 +-
.../common/datatype/BigDecimalSerializer.java | 109 +++++++
.../apache/kylin/common/datatype/DataType.java | 289 ++++++++++++++++++
.../common/datatype/DataTypeSerializer.java | 108 +++++++
.../common/datatype/DateTimeSerializer.java | 62 ++++
.../kylin/common/datatype/DoubleMutable.java | 68 +++++
.../kylin/common/datatype/DoubleSerializer.java | 81 +++++
.../kylin/common/datatype/LongMutable.java | 70 +++++
.../kylin/common/datatype/LongSerializer.java | 88 ++++++
.../kylin/common/datatype/StringSerializer.java | 54 ++++
.../kylin/common/debug/BackdoorToggles.java | 5 -
.../kylin/common/topn/DoublyLinkedList.java | 2 -
.../apache/kylin/common/util/DoubleMutable.java | 68 -----
.../apache/kylin/common/util/LongMutable.java | 70 -----
.../datatype/BigDecimalSerializerTest.java | 51 ++++
.../apache/kylin/aggregation/Aggregation.java | 42 ---
.../kylin/aggregation/AggregationType.java | 97 ++++++
.../kylin/aggregation/DataTypeSerializer.java | 118 --------
.../kylin/aggregation/IAggregationFactory.java | 6 +
.../kylin/aggregation/MeasureAggregator.java | 61 +---
.../apache/kylin/aggregation/MeasureCodec.java | 1 +
.../aggregation/basic/BasicAggregation.java | 128 ++++++++
.../basic/BasicAggregationFactory.java | 31 ++
.../aggregation/basic/BigDecimalSerializer.java | 111 -------
.../aggregation/basic/DateTimeSerializer.java | 65 ----
.../aggregation/basic/DoubleMaxAggregator.java | 2 +-
.../aggregation/basic/DoubleMinAggregator.java | 2 +-
.../aggregation/basic/DoubleSerializer.java | 84 ------
.../aggregation/basic/DoubleSumAggregator.java | 2 +-
.../aggregation/basic/LongMaxAggregator.java | 2 +-
.../aggregation/basic/LongMinAggregator.java | 2 +-
.../kylin/aggregation/basic/LongSerializer.java | 91 ------
.../aggregation/basic/LongSumAggregator.java | 2 +-
.../aggregation/basic/StringSerializer.java | 56 ----
.../kylin/aggregation/hllc/HLLCAggregation.java | 78 +++++
.../hllc/HLLCAggregationFactory.java | 35 +++
.../kylin/aggregation/hllc/HLLCSerializer.java | 4 +-
.../kylin/aggregation/hllc/LDCAggregator.java | 2 +-
.../kylin/aggregation/topn/TopNAggregation.java | 76 +++++
.../topn/TopNAggregationFactory.java | 35 +++
.../aggregation/topn/TopNCounterSerializer.java | 12 +-
.../kylin/cube/gridtable/CubeCodeSystem.java | 4 +-
.../kylin/cube/gridtable/CubeGridTable.java | 1 -
.../gridtable/CuboidToGridTableMapping.java | 2 +-
.../cube/gridtable/TrimmedCubeCodeSystem.java | 3 +-
.../cube/inmemcubing/InMemCubeBuilder.java | 2 +-
.../InMemCubeBuilderInputConverter.java | 2 +-
.../apache/kylin/cube/kv/RowKeyColumnOrder.java | 2 +-
.../model/validation/rule/FunctionRule.java | 2 +-
.../java/org/apache/kylin/gridtable/GTInfo.java | 2 +-
.../kylin/gridtable/GTSampleCodeSystem.java | 2 +-
.../apache/kylin/gridtable/UnitTestSupport.java | 14 +-
.../basic/BigDecimalSerializerTest.java | 53 ----
.../topn/TopNCounterSerializerTest.java | 9 +-
.../gridtable/AggregationCacheMemSizeTest.java | 4 +-
.../kylin/gridtable/DictGridTableTest.java | 16 +-
.../kylin/gridtable/SimpleGridTableTest.java | 2 +-
.../gridtable/SimpleInvertedIndexTest.java | 13 +-
.../metadata/measure/MeasureCodecTest.java | 4 +-
.../apache/kylin/dict/DictionaryGenerator.java | 9 +-
.../apache/kylin/dict/DictionaryManager.java | 4 +-
.../apache/kylin/dict/NumberDictionaryTest.java | 9 +-
.../filter/TimeConditionLiteralsReplacer.java | 2 +-
.../apache/kylin/metadata/model/ColumnDesc.java | 9 +-
.../apache/kylin/metadata/model/DataType.java | 295 -------------------
.../kylin/metadata/model/FunctionDesc.java | 11 +-
.../apache/kylin/metadata/model/TblColRef.java | 5 +-
.../metadata/realization/SQLDigestUtil.java | 6 +-
.../org/apache/kylin/storage/tuple/Tuple.java | 4 +-
.../kylin/engine/mr/steps/CubeReducerTest.java | 2 +-
.../engine/mr/steps/MergeCuboidMapperTest.java | 15 +-
.../invertedindex/index/RawTableRecord.java | 2 +-
.../kylin/invertedindex/index/TableRecord.java | 2 +-
.../invertedindex/index/TableRecordInfo.java | 6 +-
.../index/TableRecordInfoDigest.java | 6 +-
.../invertedindex/measure/FixedHLLCodec.java | 2 +-
.../measure/FixedLenMeasureCodec.java | 4 +-
.../measure/FixedPointLongCodec.java | 4 +-
.../invertedindex/model/IIKeyValueCodec.java | 6 +-
.../measure/FixedPointLongCodecTest.java | 13 +-
.../hbase/cube/v1/CubeTupleConverter.java | 4 -
.../endpoint/EndpointAggregators.java | 6 +-
.../storage/hbase/steps/CreateHTableJob.java | 2 +-
.../storage/hbase/steps/RowValueDecoder.java | 4 +-
.../observer/AggregateRegionObserverTest.java | 2 +-
.../endpoint/EndpointAggregationTest.java | 2 +-
.../hbase/steps/RowValueDecoderTest.java | 2 +-
88 files changed, 1601 insertions(+), 1248 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
index 5a0fee7..705c175 100644
--- a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
+++ b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
@@ -37,6 +37,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.datatype.DataType;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.Array;
import org.apache.kylin.cube.CubeInstance;
@@ -45,7 +46,6 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.metadata.model.JoinDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
index 075a048..04a5b5b 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
@@ -5,9 +5,9 @@ import java.util.List;
import java.util.Random;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.datatype.DataType;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.metadata.model.TableDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/datatype/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/BigDecimalSerializer.java b/core-common/src/main/java/org/apache/kylin/common/datatype/BigDecimalSerializer.java
new file mode 100644
index 0000000..f19ce14
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/datatype/BigDecimalSerializer.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.datatype;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ *
+ */
+public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
+
+ private static final Logger logger = LoggerFactory.getLogger(BigDecimalSerializer.class);
+
+ final DataType type;
+ final int maxLength;
+
+ int avoidVerbose = 0;
+
+ public BigDecimalSerializer(DataType type) {
+ this.type = type;
+ // see serialize(): 1 byte scale, 1 byte length, assume every 2 digits takes 1 byte
+ this.maxLength = 1 + 1 + (type.getPrecision() + 1) / 2;
+ }
+
+ @Override
+ public void serialize(BigDecimal value, ByteBuffer out) {
+ if (value.scale() > type.getScale()) {
+ if (avoidVerbose % 10000 == 0) {
+ logger.warn("value's scale has exceeded the " + type.getScale() + ", cut it off, to ensure encoded value do not exceed maxLength " + maxLength + " times:" + (avoidVerbose++));
+ }
+ value = value.setScale(type.getScale(), BigDecimal.ROUND_HALF_EVEN);
+ }
+ byte[] bytes = value.unscaledValue().toByteArray();
+ if (bytes.length + 2 > maxLength) {
+ throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
+ }
+
+ BytesUtil.writeVInt(value.scale(), out);
+ BytesUtil.writeVInt(bytes.length, out);
+ out.put(bytes);
+ }
+
+ @Override
+ public BigDecimal deserialize(ByteBuffer in) {
+ int scale = BytesUtil.readVInt(in);
+ int n = BytesUtil.readVInt(in);
+
+ byte[] bytes = new byte[n];
+ in.get(bytes);
+
+ return new BigDecimal(new BigInteger(bytes), scale);
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ int mark = in.position();
+
+ @SuppressWarnings("unused")
+ int scale = BytesUtil.readVInt(in);
+ int n = BytesUtil.readVInt(in);
+ int len = in.position() - mark + n;
+
+ in.position(mark);
+ return len;
+ }
+
+ @Override
+ public int maxLength() {
+ return maxLength;
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return 8;
+ }
+
+ @Override
+ public BigDecimal valueOf(byte[] value) {
+ if (value == null)
+ return new BigDecimal(0);
+ else
+ return new BigDecimal(Bytes.toString(value));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/datatype/DataType.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/DataType.java b/core-common/src/main/java/org/apache/kylin/common/datatype/DataType.java
new file mode 100644
index 0000000..36fad00
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/datatype/DataType.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.datatype;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class DataType implements Serializable {
+
+ // standard sql types, ref: http://www.w3schools.com/sql/sql_datatypes_general.asp
+ public static final String VALID_TYPES_STRING = "any|char|varchar|boolean|binary" //
+ + "|integer|tinyint|smallint|bigint|decimal|numeric|float|real|double" //
+ + "|date|time|datetime|timestamp|byte|int|short|long|string";
+
+ private static final String TYPE_PATTEN_TAIL = "\\s*" //
+ + "(?:" + "[(]" + "([\\d\\s,]+)" + "[)]" + ")?";
+
+ private static final Pattern TYPE_PATTERN = Pattern.compile( //
+ "(" + VALID_TYPES_STRING + ")" + TYPE_PATTEN_TAIL, Pattern.CASE_INSENSITIVE);
+
+ private static final Pattern CUSTOM_TYPE_PATTERN = Pattern.compile( //
+ "(" + ".*?" + ")" + TYPE_PATTEN_TAIL, Pattern.CASE_INSENSITIVE);
+
+ public static final Set<String> INTEGER_FAMILY = new HashSet<String>();
+ public static final Set<String> NUMBER_FAMILY = new HashSet<String>();
+ public static final Set<String> DATETIME_FAMILY = new HashSet<String>();
+ public static final Set<String> STRING_FAMILY = new HashSet<String>();
+ private static final Map<String, String> LEGACY_TYPE_MAP = new HashMap<String, String>();
+ static {
+ INTEGER_FAMILY.add("tinyint");
+ INTEGER_FAMILY.add("smallint");
+ INTEGER_FAMILY.add("integer");
+ INTEGER_FAMILY.add("bigint");
+
+ NUMBER_FAMILY.addAll(INTEGER_FAMILY);
+ NUMBER_FAMILY.add("float");
+ NUMBER_FAMILY.add("double");
+ NUMBER_FAMILY.add("decimal");
+ NUMBER_FAMILY.add("real");
+ NUMBER_FAMILY.add("numeric");
+
+ DATETIME_FAMILY.add("date");
+ DATETIME_FAMILY.add("time");
+ DATETIME_FAMILY.add("datetime");
+ DATETIME_FAMILY.add("timestamp");
+
+ STRING_FAMILY.add("varchar");
+ STRING_FAMILY.add("char");
+
+ LEGACY_TYPE_MAP.put("byte", "tinyint");
+ LEGACY_TYPE_MAP.put("int", "integer");
+ LEGACY_TYPE_MAP.put("short", "smallint");
+ LEGACY_TYPE_MAP.put("long", "bigint");
+ LEGACY_TYPE_MAP.put("string", "varchar");
+ LEGACY_TYPE_MAP.put("hllc10", "hllc(10)");
+ LEGACY_TYPE_MAP.put("hllc12", "hllc(12)");
+ LEGACY_TYPE_MAP.put("hllc14", "hllc(14)");
+ LEGACY_TYPE_MAP.put("hllc15", "hllc(15)");
+ LEGACY_TYPE_MAP.put("hllc16", "hllc(16)");
+ }
+
+ private static final ConcurrentMap<DataType, DataType> CACHE = new ConcurrentHashMap<DataType, DataType>();
+
+ public static final DataType ANY = DataType.getType("any");
+
+ public static DataType getType(String type) {
+ return getTypeInner(type, false);
+ }
+
+ public static DataType getCustomType(String type) {
+ return getTypeInner(type, true);
+ }
+
+ private static DataType getTypeInner(String type, boolean isCustom) {
+ if (type == null)
+ return null;
+
+ DataType dataType = new DataType(type, isCustom);
+ DataType cached = CACHE.get(dataType);
+ if (cached == null) {
+ CACHE.put(dataType, dataType);
+ cached = dataType;
+ }
+ return cached;
+ }
+
+ // ============================================================================
+
+ private String name;
+ private int precision;
+ private int scale;
+
+ DataType(String datatype, boolean isCustom) {
+ datatype = datatype.trim().toLowerCase();
+ datatype = replaceLegacy(datatype);
+
+ Pattern pattern = isCustom ? CUSTOM_TYPE_PATTERN : TYPE_PATTERN;
+ Matcher m = pattern.matcher(datatype);
+ if (m.matches() == false)
+ throw new IllegalArgumentException("bad data type -- " + datatype + ", does not match " + pattern);
+
+ name = replaceLegacy(m.group(1));
+ precision = -1;
+ scale = -1;
+
+ String leftover = m.group(2);
+ if (leftover != null) {
+ String[] parts = leftover.split("\\s*,\\s*");
+ for (int i = 0; i < parts.length; i++) {
+ int n;
+ try {
+ n = Integer.parseInt(parts[i]);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("bad data type -- " + datatype + ", precision/scale not numeric");
+ }
+ if (i == 0)
+ precision = n;
+ else if (i == 1)
+ scale = n;
+ else
+ throw new IllegalArgumentException("bad data type -- " + datatype + ", too many precision/scale parts");
+ }
+ }
+
+ // FIXME 256 for unknown string precision
+ if ((name.equals("char") || name.equals("varchar")) && precision == -1) {
+ precision = 256; // to save memory at frontend, e.g. tableau will
+ // allocate memory according to this
+ }
+
+ // FIXME (19,4) for unknown decimal precision
+ if ((name.equals("decimal") || name.equals("numeric")) && precision == -1) {
+ precision = 19;
+ scale = 4;
+ }
+ }
+
+ private String replaceLegacy(String str) {
+ String replace = LEGACY_TYPE_MAP.get(str);
+ return replace == null ? str : replace;
+ }
+
+ public int getStorageBytesEstimate() {
+ return DataTypeSerializer.create(this).getStorageBytesEstimate();
+ }
+
+ public boolean isStringFamily() {
+ return STRING_FAMILY.contains(name);
+ }
+
+ public boolean isIntegerFamily() {
+ return INTEGER_FAMILY.contains(name);
+ }
+
+ public boolean isNumberFamily() {
+ return NUMBER_FAMILY.contains(name);
+ }
+
+ public boolean isDateTimeFamily() {
+ return DATETIME_FAMILY.contains(name);
+ }
+
+ public boolean isDate() {
+ return name.equals("date");
+ }
+
+ public boolean isTime() {
+ return name.equals("time");
+ }
+
+ public boolean isTimestamp() {
+ return name.equals("timestamp");
+ }
+
+ public boolean isDatetime() {
+ return name.equals("datetime");
+ }
+
+ public boolean isTinyInt() {
+ return name.equals("tinyint");
+ }
+
+ public boolean isSmallInt() {
+ return name.equals("smallint");
+ }
+
+ public boolean isInt() {
+ return name.equals("integer");
+ }
+
+ public boolean isBigInt() {
+ return name.equals("bigint");
+ }
+
+ public boolean isFloat() {
+ return name.equals("float");
+ }
+
+ public boolean isDouble() {
+ return name.equals("double");
+ }
+
+ public boolean isDecimal() {
+ return name.equals("decimal");
+ }
+
+ public boolean isHLLC() {
+ return name.equals("hllc");
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getPrecision() {
+ return precision;
+ }
+
+ public int getScale() {
+ return scale;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + precision;
+ result = prime * result + scale;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ DataType other = (DataType) obj;
+ if (name == null) {
+ if (other.name != null)
+ return false;
+ } else if (!name.equals(other.name))
+ return false;
+ if (precision != other.precision)
+ return false;
+ if (scale != other.scale)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ if (precision < 0 && scale < 0)
+ return name;
+ else if (scale < 0)
+ return name + "(" + precision + ")";
+ else
+ return name + "(" + precision + "," + scale + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/datatype/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/DataTypeSerializer.java b/core-common/src/main/java/org/apache/kylin/common/datatype/DataTypeSerializer.java
new file mode 100644
index 0000000..99fa3fd
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/datatype/DataTypeSerializer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.datatype;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.util.BytesSerializer;
+
+import com.google.common.collect.Maps;
+
+/**
+ * @author yangli9
+ *
+ * Note: the implementations MUST be thread-safe.
+ */
+abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
+
+ final static Map<String, Class<?>> implementations;
+ static {
+ HashMap<String, Class<?>> impl = Maps.newHashMap();
+ impl.put("varchar", StringSerializer.class);
+ impl.put("decimal", BigDecimalSerializer.class);
+ impl.put("double", DoubleSerializer.class);
+ impl.put("float", DoubleSerializer.class);
+ impl.put("bigint", LongSerializer.class);
+ impl.put("long", LongSerializer.class);
+ impl.put("integer", LongSerializer.class);
+ impl.put("int", LongSerializer.class);
+ impl.put("smallint", LongSerializer.class);
+ impl.put("date", DateTimeSerializer.class);
+ impl.put("datetime", DateTimeSerializer.class);
+ impl.put("timestamp", DateTimeSerializer.class);
+ implementations = Collections.unmodifiableMap(impl);
+ }
+
+ public static boolean hasRegistered(String dataTypeName) {
+ return implementations.containsKey(dataTypeName);
+ }
+
+ public static void register(String dataTypeName, Class<? extends DataTypeSerializer<?>> impl) {
+ implementations.put(dataTypeName, impl);
+ }
+
+ public static DataTypeSerializer<?> create(String dataType) {
+ return create(DataType.getType(dataType));
+ }
+
+ public static DataTypeSerializer<?> create(DataType type) {
+ Class<?> clz = implementations.get(type.getName());
+ if (clz == null)
+ throw new RuntimeException("No DataTypeSerializer for type " + type);
+
+ try {
+ return (DataTypeSerializer<?>) clz.getConstructor(DataType.class).newInstance(type);
+ } catch (Exception e) {
+ throw new RuntimeException(e); // never happen
+ }
+ }
+
+ /** peek into buffer and return the length of serialization */
+ abstract public int peekLength(ByteBuffer in);
+
+ /** return the max number of bytes to the longest serialization */
+ abstract public int maxLength();
+
+ /** get an estimate of size in bytes of the serialized data */
+ abstract public int getStorageBytesEstimate();
+
+ /** convert from String to obj (string often come as byte[] in mapred) */
+ abstract public T valueOf(byte[] value);
+
+ /** convert from String to obj */
+ public T valueOf(String value) {
+ try {
+ return valueOf(value.getBytes("UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e); // never happen
+ }
+ }
+
+ /** convert from obj to string */
+ public String toString(T value) {
+ if (value == null)
+ return "NULL";
+ else
+ return value.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/datatype/DateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/DateTimeSerializer.java b/core-common/src/main/java/org/apache/kylin/common/datatype/DateTimeSerializer.java
new file mode 100644
index 0000000..d3ef7cd
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/datatype/DateTimeSerializer.java
@@ -0,0 +1,62 @@
+package org.apache.kylin.common.datatype;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.DateFormat;
+
+public class DateTimeSerializer extends DataTypeSerializer<LongMutable> {
+
+ // be thread-safe and avoid repeated obj creation
+ private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+
+ public DateTimeSerializer(DataType type) {
+ }
+
+ @Override
+ public void serialize(LongMutable value, ByteBuffer out) {
+ out.putLong(value.get());
+ }
+
+ private LongMutable current() {
+ LongMutable l = current.get();
+ if (l == null) {
+ l = new LongMutable();
+ current.set(l);
+ }
+ return l;
+ }
+
+ @Override
+ public LongMutable deserialize(ByteBuffer in) {
+ LongMutable l = current();
+ l.set(in.getLong());
+ return l;
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return 8;
+ }
+
+ @Override
+ public int maxLength() {
+ return 8;
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return 8;
+ }
+
+ @Override
+ public LongMutable valueOf(byte[] value) {
+ LongMutable l = current();
+ if (value == null)
+ l.set(0L);
+ else
+ l.set(DateFormat.stringToMillis(Bytes.toString(value)));
+ return l;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleMutable.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleMutable.java b/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleMutable.java
new file mode 100644
index 0000000..a32fbd8
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleMutable.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.datatype;
+
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+public class DoubleMutable implements Comparable<DoubleMutable>, Serializable {
+
+ private double v;
+
+ public DoubleMutable() {
+ this(0);
+ }
+
+ public DoubleMutable(double v) {
+ set(v);
+ }
+
+ public double get() {
+ return v;
+ }
+
+ public void set(double v) {
+ this.v = v;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof DoubleMutable)) {
+ return false;
+ }
+ DoubleMutable other = (DoubleMutable) o;
+ return this.v == other.v;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) Double.doubleToLongBits(v);
+ }
+
+ @Override
+ public int compareTo(DoubleMutable o) {
+ return (v < o.v ? -1 : (v == o.v ? 0 : 1));
+ }
+
+ @Override
+ public String toString() {
+ return Double.toString(v);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleSerializer.java b/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleSerializer.java
new file mode 100644
index 0000000..f128576
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleSerializer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.datatype;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.Bytes;
+
+/**
+ */
+public class DoubleSerializer extends DataTypeSerializer<DoubleMutable> {
+
+ // be thread-safe and avoid repeated obj creation
+ private ThreadLocal<DoubleMutable> current = new ThreadLocal<DoubleMutable>();
+
+ public DoubleSerializer(DataType type) {
+ }
+
+ @Override
+ public void serialize(DoubleMutable value, ByteBuffer out) {
+ out.putDouble(value.get());
+ }
+
+ private DoubleMutable current() {
+ DoubleMutable d = current.get();
+ if (d == null) {
+ d = new DoubleMutable();
+ current.set(d);
+ }
+ return d;
+ }
+
+ @Override
+ public DoubleMutable deserialize(ByteBuffer in) {
+ DoubleMutable d = current();
+ d.set(in.getDouble());
+ return d;
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return 8;
+ }
+
+ @Override
+ public int maxLength() {
+ return 8;
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return 8;
+ }
+
+ @Override
+ public DoubleMutable valueOf(byte[] value) {
+ DoubleMutable d = current();
+ if (value == null)
+ d.set(0d);
+ else
+ d.set(Double.parseDouble(Bytes.toString(value)));
+ return d;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/datatype/LongMutable.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/LongMutable.java b/core-common/src/main/java/org/apache/kylin/common/datatype/LongMutable.java
new file mode 100644
index 0000000..72b540c
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/datatype/LongMutable.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.datatype;
+
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+public class LongMutable implements Comparable<LongMutable>, Serializable {
+
+ private long v;
+
+ public LongMutable() {
+ this(0);
+ }
+
+ public LongMutable(long v) {
+ set(v);
+ }
+
+ public long get() {
+ return v;
+ }
+
+ public void set(long v) {
+ this.v = v;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof LongMutable)) {
+ return false;
+ }
+ LongMutable other = (LongMutable) o;
+ return this.v == other.v;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) v;
+ }
+
+ @Override
+ public int compareTo(LongMutable o) {
+ long thisValue = this.v;
+ long thatValue = o.v;
+ return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+ }
+
+ @Override
+ public String toString() {
+ return Long.toString(v);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/datatype/LongSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/LongSerializer.java b/core-common/src/main/java/org/apache/kylin/common/datatype/LongSerializer.java
new file mode 100644
index 0000000..5c5e10f
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/datatype/LongSerializer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.datatype;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ */
+public class LongSerializer extends DataTypeSerializer<LongMutable> {
+
+ // be thread-safe and avoid repeated obj creation
+ private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+
+ public LongSerializer(DataType type) {
+ }
+
+ @Override
+ public void serialize(LongMutable value, ByteBuffer out) {
+ BytesUtil.writeVLong(value.get(), out);
+ }
+
+ private LongMutable current() {
+ LongMutable l = current.get();
+ if (l == null) {
+ l = new LongMutable();
+ current.set(l);
+ }
+ return l;
+ }
+
+ @Override
+ public LongMutable deserialize(ByteBuffer in) {
+ LongMutable l = current();
+ l.set(BytesUtil.readVLong(in));
+ return l;
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ int mark = in.position();
+
+ BytesUtil.readVLong(in);
+ int len = in.position() - mark;
+
+ in.position(mark);
+ return len;
+ }
+
+ @Override
+ public int maxLength() {
+ return 9; // vlong: 1 + 8
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return 5;
+ }
+
+ @Override
+ public LongMutable valueOf(byte[] value) {
+ LongMutable l = current();
+ if (value == null)
+ l.set(0L);
+ else
+ l.set(Long.parseLong(Bytes.toString(value)));
+ return l;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/datatype/StringSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/StringSerializer.java b/core-common/src/main/java/org/apache/kylin/common/datatype/StringSerializer.java
new file mode 100644
index 0000000..b3562eb
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/datatype/StringSerializer.java
@@ -0,0 +1,54 @@
+package org.apache.kylin.common.datatype;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+
+public class StringSerializer extends DataTypeSerializer<String> {
+
+ final DataType type;
+ final int maxLength;
+
+ public StringSerializer(DataType type) {
+ this.type = type;
+ // see serialize(): 2 byte length, rest is String.toBytes()
+ this.maxLength = 2 + type.getPrecision();
+ }
+
+ @Override
+ public void serialize(String value, ByteBuffer out) {
+ int start = out.position();
+
+ BytesUtil.writeUTFString(value, out);
+
+ if (out.position() - start > maxLength)
+ throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
+ }
+
+ @Override
+ public String deserialize(ByteBuffer in) {
+ return BytesUtil.readUTFString(in);
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return BytesUtil.peekByteArrayLength(in);
+ }
+
+ @Override
+ public int maxLength() {
+ return maxLength;
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return maxLength;
+ }
+
+ @Override
+ public String valueOf(byte[] value) {
+ return Bytes.toString(value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
index 7304b8b..f3745d7 100644
--- a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
+++ b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
@@ -20,15 +20,10 @@ package org.apache.kylin.common.debug;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
*/
public class BackdoorToggles {
- private static final Logger logger = LoggerFactory.getLogger(BackdoorToggles.class);
-
private static final ThreadLocal<Map<String, String>> _backdoorToggles = new ThreadLocal<Map<String, String>>();
public static void setToggles(Map<String, String> toggles) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
index 1520ce1..4a3d6e8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
@@ -18,8 +18,6 @@
package org.apache.kylin.common.topn;
-import java.util.ConcurrentModificationException;
-import java.util.Iterator;
/**
* Modified from DoublyLinkedList.java in https://github.com/addthis/stream-lib
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java b/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java
deleted file mode 100644
index 520cd74..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.util;
-
-import java.io.Serializable;
-
-@SuppressWarnings("serial")
-public class DoubleMutable implements Comparable<DoubleMutable>, Serializable {
-
- private double v;
-
- public DoubleMutable() {
- this(0);
- }
-
- public DoubleMutable(double v) {
- set(v);
- }
-
- public double get() {
- return v;
- }
-
- public void set(double v) {
- this.v = v;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof DoubleMutable)) {
- return false;
- }
- DoubleMutable other = (DoubleMutable) o;
- return this.v == other.v;
- }
-
- @Override
- public int hashCode() {
- return (int) Double.doubleToLongBits(v);
- }
-
- @Override
- public int compareTo(DoubleMutable o) {
- return (v < o.v ? -1 : (v == o.v ? 0 : 1));
- }
-
- @Override
- public String toString() {
- return Double.toString(v);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java b/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java
deleted file mode 100644
index 238bb86..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.util;
-
-import java.io.Serializable;
-
-@SuppressWarnings("serial")
-public class LongMutable implements Comparable<LongMutable>, Serializable {
-
- private long v;
-
- public LongMutable() {
- this(0);
- }
-
- public LongMutable(long v) {
- set(v);
- }
-
- public long get() {
- return v;
- }
-
- public void set(long v) {
- this.v = v;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof LongMutable)) {
- return false;
- }
- LongMutable other = (LongMutable) o;
- return this.v == other.v;
- }
-
- @Override
- public int hashCode() {
- return (int) v;
- }
-
- @Override
- public int compareTo(LongMutable o) {
- long thisValue = this.v;
- long thatValue = o.v;
- return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
- }
-
- @Override
- public String toString() {
- return Long.toString(v);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/test/java/org/apache/kylin/common/datatype/BigDecimalSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/datatype/BigDecimalSerializerTest.java b/core-common/src/test/java/org/apache/kylin/common/datatype/BigDecimalSerializerTest.java
new file mode 100644
index 0000000..cba6795
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/datatype/BigDecimalSerializerTest.java
@@ -0,0 +1,51 @@
+package org.apache.kylin.common.datatype;
+
+import static org.junit.Assert.*;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ */
+public class BigDecimalSerializerTest {
+
+ private static BigDecimalSerializer bigDecimalSerializer;
+
+ @BeforeClass
+ public static void beforeClass() {
+ bigDecimalSerializer = new BigDecimalSerializer(DataType.getType("decimal"));
+ }
+
+ @Test
+ public void testNormal() {
+ BigDecimal input = new BigDecimal("1234.1234");
+ ByteBuffer buffer = ByteBuffer.allocate(256);
+ buffer.mark();
+ bigDecimalSerializer.serialize(input, buffer);
+ buffer.reset();
+ BigDecimal output = bigDecimalSerializer.deserialize(buffer);
+ assertEquals(input, output);
+ }
+
+ @Test
+ public void testScaleOutOfRange() {
+ BigDecimal input = new BigDecimal("1234.1234567890");
+ ByteBuffer buffer = ByteBuffer.allocate(256);
+ buffer.mark();
+ bigDecimalSerializer.serialize(input, buffer);
+ buffer.reset();
+ BigDecimal output = bigDecimalSerializer.deserialize(buffer);
+ assertEquals(input.setScale(bigDecimalSerializer.type.getScale(), BigDecimal.ROUND_HALF_EVEN), output);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testOutOfPrecision() {
+ BigDecimal input = new BigDecimal("66855344214907231736.4924");
+ ByteBuffer buffer = ByteBuffer.allocate(256);
+ bigDecimalSerializer.serialize(input, buffer);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java b/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java
deleted file mode 100644
index 193c5de..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package org.apache.kylin.aggregation;
-
-import java.util.List;
-
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-abstract public class Aggregation {
-
- /* ============================================================================
- * Define
- * ---------------------------------------------------------------------------- */
-
- abstract public DataType getAggregationDataType();
-
- abstract public DataType getResultDataType();
-
- abstract public void validate(MeasureDesc measureDesc) throws IllegalArgumentException;
-
- /* ============================================================================
- * Build
- * ---------------------------------------------------------------------------- */
-
- abstract public DataTypeSerializer<?> getSeralizer();
-
- abstract public MeasureAggregator<?> getAggregator();
-
- abstract public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc);
-
- abstract public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts);
-
- /* ============================================================================
- * Cube Selection
- * ---------------------------------------------------------------------------- */
-
- /* ============================================================================
- * Query
- * ---------------------------------------------------------------------------- */
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/AggregationType.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/AggregationType.java b/core-cube/src/main/java/org/apache/kylin/aggregation/AggregationType.java
new file mode 100644
index 0000000..81345b7
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/AggregationType.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.aggregation.basic.BasicAggregationFactory;
+import org.apache.kylin.aggregation.hllc.HLLCAggregationFactory;
+import org.apache.kylin.aggregation.topn.TopNAggregationFactory;
+import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.common.datatype.DataTypeSerializer;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Maps;
+
+abstract public class AggregationType {
+
+ private static final Map<String, IAggregationFactory> factoryRegistry = Maps.newConcurrentMap();
+ private static final IAggregationFactory defaultFactory = new BasicAggregationFactory();
+
+ static {
+ factoryRegistry.put(FunctionDesc.FUNC_COUNT_DISTINCT, new HLLCAggregationFactory());
+ factoryRegistry.put(FunctionDesc.FUNC_TOP_N, new TopNAggregationFactory());
+ }
+
+ public static AggregationType create(String funcName, String dataType) {
+ funcName = funcName.toUpperCase();
+ dataType = dataType.toLowerCase();
+
+ IAggregationFactory factory = factoryRegistry.get(funcName);
+ if (factory == null)
+ factory = defaultFactory;
+
+ AggregationType result = factory.createAggregationType(funcName, dataType);
+
+ // register serializer for aggr data type
+ DataType aggregationDataType = result.getAggregationDataType();
+ if (DataTypeSerializer.hasRegistered(aggregationDataType.getName()) == false) {
+ DataTypeSerializer.register(aggregationDataType.getName(), result.getAggregationDataSeralizer());
+ }
+
+ return result;
+ }
+
+ /* ============================================================================
+ * Define
+ * ---------------------------------------------------------------------------- */
+
+ abstract public DataType getAggregationDataType();
+
+ abstract public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer();
+
+ abstract public void validate(MeasureDesc measureDesc) throws IllegalArgumentException;
+
+ /* ============================================================================
+ * Build
+ * ---------------------------------------------------------------------------- */
+
+ abstract public MeasureAggregator<?> newAggregator();
+
+ abstract public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc);
+
+ abstract public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts);
+
+ /* ============================================================================
+ * Cube Selection
+ * ---------------------------------------------------------------------------- */
+
+ /* ============================================================================
+ * Query
+ * ---------------------------------------------------------------------------- */
+
+ /* ============================================================================
+ * Storage
+ * ---------------------------------------------------------------------------- */
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java
deleted file mode 100644
index df6833c..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation;
-
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.kylin.aggregation.basic.BigDecimalSerializer;
-import org.apache.kylin.aggregation.basic.DateTimeSerializer;
-import org.apache.kylin.aggregation.basic.DoubleSerializer;
-import org.apache.kylin.aggregation.basic.LongSerializer;
-import org.apache.kylin.aggregation.basic.StringSerializer;
-import org.apache.kylin.aggregation.hllc.HLLCSerializer;
-import org.apache.kylin.aggregation.topn.TopNCounterSerializer;
-import org.apache.kylin.common.util.BytesSerializer;
-import org.apache.kylin.metadata.model.DataType;
-
-import com.google.common.collect.Maps;
-
-/**
- * @author yangli9
- *
- * Note: the implementations MUST be thread-safe.
- *
- */
-abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
-
- final static Map<String, Class<?>> implementations;
- static {
- HashMap<String, Class<?>> impl = Maps.newHashMap();
- impl.put("varchar", StringSerializer.class);
- impl.put("decimal", BigDecimalSerializer.class);
- impl.put("double", DoubleSerializer.class);
- impl.put("float", DoubleSerializer.class);
- impl.put("bigint", LongSerializer.class);
- impl.put("long", LongSerializer.class);
- impl.put("integer", LongSerializer.class);
- impl.put("int", LongSerializer.class);
- impl.put("smallint", LongSerializer.class);
- impl.put("date", DateTimeSerializer.class);
- impl.put("datetime", DateTimeSerializer.class);
- impl.put("timestamp", DateTimeSerializer.class);
- implementations = Collections.unmodifiableMap(impl);
-
- }
-
- public static DataTypeSerializer<?> create(String dataType) {
- return create(DataType.getInstance(dataType));
- }
-
- public static DataTypeSerializer<?> create(DataType type) {
- if (type.isHLLC()) {
- return new HLLCSerializer(type);
- }
-
- if (type.isTopN()) {
- return new TopNCounterSerializer(type);
- }
-
- Class<?> clz = implementations.get(type.getName());
- if (clz == null)
- throw new RuntimeException("No MeasureSerializer for type " + type);
-
- try {
- return (DataTypeSerializer<?>) clz.getConstructor(DataType.class).newInstance(type);
- } catch (Exception e) {
- throw new RuntimeException(e); // never happen
- }
- }
-
- /** peek into buffer and return the length of serialization */
- abstract public int peekLength(ByteBuffer in);
-
- /** return the max number of bytes to the longest serialization */
- abstract public int maxLength();
-
- /** get an estimate of size in bytes of the serialized data */
- abstract public int getStorageBytesEstimate();
-
- /** convert from String to obj (string often come as byte[] in mapred) */
- abstract public T valueOf(byte[] value);
-
- /** convert from String to obj */
- public T valueOf(String value) {
- try {
- return valueOf(value.getBytes("UTF-8"));
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e); // never happen
- }
- }
-
- /** convert from obj to string */
- public String toString(T value) {
- if (value == null)
- return "NULL";
- else
- return value.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/IAggregationFactory.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/IAggregationFactory.java b/core-cube/src/main/java/org/apache/kylin/aggregation/IAggregationFactory.java
new file mode 100644
index 0000000..cbdd9da
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/IAggregationFactory.java
@@ -0,0 +1,6 @@
+package org.apache.kylin.aggregation;
+
+public interface IAggregationFactory {
+
+ public AggregationType createAggregationType(String funcName, String dataType);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java
index 9c8945d..6968a9d 100644
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java
@@ -18,21 +18,6 @@
package org.apache.kylin.aggregation;
-import org.apache.kylin.aggregation.basic.BigDecimalMaxAggregator;
-import org.apache.kylin.aggregation.basic.BigDecimalMinAggregator;
-import org.apache.kylin.aggregation.basic.BigDecimalSumAggregator;
-import org.apache.kylin.aggregation.basic.DoubleMaxAggregator;
-import org.apache.kylin.aggregation.basic.DoubleMinAggregator;
-import org.apache.kylin.aggregation.basic.DoubleSumAggregator;
-import org.apache.kylin.aggregation.basic.LongMaxAggregator;
-import org.apache.kylin.aggregation.basic.LongMinAggregator;
-import org.apache.kylin.aggregation.basic.LongSumAggregator;
-import org.apache.kylin.aggregation.hllc.HLLCAggregator;
-import org.apache.kylin.aggregation.hllc.LDCAggregator;
-import org.apache.kylin.aggregation.topn.TopNAggregator;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.model.FunctionDesc;
-
import java.io.Serializable;
/**
@@ -40,50 +25,8 @@ import java.io.Serializable;
@SuppressWarnings("serial")
abstract public class MeasureAggregator<V> implements Serializable {
- public static MeasureAggregator<?> create(String funcName, String returnType) {
- if (FunctionDesc.FUNC_SUM.equalsIgnoreCase(funcName) || FunctionDesc.FUNC_COUNT.equalsIgnoreCase(funcName)) {
- if (isInteger(returnType))
- return new LongSumAggregator();
- else if (isBigDecimal(returnType))
- return new BigDecimalSumAggregator();
- else if (isDouble(returnType))
- return new DoubleSumAggregator();
- } else if (FunctionDesc.FUNC_COUNT_DISTINCT.equalsIgnoreCase(funcName)) {
- DataType hllcType = DataType.getInstance(returnType);
- if (hllcType.isHLLC())
- return new HLLCAggregator(hllcType.getPrecision());
- else
- return new LDCAggregator();
- } else if (FunctionDesc.FUNC_MAX.equalsIgnoreCase(funcName)) {
- if (isInteger(returnType))
- return new LongMaxAggregator();
- else if (isBigDecimal(returnType))
- return new BigDecimalMaxAggregator();
- else if (isDouble(returnType))
- return new DoubleMaxAggregator();
- } else if (FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName)) {
- if (isInteger(returnType))
- return new LongMinAggregator();
- else if (isBigDecimal(returnType))
- return new BigDecimalMinAggregator();
- else if (isDouble(returnType))
- return new DoubleMinAggregator();
- } else if (FunctionDesc.FUNC_TOP_N.equalsIgnoreCase(funcName)) {
- return new TopNAggregator();
- }
- throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + returnType + "'");
- }
-
- public static boolean isBigDecimal(String type) {
- return type.startsWith("decimal");
- }
-
- public static boolean isDouble(String type) {
- return "double".equalsIgnoreCase(type) || "float".equalsIgnoreCase(type) || "real".equalsIgnoreCase(type);
- }
-
- public static boolean isInteger(String type) {
- return "long".equalsIgnoreCase(type) || "bigint".equalsIgnoreCase(type) || "int".equalsIgnoreCase(type) || "integer".equalsIgnoreCase(type);
+ public static MeasureAggregator<?> create(String funcName, String dataType) {
+ return AggregationType.create(funcName, dataType).newAggregator();
}
public static int guessBigDecimalMemBytes() {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java
index cbcb3a8..8721954 100644
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java
@@ -21,6 +21,7 @@ package org.apache.kylin.aggregation;
import java.nio.ByteBuffer;
import java.util.Collection;
+import org.apache.kylin.common.datatype.DataTypeSerializer;
import org.apache.kylin.metadata.model.MeasureDesc;
/**
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregation.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregation.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregation.java
new file mode 100644
index 0000000..7ea88bd
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregation.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import java.util.List;
+
+import org.apache.kylin.aggregation.AggregationType;
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.datatype.BigDecimalSerializer;
+import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.common.datatype.DataTypeSerializer;
+import org.apache.kylin.common.datatype.DateTimeSerializer;
+import org.apache.kylin.common.datatype.DoubleSerializer;
+import org.apache.kylin.common.datatype.LongSerializer;
+import org.apache.kylin.common.datatype.StringSerializer;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class BasicAggregation extends AggregationType {
+
+ private final String funcName;
+ private final DataType dataType;
+
+ public BasicAggregation(String funcName, String dataType) {
+ this.funcName = funcName;
+ this.dataType = DataType.getType(dataType);
+ }
+
+ @Override
+ public DataType getAggregationDataType() {
+ return dataType;
+ }
+
+ public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() {
+ if (dataType.isStringFamily())
+ return StringSerializer.class;
+ else if (dataType.isIntegerFamily())
+ return LongSerializer.class;
+ else if (dataType.isDecimal())
+ return BigDecimalSerializer.class;
+ else if (dataType.isNumberFamily())
+ return DoubleSerializer.class;
+ else if (dataType.isDateTimeFamily())
+ return DateTimeSerializer.class;
+ else
+ throw new IllegalArgumentException("No default serializer for type " + dataType);
+ }
+
+ @Override
+ public void validate(MeasureDesc measureDesc) throws IllegalArgumentException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public MeasureAggregator<?> newAggregator() {
+ if (isSum() || isCount()) {
+ if (dataType.isDecimal())
+ return new BigDecimalSumAggregator();
+ else if (dataType.isIntegerFamily())
+ return new LongSumAggregator();
+ else if (dataType.isNumberFamily())
+ return new DoubleSumAggregator();
+ } else if (isMax()) {
+ if (dataType.isDecimal())
+ return new BigDecimalMaxAggregator();
+ else if (dataType.isIntegerFamily())
+ return new LongMaxAggregator();
+ else if (dataType.isNumberFamily())
+ return new DoubleMaxAggregator();
+ } else if (isMin()) {
+ if (dataType.isDecimal())
+ return new BigDecimalMinAggregator();
+ else if (dataType.isIntegerFamily())
+ return new LongMinAggregator();
+ else if (dataType.isNumberFamily())
+ return new DoubleMinAggregator();
+ }
+ throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + dataType + "'");
+ }
+
+ private boolean isSum() {
+ return FunctionDesc.FUNC_SUM.equalsIgnoreCase(funcName);
+ }
+
+ private boolean isCount() {
+ return FunctionDesc.FUNC_COUNT.equalsIgnoreCase(funcName);
+ }
+
+ private boolean isMax() {
+ return FunctionDesc.FUNC_MAX.equalsIgnoreCase(funcName);
+ }
+
+ private boolean isMin() {
+ return FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName);
+ }
+
+ @Override
+ public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregationFactory.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregationFactory.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregationFactory.java
new file mode 100644
index 0000000..547d45f
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregationFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import org.apache.kylin.aggregation.AggregationType;
+import org.apache.kylin.aggregation.IAggregationFactory;
+
+public class BasicAggregationFactory implements IAggregationFactory {
+
+ @Override
+ public AggregationType createAggregationType(String funcName, String dataType) {
+ return new BasicAggregation(funcName, dataType);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java
deleted file mode 100644
index 9f7c3cf..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.aggregation.DataTypeSerializer;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.model.DataType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- *
- */
-public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
-
- private static final Logger logger = LoggerFactory.getLogger(BigDecimalSerializer.class);
-
- final DataType type;
- final int maxLength;
-
- int avoidVerbose = 0;
-
- public BigDecimalSerializer(DataType type) {
- this.type = type;
- // see serialize(): 1 byte scale, 1 byte length, assume every 2 digits takes 1 byte
- this.maxLength = 1 + 1 + (type.getPrecision() + 1) / 2;
- }
-
- @Override
- public void serialize(BigDecimal value, ByteBuffer out) {
- if (value.scale() > type.getScale()) {
- if (avoidVerbose % 10000 == 0) {
- logger.warn("value's scale has exceeded the " + type.getScale() + ", cut it off, to ensure encoded value do not exceed maxLength " + maxLength + " times:" + (avoidVerbose++));
- }
- value = value.setScale(type.getScale(), BigDecimal.ROUND_HALF_EVEN);
- }
- byte[] bytes = value.unscaledValue().toByteArray();
- if (bytes.length + 2 > maxLength) {
- throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
- }
-
- BytesUtil.writeVInt(value.scale(), out);
- BytesUtil.writeVInt(bytes.length, out);
- out.put(bytes);
- }
-
- @Override
- public BigDecimal deserialize(ByteBuffer in) {
- int scale = BytesUtil.readVInt(in);
- int n = BytesUtil.readVInt(in);
-
- byte[] bytes = new byte[n];
- in.get(bytes);
-
- return new BigDecimal(new BigInteger(bytes), scale);
- }
-
- @Override
- public int peekLength(ByteBuffer in) {
- int mark = in.position();
-
- @SuppressWarnings("unused")
- int scale = BytesUtil.readVInt(in);
- int n = BytesUtil.readVInt(in);
- int len = in.position() - mark + n;
-
- in.position(mark);
- return len;
- }
-
- @Override
- public int maxLength() {
- return maxLength;
- }
-
- @Override
- public int getStorageBytesEstimate() {
- return 8;
- }
-
- @Override
- public BigDecimal valueOf(byte[] value) {
- if (value == null)
- return new BigDecimal(0);
- else
- return new BigDecimal(Bytes.toString(value));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java
deleted file mode 100644
index 0bf4aba..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package org.apache.kylin.aggregation.basic;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.aggregation.DataTypeSerializer;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.LongMutable;
-import org.apache.kylin.metadata.model.DataType;
-
-public class DateTimeSerializer extends DataTypeSerializer<LongMutable> {
-
- // be thread-safe and avoid repeated obj creation
- private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
-
- public DateTimeSerializer(DataType type) {
- }
-
- @Override
- public void serialize(LongMutable value, ByteBuffer out) {
- out.putLong(value.get());
- }
-
- private LongMutable current() {
- LongMutable l = current.get();
- if (l == null) {
- l = new LongMutable();
- current.set(l);
- }
- return l;
- }
-
- @Override
- public LongMutable deserialize(ByteBuffer in) {
- LongMutable l = current();
- l.set(in.getLong());
- return l;
- }
-
- @Override
- public int peekLength(ByteBuffer in) {
- return 8;
- }
-
- @Override
- public int maxLength() {
- return 8;
- }
-
- @Override
- public int getStorageBytesEstimate() {
- return 8;
- }
-
- @Override
- public LongMutable valueOf(byte[] value) {
- LongMutable l = current();
- if (value == null)
- l.set(0L);
- else
- l.set(DateFormat.stringToMillis(Bytes.toString(value)));
- return l;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java
index f09614d..99896a6 100644
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java
@@ -19,7 +19,7 @@
package org.apache.kylin.aggregation.basic;
import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.util.DoubleMutable;
+import org.apache.kylin.common.datatype.DoubleMutable;
/**
*/
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
index b93c15c..7430c4e 100644
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
@@ -19,7 +19,7 @@
package org.apache.kylin.aggregation.basic;
import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.util.DoubleMutable;
+import org.apache.kylin.common.datatype.DoubleMutable;
/**
*/
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java
deleted file mode 100644
index f207054..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.aggregation.DataTypeSerializer;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.DoubleMutable;
-import org.apache.kylin.metadata.model.DataType;
-
-/**
- */
-public class DoubleSerializer extends DataTypeSerializer<DoubleMutable> {
-
- // be thread-safe and avoid repeated obj creation
- private ThreadLocal<DoubleMutable> current = new ThreadLocal<DoubleMutable>();
-
- public DoubleSerializer(DataType type) {
- }
-
- @Override
- public void serialize(DoubleMutable value, ByteBuffer out) {
- out.putDouble(value.get());
- }
-
- private DoubleMutable current() {
- DoubleMutable d = current.get();
- if (d == null) {
- d = new DoubleMutable();
- current.set(d);
- }
- return d;
- }
-
- @Override
- public DoubleMutable deserialize(ByteBuffer in) {
- DoubleMutable d = current();
- d.set(in.getDouble());
- return d;
- }
-
- @Override
- public int peekLength(ByteBuffer in) {
- return 8;
- }
-
- @Override
- public int maxLength() {
- return 8;
- }
-
- @Override
- public int getStorageBytesEstimate() {
- return 8;
- }
-
- @Override
- public DoubleMutable valueOf(byte[] value) {
- DoubleMutable d = current();
- if (value == null)
- d.set(0d);
- else
- d.set(Double.parseDouble(Bytes.toString(value)));
- return d;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
index 298cec6..6e66c1b 100644
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
@@ -19,7 +19,7 @@
package org.apache.kylin.aggregation.basic;
import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.util.DoubleMutable;
+import org.apache.kylin.common.datatype.DoubleMutable;
/**
*/
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
index 71d95f2..7fdf3d8 100644
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
@@ -19,7 +19,7 @@
package org.apache.kylin.aggregation.basic;
import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.util.LongMutable;
+import org.apache.kylin.common.datatype.LongMutable;
/**
*/
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
index d1e93f2..22ae865 100644
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
@@ -19,7 +19,7 @@
package org.apache.kylin.aggregation.basic;
import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.util.LongMutable;
+import org.apache.kylin.common.datatype.LongMutable;
/**
*/