You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/22 06:13:10 UTC
[20/47] incubator-kylin git commit: KYLIN-875 rename modules:
core-common, core-cube, core-dictionary, core-cube
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
new file mode 100644
index 0000000..a7705d3
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.filter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
+
+
+/**
+ *
+ * @author xjiang
+ *
+ */
+public class ExtractTupleFilter extends TupleFilter {
+
+ private int date;
+ private List<Object> values;
+
+ public ExtractTupleFilter(FilterOperatorEnum op) {
+ super(new ArrayList<TupleFilter>(3), op);
+ assert (op == FilterOperatorEnum.EXTRACT);
+ this.values = new ArrayList<Object>(1);
+ this.values.add(null);
+ this.date = 0;
+ }
+
+ @Override
+ public String toString() {
+ return "ExtractTupleFilter=[children=" + this.children + "]";
+ }
+
+ @Override
+ public boolean isEvaluable() {
+ return false;
+ }
+
+ @Override
+ public boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
+ // extract tuple value
+ String extractType = null;
+ String tupleValue = null;
+ for (TupleFilter filter : this.children) {
+ filter.evaluate(tuple, cs);
+ if (filter instanceof ConstantTupleFilter) {
+ tupleValue = filter.getValues().iterator().next().toString();
+ } else if (filter instanceof CompareTupleFilter) {
+ extractType = filter.getValues().iterator().next().toString();
+ }
+ }
+
+ // extract date
+ this.date = extractDate(extractType, Integer.valueOf(tupleValue));
+ return true;
+ }
+
+ private int extractDate(String type, int inDate) {
+ // this shifts the epoch back to astronomical year -4800 instead of the
+ // start of the Christian era in year AD 1 of the proleptic Gregorian
+ // calendar.
+ int j = inDate + 32044;
+ int g = j / 146097;
+ int dg = j % 146097;
+ int c = (dg / 36524 + 1) * 3 / 4;
+ int dc = dg - c * 36524;
+ int b = dc / 1461;
+ int db = dc % 1461;
+ int a = (db / 365 + 1) * 3 / 4;
+ int da = db - a * 365;
+
+ // integer number of full years elapsed since March 1, 4801 BC
+ int y = g * 400 + c * 100 + b * 4 + a;
+ // integer number of full months elapsed since the last March 1
+ int m = (da * 5 + 308) / 153 - 2;
+ // number of days elapsed since day 1 of the month
+ int d = da - (m + 4) * 153 / 5 + 122;
+ int year = y - 4800 + (m + 2) / 12;
+ int month = (m + 2) % 12 + 1;
+ int day = d + 1;
+ if ("YEAR".equalsIgnoreCase(type)) {
+ return year;
+ }
+ if ("MONTH".equalsIgnoreCase(type)) {
+ return month;
+ }
+ if ("DAY".equalsIgnoreCase(type)) {
+ return day;
+ }
+ return -1;
+ }
+
+ @Override
+ public Collection<?> getValues() {
+ this.values.set(0, String.valueOf(this.date));
+ return this.values;
+ }
+
+ @Override
+ public byte[] serialize(IFilterCodeSystem<?> cs) {
+ return new byte[0];
+ }
+
+ @Override
+ public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/filter/IFilterCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/IFilterCodeSystem.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/IFilterCodeSystem.java
new file mode 100644
index 0000000..1f94bb8
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/IFilterCodeSystem.java
@@ -0,0 +1,31 @@
+package org.apache.kylin.metadata.filter;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+/**
+ * Decides how constant values are coded and compared.
+ *
+ * TupleFilter are involved in both query engine and coprocessor. In query engine, the values are strings.
+ * In coprocessor, the values are dictionary IDs.
+ *
+ * The type parameter is the java type of code, which should be bytes. However some legacy implementation
+ * stores code as String.
+ *
+ * @author yangli9
+ */
+public interface IFilterCodeSystem<T> extends Comparator<T> {
+
+ /** if given code represents the NULL value */
+ boolean isNull(T code);
+
+ /** compare two values by their codes */
+ // int compare(T code1, T code2);
+
+ /** write code to buffer */
+ void serialize(T code, ByteBuffer buf);
+
+ /** read code from buffer */
+ T deserialize(ByteBuffer buf);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
new file mode 100644
index 0000000..4d38565
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.filter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
+
+
+public class LogicalTupleFilter extends TupleFilter {
+
+ public LogicalTupleFilter(FilterOperatorEnum op) {
+ super(new ArrayList<TupleFilter>(2), op);
+ boolean opGood = (op == FilterOperatorEnum.AND || op == FilterOperatorEnum.OR || op == FilterOperatorEnum.NOT);
+ if (opGood == false)
+ throw new IllegalArgumentException("Unsupported operator " + op);
+ }
+
+ private LogicalTupleFilter(List<TupleFilter> filters, FilterOperatorEnum op) {
+ super(filters, op);
+ }
+
+ @Override
+ public TupleFilter copy() {
+ List<TupleFilter> cloneChildren = new LinkedList<TupleFilter>(children);
+ TupleFilter cloneTuple = new LogicalTupleFilter(cloneChildren, operator);
+ return cloneTuple;
+ }
+
+ @Override
+ public TupleFilter reverse() {
+ switch (operator) {
+ case NOT:
+ assert (children.size() == 1);
+ return children.get(0);
+ case AND:
+ case OR:
+ LogicalTupleFilter reverse = new LogicalTupleFilter(REVERSE_OP_MAP.get(operator));
+ for (TupleFilter child : children) {
+ reverse.addChild(child.reverse());
+ }
+ return reverse;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return operator + " " + children;
+ }
+
+ @Override
+ public boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
+ switch (this.operator) {
+ case AND:
+ return evalAnd(tuple, cs);
+ case OR:
+ return evalOr(tuple, cs);
+ case NOT:
+ return evalNot(tuple, cs);
+ default:
+ return false;
+ }
+ }
+
+ private boolean evalAnd(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
+ for (TupleFilter filter : this.children) {
+ if (!filter.evaluate(tuple, cs)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean evalOr(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
+ for (TupleFilter filter : this.children) {
+ if (filter.evaluate(tuple, cs)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean evalNot(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
+ return !this.children.get(0).evaluate(tuple, cs);
+ }
+
+ @Override
+ public Collection<?> getValues() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean isEvaluable() {
+ return true;
+ }
+
+ @Override
+ public byte[] serialize(IFilterCodeSystem<?> cs) {
+ return new byte[0];
+ }
+
+ @Override
+ public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/filter/StringCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/StringCodeSystem.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/StringCodeSystem.java
new file mode 100644
index 0000000..5b0040d
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/StringCodeSystem.java
@@ -0,0 +1,40 @@
+package org.apache.kylin.metadata.filter;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ * A simple code system where all values are strings and conform to string comparison system.
+ *
+ * @author yangli9
+ */
+public class StringCodeSystem implements IFilterCodeSystem<String> {
+
+ public static final StringCodeSystem INSTANCE = new StringCodeSystem();
+
+ protected StringCodeSystem() {
+ // singleton
+ }
+
+ @Override
+ public boolean isNull(String value) {
+ return value == null;
+ }
+
+ @Override
+ public int compare(String tupleValue, String constValue) {
+ return tupleValue.compareTo(constValue);
+ }
+
+ @Override
+ public void serialize(String value, ByteBuffer buffer) {
+ BytesUtil.writeUTFString( value, buffer);
+ }
+
+ @Override
+ public String deserialize(ByteBuffer buffer) {
+ return BytesUtil.readUTFString(buffer);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
new file mode 100644
index 0000000..391768a
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
@@ -0,0 +1,70 @@
+package org.apache.kylin.metadata.filter;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.util.Collection;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Set;
+
+
+public class TimeConditionLiteralsReplacer implements TupleFilterSerializer.Decorator {
+
+ private IdentityHashMap<TupleFilter, DataType> dateCompareTupleChildren;
+
+ public TimeConditionLiteralsReplacer(TupleFilter root) {
+ this.dateCompareTupleChildren = Maps.newIdentityHashMap();
+ }
+
+ @Override
+ public TupleFilter onSerialize(TupleFilter filter) {
+
+ if (filter instanceof CompareTupleFilter) {
+ CompareTupleFilter cfilter = (CompareTupleFilter) filter;
+ List<? extends TupleFilter> children = cfilter.getChildren();
+
+ if (children == null || children.size() < 1) {
+ throw new IllegalArgumentException("Illegal compare filter: " + cfilter);
+ }
+
+ TblColRef col = cfilter.getColumn();
+ if (col == null || !col.getType().isDateTimeFamily()) {
+ return cfilter;
+ }
+
+ for (TupleFilter child : filter.getChildren()) {
+ dateCompareTupleChildren.put(child, col.getType());
+ }
+ }
+
+ if (filter instanceof ConstantTupleFilter && dateCompareTupleChildren.containsKey(filter)) {
+ ConstantTupleFilter constantTupleFilter = (ConstantTupleFilter) filter;
+ Set<String> newValues = Sets.newHashSet();
+ DataType columnType = dateCompareTupleChildren.get(filter);
+
+ for (String value : (Collection<String>) constantTupleFilter.getValues()) {
+ newValues.add(formatTime(Long.valueOf(value), columnType));
+ }
+ return new ConstantTupleFilter(newValues);
+ }
+ return filter;
+ }
+
+ private String formatTime(long millis, DataType dataType) {
+ if (dataType.isDatetime() || dataType.isTime()) {
+ throw new RuntimeException("Datetime and time type are not supported yet");
+ }
+
+ if (dataType.isTimestamp()) {
+ return DateFormat.formatToTimeStr(millis);
+ } else if (dataType.isDate()) {
+ return DateFormat.formatToDateStr(millis);
+ } else {
+ throw new RuntimeException("Unknown type " + dataType + " to formatTime");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TsConditionEraser.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TsConditionEraser.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TsConditionEraser.java
new file mode 100644
index 0000000..96ff81c
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TsConditionEraser.java
@@ -0,0 +1,63 @@
+package org.apache.kylin.metadata.filter;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.util.IdentityHashMap;
+
+/**
+ */
+public class TsConditionEraser implements TupleFilterSerializer.Decorator {
+
+ private final TblColRef tsColumn;
+ private final TupleFilter root;
+
+ private IdentityHashMap<TupleFilter, Boolean> isInTopLevelANDs;
+
+ public TsConditionEraser(TblColRef tsColumn, TupleFilter root) {
+ this.tsColumn = tsColumn;
+ this.root = root;
+ this.isInTopLevelANDs = Maps.newIdentityHashMap();
+ }
+
+ /**
+ * replace filter on timestamp column to null, so that two tuple filter trees can
+ * be compared regardless of the filter condition on timestamp column (In top level where conditions concatenated by ANDs)
+ * @param filter
+ * @return
+ */
+ @Override
+ public TupleFilter onSerialize(TupleFilter filter) {
+
+ if (filter == null)
+ return null;
+
+ //we just need reference equal
+ if (root == filter) {
+ isInTopLevelANDs.put(filter, true);
+ }
+
+ if (isInTopLevelANDs.containsKey(filter)) {
+ classifyChildrenByMarking(filter);
+
+ if (filter instanceof CompareTupleFilter) {
+ TblColRef c = ((CompareTupleFilter) filter).getColumn();
+ if (c != null && c.equals(tsColumn)) {
+ return null;
+ }
+ }
+ }
+
+ return filter;
+ }
+
+ private void classifyChildrenByMarking(TupleFilter filter) {
+ if (filter instanceof LogicalTupleFilter) {
+ if (filter.getOperator() == TupleFilter.FilterOperatorEnum.AND) {
+ for (TupleFilter child : filter.getChildren()) {
+ isInTopLevelANDs.put(child, true);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
new file mode 100644
index 0000000..c5bd3e0
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.filter;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
+
+import com.google.common.collect.Maps;
+
+/**
+ *
+ * @author xjiang
+ *
+ */
+public abstract class TupleFilter {
+
+ public enum FilterOperatorEnum {
+ EQ(1), NEQ(2), GT(3), LT(4), GTE(5), LTE(6), ISNULL(7), ISNOTNULL(8), IN(9), NOTIN(10), AND(20), OR(21), NOT(22), COLUMN(30), CONSTANT(31), DYNAMIC(32), EXTRACT(33), CASE(34);
+
+ private final int value;
+
+ private FilterOperatorEnum(int v) {
+ this.value = v;
+ }
+
+ public int getValue() {
+ return this.value;
+ }
+ }
+
+ public static final int BUFFER_SIZE = 10240;
+
+ protected static final Map<FilterOperatorEnum, FilterOperatorEnum> REVERSE_OP_MAP = Maps.newHashMap();
+ protected static final Map<FilterOperatorEnum, FilterOperatorEnum> SWAP_OP_MAP = Maps.newHashMap();
+
+ static {
+ REVERSE_OP_MAP.put(FilterOperatorEnum.EQ, FilterOperatorEnum.NEQ);
+ REVERSE_OP_MAP.put(FilterOperatorEnum.NEQ, FilterOperatorEnum.EQ);
+ REVERSE_OP_MAP.put(FilterOperatorEnum.GT, FilterOperatorEnum.LTE);
+ REVERSE_OP_MAP.put(FilterOperatorEnum.LTE, FilterOperatorEnum.GT);
+ REVERSE_OP_MAP.put(FilterOperatorEnum.LT, FilterOperatorEnum.GTE);
+ REVERSE_OP_MAP.put(FilterOperatorEnum.GTE, FilterOperatorEnum.LT);
+ REVERSE_OP_MAP.put(FilterOperatorEnum.IN, FilterOperatorEnum.NOTIN);
+ REVERSE_OP_MAP.put(FilterOperatorEnum.NOTIN, FilterOperatorEnum.IN);
+ REVERSE_OP_MAP.put(FilterOperatorEnum.ISNULL, FilterOperatorEnum.ISNOTNULL);
+ REVERSE_OP_MAP.put(FilterOperatorEnum.ISNOTNULL, FilterOperatorEnum.ISNULL);
+ REVERSE_OP_MAP.put(FilterOperatorEnum.AND, FilterOperatorEnum.OR);
+ REVERSE_OP_MAP.put(FilterOperatorEnum.OR, FilterOperatorEnum.AND);
+
+ SWAP_OP_MAP.put(FilterOperatorEnum.EQ, FilterOperatorEnum.EQ);
+ SWAP_OP_MAP.put(FilterOperatorEnum.NEQ, FilterOperatorEnum.NEQ);
+ SWAP_OP_MAP.put(FilterOperatorEnum.GT, FilterOperatorEnum.LT);
+ SWAP_OP_MAP.put(FilterOperatorEnum.LTE, FilterOperatorEnum.GTE);
+ SWAP_OP_MAP.put(FilterOperatorEnum.LT, FilterOperatorEnum.GT);
+ SWAP_OP_MAP.put(FilterOperatorEnum.GTE, FilterOperatorEnum.LTE);
+ }
+
+ protected final List<TupleFilter> children;
+ protected FilterOperatorEnum operator;
+ protected boolean hasChildren;
+
+ protected TupleFilter(List<TupleFilter> filters, FilterOperatorEnum op) {
+ this.children = filters;
+ this.operator = op;
+ }
+
+ public void addChild(TupleFilter child) {
+ children.add(child);
+ }
+
+ final public void addChildren(List<? extends TupleFilter> children) {
+ for (TupleFilter c : children)
+ addChild(c); // subclass overrides addChild()
+ }
+
+ public List<? extends TupleFilter> getChildren() {
+ return children;
+ }
+
+ public boolean hasChildren() {
+ return children != null && !children.isEmpty();
+ }
+
+ public FilterOperatorEnum getOperator() {
+ return operator;
+ }
+
+ public TupleFilter copy() {
+ throw new UnsupportedOperationException();
+ }
+
+ public TupleFilter reverse() {
+ throw new UnsupportedOperationException();
+ }
+
+ public TupleFilter flatFilter() {
+ return flattenInternal(this);
+ }
+
+ private TupleFilter flattenInternal(TupleFilter filter) {
+ TupleFilter flatFilter = null;
+ if (!(filter instanceof LogicalTupleFilter)) {
+ flatFilter = new LogicalTupleFilter(FilterOperatorEnum.AND);
+ flatFilter.addChild(filter);
+ return flatFilter;
+ }
+
+ // post-order recursive travel
+ FilterOperatorEnum op = filter.getOperator();
+ List<TupleFilter> andChildren = new LinkedList<TupleFilter>();
+ List<TupleFilter> orChildren = new LinkedList<TupleFilter>();
+ for (TupleFilter child : filter.getChildren()) {
+ TupleFilter flatChild = flattenInternal(child);
+ FilterOperatorEnum childOp = flatChild.getOperator();
+ if (childOp == FilterOperatorEnum.AND) {
+ andChildren.add(flatChild);
+ } else if (childOp == FilterOperatorEnum.OR) {
+ orChildren.add(flatChild);
+ } else {
+ throw new IllegalStateException("Filter is " + filter + " and child is " + flatChild);
+ }
+ }
+
+ // boolean algebra flatten
+ if (op == FilterOperatorEnum.AND) {
+ flatFilter = new LogicalTupleFilter(FilterOperatorEnum.AND);
+ for (TupleFilter andChild : andChildren) {
+ flatFilter.addChildren(andChild.getChildren());
+ }
+ if (!orChildren.isEmpty()) {
+ List<TupleFilter> fullAndFilters = cartesianProduct(orChildren, flatFilter);
+ flatFilter = new LogicalTupleFilter(FilterOperatorEnum.OR);
+ flatFilter.addChildren(fullAndFilters);
+ }
+ } else if (op == FilterOperatorEnum.OR) {
+ flatFilter = new LogicalTupleFilter(FilterOperatorEnum.OR);
+ for (TupleFilter orChild : orChildren) {
+ flatFilter.addChildren(orChild.getChildren());
+ }
+ flatFilter.addChildren(andChildren);
+ } else if (op == FilterOperatorEnum.NOT) {
+ assert (filter.children.size() == 1);
+ TupleFilter reverse = filter.children.get(0).reverse();
+ flatFilter = flattenInternal(reverse);
+ } else {
+ throw new IllegalStateException("Filter is " + filter);
+ }
+ return flatFilter;
+ }
+
+ private List<TupleFilter> cartesianProduct(List<TupleFilter> leftOrFilters, TupleFilter partialAndFilter) {
+ List<TupleFilter> oldProductFilters = new LinkedList<TupleFilter>();
+ oldProductFilters.add(partialAndFilter);
+ for (TupleFilter orFilter : leftOrFilters) {
+ List<TupleFilter> newProductFilters = new LinkedList<TupleFilter>();
+ for (TupleFilter orChildFilter : orFilter.getChildren()) {
+ for (TupleFilter productFilter : oldProductFilters) {
+ TupleFilter fullAndFilter = productFilter.copy();
+ fullAndFilter.addChildren(orChildFilter.getChildren());
+ newProductFilters.add(fullAndFilter);
+ }
+ }
+ oldProductFilters = newProductFilters;
+ }
+ return oldProductFilters;
+ }
+
+ public abstract boolean isEvaluable();
+
+ public abstract boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs);
+
+ public abstract Collection<?> getValues();
+
+ abstract byte[] serialize(IFilterCodeSystem<?> cs);
+
+ abstract void deserialize(byte[] bytes, IFilterCodeSystem<?> cs);
+
+ public static boolean isEvaluableRecursively(TupleFilter filter) {
+ if (filter == null)
+ return true;
+
+ if (!filter.isEvaluable())
+ return false;
+
+ for (TupleFilter child : filter.getChildren()) {
+ if (!isEvaluableRecursively(child))
+ return false;
+ }
+ return true;
+ }
+
+ public static void collectColumns(TupleFilter filter, Set<TblColRef> collector) {
+ if (filter == null || collector == null)
+ return;
+
+ if (filter instanceof ColumnTupleFilter) {
+ ColumnTupleFilter columnTupleFilter = (ColumnTupleFilter) filter;
+ collector.add(columnTupleFilter.getColumn());
+ }
+
+ for (TupleFilter child : filter.getChildren()) {
+ collectColumns(child, collector);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
new file mode 100644
index 0000000..a9d785b
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.filter;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ * http://eli.thegreenplace.net/2011/09/29/an-interesting-tree-serialization-algorithm-from-dwarf
+ *
+ * @author xjiang
+ *
+ */
+public class TupleFilterSerializer {
+
+ public static interface Decorator {
+ TupleFilter onSerialize(TupleFilter filter);
+ }
+
+ private static final int BUFFER_SIZE = 65536;
+ private static final Map<Integer, TupleFilter.FilterOperatorEnum> ID_OP_MAP = new HashMap<Integer, TupleFilter.FilterOperatorEnum>();
+
+ static {
+ for (TupleFilter.FilterOperatorEnum op : TupleFilter.FilterOperatorEnum.values()) {
+ ID_OP_MAP.put(op.getValue(), op);
+ }
+ }
+
+ public static byte[] serialize(TupleFilter rootFilter, IFilterCodeSystem<?> cs) {
+ return serialize(rootFilter, null, cs);
+ }
+
+ public static byte[] serialize(TupleFilter rootFilter, Decorator decorator, IFilterCodeSystem<?> cs) {
+ ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+ internalSerialize(rootFilter, decorator, buffer, cs);
+ byte[] result = new byte[buffer.position()];
+ System.arraycopy(buffer.array(), 0, result, 0, buffer.position());
+ return result;
+ }
+
+ private static void internalSerialize(TupleFilter filter, Decorator decorator, ByteBuffer buffer, IFilterCodeSystem<?> cs) {
+ if (decorator != null) { // give decorator a chance to manipulate the output filter
+ filter = decorator.onSerialize(filter);
+ }
+
+ if (filter == null) {
+ return;
+ }
+
+ if (filter.hasChildren()) {
+ // serialize filter+true
+ serializeFilter(1, filter, decorator, buffer, cs);
+ // serialize children
+ for (TupleFilter child : filter.getChildren()) {
+ internalSerialize(child, decorator, buffer, cs);
+ }
+ // serialize none
+ serializeFilter(-1, filter, decorator, buffer, cs);
+ } else {
+ // serialize filter+false
+ serializeFilter(0, filter, decorator, buffer, cs);
+ }
+ }
+
+ private static void serializeFilter(int flag, TupleFilter filter, Decorator decorator, ByteBuffer buffer, IFilterCodeSystem<?> cs) {
+ if (flag < 0) {
+ BytesUtil.writeVInt(-1, buffer);
+ } else {
+ byte[] bytes = filter.serialize(cs);
+ int opVal = filter.getOperator().getValue();
+ BytesUtil.writeVInt(opVal, buffer);
+ BytesUtil.writeByteArray(bytes, buffer);
+ BytesUtil.writeVInt(flag, buffer);
+ }
+ }
+
+ public static TupleFilter deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ TupleFilter rootFilter = null;
+ Stack<TupleFilter> parentStack = new Stack<TupleFilter>();
+ while (buffer.hasRemaining()) {
+ int opVal = BytesUtil.readVInt(buffer);
+ if (opVal < 0) {
+ parentStack.pop();
+ continue;
+ }
+
+ // deserialize filter
+ TupleFilter filter = createTupleFilter(opVal);
+ byte[] filetrBytes = BytesUtil.readByteArray(buffer);
+ filter.deserialize(filetrBytes, cs);
+
+ if (rootFilter == null) {
+ // push root to stack
+ rootFilter = filter;
+ parentStack.push(filter);
+ BytesUtil.readVInt(buffer);
+ continue;
+ }
+
+ // add filter to parent
+ TupleFilter parentFilter = parentStack.peek();
+ if (parentFilter != null) {
+ parentFilter.addChild(filter);
+ }
+
+ // push filter to stack or not based on having children or not
+ int hasChild = BytesUtil.readVInt(buffer);
+ if (hasChild == 1) {
+ parentStack.push(filter);
+ }
+ }
+ return rootFilter;
+ }
+
+ private static TupleFilter createTupleFilter(int opVal) {
+ TupleFilter.FilterOperatorEnum op = ID_OP_MAP.get(opVal);
+ if (op == null) {
+ throw new IllegalStateException("operator value is " + opVal);
+ }
+ TupleFilter filter = null;
+ switch (op) {
+ case AND:
+ case OR:
+ case NOT:
+ filter = new LogicalTupleFilter(op);
+ break;
+ case EQ:
+ case NEQ:
+ case LT:
+ case LTE:
+ case GT:
+ case GTE:
+ case IN:
+ case ISNULL:
+ case ISNOTNULL:
+ filter = new CompareTupleFilter(op);
+ break;
+ case EXTRACT:
+ filter = new ExtractTupleFilter(op);
+ break;
+ case CASE:
+ filter = new CaseTupleFilter();
+ break;
+ case COLUMN:
+ filter = new ColumnTupleFilter(null);
+ break;
+ case CONSTANT:
+ filter = new ConstantTupleFilter();
+ break;
+ case DYNAMIC:
+ filter = new DynamicTupleFilter(null);
+ break;
+ default:
+ throw new IllegalStateException("Error FilterOperatorEnum: " + op.getValue());
+ }
+
+ return filter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java
new file mode 100644
index 0000000..562fade
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import java.math.BigDecimal;
+
+/**
+ * @author yangli9
+ *
+ */
+public class BigDecimalMaxAggregator extends MeasureAggregator<BigDecimal> {
+
+ BigDecimal max = null;
+
+ @Override
+ public void reset() {
+ max = null;
+ }
+
+ @Override
+ public void aggregate(BigDecimal value) {
+ if (max == null)
+ max = value;
+ else if (max.compareTo(value) < 0)
+ max = value;
+ }
+
+ @Override
+ public BigDecimal getState() {
+ return max;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessBigDecimalMemBytes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMinAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMinAggregator.java
new file mode 100644
index 0000000..8e11fd9
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMinAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import java.math.BigDecimal;
+
+/**
+ * @author yangli9
+ *
+ */
+public class BigDecimalMinAggregator extends MeasureAggregator<BigDecimal> {
+
+ BigDecimal max = null;
+
+ @Override
+ public void reset() {
+ max = null;
+ }
+
+ @Override
+ public void aggregate(BigDecimal value) {
+ if (max == null)
+ max = value;
+ else if (max.compareTo(value) > 0)
+ max = value;
+ }
+
+ @Override
+ public BigDecimal getState() {
+ return max;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessBigDecimalMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSumAggregator.java
new file mode 100644
index 0000000..1b2aae1
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSumAggregator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import java.math.BigDecimal;
+
+/**
+ * @author yangli9
+ *
+ */
+public class BigDecimalSumAggregator extends MeasureAggregator<BigDecimal> {
+
+ BigDecimal sum = new BigDecimal(0);
+
+ @Override
+ public void reset() {
+ sum = new BigDecimal(0);
+ }
+
+ @Override
+ public void aggregate(BigDecimal value) {
+ sum = sum.add(value);
+ }
+
+ @Override
+ public BigDecimal getState() {
+ return sum;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessBigDecimalMemBytes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMaxAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMaxAggregator.java
new file mode 100644
index 0000000..1e5bf82
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMaxAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.hadoop.io.DoubleWritable;
+
+/**
+ * @author yangli9
+ *
+ */
+public class DoubleMaxAggregator extends MeasureAggregator<DoubleWritable> {
+
+ DoubleWritable max = null;
+
+ @Override
+ public void reset() {
+ max = null;
+ }
+
+ @Override
+ public void aggregate(DoubleWritable value) {
+ if (max == null)
+ max = new DoubleWritable(value.get());
+ else if (max.get() < value.get())
+ max.set(value.get());
+ }
+
+ @Override
+ public DoubleWritable getState() {
+ return max;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessDoubleMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMinAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMinAggregator.java
new file mode 100644
index 0000000..d9112f7
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMinAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.hadoop.io.DoubleWritable;
+
+/**
+ * @author yangli9
+ *
+ */
+public class DoubleMinAggregator extends MeasureAggregator<DoubleWritable> {
+
+ DoubleWritable min = null;
+
+ @Override
+ public void reset() {
+ min = null;
+ }
+
+ @Override
+ public void aggregate(DoubleWritable value) {
+ if (min == null)
+ min = new DoubleWritable(value.get());
+ else if (min.get() > value.get())
+ min.set(value.get());
+ }
+
+ @Override
+ public DoubleWritable getState() {
+ return min;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessDoubleMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSumAggregator.java
new file mode 100644
index 0000000..923bd0b
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSumAggregator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.hadoop.io.DoubleWritable;
+
+/**
+ * @author yangli9
+ *
+ */
+public class DoubleSumAggregator extends MeasureAggregator<DoubleWritable> {
+
+ DoubleWritable sum = new DoubleWritable();
+
+ @Override
+ public void reset() {
+ sum.set(0.0);
+ }
+
+ @Override
+ public void aggregate(DoubleWritable value) {
+ sum.set(sum.get() + value.get());
+ }
+
+ @Override
+ public DoubleWritable getState() {
+ return sum;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessDoubleMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java
new file mode 100644
index 0000000..a168743
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+
+/**
+ * @author yangli9
+ *
+ */
+public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounter> {
+
+ final int precision;
+ HyperLogLogPlusCounter sum = null;
+
+ public HLLCAggregator(int precision) {
+ this.precision = precision;
+ }
+
+ @Override
+ public void reset() {
+ sum = null;
+ }
+
+ @Override
+ public void aggregate(HyperLogLogPlusCounter value) {
+ if (sum == null)
+ sum = new HyperLogLogPlusCounter(value);
+ else
+ sum.merge(value);
+ }
+
+ @Override
+ public HyperLogLogPlusCounter getState() {
+ return sum;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ // 1024 + 60 returned by AggregationCacheMemSizeTest
+ return 8 // aggregator obj shell
+ + 4 // precision
+ + 8 // ref to HLLC
+ + 8 // HLLC obj shell
+ + 32 + (1 << precision); // HLLC internal
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LDCAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LDCAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LDCAggregator.java
new file mode 100644
index 0000000..767a6d8
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LDCAggregator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Long Distinct Count
+ *
+ * @author xjiang
+ *
+ */
+public class LDCAggregator extends MeasureAggregator<LongWritable> {
+
+ private static LongWritable ZERO = new LongWritable(0);
+
+ private HLLCAggregator hllAgg = null;
+ private LongWritable state = new LongWritable(0);
+
+ @SuppressWarnings("rawtypes")
+ public void setDependentAggregator(MeasureAggregator agg) {
+ this.hllAgg = (HLLCAggregator) agg;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void aggregate(LongWritable value) {
+ }
+
+ @Override
+ public LongWritable getState() {
+ if (hllAgg == null) {
+ return ZERO;
+ } else {
+ state.set(hllAgg.getState().getCountEstimate());
+ return state;
+ }
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessLongMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMaxAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMaxAggregator.java
new file mode 100644
index 0000000..0fac3c7
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMaxAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * @author yangli9
+ *
+ */
+public class LongMaxAggregator extends MeasureAggregator<LongWritable> {
+
+ LongWritable max = null;
+
+ @Override
+ public void reset() {
+ max = null;
+ }
+
+ @Override
+ public void aggregate(LongWritable value) {
+ if (max == null)
+ max = new LongWritable(value.get());
+ else if (max.get() < value.get())
+ max.set(value.get());
+ }
+
+ @Override
+ public LongWritable getState() {
+ return max;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessLongMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMinAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMinAggregator.java
new file mode 100644
index 0000000..4c058d8
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMinAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * @author yangli9
+ *
+ */
+public class LongMinAggregator extends MeasureAggregator<LongWritable> {
+
+ LongWritable min = null;
+
+ @Override
+ public void reset() {
+ min = null;
+ }
+
+ @Override
+ public void aggregate(LongWritable value) {
+ if (min == null)
+ min = new LongWritable(value.get());
+ else if (min.get() > value.get())
+ min.set(value.get());
+ }
+
+ @Override
+ public LongWritable getState() {
+ return min;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessLongMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongSumAggregator.java
new file mode 100644
index 0000000..e40870d
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongSumAggregator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * @author yangli9
+ *
+ */
+public class LongSumAggregator extends MeasureAggregator<LongWritable> {
+
+ LongWritable sum = new LongWritable();
+
+ @Override
+ public void reset() {
+ sum.set(0);
+ }
+
+ @Override
+ public void aggregate(LongWritable value) {
+ sum.set(sum.get() + value.get());
+ }
+
+ @Override
+ public LongWritable getState() {
+ return sum;
+ }
+
+ @Override
+ public int getMemBytesEstimate() {
+ return guessLongMemBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
new file mode 100644
index 0000000..4153cbd
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.FunctionDesc;
+
+/**
+ * @author yangli9
+ *
+ */
+abstract public class MeasureAggregator<V> {
+
+ public static MeasureAggregator<?> create(String funcName, String returnType) {
+ if (FunctionDesc.FUNC_SUM.equalsIgnoreCase(funcName) || FunctionDesc.FUNC_COUNT.equalsIgnoreCase(funcName)) {
+ if (isInteger(returnType))
+ return new LongSumAggregator();
+ else if (isBigDecimal(returnType))
+ return new BigDecimalSumAggregator();
+ else if (isDouble(returnType))
+ return new DoubleSumAggregator();
+ } else if (FunctionDesc.FUNC_COUNT_DISTINCT.equalsIgnoreCase(funcName)) {
+ DataType hllcType = DataType.getInstance(returnType);
+ if (hllcType.isHLLC())
+ return new HLLCAggregator(hllcType.getPrecision());
+ else
+ return new LDCAggregator();
+ } else if (FunctionDesc.FUNC_MAX.equalsIgnoreCase(funcName)) {
+ if (isInteger(returnType))
+ return new LongMaxAggregator();
+ else if (isBigDecimal(returnType))
+ return new BigDecimalMaxAggregator();
+ else if (isDouble(returnType))
+ return new DoubleMaxAggregator();
+ } else if (FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName)) {
+ if (isInteger(returnType))
+ return new LongMinAggregator();
+ else if (isBigDecimal(returnType))
+ return new BigDecimalMinAggregator();
+ else if (isDouble(returnType))
+ return new DoubleMinAggregator();
+ }
+ throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + returnType + "'");
+ }
+
+ public static boolean isBigDecimal(String type) {
+ return type.startsWith("decimal");
+ }
+
+ public static boolean isDouble(String type) {
+ return "double".equalsIgnoreCase(type) || "float".equalsIgnoreCase(type) || "real".equalsIgnoreCase(type);
+ }
+
+ public static boolean isInteger(String type) {
+ return "long".equalsIgnoreCase(type) || "bigint".equalsIgnoreCase(type) || "int".equalsIgnoreCase(type) || "integer".equalsIgnoreCase(type);
+ }
+
+ public static int guessBigDecimalMemBytes() {
+ // 116 returned by AggregationCacheMemSizeTest
+ return 8 // aggregator obj shell
+ + 8 // ref to BigDecimal
+ + 8 // BigDecimal obj shell
+ + 100; // guess of BigDecimal internal
+ }
+
+ public static int guessDoubleMemBytes() {
+ // 29 to 44 returned by AggregationCacheMemSizeTest
+ return 44;
+ /*
+ return 8 // aggregator obj shell
+ + 8 // ref to DoubleWritable
+ + 8 // DoubleWritable obj shell
+ + 8; // size of double
+ */
+ }
+
+ public static int guessLongMemBytes() {
+ // 29 to 44 returned by AggregationCacheMemSizeTest
+ return 44;
+ /*
+ return 8 // aggregator obj shell
+ + 8 // ref to LongWritable
+ + 8 // LongWritable obj shell
+ + 8; // size of long
+ */
+ }
+
+ // ============================================================================
+
+ @SuppressWarnings("rawtypes")
+ public void setDependentAggregator(MeasureAggregator agg) {
+ }
+
+ abstract public void reset();
+
+ abstract public void aggregate(V value);
+
+ abstract public V getState();
+
+ // get an estimate of memory consumption UPPER BOUND
+ abstract public int getMemBytesEstimate();
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregators.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregators.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregators.java
new file mode 100644
index 0000000..aa58664
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregators.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.FunctionDesc;
+
+/**
+ * @author yangli9
+ *
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class MeasureAggregators {
+
+ private MeasureDesc[] descs;
+ private MeasureAggregator[] aggs;
+
+ public MeasureAggregators(Collection<MeasureDesc> measureDescs) {
+ this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()]));
+ }
+
+ public MeasureAggregators(MeasureDesc... measureDescs) {
+ descs = measureDescs;
+ aggs = new MeasureAggregator[descs.length];
+
+ Map<String, Integer> measureIndexMap = new HashMap<String, Integer>();
+ for (int i = 0; i < descs.length; i++) {
+ FunctionDesc func = descs[i].getFunction();
+ aggs[i] = MeasureAggregator.create(func.getExpression(), func.getReturnType());
+ measureIndexMap.put(descs[i].getName(), i);
+ }
+ // fill back dependent aggregator
+ for (int i = 0; i < descs.length; i++) {
+ String depMsrRef = descs[i].getDependentMeasureRef();
+ if (depMsrRef != null) {
+ int index = measureIndexMap.get(depMsrRef);
+ aggs[i].setDependentAggregator(aggs[index]);
+ }
+ }
+ }
+
+ public void reset() {
+ for (int i = 0; i < aggs.length; i++) {
+ aggs[i].reset();
+ }
+ }
+
+ public void aggregate(Object[] values) {
+ assert values.length == descs.length;
+
+ for (int i = 0; i < descs.length; i++) {
+ aggs[i].aggregate(values[i]);
+ }
+ }
+
+ public void collectStates(Object[] states) {
+ for (int i = 0; i < descs.length; i++) {
+ states[i] = aggs[i].getState();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
new file mode 100644
index 0000000..f620ace
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.serializer.DataTypeSerializer;
+
+/**
+ * @author yangli9
+ *
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class MeasureCodec {
+
+ int nMeasures;
+ DataTypeSerializer[] serializers;
+
+ public MeasureCodec(Collection<MeasureDesc> measureDescs) {
+ this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()]));
+ }
+
+ public MeasureCodec(MeasureDesc... measureDescs) {
+ String[] dataTypes = new String[measureDescs.length];
+ for (int i = 0; i < dataTypes.length; i++) {
+ dataTypes[i] = measureDescs[i].getFunction().getReturnType();
+ }
+ init(dataTypes);
+ }
+
+ public MeasureCodec(String... dataTypes) {
+ init(dataTypes);
+ }
+
+ private void init(String[] dataTypes) {
+ nMeasures = dataTypes.length;
+ serializers = new DataTypeSerializer[nMeasures];
+
+ for (int i = 0; i < nMeasures; i++) {
+ serializers[i] = DataTypeSerializer.create(dataTypes[i]);
+ }
+ }
+
+ public DataTypeSerializer getSerializer(int idx) {
+ return serializers[idx];
+ }
+
+ public void decode(ByteBuffer buf, Object[] result) {
+ assert result.length == nMeasures;
+ for (int i = 0; i < nMeasures; i++) {
+ result[i] = serializers[i].deserialize(buf);
+ }
+ }
+
+ public void encode(Object[] values, ByteBuffer out) {
+ assert values.length == nMeasures;
+ for (int i = 0; i < nMeasures; i++) {
+ serializers[i].serialize(values[i], out);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
new file mode 100644
index 0000000..85dfe14
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.metadata.measure.fixedlen;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.metadata.model.DataType;
+
+/**
+ */
+public class FixedHLLCodec extends FixedLenMeasureCodec<HyperLogLogPlusCounter> {
+
+ private DataType type;
+ private int presision;
+ private HyperLogLogPlusCounter current;
+
+ public FixedHLLCodec(DataType type) {
+ this.type = type;
+ this.presision = type.getPrecision();
+ this.current = new HyperLogLogPlusCounter(this.presision);
+ }
+
+ @Override
+ public int getLength() {
+ return 1 << presision;
+ }
+
+ @Override
+ public DataType getDataType() {
+ return type;
+ }
+
+ @Override
+ public HyperLogLogPlusCounter valueOf(String value) {
+ current.clear();
+ if (value == null)
+ current.add("__nUlL__");
+ else
+ current.add(value.getBytes());
+ return current;
+ }
+
+ @Override
+ public Object getValue() {
+ return current;
+ }
+
+ @Override
+ public HyperLogLogPlusCounter read(byte[] buf, int offset) {
+ current.readRegistersArray(ByteBuffer.wrap(buf, offset, buf.length - offset));
+ return current;
+ }
+
+ @Override
+ public void write(HyperLogLogPlusCounter v, byte[] buf, int offset) {
+ v.writeRegistersArray(ByteBuffer.wrap(buf, offset, buf.length - offset));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
new file mode 100644
index 0000000..ad8c483
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure.fixedlen;
+
+import org.apache.kylin.metadata.model.DataType;
+
+abstract public class FixedLenMeasureCodec<T> {
+
+ public static FixedLenMeasureCodec<?> get(DataType type) {
+ if (type.isHLLC()) {
+ return new FixedHLLCodec(type);
+ } else {
+ return new FixedPointLongCodec(type);
+ }
+ }
+
+ abstract public int getLength();
+
+ abstract public DataType getDataType();
+
+ abstract public T valueOf(String value);
+
+ abstract public Object getValue();
+
+ abstract public T read(byte[] buf, int offset);
+
+ abstract public void write(T v, byte[] buf, int offset);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
new file mode 100644
index 0000000..5218101
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure.fixedlen;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.model.DataType;
+
+public class FixedPointLongCodec extends FixedLenMeasureCodec<LongWritable> {
+
+ private static final int SIZE = 8;
+ // number of digits after decimal point
+ int scale;
+ DataType type;
+ // avoid massive object creation
+ LongWritable current = new LongWritable();
+
+ public FixedPointLongCodec(DataType type) {
+ this.type = type;
+ this.scale = Math.max(0, type.getScale());
+ }
+
+ @Override
+ public int getLength() {
+ return SIZE;
+ }
+
+ @Override
+ public DataType getDataType() {
+ return type;
+ }
+
+ long getValueIgnoringDecimalPoint(String value) {
+ int index = value.indexOf('.');
+
+ if (index == 0 || index == value.length() - 1) {
+ throw new RuntimeException("Bad decimal format: " + value);
+ } else if (index < 0) {
+ return Long.valueOf(value) * (int) Math.pow(10, scale);
+ } else {
+ StringBuilder sb = new StringBuilder();
+ sb.append(value.substring(0, index));
+
+ //if there are more than scale digits after the decimal point, the tail will be discarded
+ int end = Math.min(value.length(), index + scale + 1);
+ sb.append(value.substring(index + 1, end));
+ int diff = index + scale + 1 - value.length();
+ //if there are less than scale digits after the decimal point, the tail will be compensated
+ for (int i = 0; i < diff; i++) {
+ sb.append('0');
+ }
+ return Long.valueOf(sb.toString());
+ }
+ }
+
+ String restoreDecimalPoint(long value) {
+ if (scale < 0) {
+ throw new RuntimeException("Bad scale: " + scale + " with value: " + value);
+ } else if (scale == 0) {
+ return Long.toString(value);
+ } else {
+ return String.format("%." + scale + "f", value / (Math.pow(10, scale)));
+ }
+ }
+
+ @Override
+ public LongWritable valueOf(String value) {
+ if (value == null)
+ current.set(0L);
+ else
+ current.set(getValueIgnoringDecimalPoint(value));
+ return current;
+ }
+
+ @Override
+ public String getValue() {
+ if (scale == 0)
+ return current.toString();
+ else
+ return restoreDecimalPoint(current.get());
+ }
+
+ @Override
+ public LongWritable read(byte[] buf, int offset) {
+ current.set(BytesUtil.readLong(buf, offset, SIZE));
+ return current;
+ }
+
+ @Override
+ public void write(LongWritable v, byte[] buf, int offset) {
+ BytesUtil.writeLong(v == null ? 0 : v.get(), buf, offset, SIZE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
new file mode 100644
index 0000000..6392eca
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.model;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Column Metadata from Source. All name should be uppercase.
+ * <p/>
+ */
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+public class ColumnDesc {
+
+ @JsonProperty("id")
+ private String id;
+ @JsonProperty("name")
+ private String name;
+ @JsonProperty("datatype")
+ private String datatype;
+
+ // parsed from data type
+ private DataType type;
+
+ private TableDesc table;
+ private int zeroBasedIndex = -1;
+ private boolean isNullable = true;
+
+ public ColumnDesc() { // default constructor for Jackson
+ }
+
+ public static ColumnDesc mockup(TableDesc table, int oneBasedColumnIndex, String name, String datatype) {
+ ColumnDesc desc = new ColumnDesc();
+ String id = "" + oneBasedColumnIndex;
+ desc.setId(id);
+ desc.setName(name);
+ desc.setDatatype(datatype);
+ desc.init(table);
+ return desc;
+ }
+
+ public int getZeroBasedIndex() {
+ return zeroBasedIndex;
+ }
+
+ public String getDatatype() {
+ return datatype;
+ }
+
+ public void setDatatype(String datatype) {
+ //logger.info("setting datatype to " + datatype);
+ this.datatype = datatype;
+ type = DataType.getInstance(datatype);
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public TableDesc getTable() {
+ return table;
+ }
+
+ public void setTable(TableDesc table) {
+ this.table = table;
+ }
+
+ public DataType getType() {
+ return type;
+ }
+
+ public String getTypeName() {
+ return type.getName();
+ }
+
+ public int getTypePrecision() {
+ return type.getPrecision();
+ }
+
+ public int getTypeScale() {
+ return type.getScale();
+ }
+
+ public boolean isNullable() {
+ return this.isNullable;
+ }
+
+ public void setNullable(boolean nullable) {
+ this.isNullable = nullable;
+ }
+
+ public void init(TableDesc table) {
+ this.table = table;
+
+ if (name != null)
+ name = name.toUpperCase();
+
+ if (id != null)
+ zeroBasedIndex = Integer.parseInt(id) - 1;
+
+ DataType normalized = DataType.getInstance(datatype);
+ if (normalized == null) {
+ this.setDatatype(null);
+ } else {
+ this.setDatatype(normalized.toString());
+ }
+ }
+
+ public boolean isSameAs(String tableName, String columnName) {
+ return StringUtils.equalsIgnoreCase(table.getIdentity(), tableName) && //
+ StringUtils.equalsIgnoreCase(name, columnName);
+ }
+
+ @Override
+ public String toString() {
+ return "ColumnDesc [name=" + name + ",table=" + table.getIdentity() + "]";
+ }
+}