You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/04/26 06:27:19 UTC

[37/50] incubator-kylin git commit: KYLIN-671 implement tee itupleiterator

KYLIN-671 implement tee itupleiterator


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d6218b44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d6218b44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d6218b44

Branch: refs/heads/streaming-localdict
Commit: d6218b44577da8c5b0b9bbbc40ff2b1d27f355eb
Parents: 9288902
Author: honma <ho...@ebay.com>
Authored: Wed Apr 15 17:41:57 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Apr 22 16:46:02 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/util/DateFormat.java    | 84 ++++++++++++++++++++
 .../org/apache/kylin/common/util/RangeUtil.java | 36 ++++++++-
 .../apache/kylin/dict/DateStrDictionary.java    |  2 +-
 .../kylin/invertedindex/index/TableRecord.java  |  2 +-
 .../metadata/realization/SQLDigestUtil.java     |  2 +-
 .../metadata/serializer/DateTimeSerializer.java |  2 +-
 .../kylin/metadata/tuple/ITupleIterator.java    |  9 +++
 .../kylin/metadata/tuple/TeeTupleIterator.java  | 54 +++++++++++++
 .../apache/kylin/metadata/util/DateFormat.java  | 80 -------------------
 .../cache/CacheFledgedStorageEngine.java        | 72 +++++++++++------
 .../kylin/storage/cache/StreamSQLResult.java    | 11 +++
 .../kylin/storage/hbase/HBaseKeyRange.java      |  2 +-
 .../hbase/coprocessor/endpoint/IIEndpoint.java  | 12 +--
 .../endpoint/TsConditionExtractor.java          |  2 +-
 .../org/apache/kylin/storage/tuple/Tuple.java   |  2 +-
 .../apache/kylin/storage/cache/EhcacheTest.java | 46 +++++++----
 16 files changed, 283 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/common/src/main/java/org/apache/kylin/common/util/DateFormat.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/DateFormat.java b/common/src/main/java/org/apache/kylin/common/util/DateFormat.java
