You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/27 10:01:04 UTC
[09/13] incubator-kylin git commit: KYLIN-976 Add ingester;
Build part done, in-mem cube test pass
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
new file mode 100644
index 0000000..16563fa
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.hllc;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.MeasureAggregator;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounter> {
+
+ final int precision;
+ HyperLogLogPlusCounter sum = null;
+
+ public HLLCAggregator(int precision) {
+ this.precision = precision;
+ }
+
+ @Override
+ public void reset() {
+ sum = null;
+ }
+
+ @Override
+ public void aggregate(HyperLogLogPlusCounter value) {
+ if (sum == null)
+ sum = new HyperLogLogPlusCounter(value);
+ else
+ sum.merge(value);
+ }
+
+ @Override
+ public HyperLogLogPlusCounter getState() {
+ return sum;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ // 1024 + 60 returned by AggregationCacheMemSizeTest
+ return 8 // aggregator obj shell
+ + 4 // precision
+ + 8 // ref to HLLC
+ + 8 // HLLC obj shell
+ + 32 + (1 << precision); // HLLC internal
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
new file mode 100644
index 0000000..ee90818
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.hllc;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class HLLCMeasureType extends MeasureType {
+
+ private final DataType dataType;
+
+ public HLLCMeasureType(DataType dataType) {
+ if ("hllc".equals(dataType.getName()) == false)
+ throw new IllegalArgumentException();
+
+ this.dataType = dataType;
+
+ if (this.dataType.getPrecision() < 10 || this.dataType.getPrecision() > 16)
+ throw new IllegalArgumentException("HLLC precision must be between 10 and 16");
+ }
+
+ @Override
+ public DataType getAggregationDataType() {
+ return dataType;
+ }
+
+ @Override
+ public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() {
+ return HLLCSerializer.class;
+ }
+
+ @Override
+ public void validate(MeasureDesc measureDesc) throws IllegalArgumentException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public MeasureIngester<?> newIngester() {
+ return new MeasureIngester<HyperLogLogPlusCounter>() {
+ HyperLogLogPlusCounter current = new HyperLogLogPlusCounter(dataType.getPrecision());
+
+ @Override
+ public HyperLogLogPlusCounter valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+ HyperLogLogPlusCounter hllc = current;
+ hllc.clear();
+ for (String v : values)
+ hllc.add(v == null ? "__nUlL__" : v);
+ return hllc;
+ }
+ };
+ }
+
+ @Override
+ public MeasureAggregator<?> newAggregator() {
+ if (dataType.isHLLC())
+ return new HLLCAggregator(dataType.getPrecision());
+ else
+ return new LDCAggregator();
+ }
+
+ @Override
+ public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
new file mode 100644
index 0000000..7131201
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.hllc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+
+/**
+ * @author yangli9
+ *
+ */
+public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> {
+
+ // be thread-safe and avoid repeated obj creation
+ private ThreadLocal<HyperLogLogPlusCounter> current = new ThreadLocal<HyperLogLogPlusCounter>();
+
+ private int precision;
+
+ public HLLCSerializer(DataType type) {
+ this.precision = type.getPrecision();
+ }
+
+ @Override
+ public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) {
+ try {
+ value.writeRegisters(out);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private HyperLogLogPlusCounter current() {
+ HyperLogLogPlusCounter hllc = current.get();
+ if (hllc == null) {
+ hllc = new HyperLogLogPlusCounter(precision);
+ current.set(hllc);
+ }
+ return hllc;
+ }
+
+ @Override
+ public HyperLogLogPlusCounter deserialize(ByteBuffer in) {
+ HyperLogLogPlusCounter hllc = current();
+ try {
+ hllc.readRegisters(in);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return hllc;
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return current().peekLength(in);
+ }
+
+ @Override
+ public int maxLength() {
+ return current().maxLength();
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return current().maxLength();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/LDCAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/LDCAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/LDCAggregator.java
new file mode 100644
index 0000000..5d96450
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/LDCAggregator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.hllc;
+
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.LongMutable;
+
+/**
+ * Long Distinct Count
+ */
+@SuppressWarnings("serial")
+public class LDCAggregator extends MeasureAggregator<LongMutable> {
+
+ private static LongMutable ZERO = new LongMutable(0);
+
+ private HLLCAggregator hllAgg = null;
+ private LongMutable state = new LongMutable(0);
+
+ @SuppressWarnings("rawtypes")
+ public void setDependentAggregator(MeasureAggregator agg) {
+ this.hllAgg = (HLLCAggregator) agg;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void aggregate(LongMutable value) {
+ }
+
+ @Override
+ public LongMutable getState() {
+ if (hllAgg == null) {
+ return ZERO;
+ } else {
+ state.set(hllAgg.getState().getCountEstimate());
+ return state;
+ }
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessLongMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
new file mode 100644
index 0000000..9b4c893
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.topn;
+
+import java.util.Map;
+
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.measure.MeasureAggregator;
+
+import com.google.common.collect.Maps;
+
+/**
+ *
+ */
+@SuppressWarnings("serial")
+public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
+
+ int capacity = 0;
+ TopNCounter<ByteArray> sum = null;
+ Map<ByteArray, Double> sanityCheckMap;
+
+ @Override
+ public void reset() {
+ sum = null;
+ }
+
+ @Override
+ public void aggregate(TopNCounter<ByteArray> value) {
+ if (sum == null) {
+ capacity = value.getCapacity();
+ sum = new TopNCounter<>(capacity);
+ sanityCheckMap = Maps.newHashMap();
+ }
+ sum.merge(value);
+ }
+
+ @Override
+ public TopNCounter<ByteArray> getState() {
+
+ //sum.retain(capacity);
+ return sum;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return 8 * capacity / 4;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
new file mode 100644
index 0000000..b422316
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.topn;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.DoubleDeltaSerializer;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+
+/**
+ *
+ */
+public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteArray>> {
+
+ private DoubleDeltaSerializer dds = new DoubleDeltaSerializer(3);
+
+ private int precision;
+
+ public TopNCounterSerializer(DataType dataType) {
+ this.precision = dataType.getPrecision();
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ int mark = in.position();
+ @SuppressWarnings("unused")
+ int capacity = in.getInt();
+ int size = in.getInt();
+ int keyLength = in.getInt();
+ dds.deserialize(in);
+ int len = in.position() - mark + keyLength * size;
+ in.position(mark);
+ return len;
+ }
+
+ @Override
+ public int maxLength() {
+ return precision * TopNCounter.EXTRA_SPACE_RATE * (4 + 8);
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return precision * TopNCounter.EXTRA_SPACE_RATE * 8;
+ }
+
+ @Override
+ public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
+ double[] counters = value.getCounters();
+ List<ByteArray> peek = value.peek(1);
+ int keyLength = peek.size() > 0 ? peek.get(0).length() : 0;
+ out.putInt(value.getCapacity());
+ out.putInt(value.size());
+ out.putInt(keyLength);
+ dds.serialize(counters, out);
+ Iterator<Counter<ByteArray>> iterator = value.iterator();
+ while (iterator.hasNext()) {
+ out.put(iterator.next().getItem().array());
+ }
+ }
+
+ @Override
+ public TopNCounter<ByteArray> deserialize(ByteBuffer in) {
+ int capacity = in.getInt();
+ int size = in.getInt();
+ int keyLength = in.getInt();
+ double[] counters = dds.deserialize(in);
+
+ TopNCounter<ByteArray> counter = new TopNCounter<ByteArray>(capacity);
+ ByteArray byteArray;
+ for (int i = 0; i < size; i++) {
+ byteArray = new ByteArray(keyLength);
+ in.get(byteArray.array());
+ counter.offerToHead(byteArray, counters[i]);
+ }
+
+ return counter;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureFactory.java
new file mode 100644
index 0000000..1ceb607
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.topn;
+
+import org.apache.kylin.measure.IMeasureFactory;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.FunctionDesc;
+
+public class TopNMeasureFactory implements IMeasureFactory {
+
+ @Override
+ public MeasureType createMeasureType(String funcName, String dataType) {
+ if (FunctionDesc.FUNC_TOP_N.equalsIgnoreCase(funcName) == false)
+ throw new IllegalArgumentException();
+
+ return new TopNMeasureType(DataType.getType(dataType));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
new file mode 100644
index 0000000..1d2c87b
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.topn;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.hllc.HLLCSerializer;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class TopNMeasureType extends MeasureType {
+
+ private final DataType dataType;
+
+ public TopNMeasureType(DataType dataType) {
+ if ("topn".equals(dataType.getName()) == false)
+ throw new IllegalArgumentException();
+
+ this.dataType = dataType;
+
+ if (this.dataType.getPrecision() < 1 || this.dataType.getPrecision() > 1000)
+ throw new IllegalArgumentException("TopN precision must be between 1 and 1000");
+ }
+
+ @Override
+ public DataType getAggregationDataType() {
+ return dataType;
+ }
+
+ @Override
+ public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() {
+ return HLLCSerializer.class;
+ }
+
+ @Override
+ public void validate(MeasureDesc measureDesc) throws IllegalArgumentException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public MeasureIngester<?> newIngester() {
+ return new MeasureIngester<TopNCounter>() {
+ @Override
+ public TopNCounter valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+ if (values.length != 2)
+ throw new IllegalArgumentException();
+
+ double counter = values[0] == null ? 0 : Double.parseDouble(values[0]);
+ String literal = values[1];
+
+ // encode literal using dictionary
+ TblColRef literalCol = measureDesc.getFunction().getTopNLiteralColumn();
+ Dictionary<String> dictionary = dictionaryMap.get(literalCol);
+ int keyEncodedValue = dictionary.getIdFromValue(literal);
+
+ ByteArray key = new ByteArray(dictionary.getSizeOfId());
+ BytesUtil.writeUnsigned(keyEncodedValue, key.array(), 0, dictionary.getSizeOfId());
+
+ TopNCounter<ByteArray> topNCounter = new TopNCounter<ByteArray>(dataType.getPrecision() * TopNCounter.EXTRA_SPACE_RATE);
+ topNCounter.offer(key, counter);
+ return topNCounter;
+ }
+ };
+ }
+
+ @Override
+ public MeasureAggregator<?> newAggregator() {
+ return new TopNAggregator();
+ }
+
+ @Override
+ public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
new file mode 100644
index 0000000..134f305
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.BytesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ *
+ */
+public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
+
+ private static final Logger logger = LoggerFactory.getLogger(BigDecimalSerializer.class);
+
+ final DataType type;
+ final int maxLength;
+
+ int avoidVerbose = 0;
+
+ public BigDecimalSerializer(DataType type) {
+ this.type = type;
+ // see serialize(): 1 byte scale, 1 byte length, assume every 2 digits takes 1 byte
+ this.maxLength = 1 + 1 + (type.getPrecision() + 1) / 2;
+ }
+
+ @Override
+ public void serialize(BigDecimal value, ByteBuffer out) {
+ if (value.scale() > type.getScale()) {
+ if (avoidVerbose % 10000 == 0) {
+ logger.warn("value's scale has exceeded the " + type.getScale() + ", cut it off, to ensure encoded value do not exceed maxLength " + maxLength + " times:" + (avoidVerbose++));
+ }
+ value = value.setScale(type.getScale(), BigDecimal.ROUND_HALF_EVEN);
+ }
+ byte[] bytes = value.unscaledValue().toByteArray();
+ if (bytes.length + 2 > maxLength) {
+ throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
+ }
+
+ BytesUtil.writeVInt(value.scale(), out);
+ BytesUtil.writeVInt(bytes.length, out);
+ out.put(bytes);
+ }
+
+ @Override
+ public BigDecimal deserialize(ByteBuffer in) {
+ int scale = BytesUtil.readVInt(in);
+ int n = BytesUtil.readVInt(in);
+
+ byte[] bytes = new byte[n];
+ in.get(bytes);
+
+ return new BigDecimal(new BigInteger(bytes), scale);
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ int mark = in.position();
+
+ @SuppressWarnings("unused")
+ int scale = BytesUtil.readVInt(in);
+ int n = BytesUtil.readVInt(in);
+ int len = in.position() - mark + n;
+
+ in.position(mark);
+ return len;
+ }
+
+ @Override
+ public int maxLength() {
+ return maxLength;
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return 8;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
new file mode 100644
index 0000000..235c99f
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class DataType implements Serializable {
+
+ // standard sql types, ref: http://www.w3schools.com/sql/sql_datatypes_general.asp
+ public static final String VALID_TYPES_STRING = "any|char|varchar|boolean|binary" //
+ + "|integer|tinyint|smallint|bigint|decimal|numeric|float|real|double" //
+ + "|date|time|datetime|timestamp|byte|int|short|long|string|hllc|topn";
+
+ private static final String TYPE_PATTEN_TAIL = "\\s*" //
+ + "(?:" + "[(]" + "([\\d\\s,]+)" + "[)]" + ")?";
+
+ private static final Pattern TYPE_PATTERN = Pattern.compile( //
+ "(" + VALID_TYPES_STRING + ")" + TYPE_PATTEN_TAIL, Pattern.CASE_INSENSITIVE);
+
+ public static final Set<String> INTEGER_FAMILY = new HashSet<String>();
+ public static final Set<String> NUMBER_FAMILY = new HashSet<String>();
+ public static final Set<String> DATETIME_FAMILY = new HashSet<String>();
+ public static final Set<String> STRING_FAMILY = new HashSet<String>();
+ private static final Map<String, String> LEGACY_TYPE_MAP = new HashMap<String, String>();
+ static {
+ INTEGER_FAMILY.add("tinyint");
+ INTEGER_FAMILY.add("smallint");
+ INTEGER_FAMILY.add("integer");
+ INTEGER_FAMILY.add("bigint");
+
+ NUMBER_FAMILY.addAll(INTEGER_FAMILY);
+ NUMBER_FAMILY.add("float");
+ NUMBER_FAMILY.add("double");
+ NUMBER_FAMILY.add("decimal");
+ NUMBER_FAMILY.add("real");
+ NUMBER_FAMILY.add("numeric");
+
+ DATETIME_FAMILY.add("date");
+ DATETIME_FAMILY.add("time");
+ DATETIME_FAMILY.add("datetime");
+ DATETIME_FAMILY.add("timestamp");
+
+ STRING_FAMILY.add("varchar");
+ STRING_FAMILY.add("char");
+
+ LEGACY_TYPE_MAP.put("byte", "tinyint");
+ LEGACY_TYPE_MAP.put("int", "integer");
+ LEGACY_TYPE_MAP.put("short", "smallint");
+ LEGACY_TYPE_MAP.put("long", "bigint");
+ LEGACY_TYPE_MAP.put("string", "varchar");
+ LEGACY_TYPE_MAP.put("hllc10", "hllc(10)");
+ LEGACY_TYPE_MAP.put("hllc12", "hllc(12)");
+ LEGACY_TYPE_MAP.put("hllc14", "hllc(14)");
+ LEGACY_TYPE_MAP.put("hllc15", "hllc(15)");
+ LEGACY_TYPE_MAP.put("hllc16", "hllc(16)");
+ }
+
+ private static final ConcurrentMap<DataType, DataType> CACHE = new ConcurrentHashMap<DataType, DataType>();
+
+ public static final DataType ANY = DataType.getType("any");
+
+ public static DataType getType(String type) {
+ if (type == null)
+ return null;
+
+ DataType dataType = new DataType(type);
+ DataType cached = CACHE.get(dataType);
+ if (cached == null) {
+ CACHE.put(dataType, dataType);
+ cached = dataType;
+ }
+ return cached;
+ }
+
+ // ============================================================================
+
+ private String name;
+ private int precision;
+ private int scale;
+
+ private DataType(String datatype) {
+ datatype = datatype.trim().toLowerCase();
+ datatype = replaceLegacy(datatype);
+
+ Pattern pattern = TYPE_PATTERN;
+ Matcher m = pattern.matcher(datatype);
+ if (m.matches() == false)
+ throw new IllegalArgumentException("bad data type -- " + datatype + ", does not match " + pattern);
+
+ name = replaceLegacy(m.group(1));
+ precision = -1;
+ scale = -1;
+
+ String leftover = m.group(2);
+ if (leftover != null) {
+ String[] parts = leftover.split("\\s*,\\s*");
+ for (int i = 0; i < parts.length; i++) {
+ int n;
+ try {
+ n = Integer.parseInt(parts[i]);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("bad data type -- " + datatype + ", precision/scale not numeric");
+ }
+ if (i == 0)
+ precision = n;
+ else if (i == 1)
+ scale = n;
+ else
+ throw new IllegalArgumentException("bad data type -- " + datatype + ", too many precision/scale parts");
+ }
+ }
+
+ // FIXME 256 for unknown string precision
+ if ((name.equals("char") || name.equals("varchar")) && precision == -1) {
+ precision = 256; // to save memory at frontend, e.g. tableau will
+ // allocate memory according to this
+ }
+
+ // FIXME (19,4) for unknown decimal precision
+ if ((name.equals("decimal") || name.equals("numeric")) && precision == -1) {
+ precision = 19;
+ scale = 4;
+ }
+ }
+
+ private String replaceLegacy(String str) {
+ String replace = LEGACY_TYPE_MAP.get(str);
+ return replace == null ? str : replace;
+ }
+
+ public int getStorageBytesEstimate() {
+ return DataTypeSerializer.create(this).getStorageBytesEstimate();
+ }
+
+ public boolean isStringFamily() {
+ return STRING_FAMILY.contains(name);
+ }
+
+ public boolean isIntegerFamily() {
+ return INTEGER_FAMILY.contains(name);
+ }
+
+ public boolean isNumberFamily() {
+ return NUMBER_FAMILY.contains(name);
+ }
+
+ public boolean isDateTimeFamily() {
+ return DATETIME_FAMILY.contains(name);
+ }
+
+ public boolean isDate() {
+ return name.equals("date");
+ }
+
+ public boolean isTime() {
+ return name.equals("time");
+ }
+
+ public boolean isTimestamp() {
+ return name.equals("timestamp");
+ }
+
+ public boolean isDatetime() {
+ return name.equals("datetime");
+ }
+
+ public boolean isTinyInt() {
+ return name.equals("tinyint");
+ }
+
+ public boolean isSmallInt() {
+ return name.equals("smallint");
+ }
+
+ public boolean isInt() {
+ return name.equals("integer");
+ }
+
+ public boolean isBigInt() {
+ return name.equals("bigint");
+ }
+
+ public boolean isFloat() {
+ return name.equals("float");
+ }
+
+ public boolean isDouble() {
+ return name.equals("double");
+ }
+
+ public boolean isDecimal() {
+ return name.equals("decimal");
+ }
+
+ public boolean isHLLC() {
+ return name.equals("hllc");
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getPrecision() {
+ return precision;
+ }
+
+ public int getScale() {
+ return scale;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + precision;
+ result = prime * result + scale;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ DataType other = (DataType) obj;
+ if (name == null) {
+ if (other.name != null)
+ return false;
+ } else if (!name.equals(other.name))
+ return false;
+ if (precision != other.precision)
+ return false;
+ if (scale != other.scale)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ if (precision < 0 && scale < 0)
+ return name;
+ else if (scale < 0)
+ return name + "(" + precision + ")";
+ else
+ return name + "(" + precision + "," + scale + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
new file mode 100644
index 0000000..fd3121f
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.util.BytesSerializer;
+import org.apache.kylin.measure.hllc.HLLCSerializer;
+import org.apache.kylin.measure.topn.TopNCounterSerializer;
+
+import com.google.common.collect.Maps;
+
+/**
+ * @author yangli9
+ *
+ * Note: the implementations MUST be thread-safe.
+ */
+abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
+
+ final static Map<String, Class<?>> implementations;
+ static {
+ HashMap<String, Class<?>> impl = Maps.newHashMap();
+ impl.put("varchar", StringSerializer.class);
+ impl.put("decimal", BigDecimalSerializer.class);
+ impl.put("double", DoubleSerializer.class);
+ impl.put("float", DoubleSerializer.class);
+ impl.put("bigint", LongSerializer.class);
+ impl.put("long", LongSerializer.class);
+ impl.put("integer", LongSerializer.class);
+ impl.put("int", LongSerializer.class);
+ impl.put("smallint", LongSerializer.class);
+ impl.put("date", DateTimeSerializer.class);
+ impl.put("datetime", DateTimeSerializer.class);
+ impl.put("timestamp", DateTimeSerializer.class);
+ impl.put("topn", TopNCounterSerializer.class);
+ impl.put("hllc", HLLCSerializer.class);
+ implementations = Collections.unmodifiableMap(impl);
+ }
+
+ public static boolean hasRegistered(String dataTypeName) {
+ return implementations.containsKey(dataTypeName);
+ }
+
+ public static void register(String dataTypeName, Class<? extends DataTypeSerializer<?>> impl) {
+ implementations.put(dataTypeName, impl);
+ }
+
+ public static DataTypeSerializer<?> create(String dataType) {
+ return create(DataType.getType(dataType));
+ }
+
+ public static DataTypeSerializer<?> create(DataType type) {
+ Class<?> clz = implementations.get(type.getName());
+ if (clz == null)
+ throw new RuntimeException("No DataTypeSerializer for type " + type);
+
+ try {
+ return (DataTypeSerializer<?>) clz.getConstructor(DataType.class).newInstance(type);
+ } catch (Exception e) {
+ throw new RuntimeException(e); // never happen
+ }
+ }
+
+ /** peek into buffer and return the length of serialization */
+ abstract public int peekLength(ByteBuffer in);
+
+ /** return the max number of bytes to the longest serialization */
+ abstract public int maxLength();
+
+ /** get an estimate of size in bytes of the serialized data */
+ abstract public int getStorageBytesEstimate();
+
+ /** convert from obj to string */
+ public String toString(T value) {
+ if (value == null)
+ return "NULL";
+ else
+ return value.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java
new file mode 100644
index 0000000..4f7935c
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java
@@ -0,0 +1,49 @@
+package org.apache.kylin.metadata.datatype;
+
+import java.nio.ByteBuffer;
+
+public class DateTimeSerializer extends DataTypeSerializer<LongMutable> {
+
+ // be thread-safe and avoid repeated obj creation
+ private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+
+ public DateTimeSerializer(DataType type) {
+ }
+
+ @Override
+ public void serialize(LongMutable value, ByteBuffer out) {
+ out.putLong(value.get());
+ }
+
+ private LongMutable current() {
+ LongMutable l = current.get();
+ if (l == null) {
+ l = new LongMutable();
+ current.set(l);
+ }
+ return l;
+ }
+
+ @Override
+ public LongMutable deserialize(ByteBuffer in) {
+ LongMutable l = current();
+ l.set(in.getLong());
+ return l;
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return 8;
+ }
+
+ @Override
+ public int maxLength() {
+ return 8;
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return 8;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleMutable.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleMutable.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleMutable.java
new file mode 100644
index 0000000..5bd75e8
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleMutable.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+public class DoubleMutable implements Comparable<DoubleMutable>, Serializable {
+
+ private double v;
+
+ public DoubleMutable() {
+ this(0);
+ }
+
+ public DoubleMutable(double v) {
+ set(v);
+ }
+
+ public double get() {
+ return v;
+ }
+
+ public void set(double v) {
+ this.v = v;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof DoubleMutable)) {
+ return false;
+ }
+ DoubleMutable other = (DoubleMutable) o;
+ return this.v == other.v;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) Double.doubleToLongBits(v);
+ }
+
+ @Override
+ public int compareTo(DoubleMutable o) {
+ return (v < o.v ? -1 : (v == o.v ? 0 : 1));
+ }
+
+ @Override
+ public String toString() {
+ return Double.toString(v);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
new file mode 100644
index 0000000..5accf1d
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.nio.ByteBuffer;
+
+/**
+ */
+public class DoubleSerializer extends DataTypeSerializer<DoubleMutable> {
+
+ // be thread-safe and avoid repeated obj creation
+ private ThreadLocal<DoubleMutable> current = new ThreadLocal<DoubleMutable>();
+
+ public DoubleSerializer(DataType type) {
+ }
+
+ @Override
+ public void serialize(DoubleMutable value, ByteBuffer out) {
+ out.putDouble(value.get());
+ }
+
+ private DoubleMutable current() {
+ DoubleMutable d = current.get();
+ if (d == null) {
+ d = new DoubleMutable();
+ current.set(d);
+ }
+ return d;
+ }
+
+ @Override
+ public DoubleMutable deserialize(ByteBuffer in) {
+ DoubleMutable d = current();
+ d.set(in.getDouble());
+ return d;
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return 8;
+ }
+
+ @Override
+ public int maxLength() {
+ return 8;
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return 8;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongMutable.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongMutable.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongMutable.java
new file mode 100644
index 0000000..b978049
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongMutable.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+public class LongMutable implements Comparable<LongMutable>, Serializable {
+
+ private long v;
+
+ public LongMutable() {
+ this(0);
+ }
+
+ public LongMutable(long v) {
+ set(v);
+ }
+
+ public long get() {
+ return v;
+ }
+
+ public void set(long v) {
+ this.v = v;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof LongMutable)) {
+ return false;
+ }
+ LongMutable other = (LongMutable) o;
+ return this.v == other.v;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) v;
+ }
+
+ @Override
+ public int compareTo(LongMutable o) {
+ long thisValue = this.v;
+ long thatValue = o.v;
+ return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+ }
+
+ @Override
+ public String toString() {
+ return Long.toString(v);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
new file mode 100644
index 0000000..777f494
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ */
+public class LongSerializer extends DataTypeSerializer<LongMutable> {
+
+ // be thread-safe and avoid repeated obj creation
+ private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+
+ public LongSerializer(DataType type) {
+ }
+
+ @Override
+ public void serialize(LongMutable value, ByteBuffer out) {
+ BytesUtil.writeVLong(value.get(), out);
+ }
+
+ private LongMutable current() {
+ LongMutable l = current.get();
+ if (l == null) {
+ l = new LongMutable();
+ current.set(l);
+ }
+ return l;
+ }
+
+ @Override
+ public LongMutable deserialize(ByteBuffer in) {
+ LongMutable l = current();
+ l.set(BytesUtil.readVLong(in));
+ return l;
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ int mark = in.position();
+
+ BytesUtil.readVLong(in);
+ int len = in.position() - mark;
+
+ in.position(mark);
+ return len;
+ }
+
+ @Override
+ public int maxLength() {
+ return 9; // vlong: 1 + 8
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return 5;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/StringSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/StringSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/StringSerializer.java
new file mode 100644
index 0000000..14c8909
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/StringSerializer.java
@@ -0,0 +1,48 @@
+package org.apache.kylin.metadata.datatype;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.BytesUtil;
+
+public class StringSerializer extends DataTypeSerializer<String> {
+
+ final DataType type;
+ final int maxLength;
+
+ public StringSerializer(DataType type) {
+ this.type = type;
+ // see serialize(): 2 byte length, rest is String.toBytes()
+ this.maxLength = 2 + type.getPrecision();
+ }
+
+ @Override
+ public void serialize(String value, ByteBuffer out) {
+ int start = out.position();
+
+ BytesUtil.writeUTFString(value, out);
+
+ if (out.position() - start > maxLength)
+ throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
+ }
+
+ @Override
+ public String deserialize(ByteBuffer in) {
+ return BytesUtil.readUTFString(in);
+ }
+
+ @Override
+ public int peekLength(ByteBuffer in) {
+ return BytesUtil.peekByteArrayLength(in);
+ }
+
+ @Override
+ public int maxLength() {
+ return maxLength;
+ }
+
+ @Override
+ public int getStorageBytesEstimate() {
+ return maxLength;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
index 0e687f7..b894fd3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
@@ -5,8 +5,8 @@ import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;
-import org.apache.kylin.common.datatype.DataType;
import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.TblColRef;
import com.google.common.collect.Maps;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
index f03e736..2ddc75a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
@@ -21,7 +21,7 @@ package org.apache.kylin.metadata.model;
import java.io.Serializable;
import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataType;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index d22d0a3..0c36873 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -21,7 +21,7 @@ package org.apache.kylin.metadata.model;
import java.util.ArrayList;
import java.util.Collection;
-import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataType;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
@@ -171,6 +171,14 @@ public class FunctionDesc {
this.parameter = parameter;
}
+ public int getParameterCount() {
+ int count = 0;
+ for (ParameterDesc p = parameter; p != null; p = p.getNextParameter()) {
+ count++;
+ }
+ return count;
+ }
+
public DataType getSQLType() {
if (isCountDistinct() || isTopN())
return DataType.ANY;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
index 618d25a..1561b1f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
@@ -18,9 +18,6 @@
package org.apache.kylin.metadata.model;
-import java.util.Collections;
-import java.util.List;
-
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -40,15 +37,6 @@ public class MeasureDesc {
@JsonProperty("dependent_measure_ref")
private String dependentMeasureRef;
- public List<TblColRef> getColumnsNeedDictionary() {
- // measure could store literal values using dictionary encoding to save space, like TopN
- if (function.isTopN()) {
- return Collections.singletonList(function.getTopNLiteralColumn());
- } else {
- return Collections.emptyList();
- }
- }
-
public int getId() {
return id;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
index de27145..61ba73b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
@@ -21,7 +21,7 @@ package org.apache.kylin.metadata.model;
import java.io.Serializable;
import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataType;
/**
*/
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
index 4a8c5d1..e52bf3b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
@@ -1,7 +1,7 @@
package org.apache.kylin.metadata.realization;
-import org.apache.kylin.common.datatype.DataType;
import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
new file mode 100644
index 0000000..6d1a7b8
--- /dev/null
+++ b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
@@ -0,0 +1,46 @@
+package org.apache.kylin.aggregation.topn;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.measure.topn.TopNCounterSerializer;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class TopNCounterSerializerTest {
+
+ private static TopNCounterSerializer serializer = new TopNCounterSerializer(DataType.getType("topn(10)"));
+
+ @Test
+ public void testSerialization() {
+ TopNCounter<ByteArray> vs = new TopNCounter<ByteArray>(50);
+ Integer[] stream = { 1, 1, 2, 9, 1, 2, 3, 7, 7, 1, 3, 1, 1 };
+ for (Integer i : stream) {
+ vs.offer(new ByteArray(Bytes.toBytes(i)));
+ }
+
+ ByteBuffer out = ByteBuffer.allocate(1024);
+ serializer.serialize(vs, out);
+
+ byte[] copyBytes = new byte[out.position()];
+ System.arraycopy(out.array(), 0, copyBytes, 0, out.position());
+
+ ByteBuffer in = ByteBuffer.wrap(copyBytes);
+ TopNCounter<ByteArray> vsNew = serializer.deserialize(in);
+
+ Assert.assertEquals(vs.toString(), vsNew.toString());
+
+ }
+
+ @Test
+ public void testValueOf() {
+ // FIXME need a good unit test for valueOf()
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/BigDecimalSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/BigDecimalSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/BigDecimalSerializerTest.java
new file mode 100644
index 0000000..f920ee7
--- /dev/null
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/BigDecimalSerializerTest.java
@@ -0,0 +1,53 @@
+package org.apache.kylin.metadata.datatype;
+
+import static org.junit.Assert.*;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.metadata.datatype.BigDecimalSerializer;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ */
+public class BigDecimalSerializerTest {
+
+ private static BigDecimalSerializer bigDecimalSerializer;
+
+ @BeforeClass
+ public static void beforeClass() {
+ bigDecimalSerializer = new BigDecimalSerializer(DataType.getType("decimal"));
+ }
+
+ @Test
+ public void testNormal() {
+ BigDecimal input = new BigDecimal("1234.1234");
+ ByteBuffer buffer = ByteBuffer.allocate(256);
+ buffer.mark();
+ bigDecimalSerializer.serialize(input, buffer);
+ buffer.reset();
+ BigDecimal output = bigDecimalSerializer.deserialize(buffer);
+ assertEquals(input, output);
+ }
+
+ @Test
+ public void testScaleOutOfRange() {
+ BigDecimal input = new BigDecimal("1234.1234567890");
+ ByteBuffer buffer = ByteBuffer.allocate(256);
+ buffer.mark();
+ bigDecimalSerializer.serialize(input, buffer);
+ buffer.reset();
+ BigDecimal output = bigDecimalSerializer.deserialize(buffer);
+ assertEquals(input.setScale(bigDecimalSerializer.type.getScale(), BigDecimal.ROUND_HALF_EVEN), output);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testOutOfPrecision() {
+ BigDecimal input = new BigDecimal("66855344214907231736.4924");
+ ByteBuffer buffer = ByteBuffer.allocate(256);
+ bigDecimalSerializer.serialize(input, buffer);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
index fbd6f97..0dc1afa 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
@@ -23,8 +23,8 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.kv.RowKeyColumnOrder;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
import org.apache.kylin.metadata.model.TblColRef;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
index 4f011cf..3bca687 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
@@ -24,11 +24,11 @@ import java.util.List;
import net.sf.ehcache.pool.sizeof.annotations.IgnoreSizeOf;
-import org.apache.kylin.common.datatype.DoubleMutable;
-import org.apache.kylin.common.datatype.LongMutable;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.datatype.DoubleMutable;
+import org.apache.kylin.metadata.datatype.LongMutable;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.ITuple;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java b/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java
index aaf07fe..d32f171 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java
@@ -6,7 +6,7 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.dict.StringBytesConverter;
import org.apache.kylin.dict.TrieDictionaryBuilder;
import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 3fa7d5c..45cc88e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -6,13 +6,11 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import com.google.common.collect.Maps;
-
import org.apache.hadoop.io.Text;
-import org.apache.kylin.aggregation.MeasureCodec;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesSplitter;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.SplittedBytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -22,11 +20,16 @@ import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.*;
+import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +52,8 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
protected String intermediateTableRowDelimiter;
protected byte byteRowDelimiter;
protected int counter;
+ protected MeasureIngester<?>[] aggrIngesters;
+ protected Map<TblColRef, Dictionary<String>> dictionaryMap;
protected Object[] measures;
protected byte[][] keyBytesBuf;
protected BytesSplitter bytesSplitter;
@@ -58,7 +63,6 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
private Text outputKey = new Text();
private Text outputValue = new Text();
private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- private Map<Integer, Dictionary<String>> topNLiteralColDictMap;
@Override
protected void setup(Context context) throws IOException {
@@ -93,25 +97,12 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
keyBytesBuf = new byte[colCount][];
- initTopNLiteralColDictionaryMap();
+ aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
+ dictionaryMap = cubeSegment.buildDictionaryMap();
+
initNullBytes();
}
-
- private void initTopNLiteralColDictionaryMap() {
- topNLiteralColDictMap = Maps.newHashMap();
- for (int measureIdx = 0; measureIdx < measures.length; measureIdx++) {
- MeasureDesc measureDesc = cubeDesc.getMeasures().get(measureIdx);
- FunctionDesc func = measureDesc.getFunction();
- if (func.isTopN()) {
- int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
- int literalColIdx = flatTableIdx[flatTableIdx.length - 1];
- TblColRef literalCol = func.getTopNLiteralColumn();
- Dictionary<String> dictionary = (Dictionary<String>) cubeSegment.getDictionary(literalCol);
- topNLiteralColDictMap.put(literalColIdx, dictionary);
- }
- }
- }
-
+
private void initNullBytes() {
nullBytes = Lists.newArrayList();
nullBytes.add(HIVE_NULL);
@@ -146,72 +137,46 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
private void buildValue(SplittedBytes[] splitBuffers) {
for (int i = 0; i < measures.length; i++) {
- byte[] valueBytes = getValueBytes(splitBuffers, i);
- measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes);
+ measures[i] = buildValueOf(i, splitBuffers);
}
valueBuf.clear();
measureCodec.encode(measures, valueBuf);
}
- private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) {
- MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx);
- FunctionDesc func = desc.getFunction();
- ParameterDesc paramDesc = func.getParameter();
- int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
-
- byte[] result = null;
-
- // constant
- if (flatTableIdx == null) {
- result = Bytes.toBytes(paramDesc.getValue());
- }
- // count and count distinct
- else if (func.isCount() || func.isHolisticCountDistinct()) {
- // note for holistic count distinct, this value will be ignored
- result = ONE;
- }
- // topN, need encode the key column
- else if (func.isTopN()) {
- // encode the key column with dict, and get the counter column;
- int keyColIndex = flatTableIdx[flatTableIdx.length - 1];
- Dictionary<String> literalColDict = topNLiteralColDictMap.get(keyColIndex);
- int keyColEncoded = literalColDict.getIdFromValue(Bytes.toString(splitBuffers[keyColIndex].value));
- valueBuf.clear();
- valueBuf.putInt(literalColDict.getSizeOfId());
- valueBuf.putInt(keyColEncoded);
- if (flatTableIdx.length == 1) {
- // only literalCol, use 1.0 as counter
- valueBuf.putDouble(1.0);
+ private Object buildValueOf(int idxOfMeasure, SplittedBytes[] splitBuffers) {
+ MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure);
+ FunctionDesc function = measure.getFunction();
+ int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure];
+
+ int paramCount = function.getParameterCount();
+ String[] inputToMeasure = new String[paramCount];
+
+ // pick up parameter values
+ ParameterDesc param = function.getParameter();
+ int colParamIdx = 0; // index among parameters of column type
+ for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
+ String value;
+ if (function.isCount() || function.isHolisticCountDistinct()) {
+ // note for holistic count distinct, this value will be ignored
+ value = "1";
+ } else if (param.isColumnType()) {
+ value = getCell(colIdxOnFlatTable[colParamIdx++], splitBuffers);
} else {
- // get the counter column value
- valueBuf.putDouble(Double.valueOf(Bytes.toString(splitBuffers[flatTableIdx[0]].value)));
- }
-
- result = valueBuf.array();
-
- }
- // normal case, concat column values
- else {
- // for multiple columns, their values are joined
- for (int i = 0; i < flatTableIdx.length; i++) {
- SplittedBytes split = splitBuffers[flatTableIdx[i]];
- if (result == null) {
- result = Arrays.copyOf(split.value, split.length);
- } else {
- byte[] newResult = new byte[result.length + split.length];
- System.arraycopy(result, 0, newResult, 0, result.length);
- System.arraycopy(split.value, 0, newResult, result.length, split.length);
- result = newResult;
- }
+ value = param.getValue();
}
+ inputToMeasure[i] = value;
}
+
+ return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
+ }
- if (isNull(result)) {
- result = null;
- }
-
- return result;
+ private String getCell(int i, SplittedBytes[] splitBuffers) {
+ byte[] bytes = Arrays.copyOf(splitBuffers[i].value, splitBuffers[i].length);
+ if (isNull(bytes))
+ return null;
+ else
+ return Bytes.toString(bytes);
}
protected void outputKV(Context context) throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
index 0f94dca..381b07c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -23,8 +23,6 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.hadoop.io.Text;
-import org.apache.kylin.aggregation.MeasureAggregators;
-import org.apache.kylin.aggregation.MeasureCodec;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.kv.RowConstants;
@@ -32,6 +30,8 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.KylinReducer;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.measure.MeasureCodec;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index d724c76..752c01d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -16,20 +16,18 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.engine.mr.ByteArrayWritable;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
@@ -63,7 +61,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
- Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
+ Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap();
// dictionary
for (TblColRef col : cubeDesc.getAllColumnsNeedDictionary()) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index ab87b21..c35e77f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -3,8 +3,6 @@ package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
import java.util.List;
-import org.apache.kylin.aggregation.MeasureAggregators;
-import org.apache.kylin.aggregation.MeasureCodec;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -16,6 +14,8 @@ import org.apache.kylin.engine.mr.KylinReducer;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.measure.MeasureCodec;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index 8d00084..bc1c883 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -23,12 +23,12 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
-import org.apache.kylin.aggregation.MeasureCodec;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.topn.Counter;
import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.SplittedBytes;
import org.apache.kylin.cube.CubeInstance;
@@ -40,7 +40,6 @@ import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.kv.RowKeyEncoder;
import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.engine.mr.ByteArrayWritable;
import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
@@ -48,6 +47,7 @@ import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.MeasureCodec;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index d08d2a4..c0277b5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -27,12 +27,12 @@ import java.util.regex.Pattern;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.kylin.aggregation.MeasureCodec;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.topn.Counter;
import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.SplittedBytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -43,11 +43,11 @@ import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.kv.RowKeyEncoder;
import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.MeasureCodec;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
index b73fda4..84fd46b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
@@ -32,7 +32,6 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.job.exception.ExecuteException;