You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/10/29 11:27:52 UTC

[kylin] branch master updated (3bcbaa8 -> de86ed6)

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from 3bcbaa8  KYLIN-3562, optimization the logic that updating the user when user logged in.
     new f120886  KYLIN-2894 Query cache expiration strategy switches from manual invalidation to signature checking
     new 1fcf967  KYLIN-2894 add trigger kylin.query.cache-signature-enabled for enabling query signature
     new 7e1cded  KYLIN-2894 add a new signature calculator FactTableRealizationSetCalculator
     new 95afdb4  KYLIN-2894 add unit test
     new 6b5f961  KYLIN-2896 remove query exception cache
     new b9aab01  KYLIN-2897 improve the query execution for a set of duplicate queries in a short period
     new cd952ff  KYLIN-2898 Introduce memcached as a distributed cache for queries
     new 73d0fd4  KYLIN-2898 config memcached
     new 2f04803  KYLIN-2898 add unit test
     new 7b58b16  KYLIN-2899 Introduce segment level query cache
     new 78b17f5  KYLIN-2723 fix potential concurrent issue when add rpc statistics
     new de86ed6  KYLIN-2898 If a distributed cache is adopted, small query results are also better to be put into the cache.

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 {query => cache}/pom.xml                           |  64 ++--
 .../spy/memcached/RefinedKetamaNodeLocator.java    | 279 ++++++++++++++++
 .../kylin/cache/cachemanager/CacheConstants.java   |   8 +-
 .../InstrumentedEhCacheCacheManager.java           | 101 ++++++
 .../cache/cachemanager/MemcachedCacheManager.java  | 181 ++++++++++
 .../RemoteLocalFailOverCacheManager.java           |  71 ++++
 .../cache/ehcache/InstrumentedEhCacheCache.java    | 205 ++++++++++++
 .../apache/kylin/cache/memcached/CacheStats.java   |  97 ++++++
 .../kylin/cache/memcached/KeyHookLookup.java       | 139 ++++++++
 .../kylin/cache/memcached/MemcachedCache.java      | 371 +++++++++++++++++++++
 .../cache/memcached/MemcachedCacheConfig.java      |  97 ++++++
 .../cache/memcached/MemcachedChunkingCache.java    | 279 ++++++++++++++++
 .../memcached/MemcachedConnectionFactory.java      | 193 +++++++++++
 .../MemcachedConnectionFactoryBuilder.java         | 173 ++++++++++
 .../kylin/cache/memcached/MemcachedMetrics.java    | 139 ++++++++
 .../RemoteLocalFailOverCacheManagerTest.java       |  62 ++++
 .../kylin/cache/memcached/MemcachedCacheTest.java  |  84 +++++
 .../memcached/MemcachedChunkingCacheTest.java      | 159 +++++++++
 cache/src/test/resources/cacheContext.xml          |  47 +++
 .../src/test}/resources/ehcache-test.xml           |  13 +-
 .../org/apache/kylin/common/KylinConfigBase.java   |  38 +++
 .../java/org/apache/kylin/common/QueryContext.java |  86 +++--
 .../apache/kylin/common/debug/BackdoorToggles.java |  16 +
 dev-support/checkstyle.xml                         |   4 +-
 pom.xml                                            |  17 +
 server-base/pom.xml                                |   4 +
 .../apache/kylin/rest/response/SQLResponse.java    |  32 +-
 .../apache/kylin/rest/service/CacheService.java    |  11 +-
 .../apache/kylin/rest/service/QueryService.java    | 130 ++++++--
 .../kylin/rest/signature/ComponentSignature.java   |  15 +-
 .../FactTableRealizationSetCalculator.java         | 112 +++++++
 .../rest/signature/RealizationSetCalculator.java   | 101 ++++++
 .../kylin/rest/signature/RealizationSignature.java | 164 +++++++++
 .../kylin/rest/signature/SegmentSignature.java     |  52 +--
 .../kylin/rest/signature/SignatureCalculator.java  |  16 +-
 .../kylin/rest/util/SQLResponseSignatureUtil.java  |  68 ++++
 .../java/org/apache/kylin/rest/bean/BeanTest.java  |   2 +-
 .../kylin/rest/response/SQLResponseTest.java       |  49 +++
 .../RealizationSignatureTest.java}                 |  33 +-
 .../kylin/rest/signature/SegmentSignatureTest.java |  26 +-
 .../rest/signature/SignatureCalculatorTest.java    | 184 ++++++++++
 server/src/main/resources/applicationContext.xml   |  28 +-
 .../kylin/rest/controller/QueryControllerTest.java |   2 +-
 storage-hbase/pom.xml                              |   4 +
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java        | 155 ++++++++-
 .../storage/hbase/cube/v2/SegmentQueryCache.java   |  80 +++++
 .../storage/hbase/cube/v2/SegmentQueryResult.java  | 101 ++++++
 .../storage/hbase/cube/SegmentQueryResultTest.java | 112 +++++++
 48 files changed, 4213 insertions(+), 191 deletions(-)
 copy {query => cache}/pom.xml (61%)
 create mode 100644 cache/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java
 copy core-cube/src/main/java/org/apache/kylin/gridtable/IGTBypassChecker.java => cache/src/main/java/org/apache/kylin/cache/cachemanager/CacheConstants.java (85%)
 create mode 100644 cache/src/main/java/org/apache/kylin/cache/cachemanager/InstrumentedEhCacheCacheManager.java
 create mode 100644 cache/src/main/java/org/apache/kylin/cache/cachemanager/MemcachedCacheManager.java
 create mode 100644 cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
 create mode 100644 cache/src/main/java/org/apache/kylin/cache/ehcache/InstrumentedEhCacheCache.java
 create mode 100644 cache/src/main/java/org/apache/kylin/cache/memcached/CacheStats.java
 create mode 100644 cache/src/main/java/org/apache/kylin/cache/memcached/KeyHookLookup.java
 create mode 100644 cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCache.java
 create mode 100644 cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCacheConfig.java
 create mode 100644 cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedChunkingCache.java
 create mode 100644 cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactory.java
 create mode 100644 cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactoryBuilder.java
 create mode 100644 cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedMetrics.java
 create mode 100644 cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java
 create mode 100644 cache/src/test/java/org/apache/kylin/cache/memcached/MemcachedCacheTest.java
 create mode 100644 cache/src/test/java/org/apache/kylin/cache/memcached/MemcachedChunkingCacheTest.java
 create mode 100644 cache/src/test/resources/cacheContext.xml
 copy {server/src/main => cache/src/test}/resources/ehcache-test.xml (77%)
 copy core-metrics/src/main/java/org/apache/kylin/metrics/lib/SinkTool.java => server-base/src/main/java/org/apache/kylin/rest/signature/ComponentSignature.java (74%)
 create mode 100644 server-base/src/main/java/org/apache/kylin/rest/signature/FactTableRealizationSetCalculator.java
 create mode 100644 server-base/src/main/java/org/apache/kylin/rest/signature/RealizationSetCalculator.java
 create mode 100644 server-base/src/main/java/org/apache/kylin/rest/signature/RealizationSignature.java
 copy core-common/src/main/java/org/apache/kylin/common/util/CaseInsensitiveString.java => server-base/src/main/java/org/apache/kylin/rest/signature/SegmentSignature.java (53%)
 copy core-job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java => server-base/src/main/java/org/apache/kylin/rest/signature/SignatureCalculator.java (74%)
 create mode 100644 server-base/src/main/java/org/apache/kylin/rest/util/SQLResponseSignatureUtil.java
 create mode 100644 server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
 copy server-base/src/test/java/org/apache/kylin/rest/{service/TableServiceTest.java => signature/RealizationSignatureTest.java} (54%)
 copy core-common/src/test/java/org/apache/kylin/common/persistence/AutoDeleteDirectoryTest.java => server-base/src/test/java/org/apache/kylin/rest/signature/SegmentSignatureTest.java (57%)
 create mode 100644 server-base/src/test/java/org/apache/kylin/rest/signature/SignatureCalculatorTest.java
 create mode 100755 storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryCache.java
 create mode 100755 storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryResult.java
 create mode 100644 storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/SegmentQueryResultTest.java


[kylin] 05/12: KYLIN-2896 remove query exception cache

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 6b5f961724e4b7964dbc1b268a200095d0fbe30d
Author: Zhong <nj...@apache.org>
AuthorDate: Thu Oct 18 17:22:22 2018 +0800

    KYLIN-2896 remove query exception cache
---
 .../apache/kylin/rest/service/CacheService.java    |  6 +--
 .../apache/kylin/rest/service/QueryService.java    | 58 +++++++++-------------
 2 files changed, 26 insertions(+), 38 deletions(-)

diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 930852b..9904cef 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -118,12 +118,10 @@ public class CacheService extends BasicService implements InitializingBean {
     public void cleanDataCache(String project) {
         if (cacheManager != null) {
             if (getConfig().isQueryCacheSignatureEnabled()) {
-                logger.info("cleaning cache for project " + project + " (currently remove exception entries)");
-                cacheManager.getCache(QueryService.EXCEPTION_QUERY_CACHE).removeAll();
+                logger.info("cleaning cache for project " + project + " (currently remove nothing)");
             } else {
                 logger.info("cleaning cache for project " + project + " (currently remove all entries)");
-                cacheManager.getCache(QueryService.SUCCESS_QUERY_CACHE).removeAll();
-                cacheManager.getCache(QueryService.EXCEPTION_QUERY_CACHE).removeAll();
+                cacheManager.getCache(QueryService.QUERY_CACHE).removeAll();
             }
         } else {
             logger.warn("skip cleaning cache for project " + project);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index d0ba4da..abcab7f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -135,8 +135,7 @@ import com.google.common.collect.Lists;
 @Component("queryService")
 public class QueryService extends BasicService {
 
-    public static final String SUCCESS_QUERY_CACHE = "StorageCache";
-    public static final String EXCEPTION_QUERY_CACHE = "ExceptionQueryCache";
+    public static final String QUERY_CACHE = "StorageCache";
     public static final String QUERY_STORE_PATH_PREFIX = "/query/";
     private static final Logger logger = LoggerFactory.getLogger(QueryService.class);
     final BadQueryDetector badQueryDetector = new BadQueryDetector();
@@ -481,7 +480,7 @@ public class QueryService extends BasicService {
                     && checkCondition(sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(),
                             "query response is too large: {} ({})", sqlResponse.getResults().size(),
                             kylinConfig.getLargeQueryThreshold())) {
-                cacheManager.getCache(SUCCESS_QUERY_CACHE).put(sqlRequest.getCacheKey(), sqlResponse);
+                cacheManager.getCache(QUERY_CACHE).put(sqlRequest.getCacheKey(), sqlResponse);
             }
 
         } catch (Throwable e) { // calcite may throw AssertError
@@ -495,7 +494,7 @@ public class QueryService extends BasicService {
 
             if (queryCacheEnabled && e.getCause() != null
                     && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) {
-                Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
+                Cache exceptionCache = cacheManager.getCache(QUERY_CACHE);
                 exceptionCache.put(sqlRequest.getCacheKey(), sqlResponse);
             }
         }
@@ -521,37 +520,28 @@ public class QueryService extends BasicService {
     }
 
     public SQLResponse searchQueryInCache(SQLRequest sqlRequest) {
-        String[] cacheTypes = new String[] { EXCEPTION_QUERY_CACHE, SUCCESS_QUERY_CACHE };
-        for (String cacheType : cacheTypes) {
-            Cache cache = cacheManager.getCache(cacheType);
-            Cache.ValueWrapper wrapper = cache.get(sqlRequest.getCacheKey());
-            if (wrapper == null) {
-                continue;
-            }
-            SQLResponse response = (SQLResponse) wrapper.get();
-            if (response == null) {
-                return null;
-            }
-            logger.info("The sqlResponse is found in " + cacheType);
-            if (getConfig().isQueryCacheSignatureEnabled()
-                    && !SQLResponseSignatureUtil.checkSignature(getConfig(), response, sqlRequest.getProject())) {
-                logger.info("The sql response signature is changed. Remove it from QUERY_CACHE.");
-                cache.evict(sqlRequest.getCacheKey());
-                return null;
-            } else {
-                switch (cacheType) {
-                case EXCEPTION_QUERY_CACHE:
-                    response.setHitExceptionCache(true);
-                    break;
-                case SUCCESS_QUERY_CACHE:
-                    response.setStorageCacheUsed(true);
-                    break;
-                default:
-                }
-            }
-            return response;
+        Cache cache = cacheManager.getCache(QUERY_CACHE);
+        Cache.ValueWrapper wrapper = cache.get(sqlRequest.getCacheKey());
+        if (wrapper == null) {
+            return null;
         }
-        return null;
+        SQLResponse response = (SQLResponse) wrapper.get();
+        if (response == null) {
+            return null;
+        }
+        logger.info("The sqlResponse is found in QUERY_CACHE");
+        if (getConfig().isQueryCacheSignatureEnabled()
+                && !SQLResponseSignatureUtil.checkSignature(getConfig(), response, sqlRequest.getProject())) {
+            logger.info("The sql response signature is changed. Remove it from QUERY_CACHE.");
+            cache.evict(sqlRequest.getCacheKey());
+            return null;
+        }
+        if (response.getIsException()) {
+            response.setHitExceptionCache(true);
+        } else {
+            response.setStorageCacheUsed(true);
+        }
+        return response;
     }
 
     private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws Exception {


[kylin] 01/12: KYLIN-2894 Query cache expiration strategy switches from manual invalidation to signature checking

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit f12088637347385ba1d81eb345c39d2276c2bbe3
Author: Wang Ken <mi...@ebay.com>
AuthorDate: Mon Sep 25 13:59:46 2017 +0800

    KYLIN-2894 Query cache expiration strategy switches from manual invalidation to signature checking
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   5 +
 .../apache/kylin/rest/response/SQLResponse.java    |  14 +-
 .../apache/kylin/rest/service/CacheService.java    |   4 +-
 .../apache/kylin/rest/service/QueryService.java    | 100 ++++++++-----
 .../kylin/rest/signature/ComponentSignature.java   |  31 ++++
 .../rest/signature/RealizationSetCalculator.java   | 101 +++++++++++++
 .../kylin/rest/signature/RealizationSignature.java | 164 +++++++++++++++++++++
 .../kylin/rest/signature/SegmentSignature.java     |  65 ++++++++
 .../kylin/rest/signature/SignatureCalculator.java  |  28 ++++
 .../kylin/rest/util/SQLResponseSignatureUtil.java  |  68 +++++++++
 .../java/org/apache/kylin/rest/bean/BeanTest.java  |   2 +-
 .../kylin/rest/controller/QueryControllerTest.java |   2 +-
 12 files changed, 545 insertions(+), 39 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 5577307..135d6e6 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1553,6 +1553,11 @@ abstract public class KylinConfigBase implements Serializable {
         return getOptional("kylin.query.realization-filter", null);
     }
 
+    public String getSQLResponseSignatureClass() {
+        return this.getOptional("kylin.query.signature-class",
+                "org.apache.kylin.rest.signature.RealizationSetCalculator");
+    }
+
     // ============================================================================
     // SERVER
     // ============================================================================
diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index 0bdf037..0502798 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -76,6 +76,9 @@ public class SQLResponse implements Serializable {
     
     protected String traceUrl = null;
 
+    // it's sql response signature for cache checking, no need to return and should be JsonIgnore
+    protected String signature;
+
     public SQLResponse() {
     }
 
@@ -205,7 +208,16 @@ public class SQLResponse implements Serializable {
     public void setTraceUrl(String traceUrl) {
         this.traceUrl = traceUrl;
     }
-    
+
+    @JsonIgnore
+    public String getSignature() {
+        return signature;
+    }
+
+    public void setSignature(String signature) {
+        this.signature = signature;
+    }
+
     @JsonIgnore
     public List<QueryContext.CubeSegmentStatisticsResult> getCubeSegmentStatisticsList() {
         try {
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 10ab90b..67d49d9 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -117,8 +117,8 @@ public class CacheService extends BasicService implements InitializingBean {
 
     public void cleanDataCache(String project) {
         if (cacheManager != null) {
-            logger.info("cleaning cache for project " + project + " (currently remove all entries)");
-            cacheManager.getCache(QueryService.SUCCESS_QUERY_CACHE).removeAll();
+            logger.info("cleaning cache for project " + project + " (currently remove exception entries)");
+            //            cacheManager.getCache(QueryService.SUCCESS_QUERY_CACHE).removeAll();
             cacheManager.getCache(QueryService.EXCEPTION_QUERY_CACHE).removeAll();
         } else {
             logger.warn("skip cleaning cache for project " + project);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 8262472..fb13ff5 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -110,6 +110,7 @@ import org.apache.kylin.rest.response.SQLResponse;
 import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.rest.util.AclPermissionUtil;
 import org.apache.kylin.rest.util.QueryRequestLimits;
+import org.apache.kylin.rest.util.SQLResponseSignatureUtil;
 import org.apache.kylin.rest.util.TableauInterceptor;
 import org.apache.kylin.storage.hybrid.HybridInstance;
 import org.apache.kylin.storage.hybrid.HybridManager;
@@ -117,18 +118,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.cache.Cache;
+import org.springframework.cache.CacheManager;
 import org.springframework.security.core.GrantedAuthority;
 import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.stereotype.Component;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 
-import net.sf.ehcache.Cache;
-import net.sf.ehcache.CacheManager;
-import net.sf.ehcache.Element;
-
 /**
  * @author xduo
  */
@@ -226,7 +226,7 @@ public class QueryService extends BasicService {
             columnMetas.add(new SelectedColumnMeta(false, false, false, false, 1, false, Integer.MAX_VALUE, "c0", "c0",
                     null, null, null, Integer.MAX_VALUE, 128, 1, "char", false, false, false));
 
-            return buildSqlResponse(true, r.getFirst(), columnMetas);
+            return buildSqlResponse(sqlRequest.getProject(), true, r.getFirst(), columnMetas);
 
         } catch (Exception e) {
             logger.info("pushdown engine failed to finish current non-select query");
@@ -313,6 +313,12 @@ public class QueryService extends BasicService {
             }
         }
 
+        if (realizationNames.isEmpty()) {
+            if (!Strings.isNullOrEmpty(response.getCube())) {
+                realizationNames.addAll(Lists.newArrayList(response.getCube().split(",")));
+            }
+        }
+
         int resultRowCount = 0;
         if (!response.getIsException() && response.getResults() != null) {
             resultRowCount = response.getResults().size();
@@ -458,6 +464,8 @@ public class QueryService extends BasicService {
                     String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()),
                     String.valueOf(sqlResponse.getTotalScanCount()));
             if (checkCondition(queryCacheEnabled, "query cache is disabled") //
+                    && checkCondition(!Strings.isNullOrEmpty(sqlResponse.getCube()),
+                            "query does not hit cube nor hybrid") //
                     && checkCondition(!sqlResponse.getIsException(), "query has exception") //
                     && checkCondition(
                             !(sqlResponse.isPushDown()
@@ -473,7 +481,7 @@ public class QueryService extends BasicService {
                     && checkCondition(sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(),
                             "query response is too large: {} ({})", sqlResponse.getResults().size(),
                             kylinConfig.getLargeQueryThreshold())) {
-                cacheManager.getCache(SUCCESS_QUERY_CACHE).put(new Element(sqlRequest.getCacheKey(), sqlResponse));
+                cacheManager.getCache(SUCCESS_QUERY_CACHE).put(sqlRequest.getCacheKey(), sqlResponse);
             }
 
         } catch (Throwable e) { // calcite may throw AssertError
@@ -482,15 +490,13 @@ public class QueryService extends BasicService {
             logger.error("Exception while executing query", e);
             String errMsg = makeErrorMsgUserFriendly(e);
 
-            sqlResponse = new SQLResponse(null, null, null, 0, true, errMsg, false, false);
-            sqlResponse.setTotalScanCount(queryContext.getScannedRows());
-            sqlResponse.setTotalScanBytes(queryContext.getScannedBytes());
-            sqlResponse.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
+            sqlResponse = buildSqlResponse(sqlRequest.getProject(), false, null, null, true, errMsg);
+            sqlResponse.setThrowable(e.getCause() == null ? e : ExceptionUtils.getRootCause(e));
 
             if (queryCacheEnabled && e.getCause() != null
                     && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) {
                 Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
-                exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse));
+                exceptionCache.put(sqlRequest.getCacheKey(), sqlResponse);
             }
         }
         return sqlResponse;
@@ -515,22 +521,36 @@ public class QueryService extends BasicService {
     }
 
     public SQLResponse searchQueryInCache(SQLRequest sqlRequest) {
-        SQLResponse response = null;
-        Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
-        Cache successCache = cacheManager.getCache(SUCCESS_QUERY_CACHE);
-
-        Element element = null;
-        if ((element = exceptionCache.get(sqlRequest.getCacheKey())) != null) {
-            logger.info("The sqlResponse is found in EXCEPTION_QUERY_CACHE");
-            response = (SQLResponse) element.getObjectValue();
-            response.setHitExceptionCache(true);
-        } else if ((element = successCache.get(sqlRequest.getCacheKey())) != null) {
-            logger.info("The sqlResponse is found in SUCCESS_QUERY_CACHE");
-            response = (SQLResponse) element.getObjectValue();
-            response.setStorageCacheUsed(true);
+        String[] cacheTypes = new String[] { EXCEPTION_QUERY_CACHE, SUCCESS_QUERY_CACHE };
+        for (String cacheType : cacheTypes) {
+            Cache cache = cacheManager.getCache(cacheType);
+            Cache.ValueWrapper wrapper = cache.get(sqlRequest.getCacheKey());
+            if (wrapper == null) {
+                continue;
+            }
+            SQLResponse response = (SQLResponse) wrapper.get();
+            if (response == null) {
+                return null;
+            }
+            logger.info("The sqlResponse is found in " + cacheType);
+            if (!SQLResponseSignatureUtil.checkSignature(getConfig(), response, sqlRequest.getProject())) {
+                logger.info("The sql response signature is changed. Remove it from QUERY_CACHE.");
+                cache.evict(sqlRequest.getCacheKey());
+                return null;
+            } else {
+                switch (cacheType) {
+                case EXCEPTION_QUERY_CACHE:
+                    response.setHitExceptionCache(true);
+                    break;
+                case SUCCESS_QUERY_CACHE:
+                    response.setStorageCacheUsed(true);
+                    break;
+                default:
+                }
+            }
+            return response;
         }
-
-        return response;
+        return null;
     }
 
     private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws Exception {
@@ -579,7 +599,8 @@ public class QueryService extends BasicService {
             List<List<String>> results = Lists.newArrayList();
             List<SelectedColumnMeta> columnMetas = Lists.newArrayList();
             if (BackdoorToggles.getPrepareOnly()) {
-                return getPrepareOnlySqlResponse(correctedSql, conn, false, results, columnMetas);
+                return getPrepareOnlySqlResponse(sqlRequest.getProject(), correctedSql, conn, false, results,
+                        columnMetas);
             }
             if (!isPrepareRequest) {
                 return executeRequest(correctedSql, sqlRequest, conn);
@@ -893,7 +914,7 @@ public class QueryService extends BasicService {
             close(resultSet, stat, null); //conn is passed in, not my duty to close
         }
 
-        return buildSqlResponse(isPushDown, r.getFirst(), r.getSecond());
+        return buildSqlResponse(sqlRequest.getProject(), isPushDown, r.getFirst(), r.getSecond());
     }
 
     private SQLResponse executePrepareRequest(String correctedSql, PrepareSqlRequest sqlRequest,
@@ -921,7 +942,7 @@ public class QueryService extends BasicService {
             DBUtils.closeQuietly(resultSet);
         }
 
-        return buildSqlResponse(isPushDown, r.getFirst(), r.getSecond());
+        return buildSqlResponse(sqlRequest.getProject(), isPushDown, r.getFirst(), r.getSecond());
     }
 
     private Pair<List<List<String>>, List<SelectedColumnMeta>> pushDownQuery(SQLRequest sqlRequest, String correctedSql,
@@ -972,7 +993,8 @@ public class QueryService extends BasicService {
         return QueryUtil.makeErrorMsgUserFriendly(e);
     }
 
-    private SQLResponse getPrepareOnlySqlResponse(String correctedSql, Connection conn, Boolean isPushDown,
+    private SQLResponse getPrepareOnlySqlResponse(String projectName, String correctedSql, Connection conn,
+            Boolean isPushDown,
             List<List<String>> results, List<SelectedColumnMeta> columnMetas) throws SQLException {
 
         CalcitePrepareImpl.KYLIN_ONLY_PREPARE.set(true);
@@ -1018,7 +1040,7 @@ public class QueryService extends BasicService {
             DBUtils.closeQuietly(preparedStatement);
         }
 
-        return buildSqlResponse(isPushDown, results, columnMetas);
+        return buildSqlResponse(projectName, isPushDown, results, columnMetas);
     }
 
     private boolean isPrepareStatementWithParams(SQLRequest sqlRequest) {
@@ -1028,10 +1050,17 @@ public class QueryService extends BasicService {
         return false;
     }
 
-    private SQLResponse buildSqlResponse(Boolean isPushDown, List<List<String>> results,
+    private SQLResponse buildSqlResponse(String projectName, Boolean isPushDown, List<List<String>> results,
             List<SelectedColumnMeta> columnMetas) {
+        return buildSqlResponse(projectName, isPushDown, results, columnMetas, false, null);
+    }
+
+    private SQLResponse buildSqlResponse(String projectName, Boolean isPushDown, List<List<String>> results,
+            List<SelectedColumnMeta> columnMetas, boolean isException, String exceptionMessage) {
 
         boolean isPartialResult = false;
+
+        List<String> realizations = Lists.newLinkedList();
         StringBuilder cubeSb = new StringBuilder();
         StringBuilder logSb = new StringBuilder("Processed rows for each storageContext: ");
         QueryContext queryContext = QueryContextFacade.current();
@@ -1049,17 +1078,20 @@ public class QueryService extends BasicService {
 
                     realizationName = ctx.realization.getName();
                     realizationType = ctx.realization.getStorageType();
+
+                    realizations.add(realizationName);
                 }
                 queryContext.setContextRealization(ctx.id, realizationName, realizationType);
             }
         }
         logger.info(logSb.toString());
 
-        SQLResponse response = new SQLResponse(columnMetas, results, cubeSb.toString(), 0, false, null, isPartialResult,
-                isPushDown);
+        SQLResponse response = new SQLResponse(columnMetas, results, cubeSb.toString(), 0, isException,
+                exceptionMessage, isPartialResult, isPushDown);
         response.setTotalScanCount(queryContext.getScannedRows());
         response.setTotalScanBytes(queryContext.getScannedBytes());
         response.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
+        response.setSignature(SQLResponseSignatureUtil.createSignature(getConfig(), response, projectName));
         return response;
     }
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/signature/ComponentSignature.java b/server-base/src/main/java/org/apache/kylin/rest/signature/ComponentSignature.java
new file mode 100644
index 0000000..7b3a2e4
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/signature/ComponentSignature.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.signature;
+
+import java.io.Serializable;
+
+abstract class ComponentSignature<T extends ComponentSignature> implements Serializable, Comparable<T> {
+
+    public abstract String getKey();
+
+    @Override
+    public int compareTo(T o) {
+        return getKey().compareTo(o.getKey());
+    }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/signature/RealizationSetCalculator.java b/server-base/src/main/java/org/apache/kylin/rest/signature/RealizationSetCalculator.java
new file mode 100644
index 0000000..63139f3
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/signature/RealizationSetCalculator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.signature;
+
+import java.security.MessageDigest;
+import java.util.Set;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.rest.response.SQLResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+public class RealizationSetCalculator implements SignatureCalculator {
+
+    public static final Logger logger = LoggerFactory.getLogger(RealizationSetCalculator.class);
+
+    @Override
+    public String calculateSignature(KylinConfig config, SQLResponse sqlResponse, ProjectInstance project) {
+        Set<String> realizations = getRealizations(config, sqlResponse.getCube(), project);
+        if (realizations == null) {
+            return null;
+        }
+        Set<RealizationSignature> signatureSet = Sets.newTreeSet();
+        for (String realization : realizations) {
+            RealizationSignature realizationSignature = getRealizationSignature(config, realization);
+            if (realizationSignature != null) {
+                signatureSet.add(realizationSignature);
+            }
+        }
+        if (signatureSet.isEmpty()) {
+            return null;
+        }
+        try {
+            MessageDigest md = MessageDigest.getInstance("MD5");
+            byte[] signature = md.digest(signatureSet.toString().getBytes("UTF-8"));
+            return new String(Base64.encodeBase64(signature), "UTF-8");
+        } catch (Exception e) {
+            logger.warn("Failed to calculate signature due to " + e);
+            return null;
+        }
+    }
+
+    protected Set<String> getRealizations(KylinConfig config, String cubes, ProjectInstance project) {
+        if (Strings.isNullOrEmpty(cubes)) {
+            return null;
+        }
+        String[] realizations = parseNamesFromCanonicalNames(cubes.split(","));
+        return Sets.newHashSet(realizations);
+    }
+
+    protected static RealizationSignature getRealizationSignature(KylinConfig config, String realizationName) {
+        RealizationSignature result = RealizationSignature.HybridSignature.getHybridSignature(config, realizationName);
+        if (result == null) {
+            result = RealizationSignature.CubeSignature.getCubeSignature(config, realizationName);
+        }
+        return result;
+    }
+
+    private static String[] parseNamesFromCanonicalNames(String[] canonicalNames) {
+        String[] result = new String[canonicalNames.length];
+        for (int i = 0; i < canonicalNames.length; i++) {
+            result[i] = parseCanonicalName(canonicalNames[i]).getSecond();
+        }
+        return result;
+    }
+
+    /**
+     * @param canonicalName
+     * @return type and name pair for realization
+     */
+    private static Pair<String, String> parseCanonicalName(String canonicalName) {
+        Iterable<String> parts = Splitter.on(CharMatcher.anyOf("[]=,")).split(canonicalName);
+        String[] partsStr = Iterables.toArray(parts, String.class);
+        return new Pair<>(partsStr[0], partsStr[2]);
+    }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/signature/RealizationSignature.java b/server-base/src/main/java/org/apache/kylin/rest/signature/RealizationSignature.java
new file mode 100644
index 0000000..9e54085
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/signature/RealizationSignature.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.signature;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridManager;
+
+import com.google.common.collect.Sets;
+
+public abstract class RealizationSignature extends ComponentSignature<RealizationSignature> {
+
+    static class CubeSignature extends RealizationSignature {
+        public final String name;
+        public final RealizationStatusEnum status;
+        public final Set<SegmentSignature> segmentSignatureSet;
+
+        private CubeSignature(String name, RealizationStatusEnum status, Set<SegmentSignature> segmentSignatureSet) {
+            this.name = name;
+            this.status = status;
+            this.segmentSignatureSet = segmentSignatureSet;
+        }
+
+        public String getKey() {
+            return name;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            CubeSignature that = (CubeSignature) o;
+
+            if (name != null ? !name.equals(that.name) : that.name != null)
+                return false;
+            if (status != that.status)
+                return false;
+            return segmentSignatureSet != null ? segmentSignatureSet.equals(that.segmentSignatureSet)
+                    : that.segmentSignatureSet == null;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = name != null ? name.hashCode() : 0;
+            result = 31 * result + (status != null ? status.hashCode() : 0);
+            result = 31 * result + (segmentSignatureSet != null ? segmentSignatureSet.hashCode() : 0);
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return name + "-" + status + ":"
+                    + (segmentSignatureSet != null ? Sets.newTreeSet(segmentSignatureSet) : null);
+        }
+
+        static CubeSignature getCubeSignature(KylinConfig config, String realizationName) {
+            CubeInstance cubeInstance = CubeManager.getInstance(config).getCube(realizationName);
+            if (cubeInstance == null) {
+                return null;
+            }
+            if (!cubeInstance.isReady()) {
+                return new CubeSignature(realizationName, RealizationStatusEnum.DISABLED, null);
+            }
+            List<CubeSegment> readySegments = cubeInstance.getSegments(SegmentStatusEnum.READY);
+            Set<SegmentSignature> segmentSignatureSet = Sets.newHashSetWithExpectedSize(readySegments.size());
+            for (CubeSegment cubeSeg : readySegments) {
+                segmentSignatureSet.add(new SegmentSignature(cubeSeg.getName(), cubeSeg.getLastBuildTime()));
+            }
+            return new CubeSignature(realizationName, RealizationStatusEnum.READY, segmentSignatureSet);
+        }
+    }
+
+    static class HybridSignature extends RealizationSignature {
+        public final String name;
+        public final Set<RealizationSignature> realizationSignatureSet;
+
+        private HybridSignature(String name, Set<RealizationSignature> realizationSignatureSet) {
+            this.name = name;
+            this.realizationSignatureSet = realizationSignatureSet;
+        }
+
+        public String getKey() {
+            return name;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            HybridSignature that = (HybridSignature) o;
+
+            if (name != null ? !name.equals(that.name) : that.name != null)
+                return false;
+            return realizationSignatureSet != null ? realizationSignatureSet.equals(that.realizationSignatureSet)
+                    : that.realizationSignatureSet == null;
+
+        }
+
+        @Override
+        public int hashCode() {
+            int result = name != null ? name.hashCode() : 0;
+            result = 31 * result + (realizationSignatureSet != null ? realizationSignatureSet.hashCode() : 0);
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return name + ":" + (realizationSignatureSet != null ? Sets.newTreeSet(realizationSignatureSet) : null);
+        }
+
+        static HybridSignature getHybridSignature(KylinConfig config, String realizationName) {
+            HybridInstance hybridInstance = HybridManager.getInstance(config).getHybridInstance(realizationName);
+            if (hybridInstance == null) {
+                return null;
+            }
+            IRealization[] realizations = hybridInstance.getRealizations();
+            Set<RealizationSignature> realizationSignatureSet = Sets.newHashSetWithExpectedSize(realizations.length);
+            for (IRealization realization : realizations) {
+                RealizationSignature realizationSignature = null;
+                if (realization.getType() == RealizationType.CUBE) {
+                    realizationSignature = CubeSignature.getCubeSignature(config, realization.getName());
+                } else if (realization.getType() == RealizationType.HYBRID) {
+                    realizationSignature = getHybridSignature(config, realization.getName());
+                }
+                if (realizationSignature != null) {
+                    realizationSignatureSet.add(realizationSignature);
+                }
+            }
+            return new HybridSignature(realizationName, realizationSignatureSet);
+        }
+    }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/signature/SegmentSignature.java b/server-base/src/main/java/org/apache/kylin/rest/signature/SegmentSignature.java
new file mode 100644
index 0000000..800dd99
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/signature/SegmentSignature.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.signature;
+
+class SegmentSignature extends ComponentSignature<SegmentSignature> {
+    public final String name;
+    public final long lastBuildTime;
+
+    public SegmentSignature(String name, long lastBuildTime) {
+        this.name = name;
+        this.lastBuildTime = lastBuildTime;
+    }
+
+    public String getKey() {
+        return name;
+    }
+
+    @Override
+    public int compareTo(SegmentSignature o) {
+        return name.compareTo(o.name);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        SegmentSignature that = (SegmentSignature) o;
+
+        if (lastBuildTime != that.lastBuildTime)
+            return false;
+        return name != null ? name.equals(that.name) : that.name == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = name != null ? name.hashCode() : 0;
+        result = 31 * result + (int) (lastBuildTime ^ (lastBuildTime >>> 32));
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return name + ":" + lastBuildTime;
+    }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/signature/SignatureCalculator.java b/server-base/src/main/java/org/apache/kylin/rest/signature/SignatureCalculator.java
new file mode 100644
index 0000000..1f94beb
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/signature/SignatureCalculator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.signature;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.rest.response.SQLResponse;
+
+public interface SignatureCalculator {
+
+    String calculateSignature(KylinConfig config, SQLResponse sqlResponse, ProjectInstance project);
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/SQLResponseSignatureUtil.java b/server-base/src/main/java/org/apache/kylin/rest/util/SQLResponseSignatureUtil.java
new file mode 100644
index 0000000..c6d3507
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/util/SQLResponseSignatureUtil.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.util;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.rest.response.SQLResponse;
+import org.apache.kylin.rest.signature.RealizationSetCalculator;
+import org.apache.kylin.rest.signature.SignatureCalculator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class SQLResponseSignatureUtil {
+
+    public static final Logger logger = LoggerFactory.getLogger(SQLResponseSignatureUtil.class);
+
+    public static boolean checkSignature(KylinConfig config, SQLResponse sqlResponse, String projectName) {
+        String old = sqlResponse.getSignature();
+        if (old == null) {
+            return false;
+        }
+        String latest = createSignature(config, sqlResponse, projectName);
+        return old.equals(latest);
+    }
+
+    public static String createSignature(KylinConfig config, SQLResponse sqlResponse, String projectName) {
+        ProjectInstance project = ProjectManager.getInstance(config).getProject(projectName);
+        Preconditions.checkNotNull(project);
+
+        SignatureCalculator signatureCalculator;
+        try {
+            Class signatureClass = getSignatureClass(project.getConfig());
+            signatureCalculator = (SignatureCalculator) signatureClass.getConstructor().newInstance();
+        } catch (Exception e) {
+            logger.warn("Will use default signature since fail to construct signature due to " + e);
+            signatureCalculator = new RealizationSetCalculator();
+        }
+        return signatureCalculator.calculateSignature(config, sqlResponse, project);
+    }
+
+    private static Class getSignatureClass(KylinConfig config) {
+        try {
+            return Class.forName(config.getSQLResponseSignatureClass());
+        } catch (ClassNotFoundException e) {
+            logger.warn("Will use default signature since cannot find class " + config.getSQLResponseSignatureClass());
+            return RealizationSetCalculator.class;
+        }
+    }
+}
diff --git a/server-base/src/test/java/org/apache/kylin/rest/bean/BeanTest.java b/server-base/src/test/java/org/apache/kylin/rest/bean/BeanTest.java
index e1a1228..09191e0 100644
--- a/server-base/src/test/java/org/apache/kylin/rest/bean/BeanTest.java
+++ b/server-base/src/test/java/org/apache/kylin/rest/bean/BeanTest.java
@@ -54,7 +54,7 @@ public class BeanTest {
         } catch (IntrospectionException e) {
         }
 
-        new SQLResponse(null, null, null, 0, true, null, false, false);
+        new SQLResponse(null, null, 0, true, null);
 
         SelectedColumnMeta coulmnMeta = new SelectedColumnMeta(false, false, false, false, 0, false, 0, null, null,
                 null, null, null, 0, 0, 0, null, false, false, false);
diff --git a/server/src/test/java/org/apache/kylin/rest/controller/QueryControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/QueryControllerTest.java
index 7c5f253..2225096 100644
--- a/server/src/test/java/org/apache/kylin/rest/controller/QueryControllerTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/controller/QueryControllerTest.java
@@ -31,7 +31,7 @@ import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 
-import net.sf.ehcache.CacheManager;
+import org.springframework.cache.CacheManager;
 
 /**
  * @author xduo


[kylin] 06/12: KYLIN-2897 improve the query execution for a set of duplicate queries in a short period

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit b9aab0105cbf00a3581409a357be3816d0cd89a6
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Thu Oct 18 18:33:16 2018 +0800

    KYLIN-2897 improve the query execution for a set of duplicate queries in a short period
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  8 ++++++
 .../apache/kylin/rest/response/SQLResponse.java    | 18 ++++++++++++
 .../apache/kylin/rest/service/QueryService.java    | 33 ++++++++++++++++++++++
 3 files changed, 59 insertions(+)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index b19f2e9..778b5bf 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1363,6 +1363,14 @@ abstract public class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.query.scan-threshold", "10000000"));
     }
 
+    public boolean isLazyQueryEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.query.lazy-query-enabled", "false"));
+    }
+
+    public long getLazyQueryWaitingTimeoutMilliSeconds() {
+        return Long.parseLong(getOptional("kylin.query.lazy-query-waiting-timeout-milliseconds", "60000L"));
+    }
+
     public int getQueryConcurrentRunningThresholdForProject() {
         // by default there's no limitation
         return Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold", "0"));
diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index 0502798..1721efe 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -79,6 +79,10 @@ public class SQLResponse implements Serializable {
     // it's sql response signature for cache checking, no need to return and should be JsonIgnore
     protected String signature;
 
+    // it's a temporary flag, no need to return and should be JsonIgnore
+    // indicating the lazy query start time, -1 indicating not enabled
+    protected long lazyQueryStartTime = -1L;
+
     public SQLResponse() {
     }
 
@@ -219,6 +223,20 @@ public class SQLResponse implements Serializable {
     }
 
     @JsonIgnore
+    public boolean isRunning() {
+        return this.lazyQueryStartTime >= 0;
+    }
+
+    @JsonIgnore
+    public long getLazyQueryStartTime() {
+        return lazyQueryStartTime;
+    }
+
+    public void setLazyQueryStartTime(long lazyQueryStartTime) {
+        this.lazyQueryStartTime = lazyQueryStartTime;
+    }
+
+    @JsonIgnore
     public List<QueryContext.CubeSegmentStatisticsResult> getCubeSegmentStatisticsList() {
         try {
             return queryStatistics == null ? Lists.<QueryContext.CubeSegmentStatisticsResult> newArrayList()
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index abcab7f..78068eb 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -443,8 +443,16 @@ public class QueryService extends BasicService {
         Message msg = MsgPicker.getMsg();
         final QueryContext queryContext = QueryContextFacade.current();
 
+        boolean isDummpyResponseEnabled = queryCacheEnabled && kylinConfig.isLazyQueryEnabled();
         SQLResponse sqlResponse = null;
         try {
+            // Add dummy response which will be updated or evicted when query finishes
+            if (isDummpyResponseEnabled) {
+                SQLResponse dummyResponse = new SQLResponse();
+                dummyResponse.setLazyQueryStartTime(System.currentTimeMillis());
+                cacheManager.getCache(QUERY_CACHE).put(sqlRequest.getCacheKey(), dummyResponse);
+            }
+
             final boolean isSelect = QueryUtil.isSelectStatement(sqlRequest.getSql());
             if (isSelect) {
                 sqlResponse = query(sqlRequest, queryContext.getQueryId());
@@ -481,6 +489,8 @@ public class QueryService extends BasicService {
                             "query response is too large: {} ({})", sqlResponse.getResults().size(),
                             kylinConfig.getLargeQueryThreshold())) {
                 cacheManager.getCache(QUERY_CACHE).put(sqlRequest.getCacheKey(), sqlResponse);
+            } else if (isDummpyResponseEnabled) {
+                cacheManager.getCache(QUERY_CACHE).evict(sqlRequest.getCacheKey());
             }
 
         } catch (Throwable e) { // calcite may throw AssertError
@@ -529,6 +539,29 @@ public class QueryService extends BasicService {
         if (response == null) {
             return null;
         }
+
+        // Check whether duplicate query exists
+        while (response.isRunning()) {
+            // Wait at most one minute
+            if (System.currentTimeMillis() - response.getLazyQueryStartTime() >= getConfig()
+                    .getLazyQueryWaitingTimeoutMilliSeconds()) {
+                cache.evict(sqlRequest.getCacheKey());
+                return null;
+            }
+            logger.info("Duplicated SQL request is running, waiting...");
+            try {
+                Thread.sleep(100L);
+            } catch (InterruptedException e) {
+            }
+            wrapper = cache.get(sqlRequest.getCacheKey());
+            if (wrapper == null) {
+                return null;
+            }
+            response = (SQLResponse) wrapper.get();
+            if (response == null) {
+                return null;
+            }
+        }
         logger.info("The sqlResponse is found in QUERY_CACHE");
         if (getConfig().isQueryCacheSignatureEnabled()
                 && !SQLResponseSignatureUtil.checkSignature(getConfig(), response, sqlRequest.getProject())) {


[kylin] 12/12: KYLIN-2898 If a distributed cache is adopted, small query results are also better to be put into the cache.

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit de86ed61271e1cddb98ddfb29639f757823f0b2a
Author: Zhong <nj...@apache.org>
AuthorDate: Tue Oct 23 19:47:57 2018 +0800

    KYLIN-2898 If a distributed cache is adopted, small query results are also better to be put into the cache.
---
 server-base/pom.xml                                                   | 4 ++++
 .../src/main/java/org/apache/kylin/rest/service/QueryService.java     | 4 +++-
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/server-base/pom.xml b/server-base/pom.xml
index 0a09b43..4454881 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -72,6 +72,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-source-kafka</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-cache</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>net.sf.ehcache</groupId>
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 78068eb..d8e397e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -62,6 +62,7 @@ import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
+import org.apache.kylin.cache.cachemanager.MemcachedCacheManager;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.QueryContextFacade;
@@ -479,7 +480,8 @@ public class QueryService extends BasicService {
                                     && (isSelect == false || kylinConfig.isPushdownQueryCacheEnabled() == false)),
                             "query is executed with pushdown, but it is non-select, or the cache for pushdown is disabled") //
                     && checkCondition(
-                            sqlResponse.getDuration() > durationThreshold
+                            cacheManager.getCache(QUERY_CACHE) instanceof MemcachedCacheManager.MemCachedCacheAdaptor
+                                    || sqlResponse.getDuration() > durationThreshold
                                     || sqlResponse.getTotalScanCount() > scanCountThreshold
                                     || sqlResponse.getTotalScanBytes() > scanBytesThreshold, //
                             "query is too lightweight with duration: {} (threshold {}), scan count: {} (threshold {}), scan bytes: {} (threshold {})",


[kylin] 09/12: KYLIN-2898 add unit test

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 2f04803e2c860922a5f33469403f667bce004cc3
Author: Zhong <nj...@apache.org>
AuthorDate: Thu Oct 18 21:48:39 2018 +0800

    KYLIN-2898 add unit test
---
 .../RemoteLocalFailOverCacheManagerTest.java       |  62 ++++++++
 .../kylin/cache/memcached/MemcachedCacheTest.java  |  84 +++++++++++
 .../memcached/MemcachedChunkingCacheTest.java      | 159 +++++++++++++++++++++
 cache/src/test/resources/cacheContext.xml          |  47 ++++++
 cache/src/test/resources/ehcache-test.xml          |  21 +++
 5 files changed, 373 insertions(+)

diff --git a/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java b/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java
new file mode 100644
index 0000000..243e386
--- /dev/null
+++ b/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.cachemanager;
+
+import static org.apache.kylin.cache.cachemanager.CacheConstants.QUERY_CACHE;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.cache.ehcache.EhCacheCache;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(locations = { "classpath:cacheContext.xml" })
+@ActiveProfiles("testing-memcached")
+public class RemoteLocalFailOverCacheManagerTest {
+
+    @Autowired
+    @Qualifier("cacheManager")
+    RemoteLocalFailOverCacheManager cacheManager;
+
+    @BeforeClass
+    public static void setupResource() throws Exception {
+        LocalFileMetadataTestCase.staticCreateTestMetadata();
+    }
+
+    @AfterClass
+    public static void tearDownResource() {
+    }
+
+    @Test
+    public void testCacheManager() {
+        cacheManager.disableRemoteCacheManager();
+        Assert.assertTrue("Memcached failover to ehcache", cacheManager.getCache(QUERY_CACHE) instanceof EhCacheCache);
+        cacheManager.enableRemoteCacheManager();
+        Assert.assertTrue("Memcached enabled",
+                cacheManager.getCache(QUERY_CACHE) instanceof MemcachedCacheManager.MemCachedCacheAdaptor);
+    }
+}
\ No newline at end of file
diff --git a/cache/src/test/java/org/apache/kylin/cache/memcached/MemcachedCacheTest.java b/cache/src/test/java/org/apache/kylin/cache/memcached/MemcachedCacheTest.java
new file mode 100644
index 0000000..40571a7
--- /dev/null
+++ b/cache/src/test/java/org/apache/kylin/cache/memcached/MemcachedCacheTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.cache.cachemanager.CacheConstants;
+import org.apache.kylin.cache.cachemanager.MemcachedCacheManager.MemCachedCacheAdaptor;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.internal.GetFuture;
+
+public class MemcachedCacheTest extends LocalFileMetadataTestCase {
+
+    private Map<String, String> keyValueMap;
+    private MemCachedCacheAdaptor memCachedAdaptor;
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+
+        keyValueMap = Maps.newHashMap();
+        keyValueMap.put("sql1", "value1");
+        keyValueMap.put("sql11", "value11");
+
+        MemcachedCacheConfig cacheConfig = new MemcachedCacheConfig();
+        MemcachedClient memcachedClient = mock(MemcachedClient.class);
+        MemcachedCache memcachedCache = new MemcachedCache(memcachedClient, cacheConfig, CacheConstants.QUERY_CACHE,
+                7 * 24 * 3600);
+        memCachedAdaptor = new MemCachedCacheAdaptor(memcachedCache);
+
+        //Mock put to cache
+        for (String key : keyValueMap.keySet()) {
+            String keyS = memcachedCache.serializeKey(key);
+            String hashedKey = memcachedCache.computeKeyHash(keyS);
+
+            String value = keyValueMap.get(key);
+            byte[] valueE = memcachedCache.encodeValue(keyS, value);
+
+            GetFuture<Object> future = mock(GetFuture.class);
+            when(future.get(cacheConfig.getTimeout(), TimeUnit.MILLISECONDS)).thenReturn(valueE);
+            when(memcachedClient.asyncGet(hashedKey)).thenReturn(future);
+        }
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testGet() {
+        for (String key : keyValueMap.keySet()) {
+            Assert.assertEquals("The value should not change", keyValueMap.get(key), memCachedAdaptor.get(key).get());
+        }
+    }
+}
\ No newline at end of file
diff --git a/cache/src/test/java/org/apache/kylin/cache/memcached/MemcachedChunkingCacheTest.java b/cache/src/test/java/org/apache/kylin/cache/memcached/MemcachedChunkingCacheTest.java
new file mode 100644
index 0000000..295b20c
--- /dev/null
+++ b/cache/src/test/java/org/apache/kylin/cache/memcached/MemcachedChunkingCacheTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.cache.cachemanager.CacheConstants;
+import org.apache.kylin.cache.cachemanager.MemcachedCacheManager.MemCachedCacheAdaptor;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.Pair;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.internal.BulkFuture;
+import net.spy.memcached.internal.GetFuture;
+
+public class MemcachedChunkingCacheTest extends LocalFileMetadataTestCase {
+
+    private Map<String, String> smallValueMap;
+    private Map<String, String> largeValueMap;
+    private MemCachedCacheAdaptor memCachedAdaptor;
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+        final int maxObjectSize = 300;
+
+        smallValueMap = Maps.newHashMap();
+        smallValueMap.put("sql1", "value1");
+
+        largeValueMap = Maps.newHashMap();
+        largeValueMap.put("sql2", Strings.repeat("value2", maxObjectSize));
+
+        MemcachedCacheConfig cacheConfig = new MemcachedCacheConfig();
+        cacheConfig.setMaxObjectSize(maxObjectSize);
+        MemcachedClient memcachedClient = mock(MemcachedClient.class);
+        MemcachedCache memcachedCache = new MemcachedCache(memcachedClient, cacheConfig, CacheConstants.QUERY_CACHE,
+                7 * 24 * 3600);
+        MemcachedChunkingCache memcachedChunkingCache = new MemcachedChunkingCache(memcachedCache);
+        memCachedAdaptor = new MemCachedCacheAdaptor(memcachedChunkingCache);
+
+        //Mock put to cache
+        for (String key : smallValueMap.keySet()) {
+            String keyS = memcachedCache.serializeKey(key);
+            String hashedKey = memcachedCache.computeKeyHash(keyS);
+
+            String value = smallValueMap.get(key);
+            byte[] valueB = memcachedCache.serializeValue(value);
+            KeyHookLookup.KeyHook keyHook = new KeyHookLookup.KeyHook(null, valueB);
+            byte[] valueE = memcachedCache.encodeValue(keyS, keyHook);
+
+            GetFuture<Object> future = mock(GetFuture.class);
+            when(memcachedClient.asyncGet(hashedKey)).thenReturn(future);
+
+            when(future.get(cacheConfig.getTimeout(), TimeUnit.MILLISECONDS)).thenReturn(valueE);
+        }
+
+        //Mock put large value to cache
+        for (String key : largeValueMap.keySet()) {
+            String keyS = memcachedCache.serializeKey(key);
+            String hashedKey = memcachedCache.computeKeyHash(keyS);
+
+            String value = largeValueMap.get(key);
+            byte[] valueB = memcachedCache.serializeValue(value);
+            int nSplit = MemcachedChunkingCache.getValueSplit(cacheConfig, keyS, valueB.length);
+            Pair<KeyHookLookup.KeyHook, byte[][]> keyValuePair = MemcachedChunkingCache.getKeyValuePair(nSplit, keyS,
+                    valueB);
+            KeyHookLookup.KeyHook keyHook = keyValuePair.getFirst();
+            byte[][] splitValueB = keyValuePair.getSecond();
+
+            //For key
+            byte[] valueE = memcachedCache.encodeValue(keyS, keyHook);
+            GetFuture<Object> future = mock(GetFuture.class);
+            when(memcachedClient.asyncGet(hashedKey)).thenReturn(future);
+            when(future.get(cacheConfig.getTimeout(), TimeUnit.MILLISECONDS)).thenReturn(valueE);
+
+            //For splits
+            Map<String, String> keyLookup = memcachedChunkingCache
+                    .computeKeyHash(Arrays.asList(keyHook.getChunkskey()));
+            Map<String, Object> bulkResult = Maps.newHashMap();
+            for (int i = 0; i < nSplit; i++) {
+                String splitKeyS = keyHook.getChunkskey()[i];
+                bulkResult.put(memcachedCache.computeKeyHash(splitKeyS),
+                        memcachedCache.encodeValue(splitKeyS.getBytes(Charsets.UTF_8), splitValueB[i]));
+            }
+
+            BulkFuture<Map<String, Object>> bulkFuture = mock(BulkFuture.class);
+            when(memcachedClient.asyncGetBulk(keyLookup.keySet())).thenReturn(bulkFuture);
+            when(bulkFuture.get(cacheConfig.getTimeout(), TimeUnit.MILLISECONDS)).thenReturn(bulkResult);
+        }
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testGet() {
+        for (String key : smallValueMap.keySet()) {
+            Assert.assertEquals("The value should not change", smallValueMap.get(key), memCachedAdaptor.get(key).get());
+        }
+        for (String key : largeValueMap.keySet()) {
+            Assert.assertEquals("The value should not change", largeValueMap.get(key), memCachedAdaptor.get(key).get());
+        }
+    }
+
+    @Test
+    public void testSplitBytes() {
+        byte[] data = new byte[8];
+        for (int i = 0; i < data.length; i++) {
+            data[i] = (byte) i;
+        }
+
+        int nSplit;
+        byte[][] dataSplits;
+
+        nSplit = 2;
+        dataSplits = MemcachedChunkingCache.splitBytes(data, nSplit);
+        Assert.assertEquals(nSplit, dataSplits.length);
+        Assert.assertArrayEquals(dataSplits[0], new byte[] { 0, 1, 2, 3 });
+        Assert.assertArrayEquals(dataSplits[1], new byte[] { 4, 5, 6, 7 });
+
+        nSplit = 3;
+        dataSplits = MemcachedChunkingCache.splitBytes(data, nSplit);
+        Assert.assertEquals(nSplit, dataSplits.length);
+        Assert.assertArrayEquals(dataSplits[0], new byte[] { 0, 1, 2 });
+        Assert.assertArrayEquals(dataSplits[1], new byte[] { 3, 4, 5 });
+        Assert.assertArrayEquals(dataSplits[2], new byte[] { 6, 7 });
+    }
+}
\ No newline at end of file
diff --git a/cache/src/test/resources/cacheContext.xml b/cache/src/test/resources/cacheContext.xml
new file mode 100644
index 0000000..a2fb9e9
--- /dev/null
+++ b/cache/src/test/resources/cacheContext.xml
@@ -0,0 +1,47 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:context="http://www.springframework.org/schema/context"
+       xmlns:cache="http://www.springframework.org/schema/cache"
+       xmlns:p="http://www.springframework.org/schema/p"
+       xmlns="http://www.springframework.org/schema/beans"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans
+    http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
+    http://www.springframework.org/schema/context
+    http://www.springframework.org/schema/context/spring-context-4.3.xsd
+    http://www.springframework.org/schema/cache
+    http://www.springframework.org/schema/cache/spring-cache.xsd">
+
+    <description>Kylin Rest Service</description>
+    <context:annotation-config/>
+
+    <!-- Cache Config -->
+    <cache:annotation-driven/>
+
+    <beans profile="testing-memcached">
+        <bean id="ehcache"
+              class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean"
+              p:configLocation="classpath:ehcache-test.xml" p:shared="true"/>
+        <bean id="localCacheManager" class="org.apache.kylin.cache.cachemanager.InstrumentedEhCacheCacheManager"
+              p:cacheManager-ref="ehcache"/>
+
+        <bean id="remoteCacheManager" class="org.apache.kylin.cache.cachemanager.MemcachedCacheManager"/>
+        <bean id="memcachedCacheConfig" class="org.apache.kylin.cache.memcached.MemcachedCacheConfig">
+            <property name="timeout" value="500"/>
+            <property name="hosts" value="localhost:11211"/>
+        </bean>
+
+        <bean id="cacheManager" class="org.apache.kylin.cache.cachemanager.RemoteLocalFailOverCacheManager"/>
+    </beans>
+
+</beans>
\ No newline at end of file
diff --git a/cache/src/test/resources/ehcache-test.xml b/cache/src/test/resources/ehcache-test.xml
new file mode 100644
index 0000000..90299ec
--- /dev/null
+++ b/cache/src/test/resources/ehcache-test.xml
@@ -0,0 +1,21 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<ehcache maxBytesLocalHeap="256M">>
+    <cache name="StorageCache"
+           eternal="false"
+           timeToIdleSeconds="86400"
+           memoryStoreEvictionPolicy="LRU"
+    >
+        <persistence strategy="none"/>
+    </cache>
+</ehcache>
\ No newline at end of file


[kylin] 08/12: KYLIN-2898 config memcached

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 73d0fd43af2ad4fa3ae7830341a0e7d96a349ee5
Author: Zhong <nj...@apache.org>
AuthorDate: Thu Oct 18 21:40:27 2018 +0800

    KYLIN-2898 config memcached
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 ++++
 server/src/main/resources/applicationContext.xml   | 28 ++++++++++++++++++----
 2 files changed, 27 insertions(+), 5 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 159137b..9066b0d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1457,6 +1457,10 @@ abstract public class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(this.getOptional("kylin.query.ignore-unknown-function", FALSE));
     }
 
+    public String getMemCachedHosts() {
+        return getRequired("kylin.cache.memcached.hosts");
+    }
+
     public String getQueryAccessController() {
         return getOptional("kylin.query.access-controller", null);
     }
diff --git a/server/src/main/resources/applicationContext.xml b/server/src/main/resources/applicationContext.xml
index c39ec5b..523fdc2 100644
--- a/server/src/main/resources/applicationContext.xml
+++ b/server/src/main/resources/applicationContext.xml
@@ -16,7 +16,6 @@
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:mvc="http://www.springframework.org/schema/mvc"
-       xmlns:task="http://www.springframework.org/schema/task"
        xmlns:aop="http://www.springframework.org/schema/aop"
        xmlns:cache="http://www.springframework.org/schema/cache"
        xmlns:p="http://www.springframework.org/schema/p"
@@ -24,8 +23,8 @@
     http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
     http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-4.3.xsd
-    http://www.springframework.org/schema/task
-    http://www.springframework.org/schema/task/spring-task-4.3.xsd
+
+
     http://www.springframework.org/schema/mvc
     http://www.springframework.org/schema/mvc/spring-mvc-4.3.xsd
     http://www.springframework.org/schema/aop
@@ -88,17 +87,36 @@
     <!-- Cache Config -->
     <cache:annotation-driven/>
 
-    <bean id="cacheManager" class="org.springframework.cache.ehcache.EhCacheCacheManager"
-          p:cacheManager-ref="ehcache"/>
     <beans profile="ldap,saml">
         <bean id="ehcache"
               class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean"
               p:configLocation="classpath:ehcache.xml" p:shared="true"/>
+
+        <bean id="cacheManager" class="org.springframework.cache.ehcache.EhCacheCacheManager"
+              p:cacheManager-ref="ehcache"/>
     </beans>
     <beans profile="testing">
         <bean id="ehcache"
               class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean"
               p:configLocation="classpath:ehcache-test.xml" p:shared="true"/>
+
+        <bean id="cacheManager" class="org.springframework.cache.ehcache.EhCacheCacheManager"
+              p:cacheManager-ref="ehcache"/>
+
+        <!--
+                <bean id="ehcache" class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean"
+                      p:configLocation="classpath:ehcache-test.xml" p:shared="true"/>
+
+                <bean id="remoteCacheManager" class="org.apache.kylin.cache.cachemanager.MemcachedCacheManager" />
+                <bean id="localCacheManager" class="org.apache.kylin.cache.cachemanager.InstrumentedEhCacheCacheManager"
+                      p:cacheManager-ref="ehcache"/>
+                <bean id="cacheManager" class="org.apache.kylin.cache.cachemanager.RemoteLocalFailOverCacheManager" />
+
+                <bean id="memcachedCacheConfig" class="org.apache.kylin.cache.memcached.MemcachedCacheConfig">
+                    <property name="timeout" value="500" />
+                    <property name="hosts" value="${kylin.cache.memcached.hosts}" />
+                </bean>
+        -->
     </beans>
 
 </beans>
\ No newline at end of file


[kylin] 07/12: KYLIN-2898 Introduce memcached as a distributed cache for queries

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit cd952ffd23e27b8468db03c38758d552a00c7486
Author: Wang Ken <mi...@ebay.com>
AuthorDate: Thu Oct 18 20:44:30 2018 +0800

    KYLIN-2898 Introduce memcached as a distributed cache for queries
---
 cache/pom.xml                                      |  94 ++++++
 .../spy/memcached/RefinedKetamaNodeLocator.java    | 279 ++++++++++++++++
 .../kylin/cache/cachemanager/CacheConstants.java   |  23 ++
 .../InstrumentedEhCacheCacheManager.java           | 101 ++++++
 .../cache/cachemanager/MemcachedCacheManager.java  | 181 ++++++++++
 .../RemoteLocalFailOverCacheManager.java           |  71 ++++
 .../cache/ehcache/InstrumentedEhCacheCache.java    | 205 ++++++++++++
 .../apache/kylin/cache/memcached/CacheStats.java   |  97 ++++++
 .../kylin/cache/memcached/KeyHookLookup.java       | 139 ++++++++
 .../kylin/cache/memcached/MemcachedCache.java      | 371 +++++++++++++++++++++
 .../cache/memcached/MemcachedCacheConfig.java      |  97 ++++++
 .../cache/memcached/MemcachedChunkingCache.java    | 279 ++++++++++++++++
 .../memcached/MemcachedConnectionFactory.java      | 193 +++++++++++
 .../MemcachedConnectionFactoryBuilder.java         | 173 ++++++++++
 .../kylin/cache/memcached/MemcachedMetrics.java    | 139 ++++++++
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 pom.xml                                            |  17 +
 17 files changed, 2463 insertions(+)

diff --git a/cache/pom.xml b/cache/pom.xml
new file mode 100644
index 0000000..8e31435
--- /dev/null
+++ b/cache/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+     http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kylin-cache</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache Kylin - Cache</name>
+    <description>Apache Kylin - Cache</description>
+
+    <parent>
+        <groupId>org.apache.kylin</groupId>
+        <artifactId>kylin</artifactId>
+        <version>2.6.0-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-metrics</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-jvm</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context-support</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-test</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>net.sf.ehcache</groupId>
+            <artifactId>ehcache</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>net.spy</groupId>
+            <artifactId>spymemcached</artifactId>
+        </dependency>
+
+        <!-- Test & Env -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+            <!--MRUnit relies on older version of mockito, so cannot manage it globally-->
+            <version>${mockito.version}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/cache/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java b/cache/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java
new file mode 100644
index 0000000..95298d7
--- /dev/null
+++ b/cache/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.spy.memcached;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import net.spy.memcached.compat.SpyObject;
+import net.spy.memcached.util.DefaultKetamaNodeLocatorConfiguration;
+import net.spy.memcached.util.KetamaNodeLocatorConfiguration;
+
+/**
+ * Copyright (C) 2006-2009 Dustin Sallings
+ * Copyright (C) 2009-2011 Couchbase, Inc.
+ *
+ * This is a modified version of the Ketama consistent hash strategy from
+ * last.fm. This implementation may not be compatible with libketama as hashing
+ * is considered separate from node location.
+ *
+ * The only modified method is the getSequence(). 
+ * The previous 7 may be too small to reduce the chance to get all down nodes.
+ *
+ * Note that this implementation does not currently supported weighted nodes.
+ *
+ * @see <a href="http://www.last.fm/user/RJ/journal/2007/04/10/392555/">RJ's
+ *      blog post</a>
+ */
+public final class RefinedKetamaNodeLocator extends SpyObject implements NodeLocator {
+
+    private final HashAlgorithm hashAlg;
+    private final Map<InetSocketAddress, Integer> weights;
+    private final boolean isWeightedKetama;
+    private final KetamaNodeLocatorConfiguration config;
+    private volatile TreeMap<Long, MemcachedNode> ketamaNodes;
+    private volatile Collection<MemcachedNode> allNodes;
+
+    /**
+     * Create a new KetamaNodeLocator using specified nodes and the specifed hash
+     * algorithm.
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg) {
+        this(nodes, alg, KetamaNodeKeyFormatter.Format.SPYMEMCACHED, new HashMap<InetSocketAddress, Integer>());
+    }
+
+    /**
+     * Create a new KetamaNodeLocator with specific nodes, hash, node key format,
+     * and weight
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     * @param nodeKeyFormat the format used to name the nodes in Ketama, either
+     *          SPYMEMCACHED or LIBMEMCACHED
+     * @param weights node weights for ketama, a map from InetSocketAddress to
+     *          weight as Integer
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg,
+            KetamaNodeKeyFormatter.Format nodeKeyFormat, Map<InetSocketAddress, Integer> weights) {
+        this(nodes, alg, weights, new DefaultKetamaNodeLocatorConfiguration(new KetamaNodeKeyFormatter(nodeKeyFormat)));
+    }
+
+    /**
+     * Create a new KetamaNodeLocator using specified nodes and the specifed hash
+     * algorithm and configuration.
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     * @param conf
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg, KetamaNodeLocatorConfiguration conf) {
+        this(nodes, alg, new HashMap<InetSocketAddress, Integer>(), conf);
+    }
+
+    /**
+     * Create a new KetamaNodeLocator with specific nodes, hash, node key format,
+     * and weight
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     * @param weights node weights for ketama, a map from InetSocketAddress to
+     *          weight as Integer
+     * @param configuration node locator configuration
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg,
+            Map<InetSocketAddress, Integer> nodeWeights, KetamaNodeLocatorConfiguration configuration) {
+        super();
+        allNodes = nodes;
+        hashAlg = alg;
+        config = configuration;
+        weights = nodeWeights;
+        isWeightedKetama = !weights.isEmpty();
+        setKetamaNodes(nodes);
+    }
+
+    private RefinedKetamaNodeLocator(TreeMap<Long, MemcachedNode> smn, Collection<MemcachedNode> an, HashAlgorithm alg,
+            Map<InetSocketAddress, Integer> nodeWeights, KetamaNodeLocatorConfiguration conf) {
+        super();
+        ketamaNodes = smn;
+        allNodes = an;
+        hashAlg = alg;
+        config = conf;
+        weights = nodeWeights;
+        isWeightedKetama = !weights.isEmpty();
+    }
+
+    public Collection<MemcachedNode> getAll() {
+        return allNodes;
+    }
+
+    public MemcachedNode getPrimary(final String k) {
+        MemcachedNode rv = getNodeForKey(hashAlg.hash(k));
+        assert rv != null : "Found no node for key " + k;
+        return rv;
+    }
+
+    long getMaxKey() {
+        return getKetamaNodes().lastKey();
+    }
+
+    MemcachedNode getNodeForKey(long hash) {
+        final MemcachedNode rv;
+        if (!ketamaNodes.containsKey(hash)) {
+            // Java 1.6 adds a ceilingKey method, but I'm still stuck in 1.5
+            // in a lot of places, so I'm doing this myself.
+            SortedMap<Long, MemcachedNode> tailMap = getKetamaNodes().tailMap(hash);
+            if (tailMap.isEmpty()) {
+                hash = getKetamaNodes().firstKey();
+            } else {
+                hash = tailMap.firstKey();
+            }
+        }
+        rv = getKetamaNodes().get(hash);
+        return rv;
+    }
+
+    /**
+     * the previous 7 may be too small to reduce the chance to get all down nodes
+     * @param k
+     * @return
+     */
+    public Iterator<MemcachedNode> getSequence(String k) {
+        // Seven searches gives us a 1 in 2^maxTry chance of hitting the
+        // same dead node all of the time.
+        int maxTry = config.getNodeRepetitions() + 1;
+        if (maxTry < 20) {
+            maxTry = 20;
+        }
+        return new KetamaIterator(k, maxTry, getKetamaNodes(), hashAlg);
+    }
+
+    public NodeLocator getReadonlyCopy() {
+        TreeMap<Long, MemcachedNode> smn = new TreeMap<Long, MemcachedNode>(getKetamaNodes());
+        Collection<MemcachedNode> an = new ArrayList<MemcachedNode>(allNodes.size());
+
+        // Rewrite the values a copy of the map.
+        for (Map.Entry<Long, MemcachedNode> me : smn.entrySet()) {
+            smn.put(me.getKey(), new MemcachedNodeROImpl(me.getValue()));
+        }
+
+        // Copy the allNodes collection.
+        for (MemcachedNode n : allNodes) {
+            an.add(new MemcachedNodeROImpl(n));
+        }
+
+        return new RefinedKetamaNodeLocator(smn, an, hashAlg, weights, config);
+    }
+
+    @Override
+    public void updateLocator(List<MemcachedNode> nodes) {
+        allNodes = nodes;
+        setKetamaNodes(nodes);
+    }
+
+    /**
+     * @return the ketamaNodes
+     */
+    protected TreeMap<Long, MemcachedNode> getKetamaNodes() {
+        return ketamaNodes;
+    }
+
+    /**
+     * Setup the KetamaNodeLocator with the list of nodes it should use.
+     *
+     * @param nodes a List of MemcachedNodes for this KetamaNodeLocator to use in
+     *          its continuum
+     */
+    protected void setKetamaNodes(List<MemcachedNode> nodes) {
+        TreeMap<Long, MemcachedNode> newNodeMap = new TreeMap<Long, MemcachedNode>();
+        int numReps = config.getNodeRepetitions();
+        int nodeCount = nodes.size();
+        int totalWeight = 0;
+
+        if (isWeightedKetama) {
+            for (MemcachedNode node : nodes) {
+                totalWeight += weights.get(node.getSocketAddress());
+            }
+        }
+
+        for (MemcachedNode node : nodes) {
+            if (isWeightedKetama) {
+
+                int thisWeight = weights.get(node.getSocketAddress());
+                float percent = (float) thisWeight / (float) totalWeight;
+                int pointerPerServer = (int) ((Math.floor(
+                        (float) (percent * (float) config.getNodeRepetitions() / 4 * (float) nodeCount + 0.0000000001)))
+                        * 4);
+                for (int i = 0; i < pointerPerServer / 4; i++) {
+                    for (long position : ketamaNodePositionsAtIteration(node, i)) {
+                        newNodeMap.put(position, node);
+                        getLogger().debug("Adding node %s with weight %s in position %d", node, thisWeight, position);
+                    }
+                }
+            } else {
+                // Ketama does some special work with md5 where it reuses chunks.
+                // Check to be backwards compatible, the hash algorithm does not
+                // matter for Ketama, just the placement should always be done using
+                // MD5
+                if (hashAlg == DefaultHashAlgorithm.KETAMA_HASH) {
+                    for (int i = 0; i < numReps / 4; i++) {
+                        for (long position : ketamaNodePositionsAtIteration(node, i)) {
+                            newNodeMap.put(position, node);
+                            getLogger().debug("Adding node %s in position %d", node, position);
+                        }
+                    }
+                } else {
+                    for (int i = 0; i < numReps; i++) {
+                        newNodeMap.put(hashAlg.hash(config.getKeyForNode(node, i)), node);
+                    }
+                }
+            }
+        }
+        assert newNodeMap.size() == numReps * nodes.size();
+        ketamaNodes = newNodeMap;
+    }
+
+    private List<Long> ketamaNodePositionsAtIteration(MemcachedNode node, int iteration) {
+        List<Long> positions = new ArrayList<Long>();
+        byte[] digest = DefaultHashAlgorithm.computeMd5(config.getKeyForNode(node, iteration));
+        for (int h = 0; h < 4; h++) {
+            Long k = ((long) (digest[3 + h * 4] & 0xFF) << 24) | ((long) (digest[2 + h * 4] & 0xFF) << 16);
+            k |= ((long) (digest[1 + h * 4] & 0xFF) << 8) | (digest[h * 4] & 0xFF);
+            positions.add(k);
+        }
+        return positions;
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/CacheConstants.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/CacheConstants.java
new file mode 100644
index 0000000..07b15a5
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/CacheConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.cachemanager;
+
+public class CacheConstants {
+    public static final String QUERY_CACHE = "StorageCache";
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/InstrumentedEhCacheCacheManager.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/InstrumentedEhCacheCacheManager.java
new file mode 100644
index 0000000..4f0911f
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/InstrumentedEhCacheCacheManager.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.cachemanager;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.kylin.cache.ehcache.InstrumentedEhCacheCache;
+import org.apache.kylin.common.KylinConfig;
+import org.springframework.cache.Cache;
+import org.springframework.cache.ehcache.EhCacheCache;
+import org.springframework.cache.support.AbstractCacheManager;
+import org.springframework.util.Assert;
+
+import com.google.common.collect.Sets;
+
+import net.sf.ehcache.Ehcache;
+import net.sf.ehcache.Status;
+
+/**
+ * CacheManager backed by an EhCache {@link net.sf.ehcache.CacheManager}.
+ *
+ */
+public class InstrumentedEhCacheCacheManager extends AbstractCacheManager {
+
+    private net.sf.ehcache.CacheManager cacheManager;
+    private Map<String, String> metricsConfig = KylinConfig.getInstanceFromEnv().getKylinMetricsConf();
+    private boolean enableMetrics = false;
+
+    /**
+     * Return the backing EhCache {@link net.sf.ehcache.CacheManager}.
+     */
+    public net.sf.ehcache.CacheManager getCacheManager() {
+        return this.cacheManager;
+    }
+
+    /**
+     * Set the backing EhCache {@link net.sf.ehcache.CacheManager}.
+     */
+    public void setCacheManager(net.sf.ehcache.CacheManager cacheManager) {
+        this.cacheManager = cacheManager;
+        if ("true".equalsIgnoreCase(metricsConfig.get("ehcache.enabled"))) {
+            enableMetrics = true;
+        }
+    }
+
+    @Override
+    protected Collection<Cache> loadCaches() {
+        Assert.notNull(this.cacheManager, "A backing EhCache CacheManager is required");
+        Status status = this.cacheManager.getStatus();
+        Assert.isTrue(Status.STATUS_ALIVE.equals(status),
+                "An 'alive' EhCache CacheManager is required - current cache is " + status.toString());
+
+        String[] names = this.cacheManager.getCacheNames();
+        Collection<Cache> caches = Sets.newLinkedHashSetWithExpectedSize(names.length);
+        for (String name : names) {
+            if (enableMetrics) {
+                caches.add(new InstrumentedEhCacheCache(this.cacheManager.getEhcache(name)));
+            } else {
+                caches.add(new EhCacheCache(this.cacheManager.getEhcache(name)));
+            }
+        }
+        return caches;
+    }
+
+    @Override
+    public Cache getCache(String name) {
+        Cache cache = super.getCache(name);
+        if (cache == null) {
+            // check the EhCache cache again
+            // (in case the cache was added at runtime)
+            Ehcache ehcache = this.cacheManager.getEhcache(name);
+            if (ehcache != null) {
+                if (enableMetrics) {
+                    cache = new InstrumentedEhCacheCache(ehcache);
+                } else {
+                    cache = new EhCacheCache(ehcache);
+                }
+                addCache(cache);
+            }
+        }
+        return cache;
+    }
+
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/MemcachedCacheManager.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/MemcachedCacheManager.java
new file mode 100644
index 0000000..a4e1ffe
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/MemcachedCacheManager.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.cachemanager;
+
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.cache.memcached.MemcachedCache;
+import org.apache.kylin.cache.memcached.MemcachedCacheConfig;
+import org.apache.kylin.cache.memcached.MemcachedChunkingCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.Cache;
+import org.springframework.cache.support.AbstractCacheManager;
+import org.springframework.cache.support.SimpleValueWrapper;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import net.spy.memcached.MemcachedClientIF;
+
+public class MemcachedCacheManager extends AbstractCacheManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(MemcachedCacheManager.class);
+    private static final Long ONE_MINUTE = 60 * 1000L;
+
+    @Autowired
+    private MemcachedCacheConfig memcachedCacheConfig;
+
+    private ScheduledExecutorService timer = Executors.newScheduledThreadPool(1,
+            new ThreadFactoryBuilder().setNameFormat("Memcached-HealthChecker").build());
+    private AtomicBoolean clusterHealth = new AtomicBoolean(true);
+
+    @Override
+    protected Collection<? extends Cache> loadCaches() {
+        Cache successCache = new MemCachedCacheAdaptor(
+                new MemcachedChunkingCache(MemcachedCache.create(memcachedCacheConfig, CacheConstants.QUERY_CACHE)));
+
+        addCache(successCache);
+
+        Collection<String> names = getCacheNames();
+        Collection<Cache> caches = Lists.newArrayList();
+        for (String name : names) {
+            caches.add(getCache(name));
+        }
+
+        timer.scheduleWithFixedDelay(new MemcachedClusterHealthChecker(), ONE_MINUTE, ONE_MINUTE,
+                TimeUnit.MILLISECONDS);
+        return caches;
+    }
+
+    public boolean isClusterDown() {
+        return !clusterHealth.get();
+    }
+
+    @VisibleForTesting
+    void setClusterHealth(boolean ifHealth) {
+        clusterHealth.set(ifHealth);
+    }
+
+    public static class MemCachedCacheAdaptor implements Cache {
+        private MemcachedCache memcachedCache;
+
+        public MemCachedCacheAdaptor(MemcachedCache memcachedCache) {
+            this.memcachedCache = memcachedCache;
+        }
+
+        @Override
+        public String getName() {
+            return memcachedCache.getName();
+        }
+
+        @Override
+        public Object getNativeCache() {
+            return memcachedCache.getNativeCache();
+        }
+
+        @Override
+        public ValueWrapper get(Object key) {
+            byte[] value = memcachedCache.get(key);
+            if (value == null) {
+                return null;
+            }
+            return new SimpleValueWrapper(SerializationUtils.deserialize(value));
+        }
+
+        @Override
+        public void put(Object key, Object value) {
+            memcachedCache.put(key, value);
+        }
+
+        @Override
+        public void evict(Object key) {
+            memcachedCache.evict(key);
+        }
+
+        @Override
+        public void clear() {
+            memcachedCache.clear();
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <T> T get(Object key, Class<T> type) {
+            byte[] value = memcachedCache.get(key);
+            if (value == null) {
+                return null;
+            }
+            Object obj = SerializationUtils.deserialize(value);
+            if (obj != null && type != null && !type.isInstance(value)) {
+                throw new IllegalStateException(
+                        "Cached value is not of required type [" + type.getName() + "]: " + value);
+            }
+            return (T) obj;
+        }
+
+        @Override
+        //TODO
+        public <T> T get(Object key, Callable<T> valueLoader) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        //TODO implementation here doesn't guarantee the atomicity.
+        //Without atomicity, this method should not be invoked
+        public ValueWrapper putIfAbsent(Object key, Object value) {
+            byte[] existing = memcachedCache.get(key);
+            if (existing == null) {
+                memcachedCache.put(key, value);
+                return null;
+            } else {
+                return new SimpleValueWrapper(SerializationUtils.deserialize(existing));
+            }
+        }
+
+    }
+
+    private class MemcachedClusterHealthChecker implements Runnable {
+        @Override
+        public void run() {
+            Cache cache = getCache(CacheConstants.QUERY_CACHE);
+            MemcachedClientIF cacheClient = (MemcachedClientIF) cache.getNativeCache();
+            Collection<SocketAddress> liveServers = cacheClient.getAvailableServers();
+            Collection<SocketAddress> deadServers = cacheClient.getUnavailableServers();
+            if (liveServers.size() == 0) {
+                clusterHealth.set(false);
+                logger.error("All the servers in MemcachedCluster is down, UnavailableServers: " + deadServers);
+            } else {
+                clusterHealth.set(true);
+                if (deadServers.size() > liveServers.size()) {
+                    logger.warn("Half of the servers in MemcachedCluster is down, LiveServers: " + liveServers
+                            + ", UnavailableServers: " + deadServers);
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
new file mode 100644
index 0000000..f9b7ef6
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.cachemanager;
+
+import java.util.Collection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.Cache;
+import org.springframework.cache.CacheManager;
+import org.springframework.cache.support.AbstractCacheManager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+public class RemoteLocalFailOverCacheManager extends AbstractCacheManager {
+    private static final Logger logger = LoggerFactory.getLogger(RemoteLocalFailOverCacheManager.class);
+
+    @Autowired
+    private MemcachedCacheManager remoteCacheManager;
+
+    @Autowired
+    private CacheManager localCacheManager;
+
+    @Override
+    public void afterPropertiesSet() {
+        Preconditions.checkNotNull(localCacheManager, "localCacheManager is not injected yet");
+    }
+
+    @Override
+    protected Collection<? extends Cache> loadCaches() {
+        return null;
+    }
+
+    @Override
+    public Cache getCache(String name) {
+        if (remoteCacheManager == null || remoteCacheManager.isClusterDown()) {
+            logger.info("use local cache, because remote cache is not configured or down");
+            return localCacheManager.getCache(name);
+        } else {
+            return remoteCacheManager.getCache(name);
+        }
+    }
+
+    @VisibleForTesting
+    void disableRemoteCacheManager() {
+        remoteCacheManager.setClusterHealth(false);
+    }
+
+    @VisibleForTesting
+    void enableRemoteCacheManager() {
+        remoteCacheManager.setClusterHealth(true);
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/ehcache/InstrumentedEhCacheCache.java b/cache/src/main/java/org/apache/kylin/cache/ehcache/InstrumentedEhCacheCache.java
new file mode 100644
index 0000000..7a9b585
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/ehcache/InstrumentedEhCacheCache.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.ehcache;
+
+import static org.apache.kylin.metrics.lib.impl.MetricsSystem.Metrics;
+import static org.apache.kylin.metrics.lib.impl.MetricsSystem.name;
+
+import java.util.concurrent.Callable;
+
+import org.springframework.cache.Cache;
+import org.springframework.cache.ehcache.EhCacheCache;
+import org.springframework.cache.support.SimpleValueWrapper;
+import org.springframework.util.Assert;
+
+import com.codahale.metrics.Gauge;
+
+import net.sf.ehcache.Ehcache;
+import net.sf.ehcache.Element;
+import net.sf.ehcache.Status;
+
+/**
+ * {@link Cache} implementation on top of an {@link Ehcache} instance.
+ *
+ */
+public class InstrumentedEhCacheCache implements Cache {
+
+    private final Ehcache cache;
+
+    /**
+     * Create an {@link EhCacheCache} instance.
+     * @param ehcache backing Ehcache instance
+     */
+    public InstrumentedEhCacheCache(Ehcache ehcache) {
+        Assert.notNull(ehcache, "Ehcache must not be null");
+        Status status = ehcache.getStatus();
+        Assert.isTrue(Status.STATUS_ALIVE.equals(status),
+                "An 'alive' Ehcache is required - current cache is " + status.toString());
+        this.cache = ehcache;
+
+        final String prefix = name(cache.getClass(), cache.getName());
+        Metrics.register(name(prefix, "hits"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().cacheHitCount();
+            }
+        });
+
+        Metrics.register(name(prefix, "in-memory-hits"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().localHeapHitCount();
+            }
+        });
+
+        Metrics.register(name(prefix, "misses"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().cacheMissCount();
+            }
+        });
+
+        Metrics.register(name(prefix, "in-memory-misses"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().localHeapMissCount();
+            }
+        });
+
+        Metrics.register(name(prefix, "objects"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().getSize();
+            }
+        });
+
+        Metrics.register(name(prefix, "in-memory-objects"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().getLocalHeapSize();
+            }
+        });
+
+        Metrics.register(name(prefix, "mean-get-time"), new Gauge<Double>() {
+            @Override
+            public Double getValue() {
+                return cache.getStatistics().cacheGetOperation().latency().average().value();
+            }
+        });
+
+        Metrics.register(name(prefix, "mean-search-time"), new Gauge<Double>() {
+            @Override
+            public Double getValue() {
+                return cache.getStatistics().cacheSearchOperation().latency().average().value();
+            }
+        });
+
+        Metrics.register(name(prefix, "eviction-count"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().cacheEvictionOperation().count().value();
+            }
+        });
+
+        Metrics.register(name(prefix, "writer-queue-size"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().getWriterQueueLength();
+            }
+        });
+    }
+
+    public String getName() {
+        return this.cache.getName();
+    }
+
+    public Ehcache getNativeCache() {
+        return this.cache;
+    }
+
+    public ValueWrapper get(Object key) {
+        Element element = this.cache.get(key);
+        return (element != null ? new SimpleValueWrapper(element.getObjectValue()) : null);
+    }
+
+    public void put(Object key, Object value) {
+        this.cache.put(new Element(key, value));
+    }
+
+    public void evict(Object key) {
+        this.cache.remove(key);
+    }
+
+    public void clear() {
+        this.cache.removeAll();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T get(Object key, Class<T> type) {
+        Element element = lookup(key);
+        Object value = (element != null ? element.getObjectValue() : null);
+        if (value != null && type != null && !type.isInstance(value)) {
+            throw new IllegalStateException("Cached value is not of required type [" + type.getName() + "]: " + value);
+        }
+        return (T) value;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T get(Object key, Callable<T> valueLoader) {
+        Element element = lookup(key);
+        if (element != null) {
+            return (T) element.getObjectValue();
+        } else {
+            this.cache.acquireWriteLockOnKey(key);
+            try {
+                element = lookup(key); // One more attempt with the write lock
+                if (element != null) {
+                    return (T) element.getObjectValue();
+                } else {
+                    return loadValue(key, valueLoader);
+                }
+            } finally {
+                this.cache.releaseWriteLockOnKey(key);
+            }
+        }
+    }
+
+    @Override
+    public ValueWrapper putIfAbsent(Object key, Object value) {
+        Element existingElement = this.cache.putIfAbsent(new Element(key, value));
+        return (existingElement != null ? new SimpleValueWrapper(existingElement.getObjectValue()) : null);
+    }
+
+    private Element lookup(Object key) {
+        return this.cache.get(key);
+    }
+
+    private <T> T loadValue(Object key, Callable<T> valueLoader) {
+        T value;
+        try {
+            value = valueLoader.call();
+        } catch (Throwable ex) {
+            throw new ValueRetrievalException(key, valueLoader, ex);
+        }
+        put(key, value);
+        return value;
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/CacheStats.java b/cache/src/main/java/org/apache/kylin/cache/memcached/CacheStats.java
new file mode 100644
index 0000000..c91ba45
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/CacheStats.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+public class CacheStats {
+    private final long numHits;
+    private final long numMisses;
+    private final long getBytes;
+    private final long getTime;
+    private final long numPut;
+    private final long putBytes;
+    private final long numEvictions;
+    private final long numTimeouts;
+    private final long numErrors;
+
+    public CacheStats(long getBytes, long getTime, long numPut, long putBytes, long numHits, long numMisses,
+            long numEvictions, long numTimeouts, long numErrors) {
+        this.getBytes = getBytes;
+        this.getTime = getTime;
+        this.numPut = numPut;
+        this.putBytes = putBytes;
+        this.numHits = numHits;
+        this.numMisses = numMisses;
+        this.numEvictions = numEvictions;
+        this.numTimeouts = numTimeouts;
+        this.numErrors = numErrors;
+    }
+
+    public long getNumHits() {
+        return numHits;
+    }
+
+    public long getNumMisses() {
+        return numMisses;
+    }
+
+    public long getNumGet() {
+        return numHits + numMisses;
+    }
+
+    public long getNumGetBytes() {
+        return getBytes;
+    }
+
+    public long getNumPutBytes() {
+        return putBytes;
+    }
+
+    public long getNumPut() {
+        return numPut;
+    }
+
+    public long getNumEvictions() {
+        return numEvictions;
+    }
+
+    public long getNumTimeouts() {
+        return numTimeouts;
+    }
+
+    public long getNumErrors() {
+        return numErrors;
+    }
+
+    public long numLookups() {
+        return numHits + numMisses;
+    }
+
+    public double hitRate() {
+        long lookups = numLookups();
+        return lookups == 0 ? 0 : numHits / (double) lookups;
+    }
+
+    public long avgGetBytes() {
+        return getBytes == 0 ? 0 : getBytes / numLookups();
+    }
+
+    public long getAvgGetTime() {
+        return getTime / numLookups();
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/KeyHookLookup.java b/cache/src/main/java/org/apache/kylin/cache/memcached/KeyHookLookup.java
new file mode 100644
index 0000000..b9bdf5c
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/KeyHookLookup.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * A Class implement this interface indicates that the key information need to be calculated from a first lookup from cache itself to get
+ * a hook.
+ *
+ */
+public interface KeyHookLookup {
+    KeyHook lookupKeyHook(String key);
+
+    public static class KeyHook implements Serializable {
+        private static final long serialVersionUID = 2400159460862757991L;
+
+        private String[] chunkskey;
+        private byte[] values;
+
+        /**
+         * For de-serialization
+         */
+        public KeyHook() {
+        }
+
+        public KeyHook(String[] chunkskey, byte[] values) {
+            super();
+            this.chunkskey = chunkskey;
+            this.values = values;
+        }
+
+        public String[] getChunkskey() {
+            return chunkskey;
+        }
+
+        public void setChunkskey(String[] chunkskey) {
+            this.chunkskey = chunkskey;
+        }
+
+        public byte[] getValues() {
+            return values;
+        }
+
+        public void setValues(byte[] values) {
+            this.values = values;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + Arrays.hashCode(chunkskey);
+            result = prime * result + Arrays.hashCode(values);
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            KeyHook other = (KeyHook) obj;
+            if (!Arrays.equals(chunkskey, other.chunkskey))
+                return false;
+            if (!Arrays.equals(values, other.values))
+                return false;
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            if (chunkskey != null) {
+                builder.append("chunkskey_length:" + chunkskey.length);
+            } else {
+                builder.append("chunkskey_is_null");
+            }
+            builder.append("|");
+            if (values != null) {
+                builder.append("value_length:" + values.length);
+            } else {
+                builder.append("value_is_null");
+            }
+            return builder.toString();
+        }
+
+        //        @Override
+        //        public void writeExternal(ObjectOutput out) throws IOException {
+        //            if(chunkskey == null){
+        //                out.writeInt(0);
+        //            }else{
+        //                out.writeInt(chunkskey.length);
+        //                for (String chunkKey : chunkskey) {
+        //                    out.writeUTF(chunkKey);
+        //                }
+        //            }
+        //            if(values != null){
+        //                out.write(values);
+        //            }
+        //        }
+        //        
+        //        @Override
+        //        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        //            int keySize = in.readInt();
+        //            if(keySize > 0){
+        //                chunkskey = new String[keySize];
+        //                for (int i = 0; i < keySize; i++){
+        //                    chunkskey[i] = in.readUTF();
+        //                }
+        //            }
+        //            int available = in.available();
+        //            if(available > 0){
+        //                values = new byte[available];
+        //                in.read(values);
+        //            }
+        //        }
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCache.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCache.java
new file mode 100644
index 0000000..a2e69a7
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCache.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.DataFormatException;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.CompressionUtils;
+import org.apache.kylin.common.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Shorts;
+
+import net.spy.memcached.AddrUtil;
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.ConnectionFactoryBuilder;
+import net.spy.memcached.DefaultHashAlgorithm;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.MemcachedClientIF;
+import net.spy.memcached.ops.ArrayOperationQueueFactory;
+import net.spy.memcached.ops.LinkedOperationQueueFactory;
+import net.spy.memcached.ops.OperationQueueFactory;
+import net.spy.memcached.transcoders.SerializingTranscoder;
+
+/**
+ * Cache backend by Memcached. The implementation leverages spymemcached client to talk to the servers.
+ * Memcached itself has a limitation to the size of the key. So the real key for cache lookup is hashed from the orginal key.
+ * The implementation provdes a way for hash collsion detection. It can also compress/decompress the value bytes based on the preconfigred compression threshold to save network bandwidth and storage space.
+ *
+ * @author mingmwang
+ *
+ */
+public class MemcachedCache {
+    public static final int MAX_PREFIX_LENGTH = MemcachedClientIF.MAX_KEY_LENGTH - 40 // length of namespace hash
+            - 40 // length of key hash
+            - 2; // length of separators
+    private static final Logger logger = LoggerFactory.getLogger(MemcachedCache.class);
+    private static final int DEFAULT_TTL = 7 * 24 * 3600;
+    protected final MemcachedCacheConfig config;
+    protected final MemcachedClientIF client;
+    protected final String memcachedPrefix;
+    protected final int compressThreshold;
+    protected final AtomicLong hitCount = new AtomicLong(0);
+    protected final AtomicLong missCount = new AtomicLong(0);
+    protected final AtomicLong readBytes = new AtomicLong(0);
+    protected final AtomicLong timeoutCount = new AtomicLong(0);
+    protected final AtomicLong errorCount = new AtomicLong(0);
+    protected final AtomicLong putCount = new AtomicLong(0);
+    protected final AtomicLong putBytes = new AtomicLong(0);
+    private final int timeToLiveSeconds;
+    protected AtomicLong cacheGetTime = new AtomicLong(0);
+
+    public MemcachedCache(final MemcachedClientIF client, final MemcachedCacheConfig config,
+            final String memcachedPrefix, int timeToLiveSeconds) {
+        Preconditions.checkArgument(memcachedPrefix.length() <= MAX_PREFIX_LENGTH,
+                "memcachedPrefix length [%d] exceeds maximum length [%d]", memcachedPrefix.length(), MAX_PREFIX_LENGTH);
+        this.memcachedPrefix = memcachedPrefix;
+        this.client = client;
+        this.config = config;
+        this.compressThreshold = config.getMaxObjectSize() / 2;
+        this.timeToLiveSeconds = timeToLiveSeconds;
+    }
+
+    public MemcachedCache(MemcachedCache cache) {
+        this(cache.client, cache.config, cache.memcachedPrefix, cache.timeToLiveSeconds);
+    }
+
+    /**
+     * Create and return the MemcachedCache. Each time call this method will create a new instance.
+     * @param config            The MemcachedCache configuration to control the cache behavior.
+     * @return
+     */
+    public static MemcachedCache create(final MemcachedCacheConfig config, String memcachedPrefix) {
+        return create(config, memcachedPrefix, DEFAULT_TTL);
+    }
+
+    public static MemcachedCache create(final MemcachedCacheConfig config, String memcachedPrefix, int timeToLive) {
+        try {
+            SerializingTranscoder transcoder = new SerializingTranscoder(config.getMaxObjectSize());
+            // always no compression inside, we compress/decompress outside
+            transcoder.setCompressionThreshold(Integer.MAX_VALUE);
+
+            OperationQueueFactory opQueueFactory;
+            int maxQueueSize = config.getMaxOperationQueueSize();
+            if (maxQueueSize > 0) {
+                opQueueFactory = new ArrayOperationQueueFactory(maxQueueSize);
+            } else {
+                opQueueFactory = new LinkedOperationQueueFactory();
+            }
+            String hostsStr = config.getHosts();
+            ConnectionFactory connectionFactory = new MemcachedConnectionFactoryBuilder()
+                    .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
+                    .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
+                    .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT).setDaemon(true)
+                    .setFailureMode(FailureMode.Redistribute).setTranscoder(transcoder).setShouldOptimize(true)
+                    .setOpQueueMaxBlockTime(config.getTimeout()).setOpTimeout(config.getTimeout())
+                    .setReadBufferSize(config.getReadBufferSize()).setOpQueueFactory(opQueueFactory).build();
+            return new MemcachedCache(new MemcachedClient(new MemcachedConnectionFactory(connectionFactory),
+                    AddrUtil.getAddresses(hostsStr)), config, memcachedPrefix, timeToLive);
+        } catch (IOException e) {
+            logger.error("Unable to create MemcachedCache instance.", e);
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public String getName() {
+        return memcachedPrefix;
+    }
+
+    public Object getNativeCache() {
+        return client;
+    }
+
+    protected String serializeKey(Object key) {
+        try {
+            return JsonUtil.writeValueAsString(key);
+        } catch (JsonProcessingException e) {
+            logger.warn("Can not convert key to String.", e);
+        }
+        return null;
+    }
+
+    protected byte[] serializeValue(Object value) {
+        return SerializationUtils.serialize((Serializable) value);
+    }
+
+    @VisibleForTesting
+    byte[] encodeValue(String keyS, Object value) {
+        if (keyS == null) {
+            return null;
+        }
+        return encodeValue(keyS.getBytes(Charsets.UTF_8), serializeValue(value));
+    }
+
+    /**
+     * This method is used to get value object based on key from the Cache. It converts key to json string first.
+     * And then it calls getBinary() method to compute hashed key from the original key string, and use this as the real key to do lookup from internal Cache.
+     * Then decodes the real values bytes from the cache lookup result, and leverages object serializer to convert value bytes to object.
+     */
+    public byte[] get(Object key) {
+        return get(serializeKey(key));
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     */
+    public byte[] get(String keyS) {
+        return getBinary(keyS);
+    }
+
+    /**
+     * This method is used to put key/value objects to the Cache. It converts key to json string and leverages object serializer to convert value object to bytes.
+     * And then it calls putBinary() method to compute hashed key from the original key string and encode the original key bytes into value bytes for hash conflicts detection.
+     */
+    public void put(Object key, Object value) {
+        put(serializeKey(key), value);
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     */
+    public void put(String keyS, Object value) {
+        if (keyS != null) {
+            putBinary(keyS, serializeValue(value), timeToLiveSeconds);
+        }
+    }
+
+    public void evict(Object key) {
+        if (key == null)
+            return;
+        evict(serializeKey(key));
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     */
+    public void evict(String keyS) {
+        if (keyS == null)
+            return;
+        client.delete(computeKeyHash(keyS));
+    }
+
+    public void clear() {
+        logger.warn("Clear Remote Cache!");
+        Future<Boolean> resultFuture = client.flush();
+        try {
+            boolean result = resultFuture.get();
+            logger.warn("Clear Remote Cache returned with result: " + result);
+        } catch (Exception e) {
+            logger.warn("Can't clear Remote Cache.", e);
+        }
+    }
+
+    public CacheStats getStats() {
+        return new CacheStats(readBytes.get(), cacheGetTime.get(), putCount.get(), putBytes.get(), hitCount.get(),
+                missCount.get(), 0, timeoutCount.get(), errorCount.get());
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     * @return the serialized value
+     */
+    protected byte[] getBinary(String keyS) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return null;
+        }
+        byte[] bytes = internalGet(computeKeyHash(keyS));
+        return decodeValue(keyS.getBytes(Charsets.UTF_8), bytes);
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     * @param valueB should be the serialized value
+     */
+    protected void putBinary(String keyS, byte[] valueB, int expiration) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return;
+        }
+        internalPut(computeKeyHash(keyS), encodeValue(keyS.getBytes(Charsets.UTF_8), valueB), expiration);
+    }
+
+    protected byte[] internalGet(String hashedKey) {
+        Future<Object> future;
+        long start = System.currentTimeMillis();
+        try {
+            future = client.asyncGet(hashedKey);
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", e);
+            return null;
+        } catch (Throwable t) {
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", t);
+            return null;
+        }
+
+        try {
+            byte[] result = (byte[]) future.get(config.getTimeout(), TimeUnit.MILLISECONDS);
+            cacheGetTime.addAndGet(System.currentTimeMillis() - start);
+            if (result != null) {
+                hitCount.incrementAndGet();
+                readBytes.addAndGet(result.length);
+            } else {
+                missCount.incrementAndGet();
+            }
+            return result;
+        } catch (TimeoutException e) {
+            timeoutCount.incrementAndGet();
+            future.cancel(false);
+            return null;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw Throwables.propagate(e);
+        } catch (ExecutionException e) {
+            errorCount.incrementAndGet();
+            logger.error("ExecutionException when pulling key meta from cache.", e);
+            return null;
+        }
+    }
+
+    private void internalPut(String hashedKey, byte[] encodedValue, int expiration) {
+        try {
+            client.set(hashedKey, expiration, encodedValue);
+            putCount.incrementAndGet();
+            putBytes.addAndGet(encodedValue.length);
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", e);
+        } catch (Throwable t) {
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", t);
+        }
+    }
+
+    protected byte[] encodeValue(byte[] key, byte[] valueB) {
+        byte[] compressed = null;
+        if (config.isEnableCompression() && (valueB.length + Ints.BYTES + key.length > compressThreshold)) {
+            try {
+                compressed = CompressionUtils.compress(ByteBuffer.allocate(Ints.BYTES + key.length + valueB.length)
+                        .putInt(key.length).put(key).put(valueB).array());
+            } catch (IOException e) {
+                compressed = null;
+                logger.warn("Compressing value bytes error.", e);
+            }
+        }
+        if (compressed != null) {
+            return ByteBuffer.allocate(Shorts.BYTES + compressed.length).putShort((short) 1).put(compressed).array();
+        } else {
+            return ByteBuffer.allocate(Shorts.BYTES + Ints.BYTES + key.length + valueB.length).putShort((short) 0)
+                    .putInt(key.length).put(key).put(valueB).array();
+        }
+    }
+
+    protected byte[] decodeValue(byte[] key, byte[] valueE) {
+        if (valueE == null)
+            return null;
+        ByteBuffer buf = ByteBuffer.wrap(valueE);
+        short enableCompression = buf.getShort();
+        byte[] uncompressed = null;
+        if (enableCompression == 1) {
+            byte[] value = new byte[buf.remaining()];
+            buf.get(value);
+            try {
+                uncompressed = CompressionUtils.decompress(value);
+            } catch (IOException | DataFormatException e) {
+                logger.error("Decompressing value bytes error.", e);
+                return null;
+            }
+        }
+        if (uncompressed != null) {
+            buf = ByteBuffer.wrap(uncompressed);
+        }
+        final int keyLength = buf.getInt();
+        byte[] keyBytes = new byte[keyLength];
+        buf.get(keyBytes);
+        if (!Arrays.equals(keyBytes, key)) {
+            logger.error("Keys do not match, possible hash collision!");
+            return null;
+        }
+        byte[] value = new byte[buf.remaining()];
+        buf.get(value);
+        return value;
+    }
+
+    protected String computeKeyHash(String key) {
+        // hash keys to keep things under 250 characters for memcached
+        return Joiner.on(":").skipNulls().join(KylinConfig.getInstanceFromEnv().getDeployEnv(), this.memcachedPrefix,
+                DigestUtils.shaHex(key));
+
+    }
+
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCacheConfig.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCacheConfig.java
new file mode 100644
index 0000000..d71c279
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCacheConfig.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import net.spy.memcached.DefaultConnectionFactory;
+
+public class MemcachedCacheConfig {
+    private long timeout = 500L;
+
+    // comma delimited list of memcached servers, given as host:port combination
+    private String hosts;
+
+    private int maxChunkSize = 1024;
+
+    private int maxObjectSize = 1024 * 1024;
+
+    // memcached client read buffer size, -1 uses the spymemcached library default
+    private int readBufferSize = DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE;
+
+    // maximum operation queue size. 0 means unbounded
+    private int maxOperationQueueSize = 0;
+
+    // whether enable compress the value data or not
+    private boolean enableCompression = true;
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    public String getHosts() {
+        return hosts;
+    }
+
+    public void setHosts(String hosts) {
+        this.hosts = hosts;
+    }
+
+    public int getMaxChunkSize() {
+        return maxChunkSize;
+    }
+
+    public void setMaxChunkSize(int maxChunkSize) {
+        this.maxChunkSize = maxChunkSize;
+    }
+
+    public int getMaxObjectSize() {
+        return maxObjectSize;
+    }
+
+    public void setMaxObjectSize(int maxObjectSize) {
+        this.maxObjectSize = maxObjectSize;
+    }
+
+    public int getMaxOperationQueueSize() {
+        return maxOperationQueueSize;
+    }
+
+    public void setMaxOperationQueueSize(int maxOperationQueueSize) {
+        this.maxOperationQueueSize = maxOperationQueueSize;
+    }
+
+    public int getReadBufferSize() {
+        return readBufferSize;
+    }
+
+    public void setReadBufferSize(int readBufferSize) {
+        this.readBufferSize = readBufferSize;
+    }
+
+    public boolean isEnableCompression() {
+        return enableCompression;
+    }
+
+    public void setEnableCompression(boolean enableCompression) {
+        this.enableCompression = enableCompression;
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedChunkingCache.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedChunkingCache.java
new file mode 100644
index 0000000..e79e717
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedChunkingCache.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.common.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Shorts;
+
+import net.spy.memcached.internal.BulkFuture;
+
+/**
+ * Subclass of MemcachedCache. It supports storing large objects.  Memcached itself has a limitation to the value size with default value of 1M.
+ * This implement extends the limit to 1G and can split huge bytes to multiple chunks. It will take care of the data integrity if part of the chunks lost(due to server restart or other reasons)
+ *
+ * @author mingmwang
+ */
+public class MemcachedChunkingCache extends MemcachedCache implements KeyHookLookup {
+    private static final Logger logger = LoggerFactory.getLogger(MemcachedChunkingCache.class);
+
+    public MemcachedChunkingCache(MemcachedCache cache) {
+        super(cache);
+        Preconditions.checkArgument(config.getMaxChunkSize() > 1, "maxChunkSize [%d] must be greater than 1",
+                config.getMaxChunkSize());
+        Preconditions.checkArgument(config.getMaxObjectSize() > 261, "maxObjectSize [%d] must be greater than 261",
+                config.getMaxObjectSize());
+    }
+
+    protected static byte[][] splitBytes(final byte[] data, final int nSplit) {
+        byte[][] dest = new byte[nSplit][];
+
+        final int splitSize = (data.length - 1) / nSplit + 1;
+        for (int i = 0; i < nSplit - 1; i++) {
+            dest[i] = Arrays.copyOfRange(data, i * splitSize, (i + 1) * splitSize);
+        }
+        dest[nSplit - 1] = Arrays.copyOfRange(data, (nSplit - 1) * splitSize, data.length);
+
+        return dest;
+    }
+
+    protected static int getValueSplit(MemcachedCacheConfig config, String keyS, int valueBLen) {
+        // the number 6 means the chunk number size never exceeds 6 bytes
+        final int valueSize = config.getMaxObjectSize() - Shorts.BYTES - Ints.BYTES
+                - keyS.getBytes(Charsets.UTF_8).length - 6;
+        final int maxValueSize = config.getMaxChunkSize() * valueSize;
+        Preconditions.checkArgument(valueBLen <= maxValueSize,
+                "the value bytes length [%d] exceeds maximum value size [%d]", valueBLen, maxValueSize);
+        return (valueBLen - 1) / valueSize + 1;
+    }
+
+    protected static Pair<KeyHook, byte[][]> getKeyValuePair(int nSplit, String keyS, byte[] valueB) {
+        KeyHook keyHook;
+        byte[][] splitValueB = null;
+        if (nSplit > 1) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Enable chunking for putting large cached object values, chunk size = " + nSplit
+                        + ", original value bytes size = " + valueB.length);
+            }
+            String[] chunkKeySs = new String[nSplit];
+            for (int i = 0; i < nSplit; i++) {
+                chunkKeySs[i] = keyS + i;
+            }
+            keyHook = new KeyHook(chunkKeySs, null);
+            splitValueB = splitBytes(valueB, nSplit);
+        } else {
+            if (logger.isDebugEnabled()) {
+                logger.debug(
+                        "Chunking not enabled, put the original value bytes to keyhook directly, original value bytes size = "
+                                + valueB.length);
+            }
+            keyHook = new KeyHook(null, valueB);
+        }
+
+        return new Pair<>(keyHook, splitValueB);
+    }
+
+    /**
+     * This method overrides the parent getBinary(), it gets the KeyHook from the Cache first and check the KeyHook that whether chunking is enabled or not.
+     */
+    @Override
+    public byte[] getBinary(String keyS) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return null;
+        }
+        KeyHook keyHook = lookupKeyHook(keyS);
+        if (keyHook == null) {
+            return null;
+        }
+
+        if (keyHook.getChunkskey() == null || keyHook.getChunkskey().length == 0) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Chunking not enabled, return the value bytes in the keyhook directly, value bytes size = "
+                        + keyHook.getValues().length);
+            }
+            return keyHook.getValues();
+        }
+
+        BulkFuture<Map<String, Object>> bulkFuture;
+        long start = System.currentTimeMillis();
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Chunking enabled, chunk size = " + keyHook.getChunkskey().length);
+        }
+
+        Map<String, String> keyLookup = computeKeyHash(Arrays.asList(keyHook.getChunkskey()));
+        try {
+            bulkFuture = client.asyncGetBulk(keyLookup.keySet());
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", e);
+            return null;
+        } catch (Throwable t) {
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", t);
+            return null;
+        }
+
+        try {
+            Map<String, Object> bulkResult = bulkFuture.get(config.getTimeout(), TimeUnit.MILLISECONDS);
+            cacheGetTime.addAndGet(System.currentTimeMillis() - start);
+            if (bulkResult.size() != keyHook.getChunkskey().length) {
+                missCount.incrementAndGet();
+                logger.warn("Some paritial chunks missing for query key:" + keyS);
+                //remove all the partital chunks here.
+                for (String partitalKey : bulkResult.keySet()) {
+                    client.delete(partitalKey);
+                }
+                deleteKeyHook(keyS);
+                return null;
+            }
+            hitCount.getAndAdd(keyHook.getChunkskey().length);
+            byte[][] bytesArray = new byte[keyHook.getChunkskey().length][];
+            for (Map.Entry<String, Object> entry : bulkResult.entrySet()) {
+                byte[] bytes = (byte[]) entry.getValue();
+                readBytes.addAndGet(bytes.length);
+                String originalKeyS = keyLookup.get(entry.getKey());
+                int idx = Integer.parseInt(originalKeyS.substring(keyS.length()));
+                bytesArray[idx] = decodeValue(originalKeyS.getBytes(Charsets.UTF_8), bytes);
+            }
+            return concatBytes(bytesArray);
+        } catch (TimeoutException e) {
+            timeoutCount.incrementAndGet();
+            bulkFuture.cancel(false);
+            return null;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw Throwables.propagate(e);
+        } catch (ExecutionException e) {
+            errorCount.incrementAndGet();
+            logger.error("ExecutionException when pulling item from cache.", e);
+            return null;
+        }
+    }
+
+    /**
+     * This method overrides the parent putBinary() method. It will split the large value bytes into multiple chunks to fit into the internal Cache.
+     * It generates a KeyHook to store the splitted chunked keys.
+     */
+    @Override
+    public void putBinary(String keyS, byte[] valueB, int expiration) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return;
+        }
+        int nSplit = getValueSplit(config, keyS, valueB.length);
+        Pair<KeyHook, byte[][]> keyValuePair = getKeyValuePair(nSplit, keyS, valueB);
+        KeyHook keyHook = keyValuePair.getFirst();
+        byte[][] splitValueB = keyValuePair.getSecond();
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("put key hook:{} to cache for hash key", keyHook);
+        }
+        super.putBinary(keyS, serializeValue(keyHook), expiration);
+        if (nSplit > 1) {
+            for (int i = 0; i < nSplit; i++) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Chunk[" + i + "] bytes size before encoding  = " + splitValueB[i].length);
+                }
+                super.putBinary(keyHook.getChunkskey()[i], splitValueB[i], expiration);
+            }
+        }
+    }
+
+    public void evict(String keyS) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return;
+        }
+        KeyHook keyHook = lookupKeyHook(keyS);
+        if (keyHook == null) {
+            return;
+        }
+
+        if (keyHook.getChunkskey() != null && keyHook.getChunkskey().length > 0) {
+            String[] chunkKeys = keyHook.getChunkskey();
+            for (String chunkKey : chunkKeys) {
+                super.evict(chunkKey);
+            }
+        }
+        super.evict(keyS);
+    }
+
+    protected Map<String, String> computeKeyHash(List<String> keySList) {
+        return Maps.uniqueIndex(keySList, new Function<String, String>() {
+            @Override
+            public String apply(String keyS) {
+                return computeKeyHash(keyS);
+            }
+        });
+    }
+
+    private void deleteKeyHook(String keyS) {
+        try {
+            super.evict(keyS);
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation: ", e);
+        }
+    }
+
+    private byte[] concatBytes(byte[]... bytesArray) {
+        int length = 0;
+        for (byte[] bytes : bytesArray) {
+            length += bytes.length;
+        }
+        byte[] result = new byte[length];
+        int destPos = 0;
+        for (byte[] bytes : bytesArray) {
+            System.arraycopy(bytes, 0, result, destPos, bytes.length);
+            destPos += bytes.length;
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug("Original value bytes size for all chunks  = " + result.length);
+        }
+
+        return result;
+    }
+
+    @Override
+    public KeyHook lookupKeyHook(String keyS) {
+        byte[] bytes = super.getBinary(keyS);
+        if (bytes == null) {
+            return null;
+        }
+        return (KeyHook) SerializationUtils.deserialize(bytes);
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactory.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactory.java
new file mode 100644
index 0000000..fe48d3e
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactory.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.kylin.common.KylinConfig;
+
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.ConnectionObserver;
+import net.spy.memcached.DefaultConnectionFactory;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.HashAlgorithm;
+import net.spy.memcached.MemcachedConnection;
+import net.spy.memcached.MemcachedNode;
+import net.spy.memcached.NodeLocator;
+import net.spy.memcached.OperationFactory;
+import net.spy.memcached.auth.AuthDescriptor;
+import net.spy.memcached.compat.SpyObject;
+import net.spy.memcached.metrics.MetricCollector;
+import net.spy.memcached.metrics.MetricType;
+import net.spy.memcached.metrics.NoopMetricCollector;
+import net.spy.memcached.ops.Operation;
+import net.spy.memcached.transcoders.Transcoder;
+
+public class MemcachedConnectionFactory extends SpyObject implements ConnectionFactory {
+    private ConnectionFactory underlying;
+    private Map<String, String> metricsConfig = KylinConfig.getInstanceFromEnv().getKylinMetricsConf();
+
+    public MemcachedConnectionFactory(ConnectionFactory underlying) {
+        this.underlying = underlying;
+    }
+
+    @Override
+    public MetricType enableMetrics() {
+        String metricType = metricsConfig.get("memcached.metricstype");
+        return metricType == null ? DefaultConnectionFactory.DEFAULT_METRIC_TYPE
+                : MetricType.valueOf(metricType.toUpperCase(Locale.ROOT));
+    }
+
+    @Override
+    public MetricCollector getMetricCollector() {
+        String enableMetrics = metricsConfig.get("memcached.enabled");
+        if (enableMetrics().equals(MetricType.OFF) || enableMetrics == null
+                || "false".equalsIgnoreCase(enableMetrics)) {
+            getLogger().debug("Memcached metrics collection disabled.");
+            return new NoopMetricCollector();
+        } else {
+            getLogger().info("Memcached metrics collection enabled (Profile " + enableMetrics() + ").");
+            return new MemcachedMetrics();
+        }
+    }
+
+    @Override
+    public MemcachedConnection createConnection(List<InetSocketAddress> addrs) throws IOException {
+        return underlying.createConnection(addrs);
+    }
+
+    @Override
+    public MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c, int bufSize) {
+        return underlying.createMemcachedNode(sa, c, bufSize);
+    }
+
+    @Override
+    public BlockingQueue<Operation> createOperationQueue() {
+        return underlying.createOperationQueue();
+    }
+
+    @Override
+    public BlockingQueue<Operation> createReadOperationQueue() {
+        return underlying.createReadOperationQueue();
+    }
+
+    @Override
+    public BlockingQueue<Operation> createWriteOperationQueue() {
+        return underlying.createWriteOperationQueue();
+    }
+
+    @Override
+    public long getOpQueueMaxBlockTime() {
+        return underlying.getOpQueueMaxBlockTime();
+    }
+
+    @Override
+    public ExecutorService getListenerExecutorService() {
+        return underlying.getListenerExecutorService();
+    }
+
+    @Override
+    public boolean isDefaultExecutorService() {
+        return underlying.isDefaultExecutorService();
+    }
+
+    @Override
+    public NodeLocator createLocator(List<MemcachedNode> nodes) {
+        return underlying.createLocator(nodes);
+    }
+
+    @Override
+    public OperationFactory getOperationFactory() {
+        return underlying.getOperationFactory();
+    }
+
+    @Override
+    public long getOperationTimeout() {
+        return underlying.getOperationTimeout();
+    }
+
+    @Override
+    public boolean isDaemon() {
+        return underlying.isDaemon();
+    }
+
+    @Override
+    public boolean useNagleAlgorithm() {
+        return underlying.useNagleAlgorithm();
+    }
+
+    @Override
+    public Collection<ConnectionObserver> getInitialObservers() {
+        return underlying.getInitialObservers();
+    }
+
+    @Override
+    public FailureMode getFailureMode() {
+        return underlying.getFailureMode();
+    }
+
+    @Override
+    public Transcoder<Object> getDefaultTranscoder() {
+        return underlying.getDefaultTranscoder();
+    }
+
+    @Override
+    public boolean shouldOptimize() {
+        return underlying.shouldOptimize();
+    }
+
+    @Override
+    public int getReadBufSize() {
+        return underlying.getReadBufSize();
+    }
+
+    @Override
+    public HashAlgorithm getHashAlg() {
+        return underlying.getHashAlg();
+    }
+
+    @Override
+    public long getMaxReconnectDelay() {
+        return underlying.getMaxReconnectDelay();
+    }
+
+    @Override
+    public AuthDescriptor getAuthDescriptor() {
+        return underlying.getAuthDescriptor();
+    }
+
+    @Override
+    public int getTimeoutExceptionThreshold() {
+        return underlying.getTimeoutExceptionThreshold();
+    }
+
+    @Override
+    public long getAuthWaitTime() {
+        return underlying.getAuthWaitTime();
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactoryBuilder.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactoryBuilder.java
new file mode 100644
index 0000000..97af4a6
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactoryBuilder.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+
+import net.spy.memcached.ArrayModNodeLocator;
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.ConnectionFactoryBuilder;
+import net.spy.memcached.ConnectionObserver;
+import net.spy.memcached.DefaultConnectionFactory;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.HashAlgorithm;
+import net.spy.memcached.MemcachedNode;
+import net.spy.memcached.NodeLocator;
+import net.spy.memcached.OperationFactory;
+import net.spy.memcached.RefinedKetamaNodeLocator;
+import net.spy.memcached.auth.AuthDescriptor;
+import net.spy.memcached.metrics.MetricCollector;
+import net.spy.memcached.metrics.MetricType;
+import net.spy.memcached.ops.Operation;
+import net.spy.memcached.transcoders.Transcoder;
+
+public class MemcachedConnectionFactoryBuilder extends ConnectionFactoryBuilder {
+    /**
+     * Get the ConnectionFactory set up with the provided parameters.
+     */
+    public ConnectionFactory build() {
+        return new DefaultConnectionFactory() {
+
+            @Override
+            public BlockingQueue<Operation> createOperationQueue() {
+                return opQueueFactory == null ? super.createOperationQueue() : opQueueFactory.create();
+            }
+
+            @Override
+            public BlockingQueue<Operation> createReadOperationQueue() {
+                return readQueueFactory == null ? super.createReadOperationQueue() : readQueueFactory.create();
+            }
+
+            @Override
+            public BlockingQueue<Operation> createWriteOperationQueue() {
+                return writeQueueFactory == null ? super.createReadOperationQueue() : writeQueueFactory.create();
+            }
+
+            @Override
+            public NodeLocator createLocator(List<MemcachedNode> nodes) {
+                switch (locator) {
+                case ARRAY_MOD:
+                    return new ArrayModNodeLocator(nodes, getHashAlg());
+                case CONSISTENT:
+                    return new RefinedKetamaNodeLocator(nodes, getHashAlg());
+                default:
+                    throw new IllegalStateException("Unhandled locator type: " + locator);
+                }
+            }
+
+            @Override
+            public Transcoder<Object> getDefaultTranscoder() {
+                return transcoder == null ? super.getDefaultTranscoder() : transcoder;
+            }
+
+            @Override
+            public FailureMode getFailureMode() {
+                return failureMode == null ? super.getFailureMode() : failureMode;
+            }
+
+            @Override
+            public HashAlgorithm getHashAlg() {
+                return hashAlg == null ? super.getHashAlg() : hashAlg;
+            }
+
+            public Collection<ConnectionObserver> getInitialObservers() {
+                return initialObservers;
+            }
+
+            @Override
+            public OperationFactory getOperationFactory() {
+                return opFact == null ? super.getOperationFactory() : opFact;
+            }
+
+            @Override
+            public long getOperationTimeout() {
+                return opTimeout == -1 ? super.getOperationTimeout() : opTimeout;
+            }
+
+            @Override
+            public int getReadBufSize() {
+                return readBufSize == -1 ? super.getReadBufSize() : readBufSize;
+            }
+
+            @Override
+            public boolean isDaemon() {
+                return isDaemon;
+            }
+
+            @Override
+            public boolean shouldOptimize() {
+                return shouldOptimize;
+            }
+
+            @Override
+            public boolean useNagleAlgorithm() {
+                return useNagle;
+            }
+
+            @Override
+            public long getMaxReconnectDelay() {
+                return maxReconnectDelay;
+            }
+
+            @Override
+            public AuthDescriptor getAuthDescriptor() {
+                return authDescriptor;
+            }
+
+            @Override
+            public long getOpQueueMaxBlockTime() {
+                return opQueueMaxBlockTime > -1 ? opQueueMaxBlockTime : super.getOpQueueMaxBlockTime();
+            }
+
+            @Override
+            public int getTimeoutExceptionThreshold() {
+                return timeoutExceptionThreshold;
+            }
+
+            @Override
+            public MetricType enableMetrics() {
+                return metricType == null ? super.enableMetrics() : metricType;
+            }
+
+            @Override
+            public MetricCollector getMetricCollector() {
+                return collector == null ? super.getMetricCollector() : collector;
+            }
+
+            @Override
+            public ExecutorService getListenerExecutorService() {
+                return executorService == null ? super.getListenerExecutorService() : executorService;
+            }
+
+            @Override
+            public boolean isDefaultExecutorService() {
+                return executorService == null;
+            }
+
+            @Override
+            public long getAuthWaitTime() {
+                return authWaitTime;
+            }
+        };
+
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedMetrics.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedMetrics.java
new file mode 100644
index 0000000..ada9144
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedMetrics.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import static org.apache.kylin.metrics.lib.impl.MetricsSystem.Metrics;
+
+import java.util.Map;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.google.common.collect.Maps;
+
+import net.spy.memcached.metrics.AbstractMetricCollector;
+import net.spy.memcached.metrics.DefaultMetricCollector;
+import net.spy.memcached.metrics.MetricCollector;
+
+/**
+ * A {@link MetricCollector} that uses the Codahale Metrics library.
+ *
+ * The following system properies can be used to customize the behavior
+ * of the collector during runtime:
+ */
+public final class MemcachedMetrics extends AbstractMetricCollector {
+
+    /**
+     * Contains all registered {@link Counter}s.
+     */
+    private Map<String, Counter> counters;
+
+    /**
+     * Contains all registered {@link Meter}s.
+     */
+    private Map<String, Meter> meters;
+
+    /**
+     * Contains all registered {@link Histogram}s.
+     */
+    private Map<String, Histogram> histograms;
+
+    /**
+     * Create a new {@link DefaultMetricCollector}.
+     *
+     * Note that when this constructor is called, the reporter is also
+     * automatically established.
+     */
+    public MemcachedMetrics() {
+        counters = Maps.newConcurrentMap();
+        meters = Maps.newConcurrentMap();
+        histograms = Maps.newConcurrentMap();
+    }
+
+    @Override
+    public void addCounter(String name) {
+        if (!counters.containsKey(name)) {
+            counters.put(name, Metrics.counter(name));
+        }
+    }
+
+    @Override
+    public void removeCounter(String name) {
+        if (!counters.containsKey(name)) {
+            Metrics.remove(name);
+            counters.remove(name);
+        }
+    }
+
+    @Override
+    public void incrementCounter(String name, int amount) {
+        if (counters.containsKey(name)) {
+            counters.get(name).inc(amount);
+        }
+    }
+
+    @Override
+    public void decrementCounter(String name, int amount) {
+        if (counters.containsKey(name)) {
+            counters.get(name).dec(amount);
+        }
+    }
+
+    @Override
+    public void addMeter(String name) {
+        if (!meters.containsKey(name)) {
+            meters.put(name, Metrics.meter(name));
+        }
+    }
+
+    @Override
+    public void removeMeter(String name) {
+        if (meters.containsKey(name)) {
+            meters.remove(name);
+        }
+    }
+
+    @Override
+    public void markMeter(String name) {
+        if (meters.containsKey(name)) {
+            meters.get(name).mark();
+        }
+    }
+
+    @Override
+    public void addHistogram(String name) {
+        if (!histograms.containsKey(name)) {
+            histograms.put(name, Metrics.histogram(name));
+        }
+    }
+
+    @Override
+    public void removeHistogram(String name) {
+        if (histograms.containsKey(name)) {
+            histograms.remove(name);
+        }
+    }
+
+    @Override
+    public void updateHistogram(String name, int amount) {
+        if (histograms.containsKey(name)) {
+            histograms.get(name).update(amount);
+        }
+    }
+}
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 778b5bf..159137b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1770,6 +1770,10 @@ abstract public class KylinConfigBase implements Serializable {
                 + getKylinMetricsSubjectSuffix();
     }
 
+    public Map<String, String> getKylinMetricsConf() {
+        return getPropertiesByPrefix("kylin.metrics.");
+    }
+
     // ============================================================================
     // tool
     // ============================================================================
diff --git a/pom.xml b/pom.xml
index 264583d..6cb6716 100644
--- a/pom.xml
+++ b/pom.xml
@@ -115,6 +115,7 @@
     <xerces.version>2.11.0</xerces.version>
     <xalan.version>2.7.2</xalan.version>
     <ehcache.version>2.10.2.2.21</ehcache.version>
+    <memcached.verion>2.12.3</memcached.verion>
     <apache-httpclient.version>4.2.5</apache-httpclient.version>
     <roaring.version>0.6.18</roaring.version>
     <cglib.version>3.2.4</cglib.version>
@@ -261,6 +262,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.kylin</groupId>
+        <artifactId>kylin-cache</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kylin</groupId>
         <artifactId>kylin-engine-mr</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -731,6 +737,11 @@
         <artifactId>metrics-core</artifactId>
         <version>${dropwizard.version}</version>
       </dependency>
+      <dependency>
+        <groupId>io.dropwizard.metrics</groupId>
+        <artifactId>metrics-jvm</artifactId>
+        <version>${dropwizard.version}</version>
+      </dependency>
 
       <!-- Test -->
       <dependency>
@@ -817,6 +828,11 @@
         <version>${ehcache.version}</version>
       </dependency>
       <dependency>
+        <groupId>net.spy</groupId>
+        <artifactId>spymemcached</artifactId>
+        <version>${memcached.verion}</version>
+      </dependency>
+      <dependency>
         <groupId>org.opensaml</groupId>
         <artifactId>opensaml</artifactId>
         <version>${opensaml.version}</version>
@@ -1246,6 +1262,7 @@
     <module>core-metrics</module>
     <module>metrics-reporter-hive</module>
     <module>metrics-reporter-kafka</module>
+    <module>cache</module>
   </modules>
 
   <reporting>


[kylin] 02/12: KYLIN-2894 add trigger kylin.query.cache-signature-enabled for enabling query signature

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 1fcf9679e14978b9d0fa36a500797991be34e63e
Author: Zhong <nj...@apache.org>
AuthorDate: Thu Oct 18 17:04:25 2018 +0800

    KYLIN-2894 add trigger kylin.query.cache-signature-enabled for enabling query signature
---
 .../main/java/org/apache/kylin/common/KylinConfigBase.java  |  4 ++++
 .../java/org/apache/kylin/rest/service/CacheService.java    | 13 +++++++++----
 .../java/org/apache/kylin/rest/service/QueryService.java    |  7 +++++--
 3 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 135d6e6..43c0831 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1558,6 +1558,10 @@ abstract public class KylinConfigBase implements Serializable {
                 "org.apache.kylin.rest.signature.RealizationSetCalculator");
     }
 
+    public boolean isQueryCacheSignatureEnabled() {
+        return Boolean.parseBoolean(this.getOptional("kylin.query.cache-signature-enabled", "false"));
+    }
+
     // ============================================================================
     // SERVER
     // ============================================================================
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 67d49d9..930852b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -19,8 +19,8 @@
 package org.apache.kylin.rest.service;
 
 import java.io.IOException;
-
 import java.util.Map;
+
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
 import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -117,9 +117,14 @@ public class CacheService extends BasicService implements InitializingBean {
 
     public void cleanDataCache(String project) {
         if (cacheManager != null) {
-            logger.info("cleaning cache for project " + project + " (currently remove exception entries)");
-            //            cacheManager.getCache(QueryService.SUCCESS_QUERY_CACHE).removeAll();
-            cacheManager.getCache(QueryService.EXCEPTION_QUERY_CACHE).removeAll();
+            if (getConfig().isQueryCacheSignatureEnabled()) {
+                logger.info("cleaning cache for project " + project + " (currently remove exception entries)");
+                cacheManager.getCache(QueryService.EXCEPTION_QUERY_CACHE).removeAll();
+            } else {
+                logger.info("cleaning cache for project " + project + " (currently remove all entries)");
+                cacheManager.getCache(QueryService.SUCCESS_QUERY_CACHE).removeAll();
+                cacheManager.getCache(QueryService.EXCEPTION_QUERY_CACHE).removeAll();
+            }
         } else {
             logger.warn("skip cleaning cache for project " + project);
         }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index fb13ff5..d0ba4da 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -533,7 +533,8 @@ public class QueryService extends BasicService {
                 return null;
             }
             logger.info("The sqlResponse is found in " + cacheType);
-            if (!SQLResponseSignatureUtil.checkSignature(getConfig(), response, sqlRequest.getProject())) {
+            if (getConfig().isQueryCacheSignatureEnabled()
+                    && !SQLResponseSignatureUtil.checkSignature(getConfig(), response, sqlRequest.getProject())) {
                 logger.info("The sql response signature is changed. Remove it from QUERY_CACHE.");
                 cache.evict(sqlRequest.getCacheKey());
                 return null;
@@ -1091,7 +1092,9 @@ public class QueryService extends BasicService {
         response.setTotalScanCount(queryContext.getScannedRows());
         response.setTotalScanBytes(queryContext.getScannedBytes());
         response.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
-        response.setSignature(SQLResponseSignatureUtil.createSignature(getConfig(), response, projectName));
+        if (getConfig().isQueryCacheSignatureEnabled()) {
+            response.setSignature(SQLResponseSignatureUtil.createSignature(getConfig(), response, projectName));
+        }
         return response;
     }
 


[kylin] 11/12: KYLIN-2723 fix potential concurrent issue when add rpc statistics

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 78b17f5a70fb2e5d66e9cce4def79e5adb377878
Author: Zhong <nj...@apache.org>
AuthorDate: Wed Jan 31 19:08:58 2018 +0800

    KYLIN-2723 fix potential concurrent issue when add rpc statistics
---
 .../java/org/apache/kylin/common/QueryContext.java | 54 ++++++++++------------
 1 file changed, 25 insertions(+), 29 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 1a961ec..a065a13 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -179,7 +180,7 @@ public class QueryContext {
     }
 
     public void addContext(int ctxId, String type, boolean ifCube) {
-        Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = null;
+        ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = null;
         if (ifCube) {
             cubeSegmentStatisticsMap = Maps.newConcurrentMap();
         }
@@ -210,13 +211,13 @@ public class QueryContext {
             logger.warn("CubeSegmentStatisticsResult should be initialized for context {}", ctxId);
             return null;
         }
-        Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap;
+        ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap;
         if (cubeSegmentStatisticsMap == null) {
             logger.warn(
                     "cubeSegmentStatisticsMap should be initialized for CubeSegmentStatisticsResult with query type {}", cubeSegmentStatisticsResult.queryType);
             return null;
         }
-        Map<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName);
+        ConcurrentMap<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName);
         if (segmentStatisticsMap == null) {
             logger.warn(
                     "cubeSegmentStatistic should be initialized for cube {}", cubeName);
@@ -237,18 +238,16 @@ public class QueryContext {
             logger.warn("CubeSegmentStatisticsResult should be initialized for context {}", ctxId);
             return;
         }
-        Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap;
+        ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap;
         if (cubeSegmentStatisticsMap == null) {
             logger.warn(
                     "cubeSegmentStatisticsMap should be initialized for CubeSegmentStatisticsResult with query type {}", cubeSegmentStatisticsResult.queryType);
             return;
         }
         String cubeName = cubeSegmentStatistics.cubeName;
-        Map<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName);
-        if (segmentStatisticsMap == null) {
-            segmentStatisticsMap = Maps.newConcurrentMap();
-            cubeSegmentStatisticsMap.put(cubeName, segmentStatisticsMap);
-        }
+        cubeSegmentStatisticsMap.putIfAbsent(cubeName, Maps.<String, CubeSegmentStatistics> newConcurrentMap());
+        ConcurrentMap<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName);
+
         segmentStatisticsMap.put(cubeSegmentStatistics.getSegmentName(), cubeSegmentStatistics);
     }
 
@@ -263,28 +262,25 @@ public class QueryContext {
 
         CubeSegmentStatisticsResult cubeSegmentStatisticsResult = cubeSegmentStatisticsResultMap.get(ctxId);
         if (cubeSegmentStatisticsResult == null) {
-            logger.warn("CubeSegmentStatisticsResult should be initialized for context " + ctxId);
+            logger.warn("CubeSegmentStatisticsResult should be initialized for context {}", ctxId);
             return;
         }
-        Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap;
+        ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap;
         if (cubeSegmentStatisticsMap == null) {
             logger.warn(
-                    "cubeSegmentStatisticsMap should be initialized for CubeSegmentStatisticsResult with query type "
-                            + cubeSegmentStatisticsResult.queryType);
+                    "cubeSegmentStatisticsMap should be initialized for CubeSegmentStatisticsResult with query type {}",
+                    cubeSegmentStatisticsResult.queryType);
             return;
         }
-        Map<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName);
-        if (segmentStatisticsMap == null) {
-            segmentStatisticsMap = Maps.newConcurrentMap();
-            cubeSegmentStatisticsMap.put(cubeName, segmentStatisticsMap);
-        }
+        cubeSegmentStatisticsMap.putIfAbsent(cubeName, Maps.<String, CubeSegmentStatistics> newConcurrentMap());
+        ConcurrentMap<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName);
+
+        CubeSegmentStatistics old = segmentStatisticsMap.putIfAbsent(segmentName, new CubeSegmentStatistics());
         CubeSegmentStatistics segmentStatistics = segmentStatisticsMap.get(segmentName);
-        if (segmentStatistics == null) {
-            segmentStatistics = new CubeSegmentStatistics();
-            segmentStatisticsMap.put(segmentName, segmentStatistics);
+        if (old == null) {
             segmentStatistics.setWrapper(cubeName, segmentName, sourceCuboidId, targetCuboidId, filterMask);
-        }
-        if (segmentStatistics.sourceCuboidId != sourceCuboidId || segmentStatistics.targetCuboidId != targetCuboidId
+        } else if (segmentStatistics.sourceCuboidId != sourceCuboidId
+                || segmentStatistics.targetCuboidId != targetCuboidId
                 || segmentStatistics.filterMask != filterMask) {
             StringBuilder inconsistency = new StringBuilder();
             if (segmentStatistics.sourceCuboidId != sourceCuboidId) {
@@ -445,8 +441,8 @@ public class QueryContext {
             this.filterMask = filterMask;
         }
 
-        public void addRPCStats(long callTimeMs, long skipCount, long scanCount, long returnCount, long aggrCount,
-                long scanBytes, boolean ifSuccess) {
+        public synchronized void addRPCStats(long callTimeMs, long skipCount, long scanCount, long returnCount,
+                long aggrCount, long scanBytes, boolean ifSuccess) {
             this.callCount++;
             this.callTimeSum += callTimeMs;
             if (this.callTimeMax < callTimeMs) {
@@ -584,7 +580,7 @@ public class QueryContext {
         protected static final long serialVersionUID = 1L;
 
         private String queryType;
-        private Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap;
+        private ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap;
         private String realization;
         private int realizationType;
 
@@ -592,7 +588,7 @@ public class QueryContext {
         }
 
         public CubeSegmentStatisticsResult(String queryType,
-                Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap) {
+                ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap) {
             this.queryType = queryType;
             this.cubeSegmentStatisticsMap = cubeSegmentStatisticsMap;
         }
@@ -618,7 +614,7 @@ public class QueryContext {
         }
 
         public void setCubeSegmentStatisticsMap(
-                Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap) {
+                ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap) {
             this.cubeSegmentStatisticsMap = cubeSegmentStatisticsMap;
         }
 
@@ -627,7 +623,7 @@ public class QueryContext {
 
         }
 
-        public Map<String, Map<String, CubeSegmentStatistics>> getCubeSegmentStatisticsMap() {
+        public ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> getCubeSegmentStatisticsMap() {
             return cubeSegmentStatisticsMap;
         }
 


[kylin] 04/12: KYLIN-2894 add unit test

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 95afdb44e52be0d1e24c2487745906dbd35da8fc
Author: Zhong <nj...@apache.org>
AuthorDate: Thu Oct 18 17:49:36 2018 +0800

    KYLIN-2894 add unit test
---
 .../kylin/rest/response/SQLResponseTest.java       |  49 ++++++
 .../rest/signature/RealizationSignatureTest.java   |  46 ++++++
 .../kylin/rest/signature/SegmentSignatureTest.java |  39 +++++
 .../rest/signature/SignatureCalculatorTest.java    | 184 +++++++++++++++++++++
 4 files changed, 318 insertions(+)

diff --git a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
new file mode 100644
index 0000000..4055a27
--- /dev/null
+++ b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.response;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.util.JsonUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class SQLResponseTest {
+
+    @Test
+    public void testInterfaceConsistency() throws IOException {
+        String[] attrArray = new String[] { "columnMetas", "results", "cube", "affectedRowCount", "isException",
+                "exceptionMessage", "duration", "partial", "totalScanCount", "hitExceptionCache", "storageCacheUsed",
+                "pushDown", "traceUrl", "totalScanBytes" };
+
+        SQLResponse sqlResponse = new SQLResponse(null, null, "learn_cube", 100, false, null, false, false);
+        String jsonStr = JsonUtil.writeValueAsString(sqlResponse);
+        System.out.println(jsonStr);
+
+        JsonNode jnode = JsonUtil.readValueAsTree(jsonStr);
+        assertEquals(jnode.size(), attrArray.length);
+        for (String attr : attrArray) {
+            Assert.assertTrue(attr + " doesn't exist", jnode.has(attr));
+        }
+    }
+}
diff --git a/server-base/src/test/java/org/apache/kylin/rest/signature/RealizationSignatureTest.java b/server-base/src/test/java/org/apache/kylin/rest/signature/RealizationSignatureTest.java
new file mode 100644
index 0000000..1b857c5
--- /dev/null
+++ b/server-base/src/test/java/org/apache/kylin/rest/signature/RealizationSignatureTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.signature;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RealizationSignatureTest extends LocalFileMetadataTestCase {
+
+    private KylinConfig kylinConfig;
+
+    @Before
+    public void setUp() {
+        this.createTestMetadata();
+        kylinConfig = getTestConfig();
+    }
+    @Test
+    public void testEquals() {
+        RealizationSignature rs = RealizationSignature.CubeSignature.getCubeSignature(kylinConfig, "ssb_cube1");
+        RealizationSignature rs1 = RealizationSignature.CubeSignature.getCubeSignature(kylinConfig, "ssb_cube1");
+        RealizationSignature rs2 = RealizationSignature.CubeSignature.getCubeSignature(kylinConfig, "ssb_cube2");
+        Assert.assertTrue(rs.equals(rs));
+        Assert.assertTrue(rs.equals(rs1));
+        Assert.assertFalse(rs.equals(null));
+        Assert.assertFalse(rs.equals(rs2));
+    }
+}
diff --git a/server-base/src/test/java/org/apache/kylin/rest/signature/SegmentSignatureTest.java b/server-base/src/test/java/org/apache/kylin/rest/signature/SegmentSignatureTest.java
new file mode 100644
index 0000000..36f74cb
--- /dev/null
+++ b/server-base/src/test/java/org/apache/kylin/rest/signature/SegmentSignatureTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.signature;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SegmentSignatureTest {
+
+    @Test
+    public void testEquals() {
+        SegmentSignature ss = new SegmentSignature("segmentName", 999L);
+        Assert.assertTrue(ss.equals(ss));
+        Assert.assertFalse(ss.equals(null));
+        SegmentSignature ss1 = new SegmentSignature("segmentName", 0L);
+        Assert.assertFalse(ss.equals(ss1));
+        SegmentSignature ss2 = new SegmentSignature(null, 999L);
+        Assert.assertFalse(ss.equals(ss2));
+        Assert.assertFalse(ss2.equals(ss));
+        SegmentSignature ss3 = new SegmentSignature("segmentName2", 999L);
+        Assert.assertFalse(ss.equals(ss3));
+    }
+}
diff --git a/server-base/src/test/java/org/apache/kylin/rest/signature/SignatureCalculatorTest.java b/server-base/src/test/java/org/apache/kylin/rest/signature/SignatureCalculatorTest.java
new file mode 100644
index 0000000..55ac5b6
--- /dev/null
+++ b/server-base/src/test/java/org/apache/kylin/rest/signature/SignatureCalculatorTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.signature;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigExt;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.rest.response.SQLResponse;
+import org.apache.kylin.rest.signature.RealizationSignature.CubeSignature;
+import org.apache.kylin.rest.util.SQLResponseSignatureUtil;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class SignatureCalculatorTest extends LocalFileMetadataTestCase {
+
+    private final String projectName = "default";
+    private KylinConfig config;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        this.config = getTestConfig();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testGetRealizationSignature() {
+        RealizationSignature signature1 = RealizationSetCalculator.getRealizationSignature(this.config,
+                "Test" + System.currentTimeMillis());
+        Assert.assertNull(signature1);
+
+        CubeSignature signature2 = (CubeSignature) RealizationSetCalculator.getRealizationSignature(this.config, "ssb");
+        Assert.assertEquals(RealizationStatusEnum.DISABLED, signature2.status);
+        Assert.assertNull(signature2.segmentSignatureSet);
+
+        CubeSignature signature3 = (CubeSignature) RealizationSetCalculator.getRealizationSignature(this.config,
+                "test_kylin_cube_with_slr_left_join_ready");
+        Assert.assertNotNull(signature3.segmentSignatureSet);
+    }
+
+    @Test
+    public void testRealizationSetCalculator() throws IOException {
+        KylinConfig config = KylinConfig.createKylinConfig(getTestConfig());
+        Map<String, String> overrides = Maps.newHashMap();
+        overrides.put("kylin.query.signature-class", "org.apache.kylin.rest.signature.RealizationSetCalculator");
+
+        ProjectInstance projectInstance = ProjectManager.getInstance(config).getProject(projectName);
+        projectInstance.setConfig(KylinConfigExt.createInstance(config, overrides));
+
+        HybridManager hybridManager = HybridManager.getInstance(config);
+        HybridInstance hybrid1 = hybridManager.getHybridInstance("test_kylin_hybrid_ready");
+
+        CubeManager cubeManager = CubeManager.getInstance(config);
+        CubeInstance cube1 = cubeManager.getCube("test_kylin_cube_with_slr_ready_2_segments");
+        CubeInstance cube2 = cubeManager.getCube("test_kylin_cube_without_slr_ready");
+        CubeInstance cube2Clone = cloneCubeInstance(cubeManager, cube2, cube2.getName() + "_clone");
+
+        //Related cubes:
+        // - test_kylin_cube_with_slr_ready
+        // - test_kylin_cube_with_slr_ready_2_segments
+        // - test_kylin_cube_without_slr_ready
+        String cubes = hybrid1.getCanonicalName() + "," + cube2Clone.getCanonicalName();
+
+        SQLResponse sqlResponse = new SQLResponse();
+        sqlResponse.setCube(cubes);
+
+        String signature = SQLResponseSignatureUtil.createSignature(config, sqlResponse, projectName);
+        sqlResponse.setSignature(signature);
+
+        Assert.assertTrue(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+
+        {//Test the influence of related cubes status change
+            cube1 = cubeManager.updateCubeStatus(cube1, RealizationStatusEnum.DISABLED);
+            Assert.assertFalse(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+
+            cube1 = cubeManager.updateCubeStatus(cube1, RealizationStatusEnum.READY);
+            Assert.assertTrue(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+        }
+
+        {//Test the influence of segment changes
+            cube2Clone = cubeManager.updateCubeDropSegments(cube2Clone, cube2Clone.getSegments().get(0));
+            Assert.assertFalse(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+        }
+    }
+
+    @Test
+    public void testFactTableRealizationSetCalculator() throws IOException {
+        KylinConfig config = KylinConfig.createKylinConfig(getTestConfig());
+        Map<String, String> overrides = Maps.newHashMap();
+        overrides.put("kylin.query.signature-class",
+                "org.apache.kylin.rest.signature.FactTableRealizationSetCalculator");
+
+        ProjectInstance projectInstance = ProjectManager.getInstance(config).getProject(projectName);
+        projectInstance.setConfig(KylinConfigExt.createInstance(config, overrides));
+
+        HybridManager hybridManager = HybridManager.getInstance(config);
+        HybridInstance hybrid1 = hybridManager.getHybridInstance("test_kylin_hybrid_ready");
+
+        CubeManager cubeManager = CubeManager.getInstance(config);
+        CubeInstance cube1 = cubeManager.getCube("test_kylin_cube_with_slr_ready_2_segments");
+        CubeInstance cube2 = cubeManager.getCube("test_kylin_cube_without_slr_ready");
+        CubeInstance cube2Clone = cloneCubeInstance(cubeManager, cube2, cube2.getName() + "_clone");
+        CubeInstance cube3 = cloneCubeInstance(cubeManager, cube2, cube2.getDescName());
+
+        //Related cubes:
+        // - test_kylin_cube_with_slr_ready
+        // - test_kylin_cube_with_slr_ready_2_segments
+        // - test_kylin_cube_without_slr_ready
+        String cubes = hybrid1.getCanonicalName() + "," + cube2Clone.getCanonicalName();
+
+        SQLResponse sqlResponse = new SQLResponse();
+        sqlResponse.setCube(cubes);
+
+        String signature = SQLResponseSignatureUtil.createSignature(config, sqlResponse, projectName);
+        sqlResponse.setSignature(signature);
+
+        Assert.assertTrue(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+
+        {//Test the influence of related cubes status change
+            cube1 = cubeManager.updateCubeStatus(cube1, RealizationStatusEnum.DISABLED);
+            Assert.assertFalse(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+
+            cube1 = cubeManager.updateCubeStatus(cube1, RealizationStatusEnum.READY);
+            Assert.assertTrue(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+        }
+
+        {//Test the influence of cubes not in ${cubes} while share the same fact tables
+            cube3 = cubeManager.updateCubeStatus(cube3, RealizationStatusEnum.DISABLED);
+            Assert.assertFalse(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+
+            cube3 = cubeManager.updateCubeStatus(cube3, RealizationStatusEnum.READY);
+            Assert.assertTrue(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+        }
+
+        {//Test the influence of segment changes
+            cube2Clone = cubeManager.updateCubeDropSegments(cube2Clone, cube2Clone.getSegments().get(0));
+            Assert.assertFalse(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+        }
+    }
+
+    private CubeInstance cloneCubeInstance(CubeManager cubeManager, CubeInstance cube, String name) throws IOException {
+        CubeInstance cubeClone = cubeManager.createCube(name, projectName, cube.getDescriptor(), cube.getOwner());
+        CubeUpdate cubeUpdate = new CubeUpdate(cubeClone.latestCopyForWrite());
+        cubeUpdate.setToAddSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
+        cubeUpdate.setStatus(RealizationStatusEnum.READY);
+        return cubeManager.updateCube(cubeUpdate);
+    }
+}


[kylin] 10/12: KYLIN-2899 Introduce segment level query cache

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 7b58b161a1d3264e744e3e78b0cffbde5e830e67
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Wed Jan 31 15:39:12 2018 +0800

    KYLIN-2899 Introduce segment level query cache
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  13 ++
 .../java/org/apache/kylin/common/QueryContext.java |  48 +++++++
 .../apache/kylin/common/debug/BackdoorToggles.java |  16 +++
 dev-support/checkstyle.xml                         |   4 +-
 storage-hbase/pom.xml                              |   4 +
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java        | 155 ++++++++++++++++++++-
 .../storage/hbase/cube/v2/SegmentQueryCache.java   |  80 +++++++++++
 .../storage/hbase/cube/v2/SegmentQueryResult.java  | 101 ++++++++++++++
 .../storage/hbase/cube/SegmentQueryResultTest.java | 112 +++++++++++++++
 9 files changed, 526 insertions(+), 7 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 9066b0d..6e1ff9e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1461,6 +1461,19 @@ abstract public class KylinConfigBase implements Serializable {
         return getRequired("kylin.cache.memcached.hosts");
     }
 
+    public boolean isQuerySegmentCacheEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.query.segment-cache-enabled", "false"));
+    }
+
+    public int getQuerySegmentCacheTimeout() {
+        return Integer.parseInt(getOptional("kylin.query.segment-cache-timeout", "2000"));
+    }
+
+    // define the maximum size for each segment in one query that can be cached, in megabytes
+    public int getQuerySegmentCacheMaxSize() {
+        return Integer.parseInt(getOptional("kylin.query.segment-cache-max-size", "200"));
+    }
+
     public String getQueryAccessController() {
         return getOptional("kylin.query.access-controller", null);
     }
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index ef288c7..1a961ec 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -204,6 +204,54 @@ public class QueryContext {
         return Lists.newArrayList(cubeSegmentStatisticsResultMap.values());
     }
 
+    public CubeSegmentStatistics getCubeSegmentStatistics(int ctxId, String cubeName, String segmentName) {
+        CubeSegmentStatisticsResult cubeSegmentStatisticsResult = cubeSegmentStatisticsResultMap.get(ctxId);
+        if (cubeSegmentStatisticsResult == null) {
+            logger.warn("CubeSegmentStatisticsResult should be initialized for context {}", ctxId);
+            return null;
+        }
+        Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap;
+        if (cubeSegmentStatisticsMap == null) {
+            logger.warn(
+                    "cubeSegmentStatisticsMap should be initialized for CubeSegmentStatisticsResult with query type {}", cubeSegmentStatisticsResult.queryType);
+            return null;
+        }
+        Map<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName);
+        if (segmentStatisticsMap == null) {
+            logger.warn(
+                    "cubeSegmentStatistic should be initialized for cube {}", cubeName);
+            return null;
+        }
+        CubeSegmentStatistics segmentStatistics = segmentStatisticsMap.get(segmentName);
+        if (segmentStatistics == null) {
+            logger.warn(
+                    "segmentStatistics should be initialized for cube {} with segment{}", cubeName, segmentName);
+            return null;
+        }
+        return segmentStatistics;
+    }
+
+    public void addCubeSegmentStatistics(int ctxId, CubeSegmentStatistics cubeSegmentStatistics) {
+        CubeSegmentStatisticsResult cubeSegmentStatisticsResult = cubeSegmentStatisticsResultMap.get(ctxId);
+        if (cubeSegmentStatisticsResult == null) {
+            logger.warn("CubeSegmentStatisticsResult should be initialized for context {}", ctxId);
+            return;
+        }
+        Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap;
+        if (cubeSegmentStatisticsMap == null) {
+            logger.warn(
+                    "cubeSegmentStatisticsMap should be initialized for CubeSegmentStatisticsResult with query type {}", cubeSegmentStatisticsResult.queryType);
+            return;
+        }
+        String cubeName = cubeSegmentStatistics.cubeName;
+        Map<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName);
+        if (segmentStatisticsMap == null) {
+            segmentStatisticsMap = Maps.newConcurrentMap();
+            cubeSegmentStatisticsMap.put(cubeName, segmentStatisticsMap);
+        }
+        segmentStatisticsMap.put(cubeSegmentStatistics.getSegmentName(), cubeSegmentStatistics);
+    }
+
     public void addRPCStatistics(int ctxId, String rpcServer, String cubeName, String segmentName, long sourceCuboidId,
             long targetCuboidId, long filterMask, Exception e, long rpcCallTimeMs, long skippedRows, long scannedRows,
             long returnedRows, long aggregatedRows, long scannedBytes) {
diff --git a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
index 47fbbcd..be0f7a6 100644
--- a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
+++ b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
@@ -86,6 +86,10 @@ public class BackdoorToggles {
         return getBoolean(DEBUG_TOGGLE_DISABLE_QUERY_CACHE);
     }
 
+    public static boolean getDisableSegmentCache() {
+        return getBoolean(DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE);
+    }
+
     public static boolean getDisableFuzzyKey() {
         return getBoolean(DEBUG_TOGGLE_DISABLE_FUZZY_KEY);
     }
@@ -215,6 +219,18 @@ public class BackdoorToggles {
     public final static String DEBUG_TOGGLE_DISABLE_QUERY_CACHE = "DEBUG_TOGGLE_DISABLE_QUERY_CACHE";
 
     /**
+     * set DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE=true to prevent using segment cache for current query
+     *
+     *
+     *
+     example:(put it into request body)
+     "backdoorToggles": {
+     "DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE": "true"
+     }
+     */
+    public final static String DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE = "DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE";
+
+    /**
      * set DEBUG_TOGGLE_HBASE_CUBE_QUERY_VERSION=v1/v2 to control which version CubeStorageQuery to use
      *
      example:(put it into request body)
diff --git a/dev-support/checkstyle.xml b/dev-support/checkstyle.xml
index d8eb73f..802f058 100644
--- a/dev-support/checkstyle.xml
+++ b/dev-support/checkstyle.xml
@@ -58,7 +58,9 @@
             <property name="ignorePattern"
                       value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
         </module>
-        <module name="MethodLength"/>
+        <module name="MethodLength">
+            <property name="max" value="300"/>
+        </module>
         <module name="MethodParamPad"/>
         <module name="ParameterNumber">
             <!-- default is 8 -->
diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml
index 220fc14..190dd8c 100644
--- a/storage-hbase/pom.xml
+++ b/storage-hbase/pom.xml
@@ -41,6 +41,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-engine-mr</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-cache</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.kylin</groupId>
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 911c8d5..c9be666 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -27,6 +27,7 @@ import java.util.Locale;
 import java.util.concurrent.ExecutorService;
 import java.util.zip.DataFormatException;
 
+import org.apache.commons.lang3.SerializationUtils;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
@@ -37,8 +38,11 @@ import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContext.CubeSegmentStatistics;
+import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.exceptions.KylinTimeoutException;
 import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
+import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
@@ -48,7 +52,10 @@ import org.apache.kylin.common.util.LoggableCachedThreadPool;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRange;
 import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTUtil;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.storage.StorageContext;
@@ -171,6 +178,48 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         // for different cubes until redeployment of coprocessor jar.
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         final boolean compressionResult = kylinConfig.getCompressionResult();
+
+        final boolean querySegmentCacheEnabled = isSegmentLevelCacheEnabled();
+        final SegmentQueryResult.Builder segmentQueryResultBuilder = new SegmentQueryResult.Builder(shardNum,
+                cubeSeg.getConfig().getQuerySegmentCacheMaxSize() * 1024 * 1024);
+        String calculatedSegmentQueryCacheKey = null;
+        if (querySegmentCacheEnabled) {
+            try {
+                logger.info("Query-{}: try to get segment result from cache for segment:{}", queryContext.getQueryId(),
+                        cubeSeg);
+                calculatedSegmentQueryCacheKey = getSegmentQueryCacheKey(scanRequest);
+                long startTime = System.currentTimeMillis();
+                SegmentQueryResult segmentResult = SegmentQueryCache.getInstance().get(calculatedSegmentQueryCacheKey);
+                long spendTime = System.currentTimeMillis() - startTime;
+                if (segmentResult == null) {
+                    logger.info("Query-{}: no segment result is cached for segment:{}, take time:{}ms",
+                            queryContext.getQueryId(), cubeSeg, spendTime);
+                } else {
+                    logger.info("Query-{}: get segment result from cache for segment:{}, take time:{}ms",
+                            queryContext.getQueryId(), cubeSeg, spendTime);
+                    if (segmentResult.getCubeSegmentStatisticsBytes() != null) {
+                        queryContext.addCubeSegmentStatistics(storageContext.ctxId,
+                                (CubeSegmentStatistics) SerializationUtils
+                                        .deserialize(segmentResult.getCubeSegmentStatisticsBytes()));
+                    }
+                    for (byte[] regionResult : segmentResult.getRegionResults()) {
+                        if (compressionResult) {
+                            epResultItr.append(CompressionUtils.decompress(regionResult));
+                        } else {
+                            epResultItr.append(regionResult);
+                        }
+                    }
+                    return new StorageResponseGTScatter(scanRequest, new DummyPartitionStreamer(epResultItr),
+                            storageContext);
+                }
+            } catch (Exception e) {
+                logger.error("Fail to handle cached segment result from cache", e);
+            }
+        }
+        final String segmentQueryCacheKey = calculatedSegmentQueryCacheKey;
+        logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", shardNum,
+                cuboidBaseShard, rawScans.size());
+
         final CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
         builder.setGtScanRequest(scanRequestByteString).setHbaseRawScan(rawScanByteString);
         for (IntList intList : hbaseColumnsToGTIntList) {
@@ -193,7 +242,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                 @Override
                 public void run() {
                     runEPRange(queryContext, logHeader, compressionResult, builder.build(), conn, epRange.getFirst(),
-                            epRange.getSecond(), epResultItr);
+                            epRange.getSecond(), epResultItr, querySegmentCacheEnabled, segmentQueryResultBuilder,
+                            segmentQueryCacheKey);
                 }
             });
         }
@@ -203,7 +253,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
     private void runEPRange(final QueryContext queryContext, final String logHeader, final boolean compressionResult,
             final CubeVisitProtos.CubeVisitRequest request, final Connection conn, byte[] startKey, byte[] endKey,
-            final ExpectedSizeIterator epResultItr) {
+            final ExpectedSizeIterator epResultItr, final boolean querySegmentCacheEnabled,
+            final SegmentQueryResult.Builder segmentQueryResultBuilder, final String segmentQueryCacheKey) {
 
         final String queryId = queryContext.getQueryId();
 
@@ -328,12 +379,38 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                             }
 
                             try {
+                                byte[] rawData = HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows());
                                 if (compressionResult) {
-                                    epResultItr.append(CompressionUtils.decompress(
-                                            HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())));
+                                    epResultItr.append(CompressionUtils.decompress(rawData));
                                 } else {
-                                    epResultItr.append(
-                                            HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()));
+                                    epResultItr.append(rawData);
+                                }
+                                // put segment query result to cache if cache is enabled
+                                if (querySegmentCacheEnabled) {
+                                    try {
+                                        segmentQueryResultBuilder.putRegionResult(rawData);
+                                        if (segmentQueryResultBuilder.isComplete()) {
+                                            CubeSegmentStatistics cubeSegmentStatistics = queryContext
+                                                    .getCubeSegmentStatistics(storageContext.ctxId,
+                                                            cubeSeg.getCubeInstance().getName(), cubeSeg.getName());
+                                            if (cubeSegmentStatistics != null) {
+                                                segmentQueryResultBuilder
+                                                        .setCubeSegmentStatistics(cubeSegmentStatistics);
+                                                logger.info(
+                                                        "Query-{}: try to put segment query result to cache for segment:{}",
+                                                        queryContext.getQueryId(), cubeSeg);
+                                                SegmentQueryResult segmentQueryResult = segmentQueryResultBuilder
+                                                        .build();
+                                                SegmentQueryCache.getInstance().put(segmentQueryCacheKey,
+                                                        segmentQueryResult);
+                                                logger.info(
+                                                        "Query-{}: successfully put segment query result to cache for segment:{}",
+                                                        queryContext.getQueryId(), cubeSeg);
+                                            }
+                                        }
+                                    } catch (Throwable t) {
+                                        logger.error("Fail to put query segment result to cache", t);
+                                    }
                                 }
                             } catch (IOException | DataFormatException e) {
                                 throw new RuntimeException(logHeader + "Error when decompressing", e);
@@ -432,4 +509,70 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             throw new AssertionError("Unknown error type: " + errorInfo.getType());
         }
     }
+
+    private boolean isSegmentLevelCacheEnabled() {
+        if (BackdoorToggles.getDisableSegmentCache()) {
+            return false;
+        }
+        if (!cubeSeg.getConfig().isQuerySegmentCacheEnabled()) {
+            return false;
+        }
+        try {
+            if (KylinConfig.getInstanceFromEnv().getMemCachedHosts() == null) {
+                return false;
+            }
+        } catch (Exception e) {
+            logger.warn("Fail to get memcached hosts and segment level cache will not be enabled");
+            return false;
+        }
+        return true;
+    }
+
+    private String getSegmentQueryCacheKey(GTScanRequest scanRequest) {
+        String scanReqStr = getScanRequestString(scanRequest);
+        return cubeSeg.getCubeInstance().getName() + "_" + cubeSeg.getUuid() + "_" + scanReqStr;
+    }
+
+    private String getScanRequestString(GTScanRequest scanRequest) {
+        int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
+        while (true) {
+            try {
+                ByteBuffer out = ByteBuffer.allocate(scanRequestBufferSize);
+                GTInfo.serializer.serialize(scanRequest.getInfo(), out);
+                BytesUtil.writeVInt(scanRequest.getGTScanRanges().size(), out);
+                for (GTScanRange range : scanRequest.getGTScanRanges()) {
+                    serializeGTRecord(range.pkStart, out);
+                    serializeGTRecord(range.pkEnd, out);
+                    BytesUtil.writeVInt(range.fuzzyKeys.size(), out);
+                    for (GTRecord f : range.fuzzyKeys) {
+                        serializeGTRecord(f, out);
+                    }
+                }
+                ImmutableBitSet.serializer.serialize(scanRequest.getColumns(), out);
+                BytesUtil.writeByteArray(
+                        GTUtil.serializeGTFilter(scanRequest.getFilterPushDown(), scanRequest.getInfo()), out);
+                ImmutableBitSet.serializer.serialize(scanRequest.getAggrGroupBy(), out);
+                ImmutableBitSet.serializer.serialize(scanRequest.getAggrMetrics(), out);
+                BytesUtil.writeAsciiStringArray(scanRequest.getAggrMetricsFuncs(), out);
+                BytesUtil.writeVInt(scanRequest.isAllowStorageAggregation() ? 1 : 0, out);
+                BytesUtil.writeUTFString(scanRequest.getStorageLimitLevel().name(), out);
+                BytesUtil.writeVInt(scanRequest.getStorageScanRowNumThreshold(), out);
+                BytesUtil.writeVInt(scanRequest.getStoragePushDownLimit(), out);
+                BytesUtil.writeUTFString(scanRequest.getStorageBehavior(), out);
+                out.flip();
+                return Bytes.toStringBinary(out.array(), out.position(), out.limit());
+            } catch (BufferOverflowException boe) {
+                logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", scanRequestBufferSize);
+                scanRequestBufferSize *= 4;
+            }
+        }
+    }
+
+    private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) {
+        ByteArray[] cols = gtRecord.getInternal();
+        BytesUtil.writeVInt(cols.length, out);
+        for (ByteArray col : cols) {
+            col.exportData(out);
+        }
+    }
 }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryCache.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryCache.java
new file mode 100755
index 0000000..2b66a22
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryCache.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.cache.memcached.CacheStats;
+import org.apache.kylin.cache.memcached.MemcachedCache;
+import org.apache.kylin.cache.memcached.MemcachedCacheConfig;
+import org.apache.kylin.cache.memcached.MemcachedChunkingCache;
+import org.apache.kylin.common.KylinConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SegmentQueryCache {
+    public static final Logger logger = LoggerFactory.getLogger(SegmentQueryCache.class);
+    private static final String SEG_QUERY_CACHE_NAME = "query_segment_cache";
+    private static SegmentQueryCache segmentQueryCacheInstance = new SegmentQueryCache();
+
+    private MemcachedChunkingCache memcachedCache;
+
+    public static SegmentQueryCache getInstance() {
+        return segmentQueryCacheInstance;
+    }
+
+    private SegmentQueryCache() {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        MemcachedCacheConfig memcachedCacheConfig = new MemcachedCacheConfig();
+        String configHosts = kylinConfig.getMemCachedHosts();
+        memcachedCacheConfig.setTimeout(kylinConfig.getQuerySegmentCacheTimeout());
+        // set max object size a little less than 1024 * 1024, because the key of the segment result cache is long
+        // if set to 1024 * 1024 will cause memcached client exceed max size error
+        memcachedCacheConfig.setMaxObjectSize(1040000);
+        memcachedCacheConfig.setHosts(configHosts);
+        //Reverse the compression setting between Hbase coprocessor and memcached, if Hbase result is compressed, memcached will not compress.
+        memcachedCacheConfig.setEnableCompression(!kylinConfig.getCompressionResult());
+        String cacheName = SEG_QUERY_CACHE_NAME;
+        memcachedCache = new MemcachedChunkingCache(MemcachedCache.create(memcachedCacheConfig, cacheName));
+    }
+
+    public void put(String key, SegmentQueryResult segmentQueryResult) {
+        memcachedCache.put(key, segmentQueryResult);
+    }
+
+    public SegmentQueryResult get(String key) {
+        byte[] value = memcachedCache.get(key);
+        if (value == null) {
+            return null;
+        }
+        return (SegmentQueryResult) (SerializationUtils.deserialize(value));
+    }
+
+    public CacheStats getCacheStats() {
+        return memcachedCache.getStats();
+    }
+
+    /**
+     * evict the segment cache by query key
+     *
+     * @param segmentQueryKey
+     */
+    public void evict(String segmentQueryKey) {
+        memcachedCache.evict(segmentQueryKey);
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryResult.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryResult.java
new file mode 100755
index 0000000..e208c02
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryResult.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import com.google.common.collect.Queues;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.common.QueryContext.CubeSegmentStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * query result for each segment
+ */
+public class SegmentQueryResult implements Serializable {
+    private static final long serialVersionUID = 9047493994209284453L;
+
+    private Collection<byte[]> regionResults;
+
+    // store segment query stats for cube planer usage
+    private byte[] cubeSegmentStatisticsBytes;
+
+    public void setRegionResults(Collection<byte[]> regionResults) {
+        this.regionResults = regionResults;
+    }
+
+    public Collection<byte[]> getRegionResults() {
+        return regionResults;
+    }
+
+    public byte[] getCubeSegmentStatisticsBytes() {
+        return cubeSegmentStatisticsBytes;
+    }
+
+    public void setCubeSegmentStatisticsBytes(byte[] cubeSegmentStatisticsBytes) {
+        this.cubeSegmentStatisticsBytes = cubeSegmentStatisticsBytes;
+    }
+
+    public static class Builder {
+        private static final Logger logger = LoggerFactory.getLogger(Builder.class);
+
+        private volatile int regionsNum;
+        private ConcurrentLinkedQueue<byte[]> queue;
+        private AtomicInteger totalResultSize;
+        private volatile int maxSegmentCacheSize;
+        private byte[] cubeSegmentStatisticsBytes;
+
+        public Builder(int regionsNum, int maxCacheResultSize) {
+            this.regionsNum = regionsNum;
+            this.queue = Queues.newConcurrentLinkedQueue();
+            this.totalResultSize = new AtomicInteger();
+            this.maxSegmentCacheSize = maxCacheResultSize;
+        }
+
+        public void putRegionResult(byte[] result) {
+            totalResultSize.addAndGet(result.length);
+            if (totalResultSize.get() > maxSegmentCacheSize) {
+                logger.info("stop put result to cache, since the result size:{} is larger than configured size:{}",
+                        totalResultSize.get(), maxSegmentCacheSize);
+                return;
+            }
+            queue.offer(result);
+        }
+
+        public void setCubeSegmentStatistics(CubeSegmentStatistics cubeSegmentStatistics) {
+            this.cubeSegmentStatisticsBytes = (cubeSegmentStatistics == null ? null : SerializationUtils
+                    .serialize(cubeSegmentStatistics));
+        }
+
+        public boolean isComplete() {
+            return queue.size() == regionsNum;
+        }
+
+        public SegmentQueryResult build() {
+            SegmentQueryResult result = new SegmentQueryResult();
+            result.setCubeSegmentStatisticsBytes(cubeSegmentStatisticsBytes);
+            result.setRegionResults(queue);
+            return result;
+        }
+    }
+}
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/SegmentQueryResultTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/SegmentQueryResultTest.java
new file mode 100644
index 0000000..a944c8b
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/SegmentQueryResultTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.common.QueryContext.CubeSegmentStatistics;
+import org.apache.kylin.storage.hbase.cube.v2.SegmentQueryResult;
+import org.apache.kylin.storage.hbase.cube.v2.SegmentQueryResult.Builder;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.TestCase.assertFalse;
+
+public class SegmentQueryResultTest {
+    private static final Logger logger = LoggerFactory.getLogger(SegmentQueryResultTest.class);
+
+    @Test
+    public void buildTest() {
+        int maxCacheResultSize = 10 * 1024;
+        ExecutorService rpcExecutor = Executors.newFixedThreadPool(4);
+        SegmentQueryResult.Builder builder = new Builder(8, maxCacheResultSize);
+        mockSendRPCTasks(rpcExecutor, 4, builder, 1024);
+        assertFalse(builder.isComplete());
+        mockSendRPCTasks(rpcExecutor, 4, builder, 1024);
+        assertTrue(builder.isComplete());
+
+        builder = new Builder(8, maxCacheResultSize);
+        mockSendRPCTasks(rpcExecutor, 8, builder, 1500);
+        assertFalse(builder.isComplete());
+    }
+
+    @Test
+    public void resultValidateTest() {
+        long segmentBuildTime = System.currentTimeMillis() - 1000;
+        int maxCacheResultSize = 10 * 1024;
+        ExecutorService rpcExecutor = Executors.newFixedThreadPool(4);
+        SegmentQueryResult.Builder builder = new Builder(8, maxCacheResultSize);
+        mockSendRPCTasks(rpcExecutor, 8, builder, 1024);
+        CubeSegmentStatistics statistics = new CubeSegmentStatistics();
+        statistics.setWrapper("cube1", "20171001000000-20171010000000", 3, 7, 1);
+        builder.setCubeSegmentStatistics(statistics);
+        SegmentQueryResult segmentQueryResult = builder.build();
+
+        CubeSegmentStatistics desStatistics = SerializationUtils.deserialize(segmentQueryResult
+                .getCubeSegmentStatisticsBytes());
+        assertEquals("cube1", desStatistics.getCubeName());
+    }
+
+    private void mockSendRPCTasks(ExecutorService rpcExecutor, int rpcNum, SegmentQueryResult.Builder builder,
+                                  int resultSize) {
+        List<Future> futures = Lists.newArrayList();
+        for (int i = 0; i < rpcNum; i++) {
+            Future future = rpcExecutor.submit(new MockRPCTask(resultSize, 10, builder));
+            futures.add(future);
+        }
+        for (Future future : futures) {
+            try {
+                future.get();
+            } catch (Exception e) {
+                logger.error("exception", e);
+            }
+        }
+    }
+
+    private static class MockRPCTask implements Runnable {
+        private int resultSize;
+        private long takeTime;
+        private SegmentQueryResult.Builder builder;
+
+        MockRPCTask(int resultSize, long takeTime, SegmentQueryResult.Builder builder) {
+            this.resultSize = resultSize;
+            this.takeTime = takeTime;
+            this.builder = builder;
+        }
+
+        @Override
+        public void run() {
+            try {
+                Thread.sleep(takeTime);
+            } catch (InterruptedException e) {
+                logger.error("interrupt", e);
+            }
+            builder.putRegionResult(new byte[resultSize]);
+        }
+    }
+
+}


[kylin] 03/12: KYLIN-2894 add a new signature calculator FactTableRealizationSetCalculator

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 7e1cdeda85dc293efe8bd408444d7bbef8401be6
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Thu Oct 18 17:41:37 2018 +0800

    KYLIN-2894 add a new signature calculator FactTableRealizationSetCalculator
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   2 +-
 .../FactTableRealizationSetCalculator.java         | 112 +++++++++++++++++++++
 .../kylin/rest/util/SQLResponseSignatureUtil.java  |   6 +-
 3 files changed, 116 insertions(+), 4 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 43c0831..b19f2e9 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1555,7 +1555,7 @@ abstract public class KylinConfigBase implements Serializable {
 
     public String getSQLResponseSignatureClass() {
         return this.getOptional("kylin.query.signature-class",
-                "org.apache.kylin.rest.signature.RealizationSetCalculator");
+                "org.apache.kylin.rest.signature.FactTableRealizationSetCalculator");
     }
 
     public boolean isQueryCacheSignatureEnabled() {
diff --git a/server-base/src/main/java/org/apache/kylin/rest/signature/FactTableRealizationSetCalculator.java b/server-base/src/main/java/org/apache/kylin/rest/signature/FactTableRealizationSetCalculator.java
new file mode 100644
index 0000000..944cb1b
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/signature/FactTableRealizationSetCalculator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.signature;
+
+import java.util.Set;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.RealizationEntry;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+public class FactTableRealizationSetCalculator extends RealizationSetCalculator {
+
+    public static final Logger logger = LoggerFactory.getLogger(FactTableRealizationSetCalculator.class);
+
+    /**
+     * In case that cube selection result changes after a new cube's data is ready,
+     * the cache result should be invalidated, which requires the related signature should be changed.
+     * To achieve this, we need to consider all of those cubes who shares the same fact table
+     */
+    @Override
+    protected Set<String> getRealizations(KylinConfig config, String cubes, ProjectInstance project) {
+        Set<String> realizations = super.getRealizations(config, cubes, project);
+        if (realizations == null) {
+            return null;
+        }
+        Set<String> factTables = Sets.newHashSet();
+        for (String realName : realizations) {
+            IRealization realInstance = getRealization(config, realName);
+            String factTable = getRootFactTableForRealization(realInstance);
+            if (factTable != null) {
+                factTables.add(factTable);
+            }
+        }
+
+        Set<String> ret = Sets.newHashSet(realizations);
+        for (RealizationEntry entry : project.getRealizationEntries()) {
+            String realName = entry.getRealization();
+            IRealization realInstance = getRealization(config, realName, entry.getType());
+            String factTableForEntry = getRootFactTableForRealization(realInstance);
+            if (factTableForEntry != null) {
+                if (factTables.contains(factTableForEntry)) {
+                    ret.add(realName);
+                }
+            }
+        }
+        return ret;
+    }
+
+    private String getRootFactTableForRealization(IRealization realization) {
+        if (realization == null) {
+            logger.warn("Cannot find realization %s", realization);
+            return null;
+        }
+        DataModelDesc model = realization.getModel();
+        if (model == null) {
+            logger.warn("The model for realization %s is null", realization.getName());
+            return null;
+        }
+        TableRef rootFactTable = model.getRootFactTable();
+        if (rootFactTable == null) {
+            logger.warn("The root table for model %s is null", model.getName());
+            return null;
+        }
+        return rootFactTable.getTableIdentity();
+    }
+
+    private IRealization getRealization(KylinConfig config, String name, RealizationType type) {
+        switch (type) {
+        case CUBE:
+            return CubeManager.getInstance(config).getCube(name);
+        case HYBRID:
+            return HybridManager.getInstance(config).getHybridInstance(name);
+        default:
+            return getRealization(config, name);
+        }
+    }
+
+    private IRealization getRealization(KylinConfig config, String name) {
+        HybridInstance hybridInstance = HybridManager.getInstance(config).getHybridInstance(name);
+        if (hybridInstance != null) {
+            return hybridInstance;
+        }
+        return CubeManager.getInstance(config).getCube(name);
+    }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/SQLResponseSignatureUtil.java b/server-base/src/main/java/org/apache/kylin/rest/util/SQLResponseSignatureUtil.java
index c6d3507..2a57554 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/util/SQLResponseSignatureUtil.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/util/SQLResponseSignatureUtil.java
@@ -22,7 +22,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.rest.response.SQLResponse;
-import org.apache.kylin.rest.signature.RealizationSetCalculator;
+import org.apache.kylin.rest.signature.FactTableRealizationSetCalculator;
 import org.apache.kylin.rest.signature.SignatureCalculator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@ public class SQLResponseSignatureUtil {
             signatureCalculator = (SignatureCalculator) signatureClass.getConstructor().newInstance();
         } catch (Exception e) {
             logger.warn("Will use default signature since fail to construct signature due to " + e);
-            signatureCalculator = new RealizationSetCalculator();
+            signatureCalculator = new FactTableRealizationSetCalculator();
         }
         return signatureCalculator.calculateSignature(config, sqlResponse, project);
     }
@@ -62,7 +62,7 @@ public class SQLResponseSignatureUtil {
             return Class.forName(config.getSQLResponseSignatureClass());
         } catch (ClassNotFoundException e) {
             logger.warn("Will use default signature since cannot find class " + config.getSQLResponseSignatureClass());
-            return RealizationSetCalculator.class;
+            return FactTableRealizationSetCalculator.class;
         }
     }
 }