You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/02/26 10:55:18 UTC

[1/4] kylin git commit: KYLIN-1453 cuboid sharding based on specific column

Repository: kylin
Updated Branches:
  refs/heads/2.x-staging 2e1d2f6b6 -> 294fc7078


KYLIN-1453 cuboid sharding based on specific column


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/294fc707
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/294fc707
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/294fc707

Branch: refs/heads/2.x-staging
Commit: 294fc70785ab009e6b4d8a12cbeb609d46f89a93
Parents: 3f5074e
Author: honma <ho...@ebay.com>
Authored: Thu Feb 25 19:02:28 2016 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Feb 26 17:54:37 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/BytesUtil.java |  18 +-
 .../dict/TupleFilterFunctionTransformer.java    | 170 +++++++++++++++++++
 .../dict/TupleFilterFunctionTranslator.java     | 166 ------------------
 .../kylin/metadata/filter/CaseTupleFilter.java  |   7 +-
 .../metadata/filter/ColumnTupleFilter.java      |  11 +-
 .../metadata/filter/CompareTupleFilter.java     |  10 +-
 .../metadata/filter/ConstantTupleFilter.java    |  16 +-
 .../metadata/filter/DynamicTupleFilter.java     |   9 +-
 .../metadata/filter/ExtractTupleFilter.java     |   7 +-
 .../metadata/filter/FunctionTupleFilter.java    |  16 +-
 .../filter/ITupleFilterTransformer.java         |  23 +++
 .../metadata/filter/ITupleFilterTranslator.java |  26 ---
 .../metadata/filter/LogicalTupleFilter.java     |  10 +-
 .../kylin/metadata/filter/TupleFilter.java      |   5 +-
 .../metadata/filter/TupleFilterSerializer.java  |  25 ++-
 .../common/coprocessor/FilterDecorator.java     |   8 +-
 .../hbase/cube/v2/CubeSegmentScanner.java       |  10 +-
 .../common/coprocessor/FilterBaseTest.java      |  46 ++++-
 .../common/coprocessor/FilterEvaluateTest.java  |   4 +-
 .../common/coprocessor/FilterSerializeTest.java |  26 ++-
 20 files changed, 325 insertions(+), 288 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
index e01ce4f..4e0701c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
@@ -57,7 +57,6 @@ public class BytesUtil {
         return integer;
     }
 