new file mode 100644
index 0000000..5b540a4
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/DateFormat.java
@@ -0,0 +1,84 @@
+package org.apache.kylin.common.util;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DateFormat {
+
+    public static final String DEFAULT_DATE_PATTERN = "yyyy-MM-dd";
+    public static final String DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS = "yyyy-MM-dd HH:mm:ss";
+    public static final String DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS = "yyyy-MM-dd HH:mm:ss.SSS";
+
+    static final private Map<String, ThreadLocal<SimpleDateFormat>> threadLocalMap = new ConcurrentHashMap<String, ThreadLocal<SimpleDateFormat>>();
+
+    static SimpleDateFormat getDateFormat(String datePattern) {
+        ThreadLocal<SimpleDateFormat> formatThreadLocal = threadLocalMap.get(datePattern);
+        if (formatThreadLocal == null) {
+            threadLocalMap.put(datePattern, formatThreadLocal = new ThreadLocal<SimpleDateFormat>());
+        }
+        SimpleDateFormat format = formatThreadLocal.get();
+        if (format == null) {
+            format = new SimpleDateFormat(datePattern);
+            format.setTimeZone(TimeZone.getTimeZone("GMT")); // NOTE: this must be GMT to calculate epoch date correctly
+            formatThreadLocal.set(format);
+        }
+        return format;
+    }
+
+    public static String formatToDateStr(long millis) {
+        return getDateFormat(DEFAULT_DATE_PATTERN).format(new Date(millis));
+    }
+
+    public static String formatToTimeStr(long millis) {
+        return getDateFormat(DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS).format(new Date(millis));
+    }
+
+    public static String dateToString(Date date) {
+        return dateToString(date, DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
+    }
+
+    public static String dateToString(Date date, String pattern) {
+        return getDateFormat(pattern).format(date);
+    }
+
+    public static Date stringToDate(String str) {
+        return stringToDate(str, DEFAULT_DATE_PATTERN);
+    }
+
+    public static Date stringToDate(String str, String pattern) {
+        Date date = null;
+        try {
+            date = getDateFormat(pattern).parse(str);
+        } catch (ParseException e) {
+            throw new IllegalArgumentException("'" + str + "' is not a valid date of pattern '" + pattern + "'", e);
+        }
+        return date;
+    }
+
+    public static long stringToMillis(String str) {
+        if (isAllDigits(str)) {
+            return Long.parseLong(str);
+        } else if (str.length() == 10) {
+            return stringToDate(str, DEFAULT_DATE_PATTERN).getTime();
+        } else if (str.length() == 19) {
+            return stringToDate(str, DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS).getTime();
+        } else if (str.length() == 23) {
+            return stringToDate(str, DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS).getTime();
+        } else {
+            throw new IllegalArgumentException("there is no valid date pattern for:" + str);
+        }
+    }
+
+    private static boolean isAllDigits(String str) {
+        for (int i = 0, n = str.length(); i < n; i++) {
+            if (Character.isDigit(str.charAt(i)) == false)
+                return false;
+        }
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/RangeUtil.java b/common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
index fae521c..2dd2032 100644
--- a/common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
@@ -2,10 +2,7 @@ package org.apache.kylin.common.util;
 
 import java.util.List;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Range;
-import com.google.common.collect.RangeSet;
-import com.google.common.collect.TreeRangeSet;
+import com.google.common.collect.*;
 
 /**
  * Created by Hongbin Ma(Binmahone) on 4/14/15.
@@ -21,4 +18,35 @@ public class RangeUtil {
         rangeSet.remove(other);
         return Lists.newArrayList(rangeSet.asRanges());
     }
+
+    public static String formatTsRange(Range<Long> tsRange) {
+        if(tsRange == null)
+            return null;
+
+        StringBuilder sb = new StringBuilder();
+        if (tsRange.hasLowerBound()) {
+            if (tsRange.lowerBoundType() == BoundType.CLOSED) {
+                sb.append("[");
+            } else {
+                sb.append("(");
+            }
+            DateFormat.formatToTimeStr(tsRange.lowerEndpoint());
+        } else {
+            sb.append("(null");
+        }
+
+        sb.append("~");
+
+        if (tsRange.hasUpperBound()) {
+            DateFormat.formatToTimeStr(tsRange.upperEndpoint());
+            if (tsRange.upperBoundType() == BoundType.CLOSED) {
+                sb.append("]");
+            } else {
+                sb.append(")");
+            }
+        } else {
+            sb.append("null)");
+        }
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java b/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
index e0231c5..acd7404 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
@@ -18,7 +18,7 @@
 
 package org.apache.kylin.dict;
 
-import static org.apache.kylin.metadata.util.DateFormat.*;
+import static org.apache.kylin.common.util.DateFormat.*;
 
 import java.io.DataInput;
 import java.io.DataOutput;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
index d2a81d4..287ca51 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
@@ -22,7 +22,7 @@ import org.apache.commons.lang.ObjectUtils;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.util.DateFormat;
+import org.apache.kylin.common.util.DateFormat;
 
 import java.util.Arrays;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java b/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
index f1861b0..d15b3e3 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
@@ -6,7 +6,7 @@ import com.google.common.collect.Range;
 import org.apache.kylin.metadata.filter.*;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.util.DateFormat;
+import org.apache.kylin.common.util.DateFormat;
 
 /**
  * Created by Hongbin Ma(Binmahone) on 4/14/15.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
index 45dd6b1..8223b69 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
@@ -5,7 +5,7 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.util.DateFormat;
+import org.apache.kylin.common.util.DateFormat;
 
 public class DateTimeSerializer extends DataTypeSerializer<LongWritable> {
     

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java b/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
index 4a00780..0b83818 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
@@ -44,7 +44,16 @@ public interface ITupleIterator extends Iterator<ITuple> {
         @Override
         public void close() {
         }
+
     };
 
     public void close();
+
+    /**
+     * if hasNext() returns false because there's no more data, return true
+     * if hasNext() returns false because limits or threshold, return false
+     * if hasNext() returns true, throw IllegalStateException
+     * @return
+     */
+    //public boolean isDrained();
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java b/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
new file mode 100644
index 0000000..0314762
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
@@ -0,0 +1,54 @@
+package org.apache.kylin.metadata.tuple;
+
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 4/15/15.
+ *
+ * Like "tee" command in linux,it effectively duplicates the underlying
+ * ITupleIterator's results
+ */
+public class TeeTupleIterator implements ITupleIterator {
+
+    private static final Logger logger = LoggerFactory.getLogger(TeeTupleIterator.class);
+
+    private Function<List<ITuple>, Void> actionOnSeeingWholeData;
+    private ITupleIterator underlying;
+    private List<ITuple> duplicatedData;
+
+    public TeeTupleIterator(ITupleIterator underlying, Function<List<ITuple>, Void> actionOnSeeingWholeData) {
+        this.actionOnSeeingWholeData = actionOnSeeingWholeData;
+        this.underlying = underlying;
+        this.duplicatedData = Lists.newArrayList();
+    }
+
+    @Override
+    public void close() {
+        this.underlying.close();
+        //if(this.underlying.isDrained)
+        actionOnSeeingWholeData.apply(duplicatedData);
+    }
+
+    @Override
+    public boolean hasNext() {
+        return this.underlying.hasNext();
+    }
+
+    @Override
+    public ITuple next() {
+        ITuple ret = this.underlying.next();
+        duplicatedData.add(ret);
+        return ret;
+    }
+
+    @Override
+    public void remove() {
+        this.underlying.remove();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/metadata/src/main/java/org/apache/kylin/metadata/util/DateFormat.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/util/DateFormat.java b/metadata/src/main/java/org/apache/kylin/metadata/util/DateFormat.java
deleted file mode 100644
index 91d0720..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/util/DateFormat.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package org.apache.kylin.metadata.util;
-
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class DateFormat {
-
-    public static final String DEFAULT_DATE_PATTERN = "yyyy-MM-dd";
-    public static final String DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS = "yyyy-MM-dd HH:mm:ss";
-    public static final String DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS = "yyyy-MM-dd HH:mm:ss.SSS";
-
-    static final private Map<String, ThreadLocal<SimpleDateFormat>> threadLocalMap = new ConcurrentHashMap<String, ThreadLocal<SimpleDateFormat>>();
-
-    static SimpleDateFormat getDateFormat(String datePattern) {
-        ThreadLocal<SimpleDateFormat> formatThreadLocal = threadLocalMap.get(datePattern);
-        if (formatThreadLocal == null) {
-            threadLocalMap.put(datePattern, formatThreadLocal = new ThreadLocal<SimpleDateFormat>());
-        }
-        SimpleDateFormat format = formatThreadLocal.get();
-        if (format == null) {
-            format = new SimpleDateFormat(datePattern);
-            format.setTimeZone(TimeZone.getTimeZone("GMT")); // NOTE: this must be GMT to calculate epoch date correctly
-            formatThreadLocal.set(format);
-        }
-        return format;
-    }
-
-    public static String formatToDateStr(long millis) {
-        return getDateFormat(DEFAULT_DATE_PATTERN).format(new Date(millis));
-    }
-
-    public static String dateToString(Date date) {
-        return dateToString(date, DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
-    }
-
-    public static String dateToString(Date date, String pattern) {
-        return getDateFormat(pattern).format(date);
-    }
-
-    public static Date stringToDate(String str) {
-        return stringToDate(str, DEFAULT_DATE_PATTERN);
-    }
-
-    public static Date stringToDate(String str, String pattern) {
-        Date date = null;
-        try {
-            date = getDateFormat(pattern).parse(str);
-        } catch (ParseException e) {
-            throw new IllegalArgumentException("'" + str + "' is not a valid date of pattern '" + pattern + "'", e);
-        }
-        return date;
-    }
-
-    public static long stringToMillis(String str) {
-        if (isAllDigits(str)) {
-            return Long.parseLong(str);
-        } else if (str.length() == 10) {
-            return stringToDate(str, DEFAULT_DATE_PATTERN).getTime();
-        } else if (str.length() == 19) {
-            return stringToDate(str, DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS).getTime();
-        } else if (str.length() == 23) {
-            return stringToDate(str, DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS).getTime();
-        } else {
-            throw new IllegalArgumentException("there is no valid date pattern for:" + str);
-        }
-    }
-
-    private static boolean isAllDigits(String str) {
-        for (int i = 0, n = str.length(); i < n; i++) {
-            if (Character.isDigit(str.charAt(i)) == false)
-                return false;
-        }
-        return true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java
index b1fe17f..c6cb4ba 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java
@@ -1,9 +1,10 @@
 package org.apache.kylin.storage.cache;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
+import java.util.List;
+
 import net.sf.ehcache.Cache;
 import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.Element;
 import net.sf.ehcache.config.CacheConfiguration;
 import net.sf.ehcache.config.MemoryUnit;
 import net.sf.ehcache.config.PersistenceConfiguration;
@@ -16,27 +17,30 @@ import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.*;
-import org.apache.kylin.metadata.tuple.CompoundTupleIterator;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.SimpleTupleIterator;
+import org.apache.kylin.metadata.tuple.*;
 import org.apache.kylin.storage.IStorageEngine;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.StorageEngineFactory;
 import org.apache.kylin.storage.hbase.coprocessor.endpoint.TsConditionExtractor;
 
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import java.util.List;
 
 /**
  * Created by Hongbin Ma(Binmahone) on 4/13/15.
  */
 public class CacheFledgedStorageEngine implements IStorageEngine {
 
-    public static final String SUCCESS_QUERY_CACHE = "SuccessQueryCache";
-    public static final String EXCEPTION_QUERY_CACHE = "ExceptionQueryCache";//TODO
+    private static final Logger logger = LoggerFactory.getLogger(CacheFledgedStorageEngine.class);
+
+    public static final String STORAGE_LAYER_TUPLE_CACHE = "STORAGE_LAYER_TUPLE_CACHE";
+    //TODO: deal with failed queries
 
     static CacheManager cacheManager;
 
@@ -44,13 +48,13 @@ public class CacheFledgedStorageEngine implements IStorageEngine {
         cacheManager = CacheManager.create();
 
         //Create a Cache specifying its configuration.
-        Cache successCache = new Cache(new CacheConfiguration(SUCCESS_QUERY_CACHE, 0).//
-                memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LFU).//
+        Cache successCache = new Cache(new CacheConfiguration(STORAGE_LAYER_TUPLE_CACHE, 0).//
+                memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU).//
                 eternal(false).//
                 timeToIdleSeconds(86400).//
                 diskExpiryThreadIntervalSeconds(0).//
-                maxBytesLocalHeap(500, MemoryUnit.MEGABYTES).//
-                persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.LOCALTEMPSWAP)));
+                maxBytesLocalHeap(1, MemoryUnit.GIGABYTES).//
+                persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.NONE)));
 
         cacheManager.addCache(successCache);
     }
@@ -73,37 +77,59 @@ public class CacheFledgedStorageEngine implements IStorageEngine {
 
     @Override
     public ITupleIterator search(final StorageContext context, final SQLDigest sqlDigest) {
-        StreamSQLDigest streamSQLDigest = new StreamSQLDigest(sqlDigest, partitionColRef);
-        StreamSQLResult cachedResult = (StreamSQLResult) cacheManager.getCache(SUCCESS_QUERY_CACHE).get(streamSQLDigest).getObjectValue();
+        final StreamSQLDigest streamSQLDigest = new StreamSQLDigest(sqlDigest, partitionColRef);
+        StreamSQLResult cachedResult = (StreamSQLResult) cacheManager.getCache(STORAGE_LAYER_TUPLE_CACHE).get(streamSQLDigest).getObjectValue();
 
-        ITupleIterator ret;
+        final Range<Long> tsRange = TsConditionExtractor.extractTsCondition(partitionColRef, sqlDigest.filter);
 
+        ITupleIterator ret = null;
         if (cachedResult != null) {
-            Range<Long> tsRange = TsConditionExtractor.extractTsCondition(partitionColRef, sqlDigest.filter);
+            logger.debug("current cache    : " + cachedResult);
             Range<Long> reusePeriod = cachedResult.getReusableResults(tsRange);
+
+            logger.info("ts Range in query: " + RangeUtil.formatTsRange(tsRange));
+            logger.info("reusable range   : " + RangeUtil.formatTsRange(reusePeriod));
+
             if (reusePeriod != null) {
+
                 List<Range<Long>> remainings = RangeUtil.remove(tsRange, reusePeriod);
                 if (remainings.size() == 1) {
+
                     SimpleTupleIterator reusedTuples = new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
                     Range<Long> remaining = remainings.get(0);
                     ITupleIterator freshTuples = SQLDigestUtil.appendTsFilterToExecute(sqlDigest, partitionColRef, remaining, new Function<Void, ITupleIterator>() {
-                        @Nullable
                         @Override
                         public ITupleIterator apply(Void input) {
                             return StorageEngineFactory.getStorageEngine(realization, false).search(context, sqlDigest);
                         }
                     });
+
                     ret = new CompoundTupleIterator(Lists.newArrayList(reusedTuples, freshTuples));
-                    //TODO:update cache
-                    return ret;//cache successfully reused
                 }
             }
         }
 
-        //cache cannot reuse case:
-        ret = StorageEngineFactory.getStorageEngine(realization, false).search(context, sqlDigest);
-        //TODO:update cache
-        return ret;
+        if (ret == null) {
+            logger.info("no cache is being leveraged");
+            //cache cannot reuse case:
+            ret = StorageEngineFactory.getStorageEngine(realization, false).search(context, sqlDigest);
+        } else {
+            logger.info("cache is being leveraged");
+        }
+
+        //use another nested ITupleIterator to deal with cache
+        ITupleIterator tee = new TeeTupleIterator(ret, new Function<List<ITuple>, Void>() {
+            @Nullable
+            @Override
+            public Void apply(List<ITuple> input) {
+                //TODO: tsRange needs updated
+                StreamSQLResult newCache = new StreamSQLResult(input, tsRange);
+                cacheManager.getCache(STORAGE_LAYER_TUPLE_CACHE).put(new Element(streamSQLDigest, newCache));
+                logger.debug("current cache: " + newCache);
+                return null;
+            }
+        });
 
+        return tee;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/storage/src/main/java/org/apache/kylin/storage/cache/StreamSQLResult.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/StreamSQLResult.java b/storage/src/main/java/org/apache/kylin/storage/cache/StreamSQLResult.java
index f537fc7..ce7667a 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/StreamSQLResult.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/StreamSQLResult.java
@@ -3,6 +3,7 @@ package org.apache.kylin.storage.cache;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.kylin.common.util.RangeUtil;
 import org.apache.kylin.metadata.tuple.ITuple;
 
 import com.google.common.collect.Range;
@@ -14,6 +15,11 @@ public class StreamSQLResult {
     private List<ITuple> rows;
     private Range<Long> timeCovered;
 
+    public StreamSQLResult(List<ITuple> rows, Range<Long> timeCovered) {
+        this.rows = rows;
+        this.timeCovered = timeCovered;
+    }
+
     public Range<Long> getReusableResults(Range<Long> tsRange) {
         if (tsRange.equals(timeCovered))
             return timeCovered;
@@ -31,4 +37,9 @@ public class StreamSQLResult {
         //TODO: currently regardless of reusablePeriod, all rows are returned
         return rows.iterator();
     }
+
+    @Override
+    public String toString() {
+        return rows.size() + " tuples cached for period " + RangeUtil.formatTsRange(timeCovered);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
index e766317..c7be838 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
@@ -35,7 +35,7 @@ import org.apache.kylin.cube.kv.FuzzyMaskEncoder;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.util.DateFormat;
+import org.apache.kylin.common.util.DateFormat;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
index 4753214..abe6157 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map;
 
-import com.google.common.collect.Range;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.SerializationUtils;
@@ -55,10 +54,13 @@ import org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import it.uniroma3.mat.extendedset.intset.ConciseSet;
 
 /**
  * Created by honma on 11/7/14.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TsConditionExtractor.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TsConditionExtractor.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TsConditionExtractor.java
index 53f2945..0b36d4e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TsConditionExtractor.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TsConditionExtractor.java
@@ -4,7 +4,7 @@ import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.util.DateFormat;
+import org.apache.kylin.common.util.DateFormat;
 
 import com.google.common.collect.Range;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java b/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
index 20aa386..992ea61 100644
--- a/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
+++ b/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
@@ -29,7 +29,7 @@ import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.dict.lookup.LookupStringTable;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.ITuple;
-import org.apache.kylin.metadata.util.DateFormat;
+import org.apache.kylin.common.util.DateFormat;
 
 /**
  * @author xjiang

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/storage/src/test/java/org/apache/kylin/storage/cache/EhcacheTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/cache/EhcacheTest.java b/storage/src/test/java/org/apache/kylin/storage/cache/EhcacheTest.java
index 14ebe3e..fd34224 100644
--- a/storage/src/test/java/org/apache/kylin/storage/cache/EhcacheTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/cache/EhcacheTest.java
@@ -1,6 +1,6 @@
 package org.apache.kylin.storage.cache;
 
-import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import net.sf.ehcache.Cache;
 import net.sf.ehcache.CacheManager;
 import net.sf.ehcache.Element;
@@ -8,38 +8,52 @@ import net.sf.ehcache.config.CacheConfiguration;
 import net.sf.ehcache.config.MemoryUnit;
 import net.sf.ehcache.config.PersistenceConfiguration;
 import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
-import org.junit.Assert;
+
 import org.junit.Test;
 
-import java.util.List;
+import java.util.HashMap;
+import java.util.HashSet;
 
 /**
  * Created by Hongbin Ma(Binmahone) on 4/13/15.
  */
 public class EhcacheTest {
     @Test
-    public void basicTest() {
+    public void basicTest() throws InterruptedException {
         CacheManager cacheManager = CacheManager.create();
 
         //Create a Cache specifying its configuration.
-        Cache testCache = new Cache(new CacheConfiguration("test", 0).//
-                memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LFU).//
+        Cache testCache = //Create a Cache specifying its configuration.
+        new Cache(new CacheConfiguration("test", 0).//
+                memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU).//
                 eternal(false).//
                 timeToIdleSeconds(86400).//
                 diskExpiryThreadIntervalSeconds(0).//
-                maxBytesLocalHeap(500, MemoryUnit.MEGABYTES).//
-                persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.LOCALTEMPSWAP)));
+                maxBytesLocalHeap(100, MemoryUnit.MEGABYTES).//
+                persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.NONE)));
 
         cacheManager.addCache(testCache);
-        testCache.put(new Element("x", Lists.<String> newArrayList()));
-
-        List<String> v = (List<String>) testCache.get("x").getObjectValue();
-        Assert.assertTrue(v.size() == 0);
-        v.add("hi");
-
-        List<String> v2 = (List<String>) testCache.get("x").getObjectValue();
-        Assert.assertTrue(v2.size() == 1);
 
+        //
+        //        byte[] blob2 = new byte[(1024 * 400 * 1024)];//400M
+        //
+        //        testCache.put(new Element("1", blob));
+        //        System.out.println(testCache.get("1") == null);
+        //        System.out.println(testCache.getSize());
+        //        System.out.println(testCache.getStatistics().getLocalHeapSizeInBytes());
+        //        System.out.println("runtime used memory: " + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024 + "M");
+        //        testCache.put(new Element("2", blob));
+        //        System.out.println(testCache.get("1") == null);
+        //        System.out.println(testCache.getSize());
+        //        System.out.println(testCache.getStatistics().getLocalHeapSizeInBytes());
+        //        System.out.println("runtime used memory: " + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024 + "M");
+        //        testCache.put(new Element("3", blob));
+        //        System.out.println(testCache.get("1") == null);
+        //        System.out.println(testCache.get("2") == null);
+        //        System.out.println(testCache.get("3") == null);
+        //        System.out.println(testCache.getSize());
+        //        System.out.println(testCache.getStatistics().getLocalHeapSizeInBytes());
+        //        System.out.println("runtime used memory: " + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024 + "M");
 
         cacheManager.shutdown();
     }