You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/11/13 11:02:12 UTC

[1/2] incubator-kylin git commit: KYLIN-1144 allow disable/enable storage cache on both server level and query level

Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging b918d3e23 -> ae0f1a72e


KYLIN-1144 allow disable/enable storage cache on both server level and query level


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/692438f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/692438f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/692438f6

Branch: refs/heads/2.x-staging
Commit: 692438f67979866e4395188064e3e1d752b9e83d
Parents: b918d3e
Author: honma <ho...@ebay.com>
Authored: Fri Nov 13 11:40:22 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Nov 13 17:18:05 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    |  12 +-
 .../kylin/common/debug/BackdoorToggles.java     |  16 +++
 .../org/apache/kylin/common/util/RangeUtil.java |   4 +-
 .../metadata/realization/StreamSQLDigest.java   |   2 +-
 .../cache/AbstractCacheFledgedQuery.java        |  26 +++-
 .../storage/cache/CacheFledgedDynamicQuery.java | 118 ++++++++++---------
 .../storage/cache/CacheFledgedStaticQuery.java  |  45 ++-----
 .../kylin/rest/controller/QueryController.java  |  78 ++++++------
 .../apache/kylin/rest/service/QueryService.java |  11 +-
 .../kylin/storage/hbase/HBaseStorage.java       |  10 +-
 10 files changed, 176 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 709e5aa..e76aa94 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -23,14 +23,18 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.Properties;
 import java.util.SortedSet;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import com.google.common.collect.Sets;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.restclient.RestClient;
@@ -38,9 +42,7 @@ import org.apache.kylin.common.util.CliCommandExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.*;
-import java.util.Map;
-import java.util.Properties;
+import com.google.common.collect.Sets;
 
 /**
  */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
index 1e26557..7304b8b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
+++ b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
@@ -47,6 +47,10 @@ public class BackdoorToggles {
         return getString(DEBUG_TOGGLE_HBASE_CUBE_QUERY_PROTOCOL);
     }
 
+    public static boolean getDisableCache() {
+        return getBoolean(DEBUG_TOGGLE_DISABLE_QUERY_CACHE);
+    }
+
     public static boolean getDisableFuzzyKey() {
         return getBoolean(DEBUG_TOGGLE_DISABLE_FUZZY_KEY);
     }