-
     public static long readLong(ByteBuffer buffer, int size) {
         long integer = 0;
         for (int i = 0; i < size; i++) {
@@ -133,11 +132,15 @@ public class BytesUtil {
     // from WritableUtils
     // ============================================================================
 
+
     public static void writeVInt(int i, ByteBuffer out) {
+
         writeVLong(i, out);
+
     }
 
     public static void writeVLong(long i, ByteBuffer out) {
+
         if (i >= -112 && i <= 127) {
             out.put((byte) i);
             return;
@@ -203,6 +206,8 @@ public class BytesUtil {
     }
 
     public static void writeUnsigned(int num, int size, ByteBuffer out) {
+
+
         int mask = 0xff << ((size - 1) * 8);
         for (int i = size; i > 0; i--) {
             int v = (num & mask) >> (i - 1) * 8;
@@ -222,6 +227,7 @@ public class BytesUtil {
     }
 
     public static void writeLong(long num, ByteBuffer out) {
+
         for (int i = 0; i < 8; i++) {
             out.put((byte) num);
             num >>>= 8;
@@ -257,6 +263,8 @@ public class BytesUtil {
         }
         int len = str.length();
         BytesUtil.writeVInt(len, out);
+
+
         for (int i = 0; i < len; i++) {
             out.put((byte) str.charAt(i));
         }
@@ -335,7 +343,7 @@ public class BytesUtil {
             writeVInt(-1, out);
             return;
         }
-        writeVInt(array.length, out);
+        writeVInt(length, out);
         out.put(array, offset, length);
     }
 
@@ -348,7 +356,7 @@ public class BytesUtil {
         in.get(array);
         return array;
     }
-
+    
     public static int peekByteArrayLength(ByteBuffer in) {
         int start = in.position();
         int arrayLen = readVInt(in);
@@ -369,6 +377,7 @@ public class BytesUtil {
         writeVInt(array.length, out);
         byte b_true = (byte) 1;
         byte b_false = (byte) 0;
+
         for (int i = 0; i < array.length; i++) {
             if (array[i])
                 out.put(b_true);
@@ -428,7 +437,4 @@ public class BytesUtil {
         return sb.toString();
     }
 
-    public static void main(String[] args) throws Exception {
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTransformer.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTransformer.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTransformer.java
new file mode 100644
index 0000000..096a28d
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTransformer.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import java.util.Collection;
+import java.util.ListIterator;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.FunctionTupleFilter;
+import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Primitives;
+
+/**
+ * only take effect when the compare filter has function
+ */
+public class TupleFilterFunctionTransformer implements ITupleFilterTransformer {
+    public static final Logger logger = LoggerFactory.getLogger(TupleFilterFunctionTransformer.class);
+
+    private IDictionaryAware dictionaryAware;
+
+    public TupleFilterFunctionTransformer(IDictionaryAware dictionaryAware) {
+        this.dictionaryAware = dictionaryAware;
+    }
+
+    @Override
+    public TupleFilter transform(TupleFilter tupleFilter) {
+        TupleFilter translated = null;
+        if (tupleFilter instanceof CompareTupleFilter) {
+            translated = translateCompareTupleFilter((CompareTupleFilter) tupleFilter);
+            if (translated != null) {
+                logger.info("Translated {" + tupleFilter + "} to IN clause: {" + translated + "}");
+            }
+        } else if (tupleFilter instanceof FunctionTupleFilter) {
+            translated = translateFunctionTupleFilter((FunctionTupleFilter) tupleFilter);
+            if (translated != null) {
+                logger.info("Translated {" + tupleFilter + "} to IN clause: {" + translated + "}");
+            }
+        } else if (tupleFilter instanceof LogicalTupleFilter) {
+            ListIterator<TupleFilter> childIterator = (ListIterator<TupleFilter>) tupleFilter.getChildren().listIterator();
+            while (childIterator.hasNext()) {
+                TupleFilter transformed = transform(childIterator.next());
+                if (transformed != null)
+                    childIterator.set(transformed);
+            }
+        }
+        return translated == null ? tupleFilter : translated;
+    }
+
+    private TupleFilter translateFunctionTupleFilter(FunctionTupleFilter functionTupleFilter) {
+        if (!functionTupleFilter.isValid())
+            return null;
+
+        TblColRef columnRef = functionTupleFilter.getColumn();
+        Dictionary<?> dict = dictionaryAware.getDictionary(columnRef);
+        if (dict == null)
+            return null;
+
+        CompareTupleFilter translated = new CompareTupleFilter(FilterOperatorEnum.IN);
+        translated.addChild(new ColumnTupleFilter(columnRef));
+
+        try {
+            for (int i = dict.getMinId(); i <= dict.getMaxId(); i++) {
+                Object dictVal = dict.getValueFromId(i);
+                if ((Boolean) functionTupleFilter.invokeFunction(dictVal)) {
+                    translated.addChild(new ConstantTupleFilter(dictVal));
+                }
+            }
+        } catch (Exception e) {
+            logger.debug(e.getMessage());
+            return null;
+        }
+        return translated;
+    }
+
+    @SuppressWarnings("unchecked")
+    private TupleFilter translateCompareTupleFilter(CompareTupleFilter compTupleFilter) {
+        if (compTupleFilter.getFunction() == null)
+            return null;
+
+        FunctionTupleFilter functionTupleFilter = compTupleFilter.getFunction();
+        if (!functionTupleFilter.isValid())
+            return null;
+
+        TblColRef columnRef = functionTupleFilter.getColumn();
+        Dictionary<?> dict = dictionaryAware.getDictionary(columnRef);
+        if (dict == null)
+            return null;
+
+        CompareTupleFilter translated = new CompareTupleFilter(FilterOperatorEnum.IN);
+        translated.addChild(new ColumnTupleFilter(columnRef));
+
+        try {
+            Collection<Object> inValues = Lists.newArrayList();
+            for (int i = dict.getMinId(); i <= dict.getMaxId(); i++) {
+                Object dictVal = dict.getValueFromId(i);
+                Object computedVal = functionTupleFilter.invokeFunction(dictVal);
+                Class clazz = Primitives.wrap(computedVal.getClass());
+                Object targetVal = compTupleFilter.getFirstValue();
+                if (Primitives.isWrapperType(clazz))
+                    targetVal = clazz.cast(clazz.getDeclaredMethod("valueOf", String.class).invoke(null, compTupleFilter.getFirstValue()));
+
+                int comp = ((Comparable) computedVal).compareTo(targetVal);
+                boolean compResult = false;
+                switch (compTupleFilter.getOperator()) {
+                case EQ:
+                    compResult = comp == 0;
+                    break;
+                case NEQ:
+                    compResult = comp != 0;
+                    break;
+                case LT:
+                    compResult = comp < 0;
+                    break;
+                case LTE:
+                    compResult = comp <= 0;
+                    break;
+                case GT:
+                    compResult = comp > 0;
+                    break;
+                case GTE:
+                    compResult = comp >= 0;
+                    break;
+                case IN:
+                    compResult = compTupleFilter.getValues().contains(computedVal.toString());
+                    break;
+                case NOTIN:
+                    compResult = !compTupleFilter.getValues().contains(computedVal.toString());
+                    break;
+                default:
+                    break;
+                }
+                if (compResult) {
+                    inValues.add(dictVal);
+                }
+            }
+            translated.addChild(new ConstantTupleFilter(inValues));
+        } catch (Exception e) {
+            logger.debug(e.getMessage());
+            return null;
+        }
+        return translated;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTranslator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTranslator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTranslator.java
deleted file mode 100644
index 1c96dd4..0000000
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTranslator.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.dict;
-
-import java.util.ListIterator;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.FunctionTupleFilter;
-import org.apache.kylin.metadata.filter.ITupleFilterTranslator;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.primitives.Primitives;
-
-/**
- * only take effect when the compare filter has function
- */
-public class TupleFilterFunctionTranslator implements ITupleFilterTranslator {
-    public static final Logger logger = LoggerFactory.getLogger(TupleFilterFunctionTranslator.class);
-
-    private IDictionaryAware dictionaryAware;
-
-    public TupleFilterFunctionTranslator(IDictionaryAware dictionaryAware) {
-        this.dictionaryAware = dictionaryAware;
-    }
-
-    @Override
-    public TupleFilter translate(TupleFilter tupleFilter) {
-        TupleFilter translated = null;
-        if (tupleFilter instanceof CompareTupleFilter) {
-            translated = translateCompareTupleFilter((CompareTupleFilter) tupleFilter);
-            if (translated != null) {
-                logger.info("Translated {" + tupleFilter + "} to IN clause: {" + translated + "}");
-            }
-        } else if (tupleFilter instanceof FunctionTupleFilter) {
-            translated = translateFunctionTupleFilter((FunctionTupleFilter) tupleFilter);
-            if (translated != null) {
-                logger.info("Translated {" + tupleFilter + "} to IN clause: {" + translated + "}");
-            }
-        } else if (tupleFilter instanceof LogicalTupleFilter) {
-            ListIterator<TupleFilter> childIterator = (ListIterator<TupleFilter>) tupleFilter.getChildren().listIterator();
-            while (childIterator.hasNext()) {
-                TupleFilter tempTranslated = translate(childIterator.next());
-                if (tempTranslated != null)
-                    childIterator.set(tempTranslated);
-            }
-        }
-        return translated == null ? tupleFilter : translated;
-    }
-
-    private TupleFilter translateFunctionTupleFilter(FunctionTupleFilter functionTupleFilter) {
-        if (!functionTupleFilter.isValid())
-            return null;
-
-        TblColRef columnRef = functionTupleFilter.getColumn();
-        Dictionary<?> dict = dictionaryAware.getDictionary(columnRef);
-        if (dict == null)
-            return null;
-
-        CompareTupleFilter translated = new CompareTupleFilter(FilterOperatorEnum.IN);
-        translated.addChild(new ColumnTupleFilter(columnRef));
-
-        try {
-            for (int i = dict.getMinId(); i <= dict.getMaxId(); i++) {
-                Object dictVal = dict.getValueFromId(i);
-                if ((Boolean) functionTupleFilter.invokeFunction(dictVal)) {
-                    translated.addChild(new ConstantTupleFilter(dictVal));
-                }
-            }
-        } catch (Exception e) {
-            logger.debug(e.getMessage());
-            return null;
-        }
-        return translated;
-    }
-
-    @SuppressWarnings("unchecked")
-    private TupleFilter translateCompareTupleFilter(CompareTupleFilter compTupleFilter) {
-        if (compTupleFilter.getFunction() == null)
-            return null;
-
-        FunctionTupleFilter functionTupleFilter = compTupleFilter.getFunction();
-        if (!functionTupleFilter.isValid())
-            return null;
-
-        TblColRef columnRef = functionTupleFilter.getColumn();
-        Dictionary<?> dict = dictionaryAware.getDictionary(columnRef);
-        if (dict == null)
-            return null;
-
-        CompareTupleFilter translated = new CompareTupleFilter(FilterOperatorEnum.IN);
-        translated.addChild(new ColumnTupleFilter(columnRef));
-
-        try {
-            for (int i = dict.getMinId(); i <= dict.getMaxId(); i++) {
-                Object dictVal = dict.getValueFromId(i);
-                Object computedVal = functionTupleFilter.invokeFunction(dictVal);
-                Class clazz = Primitives.wrap(computedVal.getClass());
-                Object targetVal = compTupleFilter.getFirstValue();
-                if (Primitives.isWrapperType(clazz))
-                    targetVal = clazz.cast(clazz.getDeclaredMethod("valueOf", String.class).invoke(null, compTupleFilter.getFirstValue()));
-
-                int comp = ((Comparable) computedVal).compareTo(targetVal);
-                boolean compResult = false;
-                switch (compTupleFilter.getOperator()) {
-                case EQ:
-                    compResult = comp == 0;
-                    break;
-                case NEQ:
-                    compResult = comp != 0;
-                    break;
-                case LT:
-                    compResult = comp < 0;
-                    break;
-                case LTE:
-                    compResult = comp <= 0;
-                    break;
-                case GT:
-                    compResult = comp > 0;
-                    break;
-                case GTE:
-                    compResult = comp >= 0;
-                    break;
-                case IN:
-                    compResult = compTupleFilter.getValues().contains(computedVal.toString());
-                    break;
-                case NOTIN:
-                    compResult = !compTupleFilter.getValues().contains(computedVal.toString());
-                    break;
-                default:
-                    break;
-                }
-                if (compResult) {
-                    translated.addChild(new ConstantTupleFilter(dictVal));
-                }
-            }
-        } catch (Exception e) {
-            logger.debug(e.getMessage());
-            return null;
-        }
-        return translated;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
index 6dbc614..2b00d69 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.metadata.filter;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -101,12 +102,12 @@ public class CaseTupleFilter extends TupleFilter {
     }
 
     @Override
-    public byte[] serialize(IFilterCodeSystem<?> cs) {
-        return new byte[0];
+    public void serialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
+        //serialize nothing
     }
 
     @Override
-    public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
+    public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
index 0d2a73d..029233d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
@@ -84,8 +84,7 @@ public class ColumnTupleFilter extends TupleFilter {
     }
 
     @Override
-    public byte[] serialize(IFilterCodeSystem<?> cs) {
-        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+    public void serialize(IFilterCodeSystem<?> cs,ByteBuffer buffer) {
         String table = columnRef.getTable();
         BytesUtil.writeUTFString(table, buffer);
 
@@ -97,17 +96,13 @@ public class ColumnTupleFilter extends TupleFilter {
 
         String dataType = columnRef.getDatatype();
         BytesUtil.writeUTFString(dataType, buffer);
-
-        byte[] result = new byte[buffer.position()];
-        System.arraycopy(buffer.array(), 0, result, 0, buffer.position());
-        return result;
     }
 
     @Override
-    public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
+    public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
+
         TableDesc table = null;
         ColumnDesc column = new ColumnDesc();
-        ByteBuffer buffer = ByteBuffer.wrap(bytes);
 
         String tableName = BytesUtil.readUTFString(buffer);
         if (tableName != null) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
index 248ab3b..fc0bab7 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
@@ -214,23 +214,19 @@ public class CompareTupleFilter extends TupleFilter {
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
-    public byte[] serialize(IFilterCodeSystem cs) {
-        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+    public void serialize(IFilterCodeSystem cs, ByteBuffer buffer) {
         int size = this.dynamicVariables.size();
         BytesUtil.writeVInt(size, buffer);
         for (Map.Entry<String, Object> entry : this.dynamicVariables.entrySet()) {
             BytesUtil.writeUTFString(entry.getKey(), buffer);
             cs.serialize(entry.getValue(), buffer);
         }
-        byte[] result = new byte[buffer.position()];
-        System.arraycopy(buffer.array(), 0, result, 0, buffer.position());
-        return result;
     }
 
     @Override
-    public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
+    public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
+
         this.dynamicVariables.clear();
-        ByteBuffer buffer = ByteBuffer.wrap(bytes);
         int size = BytesUtil.readVInt(buffer);
         for (int i = 0; i < size; i++) {
             String name = BytesUtil.readUTFString(buffer);

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
index 3056a9c..db3eb4f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
@@ -21,8 +21,10 @@ package org.apache.kylin.metadata.filter;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
+import java.util.TreeSet;
 
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.comparators.NullComparator;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
 
@@ -40,7 +42,7 @@ public class ConstantTupleFilter extends TupleFilter {
 
     public ConstantTupleFilter() {
         super(Collections.<TupleFilter> emptyList(), FilterOperatorEnum.CONSTANT);
-        this.constantValues = new HashSet<Object>();
+        this.constantValues = Lists.newArrayList();
     }
 
     public ConstantTupleFilter(Object value) {
@@ -89,22 +91,18 @@ public class ConstantTupleFilter extends TupleFilter {
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
-    public byte[] serialize(IFilterCodeSystem cs) {
-        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+    public void serialize(IFilterCodeSystem cs, ByteBuffer buffer) {
         int size = this.constantValues.size();
         BytesUtil.writeVInt(size, buffer);
         for (Object val : this.constantValues) {
             cs.serialize(val, buffer);
         }
-        byte[] result = new byte[buffer.position()];
-        System.arraycopy(buffer.array(), 0, result, 0, buffer.position());
-        return result;
     }
 
     @Override
-    public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
+    public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
+
         this.constantValues.clear();
-        ByteBuffer buffer = ByteBuffer.wrap(bytes);
         int size = BytesUtil.readVInt(buffer);
         for (int i = 0; i < size; i++) {
             this.constantValues.add(cs.deserialize(buffer));

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
index a482519..d9dc52a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
@@ -69,17 +69,12 @@ public class DynamicTupleFilter extends TupleFilter {
     }
 
     @Override
-    public byte[] serialize(IFilterCodeSystem<?> cs) {
-        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+    public void serialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
         BytesUtil.writeUTFString(variableName, buffer);
-        byte[] result = new byte[buffer.position()];
-        System.arraycopy(buffer.array(), 0, result, 0, buffer.position());
-        return result;
     }
 
     @Override
-    public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
-        ByteBuffer buffer = ByteBuffer.wrap(bytes);
+    public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
         this.variableName = BytesUtil.readUTFString(buffer);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
index 6f7dfaf..591e64b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.metadata.filter;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -113,12 +114,12 @@ public class ExtractTupleFilter extends TupleFilter {
     }
 
     @Override
-    public byte[] serialize(IFilterCodeSystem<?> cs) {
-        return new byte[0];
+    public void serialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
+        //do nothing
     }
 
     @Override
-    public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
+    public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java
index 30bef97..2a08728 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java
@@ -123,32 +123,20 @@ public class FunctionTupleFilter extends TupleFilter {
     }
 
     @Override
-    byte[] serialize(IFilterCodeSystem<?> cs) {
-        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+    void serialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
         BytesUtil.writeUTFString(name, buffer);
         BytesUtil.writeVInt(colPosition, buffer);
         BytesUtil.writeVInt(isValid ? 1 : 0, buffer);
-        BytesUtil.writeByteArray(TupleFilterSerializer.serialize(columnContainerFilter, cs), buffer);
-
-        byte[] result = new byte[buffer.position()];
-        System.arraycopy(buffer.array(), 0, result, 0, buffer.position());
-        return result;
     }
 
     @Override
-    void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
-        ByteBuffer buffer = ByteBuffer.wrap(bytes);
+    public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
 
         this.name = BytesUtil.readUTFString(buffer);
         this.initMethod();
 
         this.colPosition = BytesUtil.readVInt(buffer);
         this.isValid = BytesUtil.readVInt(buffer) == 1;
-
-        byte[] columnFilter = BytesUtil.readByteArray(buffer);
-        if (columnFilter != null) {
-            this.columnContainerFilter = TupleFilterSerializer.deserialize(columnFilter, cs);
-        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ITupleFilterTransformer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ITupleFilterTransformer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ITupleFilterTransformer.java
new file mode 100644
index 0000000..d3d5076
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ITupleFilterTransformer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.filter;
+
+public interface ITupleFilterTransformer {
+    TupleFilter transform(TupleFilter tupleFilter);
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ITupleFilterTranslator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ITupleFilterTranslator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ITupleFilterTranslator.java
deleted file mode 100644
index aed284c..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ITupleFilterTranslator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.filter;
-
-/**
- * Created by dongli on 1/7/16.
- */
-public interface ITupleFilterTranslator {
-    TupleFilter translate(TupleFilter tupleFilter);
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
index 0929cf1..61657fb 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.metadata.filter;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -46,7 +47,6 @@ public class LogicalTupleFilter extends TupleFilter {
         return cloneTuple;
     }
 
-
     //    private TupleFilter reverseNestedNots(TupleFilter filter, int depth) {
     //        if ((filter instanceof LogicalTupleFilter) && (filter.operator == FilterOperatorEnum.NOT)) {
     //            assert (filter.children.size() == 1);
@@ -60,7 +60,6 @@ public class LogicalTupleFilter extends TupleFilter {
     //        }
     //    }
 
-
     @Override
     public TupleFilter reverse() {
         switch (operator) {
@@ -151,12 +150,13 @@ public class LogicalTupleFilter extends TupleFilter {
     }
 
     @Override
-    public byte[] serialize(IFilterCodeSystem<?> cs) {
-        return new byte[0];
+    public void serialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
+        //do nothing
     }
 
     @Override
-    public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
+    public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
index 944ddd0..1e23499 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.metadata.filter;
 
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
@@ -204,9 +205,9 @@ public abstract class TupleFilter {
 
     public abstract Collection<?> getValues();
 
-    abstract byte[] serialize(IFilterCodeSystem<?> cs);
+    abstract void serialize(IFilterCodeSystem<?> cs,ByteBuffer buffer);
 
-    abstract void deserialize(byte[] bytes, IFilterCodeSystem<?> cs);
+    abstract void deserialize(IFilterCodeSystem<?> cs,ByteBuffer buffer);
 
     public static boolean isEvaluableRecursively(TupleFilter filter) {
         if (filter == null)

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
index a394a51..39ccb15 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
@@ -18,12 +18,15 @@
 
 package org.apache.kylin.metadata.filter;
 
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Stack;
 
 import org.apache.kylin.common.util.BytesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * http://eli.thegreenplace.net/2011/09/29/an-interesting-tree-serialization-algorithm-from-dwarf
@@ -33,6 +36,8 @@ import org.apache.kylin.common.util.BytesUtil;
  */
 public class TupleFilterSerializer {
 
+    private static final Logger logger = LoggerFactory.getLogger(TupleFilterSerializer.class);
+
     public interface Decorator {
         TupleFilter onSerialize(TupleFilter filter);
     }
@@ -51,8 +56,18 @@ public class TupleFilterSerializer {
     }
 
     public static byte[] serialize(TupleFilter rootFilter, Decorator decorator, IFilterCodeSystem<?> cs) {
-        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
-        internalSerialize(rootFilter, decorator, buffer, cs);
+        ByteBuffer buffer;
+        int bufferSize = BUFFER_SIZE;
+        while (true) {
+            try {
+                buffer = ByteBuffer.allocate(bufferSize);
+                internalSerialize(rootFilter, decorator, buffer, cs);
+                break;
+            } catch (BufferOverflowException e) {
+                logger.info("Buffer size {} cannot hold the filter, resizing to 4 times", bufferSize);
+                bufferSize *= 4;
+            }
+        }
         byte[] result = new byte[buffer.position()];
         System.arraycopy(buffer.array(), 0, result, 0, buffer.position());
         return result;
@@ -86,10 +101,9 @@ public class TupleFilterSerializer {
         if (flag < 0) {
             BytesUtil.writeVInt(-1, buffer);
         } else {
-            byte[] bytes = filter.serialize(cs);
             int opVal = filter.getOperator().getValue();
             BytesUtil.writeVInt(opVal, buffer);
-            BytesUtil.writeByteArray(bytes, buffer);
+            filter.serialize(cs, buffer);
             BytesUtil.writeVInt(flag, buffer);
         }
     }
@@ -107,8 +121,7 @@ public class TupleFilterSerializer {
 
             // deserialize filter
             TupleFilter filter = createTupleFilter(opVal);
-            byte[] filterBytes = BytesUtil.readByteArray(buffer);
-            filter.deserialize(filterBytes, cs);
+            filter.deserialize(cs, buffer);
 
             if (rootFilter == null) {
                 // push root to stack

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
index 01d3041..5208ba7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
@@ -8,11 +8,11 @@ import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.kv.RowKeyColumnIO;
 import org.apache.kylin.dict.DictCodeSystem;
 import org.apache.kylin.dict.IDictionaryAware;
-import org.apache.kylin.dict.TupleFilterFunctionTranslator;
+import org.apache.kylin.dict.TupleFilterFunctionTransformer;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.ITupleFilterTranslator;
+import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilterSerializer;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -131,8 +131,8 @@ public class FilterDecorator implements TupleFilterSerializer.Decorator {
         if (filter == null)
             return null;
 
-        ITupleFilterTranslator translator = new TupleFilterFunctionTranslator(columnIO.getIDictionaryAware());
-        filter = translator.translate(filter);
+        ITupleFilterTransformer translator = new TupleFilterFunctionTransformer(columnIO.getIDictionaryAware());
+        filter = translator.transform(filter);
 
         // un-evaluatable filter is replaced with TRUE
         if (!filter.isEvaluable()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
index e96c602..3f00566 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
@@ -23,7 +23,7 @@ import org.apache.kylin.cube.gridtable.CubeGridTable;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.gridtable.NotEnoughGTInfoException;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.TupleFilterFunctionTranslator;
+import org.apache.kylin.dict.TupleFilterFunctionTransformer;
 import org.apache.kylin.gridtable.EmptyGTScanner;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
@@ -33,7 +33,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.GTUtil;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.filter.ITupleFilterTranslator;
+import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -63,9 +63,9 @@ public class CubeSegmentScanner implements IGTScanner {
 
         CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
 
-        // translate FunctionTupleFilter to IN clause
-        ITupleFilterTranslator translator = new TupleFilterFunctionTranslator(this.cubeSeg);
-        filter = translator.translate(filter);
+        // transform FunctionTupleFilter to equivalent IN clause
+        ITupleFilterTransformer translator = new TupleFilterFunctionTransformer(this.cubeSeg);
+        filter = translator.transform(filter);
 
         //replace the constant values in filter to dictionary codes 
         TupleFilter gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, mapping.getCuboidDimensionsInGTOrder(), groups);

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterBaseTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterBaseTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterBaseTest.java
index 92e9699..e7ed1a8 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterBaseTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterBaseTest.java
@@ -18,8 +18,12 @@
 
 package org.apache.kylin.storage.hbase.common.coprocessor;
 
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.Collection;
+import java.util.Date;
 import java.util.List;
 import java.util.Random;
 
@@ -39,6 +43,8 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
 
+import com.google.common.collect.Lists;
+
 /**
  * @author xjiang
  * 
@@ -64,7 +70,7 @@ public class FilterBaseTest {
         return groups;
     }
 
-    protected CompareTupleFilter buildCompareFilter(List<TblColRef> groups, int index) {
+    protected CompareTupleFilter buildEQCompareFilter(List<TblColRef> groups, int index) {
         TblColRef column = groups.get(index);
         CompareTupleFilter compareFilter = new CompareTupleFilter(FilterOperatorEnum.EQ);
         ColumnTupleFilter columnFilter = new ColumnTupleFilter(column);
@@ -79,9 +85,31 @@ public class FilterBaseTest {
         return compareFilter;
     }
 
+    protected CompareTupleFilter buildINCompareFilter(TblColRef dateColumn) throws ParseException {
+        CompareTupleFilter compareFilter = new CompareTupleFilter(FilterOperatorEnum.IN);
+        ColumnTupleFilter columnFilter = new ColumnTupleFilter(dateColumn);
+        compareFilter.addChild(columnFilter);
+
+        List<String> inValues = Lists.newArrayList();
+        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
+        Date startDate = simpleDateFormat.parse("1970-01-01");
+        Date endDate = simpleDateFormat.parse("2100-01-01");
+        Calendar start = Calendar.getInstance();
+        start.setTime(startDate);
+        Calendar end = Calendar.getInstance();
+        end.setTime(endDate);
+        for (Date date = start.getTime(); start.before(end); start.add(Calendar.DATE, 1), date = start.getTime()) {
+            inValues.add(simpleDateFormat.format(date));
+        }
+
+        ConstantTupleFilter constantFilter = new ConstantTupleFilter(inValues);
+        compareFilter.addChild(constantFilter);
+        return compareFilter;
+    }
+
     protected TupleFilter buildAndFilter(List<TblColRef> columns) {
-        CompareTupleFilter compareFilter1 = buildCompareFilter(columns, 0);
-        CompareTupleFilter compareFilter2 = buildCompareFilter(columns, 1);
+        CompareTupleFilter compareFilter1 = buildEQCompareFilter(columns, 0);
+        CompareTupleFilter compareFilter2 = buildEQCompareFilter(columns, 1);
         LogicalTupleFilter andFilter = new LogicalTupleFilter(FilterOperatorEnum.AND);
         andFilter.addChild(compareFilter1);
         andFilter.addChild(compareFilter2);
@@ -89,8 +117,8 @@ public class FilterBaseTest {
     }
 
     protected TupleFilter buildOrFilter(List<TblColRef> columns) {
-        CompareTupleFilter compareFilter1 = buildCompareFilter(columns, 0);
-        CompareTupleFilter compareFilter2 = buildCompareFilter(columns, 1);
+        CompareTupleFilter compareFilter1 = buildEQCompareFilter(columns, 0);
+        CompareTupleFilter compareFilter2 = buildEQCompareFilter(columns, 1);
         LogicalTupleFilter logicFilter = new LogicalTupleFilter(FilterOperatorEnum.OR);
         logicFilter.addChild(compareFilter1);
         logicFilter.addChild(compareFilter2);
@@ -105,12 +133,12 @@ public class FilterBaseTest {
         TupleFilter then0 = new ConstantTupleFilter("0");
         caseFilter.addChild(then0);
 
-        TupleFilter when1 = buildCompareFilter(groups, 0);
+        TupleFilter when1 = buildEQCompareFilter(groups, 0);
         caseFilter.addChild(when1);
         TupleFilter then1 = new ConstantTupleFilter("1");
         caseFilter.addChild(then1);
 
-        TupleFilter when2 = buildCompareFilter(groups, 1);
+        TupleFilter when2 = buildEQCompareFilter(groups, 1);
         caseFilter.addChild(when2);
         TupleFilter then2 = new ConstantTupleFilter("2");
         caseFilter.addChild(then2);
@@ -153,9 +181,9 @@ public class FilterBaseTest {
         }
 
         String str1 = f1.toString();
-        System.out.println("f1=" + str1);
+        //System.out.println("f1=" + str1);
         String str2 = f2.toString();
-        System.out.println("f2=" + str2);
+        //System.out.println("f2=" + str2);
         if (!str1.equals(str2)) {
             throw new IllegalStateException("f1=" + str1 + ", f2=" + str2);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterEvaluateTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterEvaluateTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterEvaluateTest.java
index bde8dd2..aac09b7 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterEvaluateTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterEvaluateTest.java
@@ -38,7 +38,7 @@ public class FilterEvaluateTest extends FilterBaseTest {
     @Test
     public void testEvaluate00() {
         List<TblColRef> groups = buildGroups();
-        TupleFilter filter = buildCompareFilter(groups, 0);
+        TupleFilter filter = buildEQCompareFilter(groups, 0);
 
         byte[] bytes = TupleFilterSerializer.serialize(filter, CS);
         TupleFilter newFilter = TupleFilterSerializer.deserialize(bytes, CS);
@@ -54,7 +54,7 @@ public class FilterEvaluateTest extends FilterBaseTest {
     @Test
     public void testEvaluate01() {
         List<TblColRef> groups = buildGroups();
-        TupleFilter filter = buildCompareFilter(groups, 1);
+        TupleFilter filter = buildEQCompareFilter(groups, 1);
 
         byte[] bytes = TupleFilterSerializer.serialize(filter, CS);
         TupleFilter newFilter = TupleFilterSerializer.deserialize(bytes, CS);

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc707/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterSerializeTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterSerializeTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterSerializeTest.java
index 8bf8ecb..0a21598 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterSerializeTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterSerializeTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.storage.hbase.common.coprocessor;
 
+import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -40,7 +41,7 @@ public class FilterSerializeTest extends FilterBaseTest {
     @Test
     public void testSerialize01() {
         List<TblColRef> groups = buildGroups();
-        TupleFilter filter = buildCompareFilter(groups, 0);
+        TupleFilter filter = buildEQCompareFilter(groups, 0);
 
         byte[] bytes = TupleFilterSerializer.serialize(filter, CS);
         TupleFilter newFilter = TupleFilterSerializer.deserialize(bytes, CS);
@@ -51,7 +52,7 @@ public class FilterSerializeTest extends FilterBaseTest {
     @Test
     public void testSerialize02() {
         List<TblColRef> groups = buildGroups();
-        TupleFilter filter = buildCompareFilter(groups, 1);
+        TupleFilter filter = buildEQCompareFilter(groups, 1);
 
         byte[] bytes = TupleFilterSerializer.serialize(filter, CS);
         TupleFilter newFilter = TupleFilterSerializer.deserialize(bytes, CS);
@@ -88,7 +89,7 @@ public class FilterSerializeTest extends FilterBaseTest {
         TblColRef colRef = new TblColRef(column);
         List<TblColRef> groups = new ArrayList<TblColRef>();
         groups.add(colRef);
-        TupleFilter filter = buildCompareFilter(groups, 0);
+        TupleFilter filter = buildEQCompareFilter(groups, 0);
 
         byte[] bytes = TupleFilterSerializer.serialize(filter, CS);
         TupleFilter newFilter = TupleFilterSerializer.deserialize(bytes, CS);
@@ -103,7 +104,7 @@ public class FilterSerializeTest extends FilterBaseTest {
         TblColRef colRef = new TblColRef(column);
         List<TblColRef> groups = new ArrayList<TblColRef>();
         groups.add(colRef);
-        TupleFilter filter = buildCompareFilter(groups, 0);
+        TupleFilter filter = buildEQCompareFilter(groups, 0);
 
         byte[] bytes = TupleFilterSerializer.serialize(filter, CS);
         TupleFilter newFilter = TupleFilterSerializer.deserialize(bytes, CS);
@@ -122,7 +123,7 @@ public class FilterSerializeTest extends FilterBaseTest {
         TblColRef colRef = new TblColRef(column);
         List<TblColRef> groups = new ArrayList<TblColRef>();
         groups.add(colRef);
-        TupleFilter filter = buildCompareFilter(groups, 0);
+        TupleFilter filter = buildEQCompareFilter(groups, 0);
 
         byte[] bytes = TupleFilterSerializer.serialize(filter, CS);
         TupleFilter newFilter = TupleFilterSerializer.deserialize(bytes, CS);
@@ -140,7 +141,7 @@ public class FilterSerializeTest extends FilterBaseTest {
         TblColRef colRef = new TblColRef(column);
         List<TblColRef> groups = new ArrayList<TblColRef>();
         groups.add(colRef);
-        TupleFilter filter = buildCompareFilter(groups, 0);
+        TupleFilter filter = buildEQCompareFilter(groups, 0);
 
         byte[] bytes = TupleFilterSerializer.serialize(filter, CS);
         TupleFilter newFilter = TupleFilterSerializer.deserialize(bytes, CS);
@@ -203,6 +204,19 @@ public class FilterSerializeTest extends FilterBaseTest {
     }
 
     @Test
+    public void testSerialize14() throws ParseException {
+        List<TblColRef> groups = buildGroups();
+        TupleFilter filter = buildINCompareFilter(groups.get(0));
+
+        long start = System.currentTimeMillis();
+        byte[] bytes = TupleFilterSerializer.serialize(filter, CS);
+        System.out.println("Size of serialized filter " + bytes.length + ", serialize time: " + (System.currentTimeMillis() - start));
+        TupleFilter newFilter = TupleFilterSerializer.deserialize(bytes, CS);
+
+        compareFilter(filter, newFilter);
+    }
+
+    @Test
     public void testDynamic() {
         final CompareTupleFilter compareDynamicFilter = buildCompareDynamicFilter(buildGroups());
 


[2/4] kylin git commit: KYLIN-1366 simply metadata version binding

Posted by ma...@apache.org.
KYLIN-1366 simply metadata version binding

kylin 1366


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ab9d5791
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ab9d5791
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ab9d5791

Branch: refs/heads/2.x-staging
Commit: ab9d57914a119b5b426d2bfae712ca55ff35c92a
Parents: 4c08ded
Author: honma <ho...@ebay.com>
Authored: Wed Feb 24 15:58:07 2016 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Feb 26 17:54:37 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfigBase.java   |  8 --------
 .../java/org/apache/kylin/common/KylinVersion.java |  2 +-
 .../common/persistence/RootPersistentEntity.java   | 17 +++++------------
 .../java/org/apache/kylin/cube/CubeInstance.java   |  5 ++---
 .../java/org/apache/kylin/cube/model/CubeDesc.java |  2 +-
 .../java/org/apache/kylin/dict/DictionaryInfo.java |  4 ++--
 .../apache/kylin/dict/lookup/SnapshotManager.java  |  2 +-
 .../apache/kylin/metadata/model/DataModelDesc.java |  2 +-
 .../kylin/metadata/project/ProjectInstance.java    |  2 +-
 .../kylin/metadata/project/ProjectManager.java     |  2 +-
 .../kylin/storage/hybrid/HybridInstance.java       |  2 +-
 .../org/apache/kylin/invertedindex/IIInstance.java |  2 +-
 .../kylin/rest/controller/CubeController.java      |  4 ++--
 .../storage/hbase/util/ExtendCubeToHybridCLI.java  |  4 ++--
 14 files changed, 21 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 5f9983a..7707684 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -61,14 +61,6 @@ public class KylinConfigBase implements Serializable {
         return kylinHome;
     }
 
-    /**
-     * @see KylinVersion
-     * @return current kylin version
-     */
-    public static String getKylinVersion(){
-        return KylinVersion.getCurrentVersion();
-    }
-
     // ============================================================================
 
     private volatile Properties properties = new Properties();

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java b/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
index d711b38..42cf237 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java
@@ -21,7 +21,7 @@ package org.apache.kylin.common;
  *
  * @since 2.1
  */
-class KylinVersion {
+public class KylinVersion {
     /**
      * Require MANUAL updating kylin version per ANY upgrading.
      */

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java b/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
index 327ddcc..c46abe7 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
@@ -26,11 +26,11 @@ import java.util.Date;
 import java.util.UUID;
 
 import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.kylin.common.KylinVersion;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kylin.common.KylinConfig;
 
 /**
  * Marks the root entity of JSON persistence. Unit of read, write, cache, and
@@ -80,14 +80,14 @@ abstract public class RootPersistentEntity implements AclEntity, Serializable {
      * For example: 2.1
      */
     @JsonProperty("version")
-    protected String version;
+    protected String version = KylinVersion.getCurrentVersion();
 
     public String getVersion() {
-      return version;
+        return version;
     }
 
     public void setVersion(String version) {
-      this.version = version;
+        this.version = version;
     }
 
     public String getUuid() {
@@ -110,14 +110,7 @@ abstract public class RootPersistentEntity implements AclEntity, Serializable {
         this.lastModified = lastModified;
     }
 
-    /**
-     * Update entity's "model_version" with current kylin version and "uuid" with random UUID
-     *
-     * @see KylinConfig#getKylinVersion()
-     * @see UUID#randomUUID()
-     */
-    public void updateVersionAndRandomUuid() {
-        setVersion(KylinConfig.getKylinVersion());
+    public void updateRandomUuid() {
         setUuid(UUID.randomUUID().toString());
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index c26e2d2..2862d4f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -63,7 +63,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
         cubeInstance.setCreateTimeUTC(System.currentTimeMillis());
         cubeInstance.setSegments(new ArrayList<CubeSegment>());
         cubeInstance.setStatus(RealizationStatusEnum.DISABLED);
-        cubeInstance.updateVersionAndRandomUuid();
+        cubeInstance.updateRandomUuid();
 
         return cubeInstance;
     }
@@ -427,10 +427,9 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
         newCube.setConfig(cubeInstance.getConfig());
         newCube.setStatus(cubeInstance.getStatus());
         newCube.setOwner(cubeInstance.getOwner());
-        newCube.setVersion(cubeInstance.getVersion());
         newCube.setCost(cubeInstance.getCost());
         newCube.setCreateTimeUTC(System.currentTimeMillis());
-        newCube.updateVersionAndRandomUuid();
+        newCube.updateRandomUuid();
         return newCube;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index de73399..49f70f8 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -998,7 +998,7 @@ public class CubeDesc extends RootPersistentEntity {
         newCubeDesc.setStorageType(cubeDesc.getStorageType());
         newCubeDesc.setAggregationGroups(cubeDesc.getAggregationGroups());
         newCubeDesc.setConfig(cubeDesc.getConfig());
-        newCubeDesc.updateVersionAndRandomUuid();
+        newCubeDesc.updateRandomUuid();
         return newCubeDesc;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
index 8e41abf..4fba59a 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
@@ -53,7 +53,7 @@ public class DictionaryInfo extends RootPersistentEntity {
 
     public DictionaryInfo(String sourceTable, String sourceColumn, int sourceColumnIndex, String dataType, TableSignature input) {
 
-        this.updateVersionAndRandomUuid();
+        this.updateRandomUuid();
 
         this.sourceTable = sourceTable;
         this.sourceColumn = sourceColumn;
@@ -64,7 +64,7 @@ public class DictionaryInfo extends RootPersistentEntity {
 
     public DictionaryInfo(DictionaryInfo other) {
 
-        this.updateVersionAndRandomUuid();
+        this.updateRandomUuid();
 
         this.sourceTable = other.sourceTable;
         this.sourceColumn = other.sourceColumn;

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index ccdc79d..53bf60d 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -87,7 +87,7 @@ public class SnapshotManager {
 
     public SnapshotTable buildSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException {
         SnapshotTable snapshot = new SnapshotTable(table);
-        snapshot.updateVersionAndRandomUuid();
+        snapshot.updateRandomUuid();
 
         String dup = checkDupByInfo(snapshot);
         if (dup != null) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
index c042138..1647707 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
@@ -348,7 +348,7 @@ public class DataModelDesc extends RootPersistentEntity {
         newDataModelDesc.setLookups(dataModelDesc.getLookups());
         newDataModelDesc.setMetrics(dataModelDesc.getMetrics());
         newDataModelDesc.setPartitionDesc(PartitionDesc.getCopyOf(dataModelDesc.getPartitionDesc()));
-        newDataModelDesc.updateVersionAndRandomUuid();
+        newDataModelDesc.updateRandomUuid();
         return newDataModelDesc;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
index 20741ee..e0ed3d9 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
@@ -92,7 +92,7 @@ public class ProjectInstance extends RootPersistentEntity {
     public static ProjectInstance create(String name, String owner, String description, List<RealizationEntry> realizationEntries, List<String> models) {
         ProjectInstance projectInstance = new ProjectInstance();
 
-        projectInstance.updateVersionAndRandomUuid();
+        projectInstance.updateRandomUuid();
         projectInstance.setName(name);
         projectInstance.setOwner(owner);
         projectInstance.setDescription(description);

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index f73239c..b6e99b3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -197,7 +197,7 @@ public class ProjectManager {
             project.setDescription(newDesc);
 
             if (project.getUuid() == null)
-                project.updateVersionAndRandomUuid();
+                project.updateRandomUuid();
 
             updateProject(project);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index 090efce..251f7c9 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -85,7 +85,7 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
         hybridInstance.setConfig(config);
         hybridInstance.setName(name);
         hybridInstance.setRealizationEntries(realizationEntries);
-        hybridInstance.updateVersionAndRandomUuid();
+        hybridInstance.updateRandomUuid();
 
         return hybridInstance;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
index 117bc02..9b56c88 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
@@ -60,7 +60,7 @@ public class IIInstance extends RootPersistentEntity implements IRealization, IB
         iii.setDescName(iiDesc.getName());
         iii.setCreateTimeUTC(System.currentTimeMillis());
         iii.setStatus(RealizationStatusEnum.DISABLED);
-        iii.updateVersionAndRandomUuid();
+        iii.updateRandomUuid();
 
         return iii;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 4741cef..9afa750 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -308,7 +308,7 @@ public class CubeController extends BasicController {
                 isStreamingCube = true;
                 newStreamingConfig = streamingConfigs.get(0).clone();
                 newStreamingConfig.setName(newCubeName + "_STREAMING");
-                newStreamingConfig.updateVersionAndRandomUuid();
+                newStreamingConfig.updateRandomUuid();
                 newStreamingConfig.setLastModified(0);
                 newStreamingConfig.setCubeName(newCubeName);
                 try {
@@ -327,7 +327,7 @@ public class CubeController extends BasicController {
                         newKafkaConfig = kafkaConfig.clone();
                         newKafkaConfig.setName(newStreamingConfig.getName());
                         newKafkaConfig.setLastModified(0);
-                        newKafkaConfig.updateVersionAndRandomUuid();
+                        newKafkaConfig.updateRandomUuid();
                     }
                 } catch (IOException e) {
                     throw new InternalErrorException("Failed to get kafka config info. ", e);

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab9d5791/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
index 1f46369..c55bde4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java
@@ -147,7 +147,7 @@ public class ExtendCubeToHybridCLI {
         CubeInstance newCubeInstance = CubeInstance.getCopyOf(cubeInstance);
         newCubeInstance.setName(newCubeInstanceName);
         newCubeInstance.setDescName(newCubeDescName);
-        newCubeInstance.updateVersionAndRandomUuid();
+        newCubeInstance.updateRandomUuid();
         Iterator<CubeSegment> segmentIterator = newCubeInstance.getSegments().iterator();
         CubeSegment currentSeg = null;
         while (segmentIterator.hasNext()) {
@@ -170,7 +170,7 @@ public class ExtendCubeToHybridCLI {
         // create new cube for old segments
         CubeDesc newCubeDesc = CubeDesc.getCopyOf(cubeDesc);
         newCubeDesc.setName(newCubeDescName);
-        newCubeDesc.updateVersionAndRandomUuid();
+        newCubeDesc.updateRandomUuid();
         newCubeDesc.init(kylinConfig, metadataManager.getAllTablesMap());
         newCubeDesc.setPartitionDateEnd(partitionDate);
         newCubeDesc.calculateSignature();


[4/4] kylin git commit: KYLIN-1340 CubeMetaExtractor support streaming case and skip segments

Posted by ma...@apache.org.
KYLIN-1340 CubeMetaExtractor support streaming case and skip segments


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4c08ded6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4c08ded6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4c08ded6

Branch: refs/heads/2.x-staging
Commit: 4c08ded63f78aad93eefa9814d48af2486725967
Parents: 2e1d2f6
Author: honma <ho...@ebay.com>
Authored: Wed Feb 24 15:45:38 2016 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Feb 26 17:54:37 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/job/CubeMetaExtractor.java | 327 +++++++++++++++++++
 .../kylin/common/persistence/ResourceTool.java  |   2 +-
 .../engine/streaming/StreamingManager.java      | 100 +-----
 .../kylin/source/kafka/KafkaConfigManager.java  |  47 +--
 .../kylin/source/kafka/config/KafkaConfig.java  |   4 +-
 .../storage/hbase/util/CubeMetaExtractor.java   | 284 ----------------
 6 files changed, 345 insertions(+), 419 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java b/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
new file mode 100644
index 0000000..527ef0a
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceTool;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.job.dao.ExecutableDao;
+import org.apache.kylin.job.dao.ExecutablePO;
+import org.apache.kylin.job.exception.PersistentException;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metadata.project.RealizationEntry;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationRegistry;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * extract cube related info for debugging/distributing purpose
+ * TODO: deal with II case
+ */
+public class CubeMetaExtractor extends AbstractApplication {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeMetaExtractor.class);
+
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("Specify which cube to extract").create("cube");
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_HYBRID = OptionBuilder.withArgName("hybrid").hasArg().isRequired(false).withDescription("Specify which hybrid to extract").create("hybrid");
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_PROJECT = OptionBuilder.withArgName("project").hasArg().isRequired(false).withDescription("Specify realizations in which project to extract").create("project");
+
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_INCLUDE_SEGMENTS = OptionBuilder.withArgName("includeSegments").hasArg().isRequired(false).withDescription("set this to true if want extract the segments info, related dicts, etc. Default true").create("includeSegments");
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_INCLUDE_JOB = OptionBuilder.withArgName("includeJobs").hasArg().isRequired(false).withDescription("set this to true if want to extract job info/outputs too. Default true").create("includeJobs");
+
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_DEST = OptionBuilder.withArgName("destDir").hasArg().isRequired(false).withDescription("specify the dest dir to save the related metadata").create("destDir");
+
+    private Options options = null;
+    private KylinConfig kylinConfig;
+    private MetadataManager metadataManager;
+    private ProjectManager projectManager;
+    private HybridManager hybridManager;
+    private CubeManager cubeManager;
+    private StreamingManager streamingManager;
+    private KafkaConfigManager kafkaConfigManager;
+    private CubeDescManager cubeDescManager;
+    private ExecutableDao executableDao;
+    private RealizationRegistry realizationRegistry;
+
+    boolean includeSegments;
+    boolean includeJobs;
+
+    List<String> requiredResources = Lists.newArrayList();
+    List<String> optionalResources = Lists.newArrayList();
+    List<CubeInstance> cubesToTrimAndSave = Lists.newArrayList();//these cubes needs to be saved skipping segments
+
+    public CubeMetaExtractor() {
+        options = new Options();
+
+        OptionGroup realizationOrProject = new OptionGroup();
+        realizationOrProject.addOption(OPTION_CUBE);
+        realizationOrProject.addOption(OPTION_PROJECT);
+        realizationOrProject.addOption(OPTION_HYBRID);
+        realizationOrProject.setRequired(true);
+
+        options.addOptionGroup(realizationOrProject);
+        options.addOption(OPTION_INCLUDE_SEGMENTS);
+        options.addOption(OPTION_INCLUDE_JOB);
+        options.addOption(OPTION_DEST);
+
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        includeSegments = optionsHelper.hasOption(OPTION_INCLUDE_SEGMENTS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_SEGMENTS)) : true;
+        includeJobs = optionsHelper.hasOption(OPTION_INCLUDE_JOB) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_JOB)) : true;
+        String dest = null;
+        if (optionsHelper.hasOption(OPTION_DEST)) {
+            dest = optionsHelper.getOptionValue(OPTION_DEST);
+        }
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        metadataManager = MetadataManager.getInstance(kylinConfig);
+        projectManager = ProjectManager.getInstance(kylinConfig);
+        hybridManager = HybridManager.getInstance(kylinConfig);
+        cubeManager = CubeManager.getInstance(kylinConfig);
+        cubeDescManager = CubeDescManager.getInstance(kylinConfig);
+        streamingManager = StreamingManager.getInstance(kylinConfig);
+        kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
+        executableDao = ExecutableDao.getInstance(kylinConfig);
+        realizationRegistry = RealizationRegistry.getInstance(kylinConfig);
+
+        if (optionsHelper.hasOption(OPTION_PROJECT)) {
+            ProjectInstance projectInstance = projectManager.getProject(optionsHelper.getOptionValue(OPTION_PROJECT));
+            if (projectInstance == null) {
+                throw new IllegalArgumentException("Project " + optionsHelper.getOptionValue(OPTION_PROJECT) + " does not exist");
+            }
+            addRequired(requiredResources, ProjectInstance.concatResourcePath(projectInstance.getName()));
+            List<RealizationEntry> realizationEntries = projectInstance.getRealizationEntries();
+            for (RealizationEntry realizationEntry : realizationEntries) {
+                retrieveResourcePath(getRealization(realizationEntry));
+            }
+        } else if (optionsHelper.hasOption(OPTION_CUBE)) {
+            String cubeName = optionsHelper.getOptionValue(OPTION_CUBE);
+            IRealization realization;
+
+            if ((realization = cubeManager.getRealization(cubeName)) != null) {
+                retrieveResourcePath(realization);
+            } else {
+                throw new IllegalArgumentException("No cube found with name of " + cubeName);
+            }
+        } else if (optionsHelper.hasOption(OPTION_HYBRID)) {
+            String hybridName = optionsHelper.getOptionValue(OPTION_HYBRID);
+            IRealization realization;
+
+            if ((realization = hybridManager.getRealization(hybridName)) != null) {
+                retrieveResourcePath(realization);
+            } else {
+                throw new IllegalArgumentException("No hybrid found with name of" + hybridName);
+            }
+        }
+
+        executeExtraction(dest);
+    }
+
+    private void executeExtraction(String dest) {
+        logger.info("The resource paths going to be extracted:");
+        for (String s : requiredResources) {
+            logger.info(s + "(required)");
+        }
+        for (String s : optionalResources) {
+            logger.info(s + "(optional)");
+        }
+        for (CubeInstance cube : cubesToTrimAndSave) {
+            logger.info("Cube {} will be trimmed and extracted", cube);
+        }
+
+        if (dest == null) {
+            logger.info("Dest is not set, exit directly without extracting");
+        } else {
+            try {
+                ResourceStore src = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
+                ResourceStore dst = ResourceStore.getStore(KylinConfig.createInstanceFromUri(dest));
+
+                for (String path : requiredResources) {
+                    ResourceTool.copyR(src, dst, path);
+                }
+
+                for (String path : optionalResources) {
+                    try {
+                        ResourceTool.copyR(src, dst, path);
+                    } catch (Exception e) {
+                        logger.warn("Exception when copying optional resource {}. May be caused by resource missing. Ignore it.");
+                    }
+                }
+
+                for (CubeInstance cube : cubesToTrimAndSave) {
+                    CubeInstance trimmedCube = CubeInstance.getCopyOf(cube);
+                    trimmedCube.getSegments().clear();
+                    trimmedCube.setUuid(cube.getUuid());
+                    dst.putResource(trimmedCube.getResourcePath(), trimmedCube, CubeManager.CUBE_SERIALIZER);
+                }
+
+            } catch (IOException e) {
+                throw new RuntimeException("IOException", e);
+            }
+        }
+    }
+
+    private IRealization getRealization(RealizationEntry realizationEntry) {
+        return realizationRegistry.getRealization(realizationEntry.getType(), realizationEntry.getRealization());
+    }
+
+    private void dealWithStreaming(CubeInstance cube) {
+        for (StreamingConfig streamingConfig : streamingManager.listAllStreaming()) {
+            if (streamingConfig.getCubeName() != null && streamingConfig.getCubeName().equalsIgnoreCase(cube.getName())) {
+                requiredResources.add(StreamingConfig.concatResourcePath(streamingConfig.getName()));
+                requiredResources.add(KafkaConfig.concatResourcePath(streamingConfig.getName()));
+            }
+        }
+    }
+
+    private void retrieveResourcePath(IRealization realization) {
+
+        logger.info("Deal with realization {} of type {}", realization.getName(), realization.getType());
+
+        if (realization instanceof CubeInstance) {
+            CubeInstance cube = (CubeInstance) realization;
+            String descName = cube.getDescName();
+            CubeDesc cubeDesc = cubeDescManager.getCubeDesc(descName);
+            String modelName = cubeDesc.getModelName();
+            DataModelDesc modelDesc = metadataManager.getDataModelDesc(modelName);
+
+            dealWithStreaming(cube);
+
+            for (String tableName : modelDesc.getAllTables()) {
+                addRequired(requiredResources, TableDesc.concatResourcePath(tableName));
+                addOptional(optionalResources, TableDesc.concatExdResourcePath(tableName));
+            }
+
+            addRequired(requiredResources, DataModelDesc.concatResourcePath(modelDesc.getName()));
+            addRequired(requiredResources, CubeDesc.concatResourcePath(cubeDesc.getName()));
+
+            if (includeSegments) {
+                addRequired(requiredResources, CubeInstance.concatResourcePath(cube.getName()));
+                for (CubeSegment segment : cube.getSegments(SegmentStatusEnum.READY)) {
+                    for (String dictPat : segment.getDictionaryPaths()) {
+                        addRequired(requiredResources, dictPat);
+                    }
+                    for (String snapshotPath : segment.getSnapshotPaths()) {
+                        addRequired(requiredResources, snapshotPath);
+                    }
+                    addRequired(requiredResources, segment.getStatisticsResourcePath());
+
+                    if (includeJobs) {
+                        String lastJobId = segment.getLastBuildJobID();
+                        if (!StringUtils.isEmpty(lastJobId)) {
+                            throw new RuntimeException("No job exist for segment :" + segment);
+                        } else {
+                            try {
+                                ExecutablePO executablePO = executableDao.getJob(lastJobId);
+                                addRequired(requiredResources, ExecutableDao.pathOfJob(lastJobId));
+                                addRequired(requiredResources, ExecutableDao.pathOfJobOutput(lastJobId));
+                                for (ExecutablePO task : executablePO.getTasks()) {
+                                    addRequired(requiredResources, ExecutableDao.pathOfJob(task.getUuid()));
+                                    addRequired(requiredResources, ExecutableDao.pathOfJobOutput(task.getUuid()));
+                                }
+                            } catch (PersistentException e) {
+                                throw new RuntimeException("PersistentException", e);
+                            }
+                        }
+                    } else {
+                        logger.info("Job info will not be extracted");
+                    }
+                }
+            } else {
+                if (includeJobs) {
+                    logger.warn("It's useless to set includeJobs to true when includeSegments is set to false");
+                }
+
+                cubesToTrimAndSave.add(cube);
+            }
+        } else if (realization instanceof HybridInstance) {
+            HybridInstance hybridInstance = (HybridInstance) realization;
+            addRequired(requiredResources, HybridInstance.concatResourcePath(hybridInstance.getName()));
+            for (IRealization iRealization : hybridInstance.getRealizations()) {
+                if (iRealization.getType() != RealizationType.CUBE) {
+                    throw new RuntimeException("Hybrid " + iRealization.getName() + " contains non cube child " + iRealization.getName() + " with type " + iRealization.getType());
+                }
+                retrieveResourcePath(iRealization);
+            }
+        } else if (realization instanceof IIInstance) {
+            throw new IllegalStateException("Does not support extract II instance or hybrid that contains II");
+        } else {
+            throw new IllegalStateException("Unknown realization type: " + realization.getType());
+        }
+    }
+
+    private void addRequired(List<String> resourcePaths, String record) {
+        logger.info("adding required resource {}", record);
+        resourcePaths.add(record);
+    }
+
+    private void addOptional(List<String> optionalPaths, String record) {
+        logger.info("adding optional resource {}", record);
+        optionalPaths.add(record);
+    }
+
+    public static void main(String[] args) {
+        CubeMetaExtractor extractor = new CubeMetaExtractor();
+        extractor.execute(args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
index 3b8e0c1..489e45a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
@@ -115,7 +115,7 @@ public class ResourceTool {
         copyR(src, dst, "/");
     }
 
-    private static void copyR(ResourceStore src, ResourceStore dst, String path) throws IOException {
+    public static void copyR(ResourceStore src, ResourceStore dst, String path) throws IOException {
         ArrayList<String> children = src.listResources(path);
 
         // case of resource (not a folder)

http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index af04a11..e0b086d 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -34,24 +34,14 @@
 
 package org.apache.kylin.engine.streaming;
 
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
@@ -60,13 +50,6 @@ import org.apache.kylin.metadata.MetadataConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
 /**
  */
 public class StreamingManager {
@@ -121,18 +104,6 @@ public class StreamingManager {
         }
     }
 
-    private String formatStreamingConfigPath(String name) {
-        return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
-    }
-
-    private String formatStreamingOutputPath(String streaming, int partition) {
-        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json";
-    }
-
-    private String formatStreamingOutputPath(String streaming, List<Integer> partitions) {
-        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
-    }
-
     public StreamingConfig getStreamingConfig(String name) {
         return streamingMap.get(name);
     }
@@ -214,77 +185,12 @@ public class StreamingManager {
         if (streamingMap.containsKey(streamingConfig.getName()))
             throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists");
 
-        String path = formatStreamingConfigPath(streamingConfig.getName());
+        String path  = StreamingConfig.concatResourcePath(streamingConfig.getName());
         getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER);
         streamingMap.put(streamingConfig.getName(), streamingConfig);
         return streamingConfig;
     }
 
-    public long getOffset(String streaming, int shard) {
-        final String resPath = formatStreamingOutputPath(streaming, shard);
-        InputStream inputStream = null; 
-        try {
-            final RawResource res = getStore().getResource(resPath);
-            if (res == null) {
-                return 0;
-            } else {
-            	inputStream = res.inputStream;
-                final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
-                return Long.parseLong(br.readLine());
-            }
-        } catch (Exception e) {
-            logger.error("error get offset, path:" + resPath, e);
-            throw new RuntimeException("error get offset, path:" + resPath, e);
-        } finally {
-        	IOUtils.closeQuietly(inputStream);
-        }
-    }
-
-    public void updateOffset(String streaming, int shard, long offset) {
-        Preconditions.checkArgument(offset >= 0, "offset cannot be smaller than 0");
-        final String resPath = formatStreamingOutputPath(streaming, shard);
-        try {
-            getStore().putResource(resPath, new ByteArrayInputStream(Long.valueOf(offset).toString().getBytes()), getStore().getResourceTimestamp(resPath));
-        } catch (IOException e) {
-            logger.error("error update offset, path:" + resPath, e);
-            throw new RuntimeException("error update offset, path:" + resPath, e);
-        }
-    }
-
-    public Map<Integer, Long> getOffset(String streaming, List<Integer> partitions) {
-        Collections.sort(partitions);
-        final String resPath = formatStreamingOutputPath(streaming, partitions);
-        InputStream inputStream = null;
-        try {
-        	RawResource res = getStore().getResource(resPath);
-        	if (res == null)
-        		return Collections.emptyMap();
-        	
-        	inputStream = res.inputStream;
-            final HashMap<Integer, Long> result = mapper.readValue(inputStream, mapType);
-            return result;
-        } catch (IOException e) {
-            logger.error("error get offset, path:" + resPath, e);
-            throw new RuntimeException("error get offset, path:" + resPath, e);
-        } finally {
-        	IOUtils.closeQuietly(inputStream);
-        }
-    }
-
-    public void updateOffset(String streaming, HashMap<Integer, Long> offset) {
-        List<Integer> partitions = Lists.newLinkedList(offset.keySet());
-        Collections.sort(partitions);
-        final String resPath = formatStreamingOutputPath(streaming, partitions);
-        try {
-            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            mapper.writeValue(baos, offset);
-            getStore().putResource(resPath, new ByteArrayInputStream(baos.toByteArray()), getStore().getResourceTimestamp(resPath));
-        } catch (IOException e) {
-            logger.error("error update offset, path:" + resPath, e);
-            throw new RuntimeException("error update offset, path:" + resPath, e);
-        }
-    }
-
     private StreamingConfig loadStreamingConfigAt(String path) throws IOException {
         ResourceStore store = getStore();
         StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER);
@@ -324,8 +230,4 @@ public class StreamingManager {
 
         logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)");
     }
-
-    private final ObjectMapper mapper = new ObjectMapper();
-    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index ac20fc3..1d07f23 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -36,7 +36,6 @@ package org.apache.kylin.source.kafka;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -52,11 +51,6 @@ import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-
 /**
  */
 public class KafkaConfigManager {
@@ -87,7 +81,7 @@ public class KafkaConfigManager {
         return ResourceStore.getStore(this.config);
     }
 
-    public static KafkaConfigManager getInstance(KylinConfig config){
+    public static KafkaConfigManager getInstance(KylinConfig config) {
         KafkaConfigManager r = CACHE.get(config);
         if (r != null) {
             return r;
@@ -98,16 +92,16 @@ public class KafkaConfigManager {
             if (r != null) {
                 return r;
             }
-            try{
-            r = new KafkaConfigManager(config);
-            CACHE.put(config, r);
-            if (CACHE.size() > 1) {
-                logger.warn("More than one KafkaConfigManager singleton exist");
+            try {
+                r = new KafkaConfigManager(config);
+                CACHE.put(config, r);
+                if (CACHE.size() > 1) {
+                    logger.warn("More than one KafkaConfigManager singleton exist");
+                }
+                return r;
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to init KafkaConfigManager from " + config, e);
             }
-            return r;
-        } catch (IOException e) {
-            throw new IllegalStateException("Failed to init KafkaConfigManager from " + config, e);
-        }
         }
     }
 
@@ -125,7 +119,7 @@ public class KafkaConfigManager {
     public KafkaConfig reloadKafkaConfigLocal(String name) throws IOException {
 
         // Save Source
-        String path = KafkaConfig.getKafkaResourcePath(name);
+        String path = KafkaConfig.concatResourcePath(name);
 
         // Reload the KafkaConfig
         KafkaConfig ndesc = loadKafkaConfigAt(path);
@@ -135,14 +129,6 @@ public class KafkaConfigManager {
         return ndesc;
     }
 
-    private boolean checkExistence(String name) {
-        return true;
-    }
-
-    private String formatStreamingConfigPath(String name) {
-        return ResourceStore.KAFKA_RESOURCE_ROOT + "/" + name + ".json";
-    }
-
     public boolean createKafkaConfig(String name, KafkaConfig config) {
 
         if (config == null || StringUtils.isEmpty(config.getName())) {
@@ -152,7 +138,7 @@ public class KafkaConfigManager {
         if (kafkaMap.containsKey(config.getName()))
             throw new IllegalArgumentException("KafkaConfig '" + config.getName() + "' already exists");
         try {
-            getStore().putResource(formatStreamingConfigPath(name), config, KafkaConfig.SERIALIZER);
+            getStore().putResource(KafkaConfig.concatResourcePath(name), config, KafkaConfig.SERIALIZER);
             kafkaMap.put(config.getName(), config);
             return true;
         } catch (IOException e) {
@@ -185,7 +171,7 @@ public class KafkaConfigManager {
 
     private KafkaConfig loadKafkaConfigAt(String path) throws IOException {
         ResourceStore store = getStore();
-        KafkaConfig kafkaConfig = store.getResource(path, KafkaConfig.class,KAFKA_SERIALIZER );
+        KafkaConfig kafkaConfig = store.getResource(path, KafkaConfig.class, KAFKA_SERIALIZER);
 
         if (StringUtils.isBlank(kafkaConfig.getName())) {
             throw new IllegalStateException("KafkaConfig name must not be blank");
@@ -193,7 +179,6 @@ public class KafkaConfigManager {
         return kafkaConfig;
     }
 
-
     public KafkaConfig getKafkaConfig(String name) {
         return kafkaMap.get(name);
     }
@@ -203,7 +188,7 @@ public class KafkaConfigManager {
             throw new IllegalArgumentException();
         }
 
-        String path = formatStreamingConfigPath(kafkaConfig.getName());
+        String path = KafkaConfig.concatResourcePath(kafkaConfig.getName());
         getStore().putResource(path, kafkaConfig, KafkaConfig.SERIALIZER);
     }
 
@@ -214,7 +199,6 @@ public class KafkaConfigManager {
         kafkaMap.remove(kafkaConfig.getName());
     }
 
-
     private void reloadAllKafkaConfig() throws IOException {
         ResourceStore store = getStore();
         logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));
@@ -245,7 +229,4 @@ public class KafkaConfigManager {
         logger.debug("Loaded " + kafkaMap.size() + " KafkaConfig(s)");
     }
 
-    private final ObjectMapper mapper = new ObjectMapper();
-    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 100ca2d..1dce844 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -85,10 +85,10 @@ public class KafkaConfig extends RootPersistentEntity {
     private String parserProperties;
     
     public String getResourcePath() {
-        return getKafkaResourcePath(name);
+        return concatResourcePath(name);
     }
 
-    public static String getKafkaResourcePath(String streamingName) {
+    public static String concatResourcePath(String streamingName) {
         return ResourceStore.KAFKA_RESOURCE_ROOT + "/" + streamingName + MetadataConstants.FILE_SURFIX;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java
deleted file mode 100644
index 680dff8..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.util;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.OptionGroup;
-import org.apache.commons.cli.Options;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceTool;
-import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.cube.CubeDescManager;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.invertedindex.IIDescManager;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.job.dao.ExecutableDao;
-import org.apache.kylin.job.dao.ExecutablePO;
-import org.apache.kylin.job.exception.PersistentException;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.project.RealizationEntry;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationRegistry;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-import org.apache.kylin.storage.hybrid.HybridManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * extract cube related info for debugging/distributing purpose
- * TODO: deal with II case, deal with Streaming case
- */
-public class CubeMetaExtractor extends AbstractApplication {
-
-    private static final Logger logger = LoggerFactory.getLogger(CubeMetaExtractor.class);
-
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("Specify which cube to extract").create("cube");
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_HYBRID = OptionBuilder.withArgName("hybrid").hasArg().isRequired(false).withDescription("Specify which hybrid to extract").create("hybrid");
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_PROJECT = OptionBuilder.withArgName("project").hasArg().isRequired(false).withDescription("Specify realizations in which project to extract").create("project");
-
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_INCLUDE_SEGMENTS = OptionBuilder.withArgName("includeSegments").hasArg().isRequired(false).withDescription("set this to true if want extract the segments info, related dicts, etc.").create("includeSegments");
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_INCLUDE_JOB = OptionBuilder.withArgName("includeJobs").hasArg().isRequired(false).withDescription("set this to true if want to extract job info/outputs too").create("includeJobs");
-
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_DEST = OptionBuilder.withArgName("destDir").hasArg().isRequired(false).withDescription("specify the dest dir to save the related metadata").create("destDir");
-
-    private Options options = null;
-    private KylinConfig kylinConfig;
-    private MetadataManager metadataManager;
-    private ProjectManager projectManager;
-    private HybridManager hybridManager;
-    private CubeManager cubeManager;
-    private CubeDescManager cubeDescManager;
-    private IIManager iiManager;
-    private IIDescManager iiDescManager;
-    private ExecutableDao executableDao;
-    RealizationRegistry realizationRegistry;
-
-    public CubeMetaExtractor() {
-        options = new Options();
-
-        OptionGroup realizationOrProject = new OptionGroup();
-        realizationOrProject.addOption(OPTION_CUBE);
-        realizationOrProject.addOption(OPTION_PROJECT);
-        realizationOrProject.addOption(OPTION_HYBRID);
-        realizationOrProject.setRequired(true);
-
-        options.addOptionGroup(realizationOrProject);
-        options.addOption(OPTION_INCLUDE_SEGMENTS);
-        options.addOption(OPTION_INCLUDE_JOB);
-        options.addOption(OPTION_DEST);
-
-    }
-
-    @Override
-    protected Options getOptions() {
-        return options;
-    }
-
-    @Override
-    protected void execute(OptionsHelper optionsHelper) throws Exception {
-        boolean includeSegments = optionsHelper.hasOption(OPTION_INCLUDE_SEGMENTS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_SEGMENTS)) : true;
-        boolean includeJobs = optionsHelper.hasOption(OPTION_INCLUDE_JOB) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_JOB)) : true;
-        String dest = null;
-        if (optionsHelper.hasOption(OPTION_DEST)) {
-            dest = optionsHelper.getOptionValue(OPTION_DEST);
-        }
-
-        if (!includeSegments) {
-            throw new RuntimeException("Does not support skip segments for now");
-        }
-
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        metadataManager = MetadataManager.getInstance(kylinConfig);
-        projectManager = ProjectManager.getInstance(kylinConfig);
-        hybridManager = HybridManager.getInstance(kylinConfig);
-        cubeManager = CubeManager.getInstance(kylinConfig);
-        cubeDescManager = CubeDescManager.getInstance(kylinConfig);
-        iiManager = IIManager.getInstance(kylinConfig);
-        iiDescManager = IIDescManager.getInstance(kylinConfig);
-        executableDao = ExecutableDao.getInstance(kylinConfig);
-        realizationRegistry = RealizationRegistry.getInstance(kylinConfig);
-
-        List<String> requiredResources = Lists.newArrayList();
-        List<String> optionalResources = Lists.newArrayList();
-
-        if (optionsHelper.hasOption(OPTION_PROJECT)) {
-            ProjectInstance projectInstance = projectManager.getProject(optionsHelper.getOptionValue(OPTION_PROJECT));
-            if (projectInstance == null) {
-                throw new IllegalArgumentException("Project " + optionsHelper.getOptionValue(OPTION_PROJECT) + " does not exist");
-            }
-            addRequired(requiredResources, ProjectInstance.concatResourcePath(projectInstance.getName()));
-            List<RealizationEntry> realizationEntries = projectInstance.getRealizationEntries();
-            for (RealizationEntry realizationEntry : realizationEntries) {
-                retrieveResourcePath(getRealization(realizationEntry), includeSegments, includeJobs, requiredResources, optionalResources);
-            }
-        } else if (optionsHelper.hasOption(OPTION_CUBE)) {
-            String cubeName = optionsHelper.getOptionValue(OPTION_CUBE);
-            IRealization realization;
-
-            if ((realization = cubeManager.getRealization(cubeName)) != null) {
-                retrieveResourcePath(realization, includeSegments, includeJobs, requiredResources, optionalResources);
-            } else {
-                throw new IllegalArgumentException("No cube found with name of " + cubeName);
-            }
-        } else if (optionsHelper.hasOption(OPTION_HYBRID)) {
-            String hybridName = optionsHelper.getOptionValue(OPTION_HYBRID);
-            IRealization realization;
-
-            if ((realization = hybridManager.getRealization(hybridName)) != null) {
-                retrieveResourcePath(realization, includeSegments, includeJobs, requiredResources, optionalResources);
-            } else {
-                throw new IllegalArgumentException("No hybrid found with name of" + hybridName);
-            }
-        }
-
-        executeExtraction(requiredResources, optionalResources, dest);
-    }
-
-    private void executeExtraction(List<String> requiredPaths, List<String> optionalPaths, String dest) {
-        logger.info("The resource paths going to be extracted:");
-        for (String s : requiredPaths) {
-            logger.info(s + "(required)");
-        }
-        for (String s : optionalPaths) {
-            logger.info(s + "(optional)");
-        }
-
-        if (dest == null) {
-            logger.info("Dest is not set, exit directly without extracting");
-        } else {
-            try {
-                ResourceTool.copy(KylinConfig.getInstanceFromEnv(), KylinConfig.createInstanceFromUri(dest));
-            } catch (IOException e) {
-                throw new RuntimeException("IOException", e);
-            }
-        }
-    }
-
-    private IRealization getRealization(RealizationEntry realizationEntry) {
-        return realizationRegistry.getRealization(realizationEntry.getType(), realizationEntry.getRealization());
-    }
-
-    private void retrieveResourcePath(IRealization realization, boolean includeSegments, boolean includeJobs, List<String> requiredResources, List<String> optionalResources) {
-
-        logger.info("Deal with realization {} of type {}", realization.getName(), realization.getType());
-
-        if (realization instanceof CubeInstance) {
-            CubeInstance cube = (CubeInstance) realization;
-            String descName = cube.getDescName();
-            CubeDesc cubeDesc = cubeDescManager.getCubeDesc(descName);
-            String modelName = cubeDesc.getModelName();
-            DataModelDesc modelDesc = metadataManager.getDataModelDesc(modelName);
-
-            for (String tableName : modelDesc.getAllTables()) {
-                addRequired(requiredResources, TableDesc.concatResourcePath(tableName));
-                addOptional(optionalResources, TableDesc.concatExdResourcePath(tableName));
-            }
-
-            addRequired(requiredResources, DataModelDesc.concatResourcePath(modelDesc.getName()));
-            addRequired(requiredResources, CubeDesc.concatResourcePath(cubeDesc.getName()));
-
-            if (includeSegments) {
-                addRequired(requiredResources, CubeInstance.concatResourcePath(cube.getName()));
-                for (CubeSegment segment : cube.getSegments()) {
-                    for (String dictPat : segment.getDictionaryPaths()) {
-                        addRequired(requiredResources, dictPat);
-                    }
-                    for (String snapshotPath : segment.getSnapshotPaths()) {
-                        addRequired(requiredResources, snapshotPath);
-                    }
-                    addRequired(requiredResources, segment.getStatisticsResourcePath());
-
-                    if (includeJobs) {
-                        String lastJobId = segment.getLastBuildJobID();
-                        if (!StringUtils.isEmpty(lastJobId)) {
-                            logger.warn("No job exist for segment {}", segment);
-                        } else {
-                            try {
-                                ExecutablePO executablePO = executableDao.getJob(lastJobId);
-                                addRequired(requiredResources, ExecutableDao.pathOfJob(lastJobId));
-                                addRequired(requiredResources, ExecutableDao.pathOfJobOutput(lastJobId));
-                                for (ExecutablePO task : executablePO.getTasks()) {
-                                    addRequired(requiredResources, ExecutableDao.pathOfJob(task.getUuid()));
-                                    addRequired(requiredResources, ExecutableDao.pathOfJobOutput(task.getUuid()));
-                                }
-                            } catch (PersistentException e) {
-                                throw new RuntimeException("PersistentException", e);
-                            }
-                        }
-                    } else {
-                        logger.info("Job info will not be extracted");
-                    }
-                }
-            } else {
-                if (includeJobs) {
-                    logger.warn("It's useless to set includeJobs to true when includeSegments is set to false");
-                }
-
-                throw new IllegalStateException("Does not support skip segments now");
-            }
-        } else if (realization instanceof HybridInstance) {
-            HybridInstance hybridInstance = (HybridInstance) realization;
-            addRequired(requiredResources, HybridInstance.concatResourcePath(hybridInstance.getName()));
-            for (IRealization iRealization : hybridInstance.getRealizations()) {
-                retrieveResourcePath(iRealization, includeSegments, includeJobs, requiredResources, optionalResources);
-            }
-        } else if (realization instanceof IIInstance) {
-            throw new IllegalStateException("Does not support extract II instance or hybrid that contains II");
-        } else {
-            throw new IllegalStateException("Unknown realization type: " + realization.getType());
-        }
-    }
-
-    private void addRequired(List<String> resourcePaths, String record) {
-        logger.info("adding required resource {}", record);
-        resourcePaths.add(record);
-    }
-
-    private void addOptional(List<String> optionalPaths, String record) {
-        logger.info("adding optional resource {}", record);
-        optionalPaths.add(record);
-    }
-
-    public static void main(String[] args) {
-        CubeMetaExtractor extractor = new CubeMetaExtractor();
-        extractor.execute(args);
-    }
-}


[3/4] kylin git commit: minor, fix CI occasionally fail issue

Posted by ma...@apache.org.
minor, fix CI occasionally fail issue


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3f5074ee
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3f5074ee
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3f5074ee

Branch: refs/heads/2.x-staging
Commit: 3f5074ee1568d5b0ba50d70d5c35319cd8223cc9
Parents: ab9d579
Author: honma <ho...@ebay.com>
Authored: Thu Feb 25 14:13:48 2016 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Feb 26 17:54:37 2016 +0800

----------------------------------------------------------------------
 .../dict/TupleFilterDictionaryTranslater.java   | 165 ------------------
 .../dict/TupleFilterFunctionTranslator.java     | 166 +++++++++++++++++++
 .../metadata/filter/FunctionTupleFilter.java    |   5 +-
 .../metadata/filter/function/BuiltInMethod.java |  33 ++--
 .../cache/AbstractCacheFledgedQuery.java        |  32 +---
 .../kylin/storage/cache/DynamicCacheTest.java   |  15 +-
 .../kylin/storage/cache/StaticCacheTest.java    |  19 ++-
 .../kylin/storage/hbase/ITStorageTest.java      |  11 +-
 .../common/coprocessor/FilterDecorator.java     |   4 +-
 .../hbase/cube/v2/CubeSegmentScanner.java       |   4 +-
 10 files changed, 226 insertions(+), 228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterDictionaryTranslater.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterDictionaryTranslater.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterDictionaryTranslater.java
deleted file mode 100644
index 9ef360d..0000000
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterDictionaryTranslater.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.dict;
-
-import com.google.common.primitives.Primitives;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.FunctionTupleFilter;
-import org.apache.kylin.metadata.filter.ITupleFilterTranslator;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ListIterator;
-
-/**
- * Created by dongli on 1/7/16.
- */
-public class TupleFilterDictionaryTranslater implements ITupleFilterTranslator {
-    public static final Logger logger = LoggerFactory.getLogger(TupleFilterDictionaryTranslater.class);
-
-    private IDictionaryAware dictionaryAware;
-
-    public TupleFilterDictionaryTranslater(IDictionaryAware dictionaryAware) {
-        this.dictionaryAware = dictionaryAware;
-    }
-
-    @Override
-    public TupleFilter translate(TupleFilter tupleFilter) {
-        TupleFilter translated = null;
-        if (tupleFilter instanceof CompareTupleFilter) {
-            translated = translateCompareTupleFilter((CompareTupleFilter) tupleFilter);
-            if (translated != null) {
-                logger.info("Translated {" + tupleFilter + "} to IN clause: {" + translated + "}");
-            }
-        } else if (tupleFilter instanceof FunctionTupleFilter) {
-            translated = translateFunctionTupleFilter((FunctionTupleFilter) tupleFilter);
-            if (translated != null) {
-                logger.info("Translated {" + tupleFilter + "} to IN clause: {" + translated + "}");
-            }
-        } else if (tupleFilter instanceof LogicalTupleFilter) {
-            ListIterator<TupleFilter> childIterator = (ListIterator<TupleFilter>) tupleFilter.getChildren().listIterator();
-            while (childIterator.hasNext()) {
-                TupleFilter tempTranslated = translate(childIterator.next());
-                if (tempTranslated != null)
-                    childIterator.set(tempTranslated);
-            }
-        }
-        return translated == null ? tupleFilter : translated;
-    }
-
-    private TupleFilter translateFunctionTupleFilter(FunctionTupleFilter functionTupleFilter) {
-        if (!functionTupleFilter.isValid())
-            return null;
-
-        TblColRef columnRef = functionTupleFilter.getColumn();
-        Dictionary<?> dict = dictionaryAware.getDictionary(columnRef);
-        if (dict == null)
-            return null;
-
-        CompareTupleFilter translated = new CompareTupleFilter(FilterOperatorEnum.IN);
-        translated.addChild(new ColumnTupleFilter(columnRef));
-
-        try {
-            for (int i = dict.getMinId(); i <= dict.getMaxId(); i++) {
-                Object dictVal = dict.getValueFromId(i);
-                if ((Boolean) functionTupleFilter.invokeFunction(dictVal)) {
-                    translated.addChild(new ConstantTupleFilter(dictVal));
-                }
-            }
-        } catch (Exception e) {
-            logger.debug(e.getMessage());
-            return null;
-        }
-        return translated;
-    }
-
-    @SuppressWarnings("unchecked")
-    private TupleFilter translateCompareTupleFilter(CompareTupleFilter compTupleFilter) {
-        if (compTupleFilter.getFunction() == null)
-            return null;
-
-        FunctionTupleFilter functionTupleFilter = compTupleFilter.getFunction();
-        if (!functionTupleFilter.isValid())
-            return null;
-
-        TblColRef columnRef = functionTupleFilter.getColumn();
-        Dictionary<?> dict = dictionaryAware.getDictionary(columnRef);
-        if (dict == null)
-            return null;
-
-        CompareTupleFilter translated = new CompareTupleFilter(FilterOperatorEnum.IN);
-        translated.addChild(new ColumnTupleFilter(columnRef));
-
-        try {
-            for (int i = dict.getMinId(); i <= dict.getMaxId(); i++) {
-                Object dictVal = dict.getValueFromId(i);
-                Object computedVal = functionTupleFilter.invokeFunction(dictVal);
-                Class clazz = Primitives.wrap(computedVal.getClass());
-                Object targetVal = compTupleFilter.getFirstValue();
-                if (Primitives.isWrapperType(clazz))
-                    targetVal = clazz.cast(clazz.getDeclaredMethod("valueOf", String.class).invoke(null, compTupleFilter.getFirstValue()));
-
-                int comp = ((Comparable) computedVal).compareTo(targetVal);
-                boolean compResult = false;
-                switch (compTupleFilter.getOperator()) {
-                    case EQ:
-                        compResult = comp == 0;
-                        break;
-                    case NEQ:
-                        compResult = comp != 0;
-                        break;
-                    case LT:
-                        compResult = comp < 0;
-                        break;
-                    case LTE:
-                        compResult = comp <= 0;
-                        break;
-                    case GT:
-                        compResult = comp > 0;
-                        break;
-                    case GTE:
-                        compResult = comp >= 0;
-                        break;
-                    case IN:
-                        compResult = compTupleFilter.getValues().contains(computedVal.toString());
-                        break;
-                    case NOTIN:
-                        compResult = !compTupleFilter.getValues().contains(computedVal.toString());
-                        break;
-                    default:
-                        break;
-                }
-                if (compResult) {
-                    translated.addChild(new ConstantTupleFilter(dictVal));
-                }
-            }
-        } catch (Exception e) {
-            logger.debug(e.getMessage());
-            return null;
-        }
-        return translated;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTranslator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTranslator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTranslator.java
new file mode 100644
index 0000000..1c96dd4
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TupleFilterFunctionTranslator.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import java.util.ListIterator;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.FunctionTupleFilter;
+import org.apache.kylin.metadata.filter.ITupleFilterTranslator;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.primitives.Primitives;
+
+/**
+ * only take effect when the compare filter has function
+ */
+public class TupleFilterFunctionTranslator implements ITupleFilterTranslator {
+    public static final Logger logger = LoggerFactory.getLogger(TupleFilterFunctionTranslator.class);
+
+    private IDictionaryAware dictionaryAware;
+
+    public TupleFilterFunctionTranslator(IDictionaryAware dictionaryAware) {
+        this.dictionaryAware = dictionaryAware;
+    }
+
+    @Override
+    public TupleFilter translate(TupleFilter tupleFilter) {
+        TupleFilter translated = null;
+        if (tupleFilter instanceof CompareTupleFilter) {
+            translated = translateCompareTupleFilter((CompareTupleFilter) tupleFilter);
+            if (translated != null) {
+                logger.info("Translated {" + tupleFilter + "} to IN clause: {" + translated + "}");
+            }
+        } else if (tupleFilter instanceof FunctionTupleFilter) {
+            translated = translateFunctionTupleFilter((FunctionTupleFilter) tupleFilter);
+            if (translated != null) {
+                logger.info("Translated {" + tupleFilter + "} to IN clause: {" + translated + "}");
+            }
+        } else if (tupleFilter instanceof LogicalTupleFilter) {
+            ListIterator<TupleFilter> childIterator = (ListIterator<TupleFilter>) tupleFilter.getChildren().listIterator();
+            while (childIterator.hasNext()) {
+                TupleFilter tempTranslated = translate(childIterator.next());
+                if (tempTranslated != null)
+                    childIterator.set(tempTranslated);
+            }
+        }
+        return translated == null ? tupleFilter : translated;
+    }
+
+    private TupleFilter translateFunctionTupleFilter(FunctionTupleFilter functionTupleFilter) {
+        if (!functionTupleFilter.isValid())
+            return null;
+
+        TblColRef columnRef = functionTupleFilter.getColumn();
+        Dictionary<?> dict = dictionaryAware.getDictionary(columnRef);
+        if (dict == null)
+            return null;
+
+        CompareTupleFilter translated = new CompareTupleFilter(FilterOperatorEnum.IN);
+        translated.addChild(new ColumnTupleFilter(columnRef));
+
+        try {
+            for (int i = dict.getMinId(); i <= dict.getMaxId(); i++) {
+                Object dictVal = dict.getValueFromId(i);
+                if ((Boolean) functionTupleFilter.invokeFunction(dictVal)) {
+                    translated.addChild(new ConstantTupleFilter(dictVal));
+                }
+            }
+        } catch (Exception e) {
+            logger.debug(e.getMessage());
+            return null;
+        }
+        return translated;
+    }
+
+    @SuppressWarnings("unchecked")
+    private TupleFilter translateCompareTupleFilter(CompareTupleFilter compTupleFilter) {
+        if (compTupleFilter.getFunction() == null)
+            return null;
+
+        FunctionTupleFilter functionTupleFilter = compTupleFilter.getFunction();
+        if (!functionTupleFilter.isValid())
+            return null;
+
+        TblColRef columnRef = functionTupleFilter.getColumn();
+        Dictionary<?> dict = dictionaryAware.getDictionary(columnRef);
+        if (dict == null)
+            return null;
+
+        CompareTupleFilter translated = new CompareTupleFilter(FilterOperatorEnum.IN);
+        translated.addChild(new ColumnTupleFilter(columnRef));
+
+        try {
+            for (int i = dict.getMinId(); i <= dict.getMaxId(); i++) {
+                Object dictVal = dict.getValueFromId(i);
+                Object computedVal = functionTupleFilter.invokeFunction(dictVal);
+                Class clazz = Primitives.wrap(computedVal.getClass());
+                Object targetVal = compTupleFilter.getFirstValue();
+                if (Primitives.isWrapperType(clazz))
+                    targetVal = clazz.cast(clazz.getDeclaredMethod("valueOf", String.class).invoke(null, compTupleFilter.getFirstValue()));
+
+                int comp = ((Comparable) computedVal).compareTo(targetVal);
+                boolean compResult = false;
+                switch (compTupleFilter.getOperator()) {
+                case EQ:
+                    compResult = comp == 0;
+                    break;
+                case NEQ:
+                    compResult = comp != 0;
+                    break;
+                case LT:
+                    compResult = comp < 0;
+                    break;
+                case LTE:
+                    compResult = comp <= 0;
+                    break;
+                case GT:
+                    compResult = comp > 0;
+                    break;
+                case GTE:
+                    compResult = comp >= 0;
+                    break;
+                case IN:
+                    compResult = compTupleFilter.getValues().contains(computedVal.toString());
+                    break;
+                case NOTIN:
+                    compResult = !compTupleFilter.getValues().contains(computedVal.toString());
+                    break;
+                default:
+                    break;
+                }
+                if (compResult) {
+                    translated.addChild(new ConstantTupleFilter(dictVal));
+                }
+            }
+        } catch (Exception e) {
+            logger.debug(e.getMessage());
+            return null;
+        }
+        return translated;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java
index 15fcb72..30bef97 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FunctionTupleFilter.java
@@ -35,9 +35,6 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Primitives;
 
-/**
- * Created by dongli on 11/11/15.
- */
 public class FunctionTupleFilter extends TupleFilter {
     public static final Logger logger = LoggerFactory.getLogger(FunctionTupleFilter.class);
 
@@ -79,7 +76,7 @@ public class FunctionTupleFilter extends TupleFilter {
         if (columnContainerFilter instanceof ColumnTupleFilter)
             methodParams.set(colPosition, (Serializable) input);
         else if (columnContainerFilter instanceof FunctionTupleFilter)
-            methodParams.set(colPosition, (Serializable) ((FunctionTupleFilter) columnContainerFilter).invokeFunction((Serializable) input));
+            methodParams.set(colPosition, (Serializable) ((FunctionTupleFilter) columnContainerFilter).invokeFunction(input));
         return method.invoke(null, (Object[]) (methodParams.toArray()));
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/core-metadata/src/main/java/org/apache/kylin/metadata/filter/function/BuiltInMethod.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/function/BuiltInMethod.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/function/BuiltInMethod.java
index b927d8d..7b241cc 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/function/BuiltInMethod.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/function/BuiltInMethod.java
@@ -18,29 +18,21 @@
 
 package org.apache.kylin.metadata.filter.function;
 
-import com.google.common.collect.ImmutableMap;
-import org.apache.commons.lang3.reflect.MethodUtils;
-
 import java.lang.reflect.Method;
 import java.util.regex.Pattern;
 
-/**
- * Created by dongli on 11/13/15.
- */
+import org.apache.commons.lang3.reflect.MethodUtils;
+
+import com.google.common.collect.ImmutableMap;
+
 public enum BuiltInMethod {
-    UPPER(BuiltInMethod.class, "upper", String.class),
-    LOWER(BuiltInMethod.class, "lower", String.class),
-    SUBSTRING(BuiltInMethod.class, "substring", String.class, int.class, int.class),
-    CHAR_LENGTH(BuiltInMethod.class, "charLength", String.class),
-    LIKE(BuiltInMethod.class, "like", String.class, String.class),
-    INITCAP(BuiltInMethod.class, "initcap", String.class);
+    UPPER(BuiltInMethod.class, "upper", String.class), LOWER(BuiltInMethod.class, "lower", String.class), SUBSTRING(BuiltInMethod.class, "substring", String.class, int.class, int.class), CHAR_LENGTH(BuiltInMethod.class, "charLength", String.class), LIKE(BuiltInMethod.class, "like", String.class, String.class), INITCAP(BuiltInMethod.class, "initcap", String.class);
     public final Method method;
 
     public static final ImmutableMap<String, BuiltInMethod> MAP;
 
     static {
-        final ImmutableMap.Builder<String, BuiltInMethod> builder =
-                ImmutableMap.builder();
+        final ImmutableMap.Builder<String, BuiltInMethod> builder = ImmutableMap.builder();
         for (BuiltInMethod value : BuiltInMethod.values()) {
             if (value.method != null) {
                 builder.put(value.name(), value);
@@ -70,22 +62,22 @@ public enum BuiltInMethod {
         for (int i = 0; i < len; i++) {
             char curCh = s.charAt(i);
             final int c = (int) curCh;
-            if (start) {  // curCh is whitespace or first character of word.
+            if (start) { // curCh is whitespace or first character of word.
                 if (c > 47 && c < 58) { // 0-9
                     start = false;
-                } else if (c > 64 && c < 91) {  // A-Z
+                } else if (c > 64 && c < 91) { // A-Z
                     start = false;
-                } else if (c > 96 && c < 123) {  // a-z
+                } else if (c > 96 && c < 123) { // a-z
                     start = false;
                     curCh = (char) (c - 32); // Uppercase this character
                 }
                 // else {} whitespace
-            } else {  // Inside of a word or white space after end of word.
+            } else { // Inside of a word or white space after end of word.
                 if (c > 47 && c < 58) { // 0-9
                     // noop
-                } else if (c > 64 && c < 91) {  // A-Z
+                } else if (c > 64 && c < 91) { // A-Z
                     curCh = (char) (c + 32); // Lowercase this character
-                } else if (c > 96 && c < 123) {  // a-z
+                } else if (c > 96 && c < 123) { // a-z
                     // noop
                 } else { // whitespace
                     start = true;
@@ -116,5 +108,4 @@ public enum BuiltInMethod {
         return s.toLowerCase();
     }
 
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
index 6ba76c4..a5ca800 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
@@ -5,9 +5,6 @@ import net.sf.ehcache.CacheManager;
 import net.sf.ehcache.Element;
 import net.sf.ehcache.Status;
 import net.sf.ehcache.config.CacheConfiguration;
-import net.sf.ehcache.config.Configuration;
-import net.sf.ehcache.config.PersistenceConfiguration;
-import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metadata.realization.StreamSQLDigest;
@@ -21,6 +18,7 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class AbstractCacheFledgedQuery implements IStorageQuery, TeeTupleItrListener {
     private static final Logger logger = LoggerFactory.getLogger(AbstractCacheFledgedQuery.class);
+
     private static final String storageCacheTemplate = "StorageCache";
 
     protected static CacheManager CACHE_MANAGER;
@@ -37,31 +35,6 @@ public abstract class AbstractCacheFledgedQuery implements IStorageQuery, TeeTup
         CACHE_MANAGER = cacheManager;
     }
 
-    /**
-     * This method is only useful non-spring injected test cases.
-     * When Kylin is normally ran as a spring app CACHE_MANAGER will be injected.
-     * and the configuration for cache lies in server/src/main/resources/ehcache.xml
-     * 
-     * the cache named "StorageCache" acts like a template for each realization to
-     * create its own cache.
-     */
-    private static void initCacheManger() {
-        Configuration conf = new Configuration();
-        conf.setMaxBytesLocalHeap("128M");
-        CACHE_MANAGER = CacheManager.create(conf);
-
-        //a fake template for test cases
-        Cache storageCache = new Cache(new CacheConfiguration(storageCacheTemplate, 0).//
-                memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU).//
-                eternal(false).//
-                timeToIdleSeconds(86400).//
-                diskExpiryThreadIntervalSeconds(0).//
-                //maxBytesLocalHeap(10, MemoryUnit.MEGABYTES).//
-                persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.NONE)));
-
-        CACHE_MANAGER.addCacheIfAbsent(storageCache);
-    }
-
     protected StreamSQLResult getStreamSQLResult(StreamSQLDigest streamSQLDigest) {
 
         Cache cache = CACHE_MANAGER.getCache(this.underlyingStorage.getStorageUUID());
@@ -87,8 +60,7 @@ public abstract class AbstractCacheFledgedQuery implements IStorageQuery, TeeTup
 
     private void makeCacheIfNecessary(String storageUUID) {
         if (CACHE_MANAGER == null || (!(CACHE_MANAGER.getStatus().equals(Status.STATUS_ALIVE)))) {
-            logger.warn("CACHE_MANAGER is not provided or not alive");
-            initCacheManger();
+            throw new RuntimeException("CACHE_MANAGER is not provided or not alive");
         }
 
         if (CACHE_MANAGER.getCache(storageUUID) == null) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
index 53e5f5b..3193bbb 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
@@ -5,6 +5,7 @@ import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import net.sf.ehcache.CacheManager;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.DateFormat;
@@ -20,6 +21,7 @@ import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
 import org.apache.kylin.storage.ICachableStorageQuery;
 import org.apache.kylin.storage.StorageContext;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -32,10 +34,21 @@ import com.google.common.collect.Ranges;
  */
 public class DynamicCacheTest {
 
+    private static CacheManager cacheManager;
+    
     @BeforeClass
-    public static void setup() {
+    public static void setupResource() throws Exception {
         System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
         KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cache.threshold.duration", "0");
+        
+        cacheManager = CacheManager.newInstance("../server/src/main/resources/ehcache-test.xml");
+        AbstractCacheFledgedQuery.setCacheManager(cacheManager);
+    }
+
+    @AfterClass
+    public static void tearDownResource() {
+        cacheManager.shutdown();
+        AbstractCacheFledgedQuery.setCacheManager(null);
     }
 
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
index 182091b..b1665df 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
@@ -6,6 +6,7 @@ import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import net.sf.ehcache.CacheManager;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.IdentityUtils;
 import org.apache.kylin.metadata.filter.TupleFilter;
@@ -20,6 +21,7 @@ import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
 import org.apache.kylin.storage.ICachableStorageQuery;
 import org.apache.kylin.storage.StorageContext;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -30,12 +32,25 @@ import com.google.common.collect.Range;
 /**
  */
 public class StaticCacheTest {
+
+    private static CacheManager cacheManager;
+
     @BeforeClass
-    public static void setup() {
+    public static void setupResource() throws Exception {
+
         System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
-        
+
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         config.setProperty("kylin.query.cache.threshold.duration", "0");
+        
+        cacheManager = CacheManager.newInstance("../server/src/main/resources/ehcache-test.xml");
+        AbstractCacheFledgedQuery.setCacheManager(cacheManager);
+    }
+
+    @AfterClass
+    public static void tearDownResource() {
+        cacheManager.shutdown();
+        AbstractCacheFledgedQuery.setCacheManager(null);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
index d6443e7..c253770 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
@@ -18,12 +18,14 @@
 
 package org.apache.kylin.storage.hbase;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import net.sf.ehcache.CacheManager;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
@@ -38,6 +40,7 @@ import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.storage.IStorageQuery;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.StorageFactory;
+import org.apache.kylin.storage.cache.AbstractCacheFledgedQuery;
 import org.apache.kylin.storage.cache.StorageMockUtils;
 import org.apache.kylin.storage.exception.ScanOutOfLimitException;
 import org.junit.After;
@@ -54,12 +57,18 @@ public class ITStorageTest extends HBaseMetadataTestCase {
     private CubeInstance cube;
     private StorageContext context;
 
+    private static CacheManager cacheManager;
+
     @BeforeClass
     public static void setupResource() throws Exception {
+        cacheManager = CacheManager.newInstance("../server/src/main/resources/ehcache-test.xml");
+        AbstractCacheFledgedQuery.setCacheManager(cacheManager);
     }
 
     @AfterClass
     public static void tearDownResource() {
+        cacheManager.shutdown();
+        AbstractCacheFledgedQuery.setCacheManager(null);
     }
 
     @Before

http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
index 294f399..01d3041 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java
@@ -8,7 +8,7 @@ import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.kv.RowKeyColumnIO;
 import org.apache.kylin.dict.DictCodeSystem;
 import org.apache.kylin.dict.IDictionaryAware;
-import org.apache.kylin.dict.TupleFilterDictionaryTranslater;
+import org.apache.kylin.dict.TupleFilterFunctionTranslator;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -131,7 +131,7 @@ public class FilterDecorator implements TupleFilterSerializer.Decorator {
         if (filter == null)
             return null;
 
-        ITupleFilterTranslator translator = new TupleFilterDictionaryTranslater(columnIO.getIDictionaryAware());
+        ITupleFilterTranslator translator = new TupleFilterFunctionTranslator(columnIO.getIDictionaryAware());
         filter = translator.translate(filter);
 
         // un-evaluatable filter is replaced with TRUE

http://git-wip-us.apache.org/repos/asf/kylin/blob/3f5074ee/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
index abfb74d..e96c602 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
@@ -23,7 +23,7 @@ import org.apache.kylin.cube.gridtable.CubeGridTable;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.gridtable.NotEnoughGTInfoException;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.TupleFilterDictionaryTranslater;
+import org.apache.kylin.dict.TupleFilterFunctionTranslator;
 import org.apache.kylin.gridtable.EmptyGTScanner;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
@@ -64,7 +64,7 @@ public class CubeSegmentScanner implements IGTScanner {
         CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
 
         // translate FunctionTupleFilter to IN clause
-        ITupleFilterTranslator translator = new TupleFilterDictionaryTranslater(this.cubeSeg);
+        ITupleFilterTranslator translator = new TupleFilterFunctionTranslator(this.cubeSeg);
         filter = translator.translate(filter);
 
         //replace the constant values in filter to dictionary codes