@@ -85,6 +89,18 @@ public class BackdoorToggles {
     public final static String DEBUG_TOGGLE_DISABLE_FUZZY_KEY = "DEBUG_TOGGLE_DISABLE_FUZZY_KEY";
 
     /**
+     * set DEBUG_TOGGLE_DISABLE_QUERY_CACHE=true to prevent using cache for current query
+     *
+     *
+     *
+     example:(put it into request body)
+     "backdoorToggles": {
+     "DEBUG_TOGGLE_DISABLE_QUERY_CACHE": "true"
+     }
+     */
+    public final static String DEBUG_TOGGLE_DISABLE_QUERY_CACHE = "DEBUG_TOGGLE_DISABLE_QUERY_CACHE";
+
+    /**
      * set DEBUG_TOGGLE_HBASE_CUBE_QUERY_VERSION=v1/v2 to control which version CubeStorageQuery to use
      *
      example:(put it into request body)

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/core-common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/RangeUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
index 8fa5535..33dfb83 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
@@ -63,15 +63,17 @@ public class RangeUtil {
      * @return
      */
     public static <C extends Comparable<?>> List<Range<C>> remove(Range<C> self, Range<C> other) {
+
         // mimic the following logic in guava 18:
         //        RangeSet<C> rangeSet = TreeRangeSet.create();
         //        rangeSet.add(self);
         //        rangeSet.remove(other);
         //        return Lists.newArrayList(rangeSet.asRanges());
 
-        if (!self.isConnected(other)) {
+        if (other == null || !self.isConnected(other)) {
             return Collections.singletonList(self);
         }
+        
         Range<C> share = self.intersection(other);
         if (share.isEmpty()) {
             return Collections.singletonList(self);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/core-metadata/src/main/java/org/apache/kylin/metadata/realization/StreamSQLDigest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/StreamSQLDigest.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/StreamSQLDigest.java
index edbe897..1420470 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/StreamSQLDigest.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/StreamSQLDigest.java
@@ -28,7 +28,7 @@ public class StreamSQLDigest {
         filterSerialized = TupleFilterSerializer.serialize(sqlDigest.filter, decorator, StringCodeSystem.INSTANCE);
 
         int nonFilterHashCode = calculateNonFilterHashCode();
-        this.hashCode = 31 * nonFilterHashCode + (filterSerialized != null ? Arrays.hashCode(filterSerialized) : 0);
+        this.hashCode = 31 * nonFilterHashCode + Arrays.hashCode(filterSerialized);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
index 5ffdf91..a7bdae0 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
@@ -2,12 +2,14 @@ package org.apache.kylin.storage.cache;
 
 import net.sf.ehcache.Cache;
 import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.Element;
 import net.sf.ehcache.config.CacheConfiguration;
 import net.sf.ehcache.config.Configuration;
 import net.sf.ehcache.config.MemoryUnit;
 import net.sf.ehcache.config.PersistenceConfiguration;
 import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metadata.realization.StreamSQLDigest;
 import org.apache.kylin.metadata.tuple.TeeTupleItrListener;
 import org.apache.kylin.storage.ICachableStorageQuery;
@@ -23,7 +25,6 @@ public abstract class AbstractCacheFledgedQuery implements IStorageQuery, TeeTup
 
     protected static CacheManager CACHE_MANAGER;
 
-    protected boolean queryCacheExists;
     protected ICachableStorageQuery underlyingStorage;
     protected StreamSQLDigest streamSQLDigest;
 
@@ -53,6 +54,29 @@ public abstract class AbstractCacheFledgedQuery implements IStorageQuery, TeeTup
         CACHE_MANAGER.addCache(storageCache);
     }
 
+    protected StreamSQLResult getStreamSQLResult(StreamSQLDigest streamSQLDigest) {
+
+        Cache cache = CACHE_MANAGER.getCache(this.underlyingStorage.getStorageUUID());
+        Element element = cache.get(streamSQLDigest.hashCode());//TODO: hash code cannot guarantee uniqueness
+        if (element != null) {
+            return (StreamSQLResult) element.getObjectValue();
+        }
+        return null;
+    }
+
+    protected boolean needSaveCache(long createTime) {
+        long storageQueryTime = System.currentTimeMillis() - createTime;
+        long durationThreshold = KylinConfig.getInstanceFromEnv().getQueryDurationCacheThreshold();
+        //TODO: check scan count necessary?
+
+        if (storageQueryTime < durationThreshold) {
+            logger.info("Skip saving storage caching for storage cache because storage query time {} less than {}", storageQueryTime, durationThreshold);
+            return false;
+        }
+
+        return true;
+    }
+
     private void makeCacheIfNecessary(String storageUUID) {
         if (CACHE_MANAGER == null) {
             logger.warn("CACHE_MANAGER is not provided");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
index febe1a9..b9ae0e9 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
@@ -2,7 +2,6 @@ package org.apache.kylin.storage.cache;
 
 import java.util.List;
 
-import net.sf.ehcache.Cache;
 import net.sf.ehcache.Element;
 
 import org.apache.kylin.common.util.RangeUtil;
@@ -33,6 +32,8 @@ public class CacheFledgedDynamicQuery extends AbstractCacheFledgedQuery {
 
     private final TblColRef partitionColRef;
 
+    private boolean noCacheUsed = true;
+
     private Range<Long> ts;
 
     public CacheFledgedDynamicQuery(ICachableStorageQuery underlyingStorage, TblColRef partitionColRef) {
@@ -52,70 +53,31 @@ public class CacheFledgedDynamicQuery extends AbstractCacheFledgedQuery {
             return ITupleIterator.EMPTY_TUPLE_ITERATOR;
         }
 
+        ITupleIterator ret = null;
+
         //enable dynamic cache iff group by columns contains partition col
         //because cache extraction requires partition col value as selection key
-        boolean needUpdateCache = sqlDigest.groupbyColumns.contains(partitionColRef);
-
-        streamSQLDigest = new StreamSQLDigest(sqlDigest, partitionColRef);
-        StreamSQLResult cachedResult = null;
-        Cache cache = CACHE_MANAGER.getCache(this.underlyingStorage.getStorageUUID());
-        Element element = cache.get(streamSQLDigest.hashCode());
-        if (element != null) {
-            this.queryCacheExists = true;
-            cachedResult = (StreamSQLResult) element.getObjectValue();
-        }
-
-        ITupleIterator ret = null;
-        if (cachedResult != null) {
-            Range<Long> reusePeriod = cachedResult.getReusableResults(ts);
-
-            logger.info("existing cache    : " + cachedResult);
-            logger.info("ts Range in query: " + RangeUtil.formatTsRange(ts));
-            logger.info("potential reusable range   : " + RangeUtil.formatTsRange(reusePeriod));
-
-            if (reusePeriod != null) {
-                List<Range<Long>> remainings = RangeUtil.remove(ts, reusePeriod);
-                if (remainings.size() == 1) {//if using cache causes two underlyingStorage searches, we'd rather not use the cache
-
-                    SimpleTupleIterator reusedTuples = new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
-                    List<ITupleIterator> iTupleIteratorList = Lists.newArrayList();
-                    iTupleIteratorList.add(reusedTuples);
-
-                    for (Range<Long> remaining : remainings) {//actually there will be only one loop
-                        logger.info("Appending ts " + RangeUtil.formatTsRange(remaining) + " as additional filter");
-
-                        ITupleIterator freshTuples = SQLDigestUtil.appendTsFilterToExecute(sqlDigest, partitionColRef, remaining, new Function<Void, ITupleIterator>() {
-                            @Override
-                            public ITupleIterator apply(Void input) {
-                                return underlyingStorage.search(context, sqlDigest, returnTupleInfo);
-                            }
-                        });
-                        iTupleIteratorList.add(freshTuples);
-                    }
+        boolean enableDynamicCache = sqlDigest.groupbyColumns.contains(partitionColRef);
 
-                    ret = new CompoundTupleIterator(iTupleIteratorList);
-                } else if (remainings.size() == 0) {
-                    logger.info("The ts range in new query was fully cached");
-                    needUpdateCache = false;
-                    ret = new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
-                } else {
-                    //if using cache causes more than one underlyingStorage searches
-                    //the incurred overhead might be more expensive than the cache benefit
-                    logger.info("Give up using cache to avoid complexity");
-                }
+        if (enableDynamicCache) {
+            StreamSQLResult cachedResult = getStreamSQLResult(new StreamSQLDigest(sqlDigest, partitionColRef));
+            if (cachedResult != null) {
+                ret = tryReuseCache(context, sqlDigest, returnTupleInfo, ret, cachedResult);
+            } else {
+                logger.info("no cache entry for this query");
             }
-        } else {
-            logger.info("no cache entry for this query");
         }
 
         if (ret == null) {
             ret = underlyingStorage.search(context, sqlDigest, returnTupleInfo);
+            noCacheUsed = true;
             logger.info("No Cache being used");
         } else {
+            noCacheUsed = false;
             logger.info("Cache being used");
         }
 
-        if (needUpdateCache) {
+        if (enableDynamicCache) {
             //use another nested ITupleIterator to deal with cache
             final TeeTupleIterator tee = new TeeTupleIterator(ret);
             tee.addCloseListener(this);
@@ -125,15 +87,65 @@ public class CacheFledgedDynamicQuery extends AbstractCacheFledgedQuery {
         }
     }
 
+    /**
+     * if cache is not enough it will try to combine existing cache as well as fresh records
+     */
+    private ITupleIterator tryReuseCache(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo, ITupleIterator ret, StreamSQLResult cachedResult) {
+        Range<Long> reusePeriod = cachedResult.getReusableResults(ts);
+
+        logger.info("existing cache: " + cachedResult);
+        logger.info("ts Range in query: " + RangeUtil.formatTsRange(ts));
+        logger.info("potential reusable range: " + RangeUtil.formatTsRange(reusePeriod));
+
+        if (reusePeriod != null) {
+            List<Range<Long>> remainings = RangeUtil.remove(ts, reusePeriod);
+            if (remainings.size() == 1) {//if using cache causes two underlyingStorage searches, we'd rather not use the cache
+
+                SimpleTupleIterator reusedTuples = new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
+                List<ITupleIterator> iTupleIteratorList = Lists.newArrayList();
+                iTupleIteratorList.add(reusedTuples);
+
+                Range<Long> remaining = remainings.get(0);
+                logger.info("Appending ts " + RangeUtil.formatTsRange(remaining) + " as additional filter");
+
+                ITupleIterator freshTuples = SQLDigestUtil.appendTsFilterToExecute(sqlDigest, partitionColRef, remaining, new Function<Void, ITupleIterator>() {
+                    @Override
+                    public ITupleIterator apply(Void input) {
+                        return underlyingStorage.search(context, sqlDigest, returnTupleInfo);
+                    }
+                });
+                iTupleIteratorList.add(freshTuples);
+
+                return new CompoundTupleIterator(iTupleIteratorList);
+            } else if (remainings.size() == 0) {
+                logger.info("The ts range in new query was fully cached");
+                return new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
+            } else {
+                //if using cache causes more than one underlyingStorage searches
+                //the incurred overhead might be more expensive than the cache benefit
+                logger.info("Give up using cache to avoid complexity");
+                return null;
+            }
+        } else {
+            logger.info("cached results not reusable by current query");
+            return null;
+        }
+    }
+
     @Override
     public void notify(List<ITuple> duplicated, long createTime) {
 
+        //for streaming sql only check if needSaveCache at first entry of cache
+        if (noCacheUsed && !needSaveCache(createTime)) {
+            return;
+        }
+
         Range<Long> cacheExclude = this.underlyingStorage.getVolatilePeriod();
         if (cacheExclude != null) {
             List<Range<Long>> cachablePeriods = RangeUtil.remove(ts, cacheExclude);
             if (cachablePeriods.size() == 1) {
                 if (!ts.equals(cachablePeriods.get(0))) {
-                    logger.info("With respect to growing storage, the cacheable tsRange shrinks from " + RangeUtil.formatTsRange(ts) + " to " + RangeUtil.formatTsRange(cachablePeriods.get(0)));
+                    logger.info("tsRange shrinks from " + RangeUtil.formatTsRange(ts) + " to " + RangeUtil.formatTsRange(cachablePeriods.get(0)));
                 }
                 ts = cachablePeriods.get(0);
             } else {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java
index 2272aac..93e5e09 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java
@@ -2,7 +2,6 @@ package org.apache.kylin.storage.cache;
 
 import java.util.List;
 
-import net.sf.ehcache.Cache;
 import net.sf.ehcache.Element;
 
 import org.apache.kylin.metadata.realization.SQLDigest;
@@ -29,60 +28,30 @@ public class CacheFledgedStaticQuery extends AbstractCacheFledgedQuery {
     @Override
     public ITupleIterator search(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo) {
 
-        streamSQLDigest = new StreamSQLDigest(sqlDigest, null);
-        StreamSQLResult cachedResult = null;
-        Cache cache = CACHE_MANAGER.getCache(this.underlyingStorage.getStorageUUID());
-        Element element = cache.get(streamSQLDigest.hashCode());
-        if (element != null) {
-            this.queryCacheExists = true;
-            cachedResult = (StreamSQLResult) element.getObjectValue();
-        }
-
+        StreamSQLResult cachedResult = getStreamSQLResult(new StreamSQLDigest(sqlDigest, null));
         ITupleIterator ret = null;
+
         if (cachedResult != null) {
-            ret = new SimpleTupleIterator(cachedResult.reuse(Ranges.<Long> all()));
+            logger.info("using existing cache");
+            return new SimpleTupleIterator(cachedResult.reuse(Ranges.<Long> all()));
         } else {
-            logger.info("no cache entry for this query");
-        }
-
-        if (ret == null) {
-            logger.info("decision: not using cache");
+            logger.info("no existing cache to use");
             ret = underlyingStorage.search(context, sqlDigest, returnTupleInfo);
-        } else {
-            logger.info("decision: use cache");
-        }
 
-        if (!queryCacheExists) {
             //use another nested ITupleIterator to deal with cache
             final TeeTupleIterator tee = new TeeTupleIterator(ret);
             tee.addCloseListener(this);
             return tee;
-        } else {
-            return ret;
         }
     }
 
     @Override
     public void notify(List<ITuple> duplicated, long createTime) {
-        boolean cacheIt = true;
-        //        long storageQueryTime = System.currentTimeMillis() - createTime;
-        //        long durationThreshold = KylinConfig.getInstanceFromEnv().getQueryDurationCacheThreshold();
-        //        long scancountThreshold = KylinConfig.getInstanceFromEnv().getQueryScanCountCacheThreshold();
-        //
-        //        if (storageQueryTime < durationThreshold) {
-        //            logger.info("Skip storage caching for storage cache because storage query time {} less than {}", storageQueryTime, durationThreshold);
-        //            cacheIt = false;
-        //        }
-        //
-        //        if (duplicated.size() < scancountThreshold) {
-        //            logger.info("Skip storage caching for storage cache because scan count {} less than {}", duplicated.size(), scancountThreshold);
-        //            cacheIt = false;
-        //        }
-
-        if (cacheIt) {
+        if (needSaveCache(createTime)) {
             StreamSQLResult newCacheEntry = new StreamSQLResult(duplicated, Ranges.<Long> all(), null);
             CACHE_MANAGER.getCache(this.underlyingStorage.getStorageUUID()).put(new Element(streamSQLDigest.hashCode(), newCacheEntry));
             logger.info("cache after the query: " + newCacheEntry);
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
index 63e973a..dfd55f1 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
@@ -32,6 +32,7 @@ import net.sf.ehcache.Element;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.InternalErrorException;
@@ -166,55 +167,64 @@ public class QueryController extends BasicController {
     }
 
     private SQLResponse doQueryWithCache(SQLRequest sqlRequest) {
-        String sql = sqlRequest.getSql();
-        String project = sqlRequest.getProject();
-        logger.info("Using project: " + project);
-        logger.info("The original query:  " + sql);
-
-        String serverMode = KylinConfig.getInstanceFromEnv().getServerMode();
-        if (!(Constant.SERVER_MODE_QUERY.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase()))) {
-            throw new InternalErrorException("Query is not allowed in " + serverMode + " mode.");
-        }
+        try {
+            BackdoorToggles.setToggles(sqlRequest.getBackdoorToggles());
 
-        if (sql.toLowerCase().contains("select") == false) {
-            logger.debug("Directly return expection as not supported");
-            throw new InternalErrorException("Not Supported SQL.");
-        }
+            String sql = sqlRequest.getSql();
+            String project = sqlRequest.getProject();
+            logger.info("Using project: " + project);
+            logger.info("The original query:  " + sql);
 
-        SQLResponse sqlResponse = searchQueryInCache(sqlRequest);
-        try {
-            if (null == sqlResponse) {
-                sqlResponse = queryService.query(sqlRequest);
+            String serverMode = KylinConfig.getInstanceFromEnv().getServerMode();
+            if (!(Constant.SERVER_MODE_QUERY.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase()))) {
+                throw new InternalErrorException("Query is not allowed in " + serverMode + " mode.");
             }
 
-            checkQueryAuth(sqlResponse);
+            if (!sql.toLowerCase().contains("select")) {
+                logger.debug("Directly return exception as not supported");
+                throw new InternalErrorException("Not Supported SQL.");
+            }
+
+            SQLResponse sqlResponse = searchQueryInCache(sqlRequest);
+            try {
+                if (null == sqlResponse) {
+                    sqlResponse = queryService.query(sqlRequest);
+                }
+
+                checkQueryAuth(sqlResponse);
 
-        } catch (Throwable e) { // calcite may throw AssertError
-            logger.error("Exception when execute sql", e);
-            String errMsg = QueryUtil.makeErrorMsgUserFriendly(e);
+            } catch (Throwable e) { // calcite may throw AssertError
+                logger.error("Exception when execute sql", e);
+                String errMsg = QueryUtil.makeErrorMsgUserFriendly(e);
 
-            sqlResponse = new SQLResponse(null, null, 0, true, errMsg);
+                sqlResponse = new SQLResponse(null, null, 0, true, errMsg);
 
-            // for exception queries, only cache ScanOutOfLimitException
-            if (e instanceof ScanOutOfLimitException) {
-                Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
-                exceptionCache.put(new Element(sqlRequest, sqlResponse));
+                // for exception queries, only cache ScanOutOfLimitException
+                if (e instanceof ScanOutOfLimitException) {
+                    Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
+                    exceptionCache.put(new Element(sqlRequest, sqlResponse));
+                }
             }
-        }
 
-        queryService.logQuery(sqlRequest, sqlResponse);
-        
-        if (sqlResponse.getIsException())
-            throw new InternalErrorException(sqlResponse.getExceptionMessage());
-        
-        return sqlResponse;
+            queryService.logQuery(sqlRequest, sqlResponse);
+
+            if (sqlResponse.getIsException())
+                throw new InternalErrorException(sqlResponse.getExceptionMessage());
+
+            return sqlResponse;
+
+        } finally {
+            BackdoorToggles.cleanToggles();
+        }
     }
 
     private SQLResponse searchQueryInCache(SQLRequest sqlRequest) {
         SQLResponse response = null;
         Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
 
-        if (KylinConfig.getInstanceFromEnv().isQueryCacheEnabled() && null != exceptionCache.get(sqlRequest)) {
+        if (KylinConfig.getInstanceFromEnv().isQueryCacheEnabled() && //
+                !BackdoorToggles.getDisableCache() && //
+                exceptionCache.get(sqlRequest) != null) {
             Element element = exceptionCache.get(sqlRequest);
             response = (SQLResponse) element.getObjectValue();
             response.setHitCache(true);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
index a1e8a29..13eb09c 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -84,7 +83,7 @@ public class QueryService extends BasicService {
 
     @Autowired
     private CacheService cacheService;
-    
+
     public static final String USER_QUERY_FAMILY = "q";
     private static final String DEFAULT_TABLE_PREFIX = "kylin_metadata";
     private static final String USER_TABLE_NAME = "_user";
@@ -267,14 +266,8 @@ public class QueryService extends BasicService {
         parameters.put(OLAPContext.PRM_ACCEPT_PARTIAL_RESULT, String.valueOf(sqlRequest.isAcceptPartial()));
         OLAPContext.setParameters(parameters);
 
-        try {
-            BackdoorToggles.setToggles(sqlRequest.getBackdoorToggles());
-
-            return execute(correctedSql, sqlRequest);
+        return execute(correctedSql, sqlRequest);
 
-        } finally {
-            BackdoorToggles.cleanToggles();
-        }
     }
 
     protected List<TableMeta> getMetadata(CubeManager cubeMgr, String project, boolean cubedOnly) throws SQLException {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
index 2e2000d..e7c8116 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
@@ -44,8 +44,6 @@ import com.google.common.base.Preconditions;
 //used by reflection
 public class HBaseStorage implements IStorage {
 
-    private final static boolean allowStorageLayerCache = true;
-
     private final static String v2CubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v2.CubeStorageQuery";
     private final static String v1CubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v1.CubeStorageQuery";
 
@@ -53,6 +51,10 @@ public class HBaseStorage implements IStorage {
 
     @Override
     public IStorageQuery createQuery(IRealization realization) {
+
+        boolean queryCacheGloballyEnabled = KylinConfig.getInstanceFromEnv().isQueryCacheEnabled();
+        boolean queryCacheQueryLevelEnabled = !BackdoorToggles.getDisableCache();
+
         if (realization.getType() == RealizationType.INVERTED_INDEX) {
             ICachableStorageQuery ret;
             try {
@@ -61,7 +63,7 @@ public class HBaseStorage implements IStorage {
                 throw new RuntimeException("Failed to initialize storage query for " + defaultIIStorageQuery, e);
             }
 
-            if (allowStorageLayerCache) {
+            if (queryCacheGloballyEnabled && queryCacheQueryLevelEnabled) {
                 return wrapWithCache(ret, realization);
             } else {
                 return ret;
@@ -82,7 +84,7 @@ public class HBaseStorage implements IStorage {
                 throw new RuntimeException("Failed to initialize storage query for " + cubeStorageQuery, e);
             }
 
-            if (allowStorageLayerCache) {
+            if (queryCacheGloballyEnabled && queryCacheQueryLevelEnabled) {
                 return wrapWithCache(ret, realization);
             } else {
                 return ret;


[2/2] incubator-kylin git commit: KYLIN-1142 more detailed hitCache message in query's log

Posted by ma...@apache.org.
KYLIN-1142 more detailed hitCache message in query's log


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/ae0f1a72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/ae0f1a72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/ae0f1a72

Branch: refs/heads/2.x-staging
Commit: ae0f1a72e76f436eab4e0eb4e5d786bde83a719b
Parents: 692438f
Author: honma <ho...@ebay.com>
Authored: Fri Nov 13 17:49:30 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Nov 13 17:50:30 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/storage/StorageContext.java | 16 +++++++++++-----
 .../storage/cache/CacheFledgedDynamicQuery.java  |  8 ++++----
 .../storage/cache/CacheFledgedStaticQuery.java   |  3 ++-
 .../kylin/query/enumerator/OLAPEnumerator.java   |  1 +
 .../kylin/rest/controller/QueryController.java   |  2 +-
 .../apache/kylin/rest/response/SQLResponse.java  | 19 ++++++++++++++-----
 .../apache/kylin/rest/service/QueryService.java  | 11 +++++++++--
 .../kylin/rest/service/QueryServiceTest.java     |  2 +-
 8 files changed, 43 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ae0f1a72/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 1643aa4..90f950c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -18,13 +18,11 @@
 
 package org.apache.kylin.storage;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.realization.SQLDigest;
+
+import com.google.common.collect.Range;
 
 /**
  * @author xjiang
@@ -33,7 +31,6 @@ public class StorageContext {
 
     public static final int DEFAULT_THRESHOLD = 1000000;
 
-
     private String connUrl;
     private int threshold;
     private int limit;
@@ -49,6 +46,8 @@ public class StorageContext {
     private Cuboid cuboid;
     private boolean partialResultReturned;
 
+    private Range<Long> reusedPeriod;
+
     public StorageContext() {
         this.threshold = DEFAULT_THRESHOLD;
         this.limit = DEFAULT_THRESHOLD;
@@ -160,4 +159,11 @@ public class StorageContext {
         return this.enableCoprocessor;
     }
 
+    public Range<Long> getReusedPeriod() {
+        return reusedPeriod;
+    }
+
+    public void setReusedPeriod(Range<Long> reusedPeriod) {
+        this.reusedPeriod = reusedPeriod;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ae0f1a72/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
index b9ae0e9..b9808ea 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
@@ -62,7 +62,7 @@ public class CacheFledgedDynamicQuery extends AbstractCacheFledgedQuery {
         if (enableDynamicCache) {
             StreamSQLResult cachedResult = getStreamSQLResult(new StreamSQLDigest(sqlDigest, partitionColRef));
             if (cachedResult != null) {
-                ret = tryReuseCache(context, sqlDigest, returnTupleInfo, ret, cachedResult);
+                ret = tryReuseCache(context, sqlDigest, returnTupleInfo, cachedResult);
             } else {
                 logger.info("no cache entry for this query");
             }
@@ -70,10 +70,8 @@ public class CacheFledgedDynamicQuery extends AbstractCacheFledgedQuery {
 
         if (ret == null) {
             ret = underlyingStorage.search(context, sqlDigest, returnTupleInfo);
-            noCacheUsed = true;
             logger.info("No Cache being used");
         } else {
-            noCacheUsed = false;
             logger.info("Cache being used");
         }
 
@@ -90,7 +88,7 @@ public class CacheFledgedDynamicQuery extends AbstractCacheFledgedQuery {
     /**
      * if cache is not enough it will try to combine existing cache as well as fresh records
      */
-    private ITupleIterator tryReuseCache(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo, ITupleIterator ret, StreamSQLResult cachedResult) {
+    private ITupleIterator tryReuseCache(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo, StreamSQLResult cachedResult) {
         Range<Long> reusePeriod = cachedResult.getReusableResults(ts);
 
         logger.info("existing cache: " + cachedResult);
@@ -116,9 +114,11 @@ public class CacheFledgedDynamicQuery extends AbstractCacheFledgedQuery {
                 });
                 iTupleIteratorList.add(freshTuples);
 
+                context.setReusedPeriod(reusePeriod);
                 return new CompoundTupleIterator(iTupleIteratorList);
             } else if (remainings.size() == 0) {
                 logger.info("The ts range in new query was fully cached");
+                context.setReusedPeriod(reusePeriod);
                 return new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
             } else {
                 //if using cache causes more than one underlyingStorage searches

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ae0f1a72/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java
index 93e5e09..d024cf7 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java
@@ -29,10 +29,11 @@ public class CacheFledgedStaticQuery extends AbstractCacheFledgedQuery {
     public ITupleIterator search(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo) {
 
         StreamSQLResult cachedResult = getStreamSQLResult(new StreamSQLDigest(sqlDigest, null));
-        ITupleIterator ret = null;
+        ITupleIterator ret;
 
         if (cachedResult != null) {
             logger.info("using existing cache");
+            context.setReusedPeriod(Ranges.<Long> all());
             return new SimpleTupleIterator(cachedResult.reuse(Ranges.<Long> all()));
         } else {
             logger.info("no existing cache to use");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ae0f1a72/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
index 07f72b1..aa8b2a7 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
@@ -125,6 +125,7 @@ public class OLAPEnumerator implements Enumerator<Object[]> {
         ITupleIterator iterator = storageEngine.search(olapContext.storageContext, sqlDigest, olapContext.returnTupleInfo);
         if (logger.isDebugEnabled()) {
             logger.debug("return TupleIterator...");
+            logger.debug("Storage cache used for this storage query:" + olapContext.storageContext.getReusedPeriod());
         }
 
         return iterator;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ae0f1a72/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
index dfd55f1..f60894e 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
@@ -227,7 +227,7 @@ public class QueryController extends BasicController {
                 exceptionCache.get(sqlRequest) != null) {
             Element element = exceptionCache.get(sqlRequest);
             response = (SQLResponse) element.getObjectValue();
-            response.setHitCache(true);
+            response.setHitExceptionCache(true);
         }
 
         return response;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ae0f1a72/server/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index 6541ef9..f5ef8c5 100644
--- a/server/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ b/server/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -52,7 +52,9 @@ public class SQLResponse implements Serializable {
 
     private long totalScanCount;
 
-    private boolean hitCache = false;
+    private boolean hitExceptionCache = false;
+    
+    private boolean storageCacheUsed = false;
 
     public SQLResponse() {
     }
@@ -133,12 +135,19 @@ public class SQLResponse implements Serializable {
         this.totalScanCount = totalScanCount;
     }
 
-    public boolean isHitCache() {
-        return hitCache;
+    public boolean isHitExceptionCache() {
+        return hitExceptionCache;
     }
 
-    public void setHitCache(boolean hitCache) {
-        this.hitCache = hitCache;
+    public void setHitExceptionCache(boolean hitExceptionCache) {
+        this.hitExceptionCache = hitExceptionCache;
     }
 
+    public boolean isStorageCacheUsed() {
+        return storageCacheUsed;
+    }
+
+    public void setStorageCacheUsed(boolean storageCacheUsed) {
+        this.storageCacheUsed = storageCacheUsed;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ae0f1a72/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 13eb09c..669616b 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -203,8 +203,9 @@ public class QueryService extends BasicService {
         final Set<String> realizationNames = new HashSet<String>();
         final Set<Long> cuboidIds = new HashSet<Long>();
         float duration = response.getDuration() / (float) 1000;
+        boolean storageCacheUsed = false;
 
-        if (!response.isHitCache() && null != OLAPContext.getThreadLocalContexts()) {
+        if (!response.isHitExceptionCache() && null != OLAPContext.getThreadLocalContexts()) {
             for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) {
                 Cuboid cuboid = ctx.storageContext.getCuboid();
                 if (cuboid != null) {
@@ -216,6 +217,11 @@ public class QueryService extends BasicService {
                     String realizationName = ctx.realization.getName();
                     realizationNames.add(realizationName);
                 }
+
+                if (ctx.storageContext.getReusedPeriod() != null) {
+                    response.setStorageCacheUsed(true);
+                    storageCacheUsed = true;
+                }
             }
         }
 
@@ -239,7 +245,8 @@ public class QueryService extends BasicService {
         stringBuilder.append("Result row count: ").append(resultRowCount).append(newLine);
         stringBuilder.append("Accept Partial: ").append(request.isAcceptPartial()).append(newLine);
         stringBuilder.append("Is Partial Result: ").append(response.isPartial()).append(newLine);
-        stringBuilder.append("Hit Cache: ").append(response.isHitCache()).append(newLine);
+        stringBuilder.append("Hit Exception Cache: ").append(response.isHitExceptionCache()).append(newLine);
+        stringBuilder.append("Storage cache used: ").append(storageCacheUsed).append(newLine);
         stringBuilder.append("Message: ").append(response.getExceptionMessage()).append(newLine);
         stringBuilder.append("==========================[QUERY]===============================").append(newLine);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ae0f1a72/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index cc86e1e..119d88b 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -59,7 +59,7 @@ public class QueryServiceTest extends ServiceTestBase {
         request.setSql("select * from test_table");
         request.setAcceptPartial(true);
         SQLResponse response = new SQLResponse();
-        response.setHitCache(true);
+        response.setHitExceptionCache(true);
         queryService.logQuery(request, response);
     }
 }