You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/10/29 11:27:59 UTC

[kylin] 07/12: KYLIN-2898 Introduce memcached as a distributed cache for queries

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit cd952ffd23e27b8468db03c38758d552a00c7486
Author: Wang Ken <mi...@ebay.com>
AuthorDate: Thu Oct 18 20:44:30 2018 +0800

    KYLIN-2898 Introduce memcached as a distributed cache for queries
---
 cache/pom.xml                                      |  94 ++++++
 .../spy/memcached/RefinedKetamaNodeLocator.java    | 279 ++++++++++++++++
 .../kylin/cache/cachemanager/CacheConstants.java   |  23 ++
 .../InstrumentedEhCacheCacheManager.java           | 101 ++++++
 .../cache/cachemanager/MemcachedCacheManager.java  | 181 ++++++++++
 .../RemoteLocalFailOverCacheManager.java           |  71 ++++
 .../cache/ehcache/InstrumentedEhCacheCache.java    | 205 ++++++++++++
 .../apache/kylin/cache/memcached/CacheStats.java   |  97 ++++++
 .../kylin/cache/memcached/KeyHookLookup.java       | 139 ++++++++
 .../kylin/cache/memcached/MemcachedCache.java      | 371 +++++++++++++++++++++
 .../cache/memcached/MemcachedCacheConfig.java      |  97 ++++++
 .../cache/memcached/MemcachedChunkingCache.java    | 279 ++++++++++++++++
 .../memcached/MemcachedConnectionFactory.java      | 193 +++++++++++
 .../MemcachedConnectionFactoryBuilder.java         | 173 ++++++++++
 .../kylin/cache/memcached/MemcachedMetrics.java    | 139 ++++++++
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 pom.xml                                            |  17 +
 17 files changed, 2463 insertions(+)

diff --git a/cache/pom.xml b/cache/pom.xml
new file mode 100644
index 0000000..8e31435
--- /dev/null
+++ b/cache/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+     http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kylin-cache</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache Kylin - Cache</name>
+    <description>Apache Kylin - Cache</description>
+
+    <parent>
+        <groupId>org.apache.kylin</groupId>
+        <artifactId>kylin</artifactId>
+        <version>2.6.0-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-metrics</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-jvm</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context-support</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-test</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>net.sf.ehcache</groupId>
+            <artifactId>ehcache</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>net.spy</groupId>
+            <artifactId>spymemcached</artifactId>
+        </dependency>
+
+        <!-- Test & Env -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+            <!--MRUnit relies on older version of mockito, so cannot manage it globally-->
+            <version>${mockito.version}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/cache/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java b/cache/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java
new file mode 100644
index 0000000..95298d7
--- /dev/null
+++ b/cache/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.spy.memcached;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import net.spy.memcached.compat.SpyObject;
+import net.spy.memcached.util.DefaultKetamaNodeLocatorConfiguration;
+import net.spy.memcached.util.KetamaNodeLocatorConfiguration;
+
+/**
+ * Copyright (C) 2006-2009 Dustin Sallings
+ * Copyright (C) 2009-2011 Couchbase, Inc.
+ *
+ * This is a modified version of the Ketama consistent hash strategy from
+ * last.fm. This implementation may not be compatible with libketama as hashing
+ * is considered separate from node location.
+ *
+ * The only modified method is the getSequence(). 
+ * The previous 7 may be too small to reduce the chance to get all down nodes.
+ *
+ * Note that this implementation does not currently supported weighted nodes.
+ *
+ * @see <a href="http://www.last.fm/user/RJ/journal/2007/04/10/392555/">RJ's
+ *      blog post</a>
+ */
+public final class RefinedKetamaNodeLocator extends SpyObject implements NodeLocator {
+
+    private final HashAlgorithm hashAlg;
+    private final Map<InetSocketAddress, Integer> weights;
+    private final boolean isWeightedKetama;
+    private final KetamaNodeLocatorConfiguration config;
+    private volatile TreeMap<Long, MemcachedNode> ketamaNodes;
+    private volatile Collection<MemcachedNode> allNodes;
+
+    /**
+     * Create a new KetamaNodeLocator using specified nodes and the specifed hash
+     * algorithm.
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg) {
+        this(nodes, alg, KetamaNodeKeyFormatter.Format.SPYMEMCACHED, new HashMap<InetSocketAddress, Integer>());
+    }
+
+    /**
+     * Create a new KetamaNodeLocator with specific nodes, hash, node key format,
+     * and weight
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     * @param nodeKeyFormat the format used to name the nodes in Ketama, either
+     *          SPYMEMCACHED or LIBMEMCACHED
+     * @param weights node weights for ketama, a map from InetSocketAddress to
+     *          weight as Integer
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg,
+            KetamaNodeKeyFormatter.Format nodeKeyFormat, Map<InetSocketAddress, Integer> weights) {
+        this(nodes, alg, weights, new DefaultKetamaNodeLocatorConfiguration(new KetamaNodeKeyFormatter(nodeKeyFormat)));
+    }
+
+    /**
+     * Create a new KetamaNodeLocator using specified nodes and the specifed hash
+     * algorithm and configuration.
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     * @param conf
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg, KetamaNodeLocatorConfiguration conf) {
+        this(nodes, alg, new HashMap<InetSocketAddress, Integer>(), conf);
+    }
+
+    /**
+     * Create a new KetamaNodeLocator with specific nodes, hash, node key format,
+     * and weight
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     * @param weights node weights for ketama, a map from InetSocketAddress to
+     *          weight as Integer
+     * @param configuration node locator configuration
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg,
+            Map<InetSocketAddress, Integer> nodeWeights, KetamaNodeLocatorConfiguration configuration) {
+        super();
+        allNodes = nodes;
+        hashAlg = alg;
+        config = configuration;
+        weights = nodeWeights;
+        isWeightedKetama = !weights.isEmpty();
+        setKetamaNodes(nodes);
+    }
+
+    private RefinedKetamaNodeLocator(TreeMap<Long, MemcachedNode> smn, Collection<MemcachedNode> an, HashAlgorithm alg,
+            Map<InetSocketAddress, Integer> nodeWeights, KetamaNodeLocatorConfiguration conf) {
+        super();
+        ketamaNodes = smn;
+        allNodes = an;
+        hashAlg = alg;
+        config = conf;
+        weights = nodeWeights;
+        isWeightedKetama = !weights.isEmpty();
+    }
+
+    public Collection<MemcachedNode> getAll() {
+        return allNodes;
+    }
+
+    public MemcachedNode getPrimary(final String k) {
+        MemcachedNode rv = getNodeForKey(hashAlg.hash(k));
+        assert rv != null : "Found no node for key " + k;
+        return rv;
+    }
+
+    long getMaxKey() {
+        return getKetamaNodes().lastKey();
+    }
+
+    MemcachedNode getNodeForKey(long hash) {
+        final MemcachedNode rv;
+        if (!ketamaNodes.containsKey(hash)) {
+            // Java 1.6 adds a ceilingKey method, but I'm still stuck in 1.5
+            // in a lot of places, so I'm doing this myself.
+            SortedMap<Long, MemcachedNode> tailMap = getKetamaNodes().tailMap(hash);
+            if (tailMap.isEmpty()) {
+                hash = getKetamaNodes().firstKey();
+            } else {
+                hash = tailMap.firstKey();
+            }
+        }
+        rv = getKetamaNodes().get(hash);
+        return rv;
+    }
+
+    /**
+     * the previous 7 may be too small to reduce the chance to get all down nodes
+     * @param k
+     * @return
+     */
+    public Iterator<MemcachedNode> getSequence(String k) {
+        // Seven searches gives us a 1 in 2^maxTry chance of hitting the
+        // same dead node all of the time.
+        int maxTry = config.getNodeRepetitions() + 1;
+        if (maxTry < 20) {
+            maxTry = 20;
+        }
+        return new KetamaIterator(k, maxTry, getKetamaNodes(), hashAlg);
+    }
+
+    public NodeLocator getReadonlyCopy() {
+        TreeMap<Long, MemcachedNode> smn = new TreeMap<Long, MemcachedNode>(getKetamaNodes());
+        Collection<MemcachedNode> an = new ArrayList<MemcachedNode>(allNodes.size());
+
+        // Rewrite the values a copy of the map.
+        for (Map.Entry<Long, MemcachedNode> me : smn.entrySet()) {
+            smn.put(me.getKey(), new MemcachedNodeROImpl(me.getValue()));
+        }
+
+        // Copy the allNodes collection.
+        for (MemcachedNode n : allNodes) {
+            an.add(new MemcachedNodeROImpl(n));
+        }
+
+        return new RefinedKetamaNodeLocator(smn, an, hashAlg, weights, config);
+    }
+
+    @Override
+    public void updateLocator(List<MemcachedNode> nodes) {
+        allNodes = nodes;
+        setKetamaNodes(nodes);
+    }
+
+    /**
+     * @return the ketamaNodes
+     */
+    protected TreeMap<Long, MemcachedNode> getKetamaNodes() {
+        return ketamaNodes;
+    }
+
+    /**
+     * Setup the KetamaNodeLocator with the list of nodes it should use.
+     *
+     * @param nodes a List of MemcachedNodes for this KetamaNodeLocator to use in
+     *          its continuum
+     */
+    protected void setKetamaNodes(List<MemcachedNode> nodes) {
+        TreeMap<Long, MemcachedNode> newNodeMap = new TreeMap<Long, MemcachedNode>();
+        int numReps = config.getNodeRepetitions();
+        int nodeCount = nodes.size();
+        int totalWeight = 0;
+
+        if (isWeightedKetama) {
+            for (MemcachedNode node : nodes) {
+                totalWeight += weights.get(node.getSocketAddress());
+            }
+        }
+
+        for (MemcachedNode node : nodes) {
+            if (isWeightedKetama) {
+
+                int thisWeight = weights.get(node.getSocketAddress());
+                float percent = (float) thisWeight / (float) totalWeight;
+                int pointerPerServer = (int) ((Math.floor(
+                        (float) (percent * (float) config.getNodeRepetitions() / 4 * (float) nodeCount + 0.0000000001)))
+                        * 4);
+                for (int i = 0; i < pointerPerServer / 4; i++) {
+                    for (long position : ketamaNodePositionsAtIteration(node, i)) {
+                        newNodeMap.put(position, node);
+                        getLogger().debug("Adding node %s with weight %s in position %d", node, thisWeight, position);
+                    }
+                }
+            } else {
+                // Ketama does some special work with md5 where it reuses chunks.
+                // Check to be backwards compatible, the hash algorithm does not
+                // matter for Ketama, just the placement should always be done using
+                // MD5
+                if (hashAlg == DefaultHashAlgorithm.KETAMA_HASH) {
+                    for (int i = 0; i < numReps / 4; i++) {
+                        for (long position : ketamaNodePositionsAtIteration(node, i)) {
+                            newNodeMap.put(position, node);
+                            getLogger().debug("Adding node %s in position %d", node, position);
+                        }
+                    }
+                } else {
+                    for (int i = 0; i < numReps; i++) {
+                        newNodeMap.put(hashAlg.hash(config.getKeyForNode(node, i)), node);
+                    }
+                }
+            }
+        }
+        assert newNodeMap.size() == numReps * nodes.size();
+        ketamaNodes = newNodeMap;
+    }
+
+    private List<Long> ketamaNodePositionsAtIteration(MemcachedNode node, int iteration) {
+        List<Long> positions = new ArrayList<Long>();
+        byte[] digest = DefaultHashAlgorithm.computeMd5(config.getKeyForNode(node, iteration));
+        for (int h = 0; h < 4; h++) {
+            Long k = ((long) (digest[3 + h * 4] & 0xFF) << 24) | ((long) (digest[2 + h * 4] & 0xFF) << 16);
+            k |= ((long) (digest[1 + h * 4] & 0xFF) << 8) | (digest[h * 4] & 0xFF);
+            positions.add(k);
+        }
+        return positions;
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/CacheConstants.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/CacheConstants.java
new file mode 100644
index 0000000..07b15a5
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/CacheConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.cachemanager;
+
+public class CacheConstants {
+    public static final String QUERY_CACHE = "StorageCache";
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/InstrumentedEhCacheCacheManager.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/InstrumentedEhCacheCacheManager.java
new file mode 100644
index 0000000..4f0911f
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/InstrumentedEhCacheCacheManager.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.cachemanager;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.kylin.cache.ehcache.InstrumentedEhCacheCache;
+import org.apache.kylin.common.KylinConfig;
+import org.springframework.cache.Cache;
+import org.springframework.cache.ehcache.EhCacheCache;
+import org.springframework.cache.support.AbstractCacheManager;
+import org.springframework.util.Assert;
+
+import com.google.common.collect.Sets;
+
+import net.sf.ehcache.Ehcache;
+import net.sf.ehcache.Status;
+
+/**
+ * CacheManager backed by an EhCache {@link net.sf.ehcache.CacheManager}.
+ *
+ */
+public class InstrumentedEhCacheCacheManager extends AbstractCacheManager {
+
+    private net.sf.ehcache.CacheManager cacheManager;
+    private Map<String, String> metricsConfig = KylinConfig.getInstanceFromEnv().getKylinMetricsConf();
+    private boolean enableMetrics = false;
+
+    /**
+     * Return the backing EhCache {@link net.sf.ehcache.CacheManager}.
+     */
+    public net.sf.ehcache.CacheManager getCacheManager() {
+        return this.cacheManager;
+    }
+
+    /**
+     * Set the backing EhCache {@link net.sf.ehcache.CacheManager}.
+     */
+    public void setCacheManager(net.sf.ehcache.CacheManager cacheManager) {
+        this.cacheManager = cacheManager;
+        if ("true".equalsIgnoreCase(metricsConfig.get("ehcache.enabled"))) {
+            enableMetrics = true;
+        }
+    }
+
+    @Override
+    protected Collection<Cache> loadCaches() {
+        Assert.notNull(this.cacheManager, "A backing EhCache CacheManager is required");
+        Status status = this.cacheManager.getStatus();
+        Assert.isTrue(Status.STATUS_ALIVE.equals(status),
+                "An 'alive' EhCache CacheManager is required - current cache is " + status.toString());
+
+        String[] names = this.cacheManager.getCacheNames();
+        Collection<Cache> caches = Sets.newLinkedHashSetWithExpectedSize(names.length);
+        for (String name : names) {
+            if (enableMetrics) {
+                caches.add(new InstrumentedEhCacheCache(this.cacheManager.getEhcache(name)));
+            } else {
+                caches.add(new EhCacheCache(this.cacheManager.getEhcache(name)));
+            }
+        }
+        return caches;
+    }
+
+    @Override
+    public Cache getCache(String name) {
+        Cache cache = super.getCache(name);
+        if (cache == null) {
+            // check the EhCache cache again
+            // (in case the cache was added at runtime)
+            Ehcache ehcache = this.cacheManager.getEhcache(name);
+            if (ehcache != null) {
+                if (enableMetrics) {
+                    cache = new InstrumentedEhCacheCache(ehcache);
+                } else {
+                    cache = new EhCacheCache(ehcache);
+                }
+                addCache(cache);
+            }
+        }
+        return cache;
+    }
+
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/MemcachedCacheManager.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/MemcachedCacheManager.java
new file mode 100644
index 0000000..a4e1ffe
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/MemcachedCacheManager.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.cachemanager;
+
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.cache.memcached.MemcachedCache;
+import org.apache.kylin.cache.memcached.MemcachedCacheConfig;
+import org.apache.kylin.cache.memcached.MemcachedChunkingCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.Cache;
+import org.springframework.cache.support.AbstractCacheManager;
+import org.springframework.cache.support.SimpleValueWrapper;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import net.spy.memcached.MemcachedClientIF;
+
+public class MemcachedCacheManager extends AbstractCacheManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(MemcachedCacheManager.class);
+    private static final Long ONE_MINUTE = 60 * 1000L;
+
+    @Autowired
+    private MemcachedCacheConfig memcachedCacheConfig;
+
+    private ScheduledExecutorService timer = Executors.newScheduledThreadPool(1,
+            new ThreadFactoryBuilder().setNameFormat("Memcached-HealthChecker").build());
+    private AtomicBoolean clusterHealth = new AtomicBoolean(true);
+
+    @Override
+    protected Collection<? extends Cache> loadCaches() {
+        Cache successCache = new MemCachedCacheAdaptor(
+                new MemcachedChunkingCache(MemcachedCache.create(memcachedCacheConfig, CacheConstants.QUERY_CACHE)));
+
+        addCache(successCache);
+
+        Collection<String> names = getCacheNames();
+        Collection<Cache> caches = Lists.newArrayList();
+        for (String name : names) {
+            caches.add(getCache(name));
+        }
+
+        timer.scheduleWithFixedDelay(new MemcachedClusterHealthChecker(), ONE_MINUTE, ONE_MINUTE,
+                TimeUnit.MILLISECONDS);
+        return caches;
+    }
+
+    public boolean isClusterDown() {
+        return !clusterHealth.get();
+    }
+
+    @VisibleForTesting
+    void setClusterHealth(boolean ifHealth) {
+        clusterHealth.set(ifHealth);
+    }
+
+    public static class MemCachedCacheAdaptor implements Cache {
+        private MemcachedCache memcachedCache;
+
+        public MemCachedCacheAdaptor(MemcachedCache memcachedCache) {
+            this.memcachedCache = memcachedCache;
+        }
+
+        @Override
+        public String getName() {
+            return memcachedCache.getName();
+        }
+
+        @Override
+        public Object getNativeCache() {
+            return memcachedCache.getNativeCache();
+        }
+
+        @Override
+        public ValueWrapper get(Object key) {
+            byte[] value = memcachedCache.get(key);
+            if (value == null) {
+                return null;
+            }
+            return new SimpleValueWrapper(SerializationUtils.deserialize(value));
+        }
+
+        @Override
+        public void put(Object key, Object value) {
+            memcachedCache.put(key, value);
+        }
+
+        @Override
+        public void evict(Object key) {
+            memcachedCache.evict(key);
+        }
+
+        @Override
+        public void clear() {
+            memcachedCache.clear();
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <T> T get(Object key, Class<T> type) {
+            byte[] value = memcachedCache.get(key);
+            if (value == null) {
+                return null;
+            }
+            Object obj = SerializationUtils.deserialize(value);
+            if (obj != null && type != null && !type.isInstance(value)) {
+                throw new IllegalStateException(
+                        "Cached value is not of required type [" + type.getName() + "]: " + value);
+            }
+            return (T) obj;
+        }
+
+        @Override
+        //TODO
+        public <T> T get(Object key, Callable<T> valueLoader) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        //TODO implementation here doesn't guarantee the atomicity.
+        //Without atomicity, this method should not be invoked
+        public ValueWrapper putIfAbsent(Object key, Object value) {
+            byte[] existing = memcachedCache.get(key);
+            if (existing == null) {
+                memcachedCache.put(key, value);
+                return null;
+            } else {
+                return new SimpleValueWrapper(SerializationUtils.deserialize(existing));
+            }
+        }
+
+    }
+
+    private class MemcachedClusterHealthChecker implements Runnable {
+        @Override
+        public void run() {
+            Cache cache = getCache(CacheConstants.QUERY_CACHE);
+            MemcachedClientIF cacheClient = (MemcachedClientIF) cache.getNativeCache();
+            Collection<SocketAddress> liveServers = cacheClient.getAvailableServers();
+            Collection<SocketAddress> deadServers = cacheClient.getUnavailableServers();
+            if (liveServers.size() == 0) {
+                clusterHealth.set(false);
+                logger.error("All the servers in MemcachedCluster is down, UnavailableServers: " + deadServers);
+            } else {
+                clusterHealth.set(true);
+                if (deadServers.size() > liveServers.size()) {
+                    logger.warn("Half of the servers in MemcachedCluster is down, LiveServers: " + liveServers
+                            + ", UnavailableServers: " + deadServers);
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
new file mode 100644
index 0000000..f9b7ef6
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.cachemanager;
+
+import java.util.Collection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.Cache;
+import org.springframework.cache.CacheManager;
+import org.springframework.cache.support.AbstractCacheManager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+public class RemoteLocalFailOverCacheManager extends AbstractCacheManager {
+    private static final Logger logger = LoggerFactory.getLogger(RemoteLocalFailOverCacheManager.class);
+
+    @Autowired
+    private MemcachedCacheManager remoteCacheManager;
+
+    @Autowired
+    private CacheManager localCacheManager;
+
+    @Override
+    public void afterPropertiesSet() {
+        Preconditions.checkNotNull(localCacheManager, "localCacheManager is not injected yet");
+    }
+
+    @Override
+    protected Collection<? extends Cache> loadCaches() {
+        return null;
+    }
+
+    @Override
+    public Cache getCache(String name) {
+        if (remoteCacheManager == null || remoteCacheManager.isClusterDown()) {
+            logger.info("use local cache, because remote cache is not configured or down");
+            return localCacheManager.getCache(name);
+        } else {
+            return remoteCacheManager.getCache(name);
+        }
+    }
+
+    @VisibleForTesting
+    void disableRemoteCacheManager() {
+        remoteCacheManager.setClusterHealth(false);
+    }
+
+    @VisibleForTesting
+    void enableRemoteCacheManager() {
+        remoteCacheManager.setClusterHealth(true);
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/ehcache/InstrumentedEhCacheCache.java b/cache/src/main/java/org/apache/kylin/cache/ehcache/InstrumentedEhCacheCache.java
new file mode 100644
index 0000000..7a9b585
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/ehcache/InstrumentedEhCacheCache.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.ehcache;
+
+import static org.apache.kylin.metrics.lib.impl.MetricsSystem.Metrics;
+import static org.apache.kylin.metrics.lib.impl.MetricsSystem.name;
+
+import java.util.concurrent.Callable;
+
+import org.springframework.cache.Cache;
+import org.springframework.cache.ehcache.EhCacheCache;
+import org.springframework.cache.support.SimpleValueWrapper;
+import org.springframework.util.Assert;
+
+import com.codahale.metrics.Gauge;
+
+import net.sf.ehcache.Ehcache;
+import net.sf.ehcache.Element;
+import net.sf.ehcache.Status;
+
+/**
+ * {@link Cache} implementation on top of an {@link Ehcache} instance.
+ *
+ */
+public class InstrumentedEhCacheCache implements Cache {
+
+    private final Ehcache cache;
+
+    /**
+     * Create an {@link EhCacheCache} instance.
+     * @param ehcache backing Ehcache instance
+     */
+    public InstrumentedEhCacheCache(Ehcache ehcache) {
+        Assert.notNull(ehcache, "Ehcache must not be null");
+        Status status = ehcache.getStatus();
+        Assert.isTrue(Status.STATUS_ALIVE.equals(status),
+                "An 'alive' Ehcache is required - current cache is " + status.toString());
+        this.cache = ehcache;
+
+        final String prefix = name(cache.getClass(), cache.getName());
+        Metrics.register(name(prefix, "hits"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().cacheHitCount();
+            }
+        });
+
+        Metrics.register(name(prefix, "in-memory-hits"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().localHeapHitCount();
+            }
+        });
+
+        Metrics.register(name(prefix, "misses"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().cacheMissCount();
+            }
+        });
+
+        Metrics.register(name(prefix, "in-memory-misses"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().localHeapMissCount();
+            }
+        });
+
+        Metrics.register(name(prefix, "objects"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().getSize();
+            }
+        });
+
+        Metrics.register(name(prefix, "in-memory-objects"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().getLocalHeapSize();
+            }
+        });
+
+        Metrics.register(name(prefix, "mean-get-time"), new Gauge<Double>() {
+            @Override
+            public Double getValue() {
+                return cache.getStatistics().cacheGetOperation().latency().average().value();
+            }
+        });
+
+        Metrics.register(name(prefix, "mean-search-time"), new Gauge<Double>() {
+            @Override
+            public Double getValue() {
+                return cache.getStatistics().cacheSearchOperation().latency().average().value();
+            }
+        });
+
+        Metrics.register(name(prefix, "eviction-count"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().cacheEvictionOperation().count().value();
+            }
+        });
+
+        Metrics.register(name(prefix, "writer-queue-size"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().getWriterQueueLength();
+            }
+        });
+    }
+
+    public String getName() {
+        return this.cache.getName();
+    }
+
+    public Ehcache getNativeCache() {
+        return this.cache;
+    }
+
+    public ValueWrapper get(Object key) {
+        Element element = this.cache.get(key);
+        return (element != null ? new SimpleValueWrapper(element.getObjectValue()) : null);
+    }
+
+    public void put(Object key, Object value) {
+        this.cache.put(new Element(key, value));
+    }
+
+    public void evict(Object key) {
+        this.cache.remove(key);
+    }
+
+    public void clear() {
+        this.cache.removeAll();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T get(Object key, Class<T> type) {
+        Element element = lookup(key);
+        Object value = (element != null ? element.getObjectValue() : null);
+        if (value != null && type != null && !type.isInstance(value)) {
+            throw new IllegalStateException("Cached value is not of required type [" + type.getName() + "]: " + value);
+        }
+        return (T) value;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T get(Object key, Callable<T> valueLoader) {
+        Element element = lookup(key);
+        if (element != null) {
+            return (T) element.getObjectValue();
+        } else {
+            this.cache.acquireWriteLockOnKey(key);
+            try {
+                element = lookup(key); // One more attempt with the write lock
+                if (element != null) {
+                    return (T) element.getObjectValue();
+                } else {
+                    return loadValue(key, valueLoader);
+                }
+            } finally {
+                this.cache.releaseWriteLockOnKey(key);
+            }
+        }
+    }
+
+    @Override
+    public ValueWrapper putIfAbsent(Object key, Object value) {
+        Element existingElement = this.cache.putIfAbsent(new Element(key, value));
+        return (existingElement != null ? new SimpleValueWrapper(existingElement.getObjectValue()) : null);
+    }
+
+    private Element lookup(Object key) {
+        return this.cache.get(key);
+    }
+
+    private <T> T loadValue(Object key, Callable<T> valueLoader) {
+        T value;
+        try {
+            value = valueLoader.call();
+        } catch (Throwable ex) {
+            throw new ValueRetrievalException(key, valueLoader, ex);
+        }
+        put(key, value);
+        return value;
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/CacheStats.java b/cache/src/main/java/org/apache/kylin/cache/memcached/CacheStats.java
new file mode 100644
index 0000000..c91ba45
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/CacheStats.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+public class CacheStats {
+    private final long numHits;
+    private final long numMisses;
+    private final long getBytes;
+    private final long getTime;
+    private final long numPut;
+    private final long putBytes;
+    private final long numEvictions;
+    private final long numTimeouts;
+    private final long numErrors;
+
+    public CacheStats(long getBytes, long getTime, long numPut, long putBytes, long numHits, long numMisses,
+            long numEvictions, long numTimeouts, long numErrors) {
+        this.getBytes = getBytes;
+        this.getTime = getTime;
+        this.numPut = numPut;
+        this.putBytes = putBytes;
+        this.numHits = numHits;
+        this.numMisses = numMisses;
+        this.numEvictions = numEvictions;
+        this.numTimeouts = numTimeouts;
+        this.numErrors = numErrors;
+    }
+
+    public long getNumHits() {
+        return numHits;
+    }
+
+    public long getNumMisses() {
+        return numMisses;
+    }
+
+    public long getNumGet() {
+        return numHits + numMisses;
+    }
+
+    public long getNumGetBytes() {
+        return getBytes;
+    }
+
+    public long getNumPutBytes() {
+        return putBytes;
+    }
+
+    public long getNumPut() {
+        return numPut;
+    }
+
+    public long getNumEvictions() {
+        return numEvictions;
+    }
+
+    public long getNumTimeouts() {
+        return numTimeouts;
+    }
+
+    public long getNumErrors() {
+        return numErrors;
+    }
+
+    public long numLookups() {
+        return numHits + numMisses;
+    }
+
+    public double hitRate() {
+        long lookups = numLookups();
+        return lookups == 0 ? 0 : numHits / (double) lookups;
+    }
+
+    public long avgGetBytes() {
+        return getBytes == 0 ? 0 : getBytes / numLookups();
+    }
+
+    public long getAvgGetTime() {
+        return getTime / numLookups();
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/KeyHookLookup.java b/cache/src/main/java/org/apache/kylin/cache/memcached/KeyHookLookup.java
new file mode 100644
index 0000000..b9bdf5c
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/KeyHookLookup.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * A Class implement this interface indicates that the key information need to be calculated from a first lookup from cache itself to get
+ * a hook.
+ *
+ */
+public interface KeyHookLookup {
+    KeyHook lookupKeyHook(String key);
+
+    public static class KeyHook implements Serializable {
+        private static final long serialVersionUID = 2400159460862757991L;
+
+        private String[] chunkskey;
+        private byte[] values;
+
+        /**
+         * For de-serialization
+         */
+        public KeyHook() {
+        }
+
+        public KeyHook(String[] chunkskey, byte[] values) {
+            super();
+            this.chunkskey = chunkskey;
+            this.values = values;
+        }
+
+        public String[] getChunkskey() {
+            return chunkskey;
+        }
+
+        public void setChunkskey(String[] chunkskey) {
+            this.chunkskey = chunkskey;
+        }
+
+        public byte[] getValues() {
+            return values;
+        }
+
+        public void setValues(byte[] values) {
+            this.values = values;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + Arrays.hashCode(chunkskey);
+            result = prime * result + Arrays.hashCode(values);
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            KeyHook other = (KeyHook) obj;
+            if (!Arrays.equals(chunkskey, other.chunkskey))
+                return false;
+            if (!Arrays.equals(values, other.values))
+                return false;
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            if (chunkskey != null) {
+                builder.append("chunkskey_length:" + chunkskey.length);
+            } else {
+                builder.append("chunkskey_is_null");
+            }
+            builder.append("|");
+            if (values != null) {
+                builder.append("value_length:" + values.length);
+            } else {
+                builder.append("value_is_null");
+            }
+            return builder.toString();
+        }
+
+        //        @Override
+        //        public void writeExternal(ObjectOutput out) throws IOException {
+        //            if(chunkskey == null){
+        //                out.writeInt(0);
+        //            }else{
+        //                out.writeInt(chunkskey.length);
+        //                for (String chunkKey : chunkskey) {
+        //                    out.writeUTF(chunkKey);
+        //                }
+        //            }
+        //            if(values != null){
+        //                out.write(values);
+        //            }
+        //        }
+        //        
+        //        @Override
+        //        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        //            int keySize = in.readInt();
+        //            if(keySize > 0){
+        //                chunkskey = new String[keySize];
+        //                for (int i = 0; i < keySize; i++){
+        //                    chunkskey[i] = in.readUTF();
+        //                }
+        //            }
+        //            int available = in.available();
+        //            if(available > 0){
+        //                values = new byte[available];
+        //                in.read(values);
+        //            }
+        //        }
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCache.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCache.java
new file mode 100644
index 0000000..a2e69a7
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCache.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.DataFormatException;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.CompressionUtils;
+import org.apache.kylin.common.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Shorts;
+
+import net.spy.memcached.AddrUtil;
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.ConnectionFactoryBuilder;
+import net.spy.memcached.DefaultHashAlgorithm;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.MemcachedClientIF;
+import net.spy.memcached.ops.ArrayOperationQueueFactory;
+import net.spy.memcached.ops.LinkedOperationQueueFactory;
+import net.spy.memcached.ops.OperationQueueFactory;
+import net.spy.memcached.transcoders.SerializingTranscoder;
+
+/**
+ * Cache backend by Memcached. The implementation leverages spymemcached client to talk to the servers.
+ * Memcached itself has a limitation to the size of the key. So the real key for cache lookup is hashed from the orginal key.
+ * The implementation provdes a way for hash collsion detection. It can also compress/decompress the value bytes based on the preconfigred compression threshold to save network bandwidth and storage space.
+ *
+ * @author mingmwang
+ *
+ */
+public class MemcachedCache {
+    public static final int MAX_PREFIX_LENGTH = MemcachedClientIF.MAX_KEY_LENGTH - 40 // length of namespace hash
+            - 40 // length of key hash
+            - 2; // length of separators
+    private static final Logger logger = LoggerFactory.getLogger(MemcachedCache.class);
+    private static final int DEFAULT_TTL = 7 * 24 * 3600;
+    protected final MemcachedCacheConfig config;
+    protected final MemcachedClientIF client;
+    protected final String memcachedPrefix;
+    protected final int compressThreshold;
+    protected final AtomicLong hitCount = new AtomicLong(0);
+    protected final AtomicLong missCount = new AtomicLong(0);
+    protected final AtomicLong readBytes = new AtomicLong(0);
+    protected final AtomicLong timeoutCount = new AtomicLong(0);
+    protected final AtomicLong errorCount = new AtomicLong(0);
+    protected final AtomicLong putCount = new AtomicLong(0);
+    protected final AtomicLong putBytes = new AtomicLong(0);
+    private final int timeToLiveSeconds;
+    protected AtomicLong cacheGetTime = new AtomicLong(0);
+
+    public MemcachedCache(final MemcachedClientIF client, final MemcachedCacheConfig config,
+            final String memcachedPrefix, int timeToLiveSeconds) {
+        Preconditions.checkArgument(memcachedPrefix.length() <= MAX_PREFIX_LENGTH,
+                "memcachedPrefix length [%d] exceeds maximum length [%d]", memcachedPrefix.length(), MAX_PREFIX_LENGTH);
+        this.memcachedPrefix = memcachedPrefix;
+        this.client = client;
+        this.config = config;
+        this.compressThreshold = config.getMaxObjectSize() / 2;
+        this.timeToLiveSeconds = timeToLiveSeconds;
+    }
+
+    public MemcachedCache(MemcachedCache cache) {
+        this(cache.client, cache.config, cache.memcachedPrefix, cache.timeToLiveSeconds);
+    }
+
+    /**
+     * Create and return the MemcachedCache. Each time call this method will create a new instance.
+     * @param config            The MemcachedCache configuration to control the cache behavior.
+     * @return
+     */
+    public static MemcachedCache create(final MemcachedCacheConfig config, String memcachedPrefix) {
+        return create(config, memcachedPrefix, DEFAULT_TTL);
+    }
+
+    public static MemcachedCache create(final MemcachedCacheConfig config, String memcachedPrefix, int timeToLive) {
+        try {
+            SerializingTranscoder transcoder = new SerializingTranscoder(config.getMaxObjectSize());
+            // always no compression inside, we compress/decompress outside
+            transcoder.setCompressionThreshold(Integer.MAX_VALUE);
+
+            OperationQueueFactory opQueueFactory;
+            int maxQueueSize = config.getMaxOperationQueueSize();
+            if (maxQueueSize > 0) {
+                opQueueFactory = new ArrayOperationQueueFactory(maxQueueSize);
+            } else {
+                opQueueFactory = new LinkedOperationQueueFactory();
+            }
+            String hostsStr = config.getHosts();
+            ConnectionFactory connectionFactory = new MemcachedConnectionFactoryBuilder()
+                    .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
+                    .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
+                    .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT).setDaemon(true)
+                    .setFailureMode(FailureMode.Redistribute).setTranscoder(transcoder).setShouldOptimize(true)
+                    .setOpQueueMaxBlockTime(config.getTimeout()).setOpTimeout(config.getTimeout())
+                    .setReadBufferSize(config.getReadBufferSize()).setOpQueueFactory(opQueueFactory).build();
+            return new MemcachedCache(new MemcachedClient(new MemcachedConnectionFactory(connectionFactory),
+                    AddrUtil.getAddresses(hostsStr)), config, memcachedPrefix, timeToLive);
+        } catch (IOException e) {
+            logger.error("Unable to create MemcachedCache instance.", e);
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public String getName() {
+        return memcachedPrefix;
+    }
+
+    public Object getNativeCache() {
+        return client;
+    }
+
+    protected String serializeKey(Object key) {
+        try {
+            return JsonUtil.writeValueAsString(key);
+        } catch (JsonProcessingException e) {
+            logger.warn("Can not convert key to String.", e);
+        }
+        return null;
+    }
+
+    protected byte[] serializeValue(Object value) {
+        return SerializationUtils.serialize((Serializable) value);
+    }
+
+    @VisibleForTesting
+    byte[] encodeValue(String keyS, Object value) {
+        if (keyS == null) {
+            return null;
+        }
+        return encodeValue(keyS.getBytes(Charsets.UTF_8), serializeValue(value));
+    }
+
+    /**
+     * This method is used to get value object based on key from the Cache. It converts key to json string first.
+     * And then it calls getBinary() method to compute hashed key from the original key string, and use this as the real key to do lookup from internal Cache.
+     * Then decodes the real values bytes from the cache lookup result, and leverages object serializer to convert value bytes to object.
+     */
+    public byte[] get(Object key) {
+        return get(serializeKey(key));
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     */
+    public byte[] get(String keyS) {
+        return getBinary(keyS);
+    }
+
+    /**
+     * This method is used to put key/value objects to the Cache. It converts key to json string and leverages object serializer to convert value object to bytes.
+     * And then it calls putBinary() method to compute hashed key from the original key string and encode the original key bytes into value bytes for hash conflicts detection.
+     */
+    public void put(Object key, Object value) {
+        put(serializeKey(key), value);
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     */
+    public void put(String keyS, Object value) {
+        if (keyS != null) {
+            putBinary(keyS, serializeValue(value), timeToLiveSeconds);
+        }
+    }
+
+    public void evict(Object key) {
+        if (key == null)
+            return;
+        evict(serializeKey(key));
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     */
+    public void evict(String keyS) {
+        if (keyS == null)
+            return;
+        client.delete(computeKeyHash(keyS));
+    }
+
+    public void clear() {
+        logger.warn("Clear Remote Cache!");
+        Future<Boolean> resultFuture = client.flush();
+        try {
+            boolean result = resultFuture.get();
+            logger.warn("Clear Remote Cache returned with result: " + result);
+        } catch (Exception e) {
+            logger.warn("Can't clear Remote Cache.", e);
+        }
+    }
+
+    public CacheStats getStats() {
+        return new CacheStats(readBytes.get(), cacheGetTime.get(), putCount.get(), putBytes.get(), hitCount.get(),
+                missCount.get(), 0, timeoutCount.get(), errorCount.get());
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     * @return the serialized value
+     */
+    protected byte[] getBinary(String keyS) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return null;
+        }
+        byte[] bytes = internalGet(computeKeyHash(keyS));
+        return decodeValue(keyS.getBytes(Charsets.UTF_8), bytes);
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     * @param valueB should be the serialized value
+     */
+    protected void putBinary(String keyS, byte[] valueB, int expiration) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return;
+        }
+        internalPut(computeKeyHash(keyS), encodeValue(keyS.getBytes(Charsets.UTF_8), valueB), expiration);
+    }
+
+    protected byte[] internalGet(String hashedKey) {
+        Future<Object> future;
+        long start = System.currentTimeMillis();
+        try {
+            future = client.asyncGet(hashedKey);
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", e);
+            return null;
+        } catch (Throwable t) {
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", t);
+            return null;
+        }
+
+        try {
+            byte[] result = (byte[]) future.get(config.getTimeout(), TimeUnit.MILLISECONDS);
+            cacheGetTime.addAndGet(System.currentTimeMillis() - start);
+            if (result != null) {
+                hitCount.incrementAndGet();
+                readBytes.addAndGet(result.length);
+            } else {
+                missCount.incrementAndGet();
+            }
+            return result;
+        } catch (TimeoutException e) {
+            timeoutCount.incrementAndGet();
+            future.cancel(false);
+            return null;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw Throwables.propagate(e);
+        } catch (ExecutionException e) {
+            errorCount.incrementAndGet();
+            logger.error("ExecutionException when pulling key meta from cache.", e);
+            return null;
+        }
+    }
+
+    private void internalPut(String hashedKey, byte[] encodedValue, int expiration) {
+        try {
+            client.set(hashedKey, expiration, encodedValue);
+            putCount.incrementAndGet();
+            putBytes.addAndGet(encodedValue.length);
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", e);
+        } catch (Throwable t) {
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", t);
+        }
+    }
+
+    protected byte[] encodeValue(byte[] key, byte[] valueB) {
+        byte[] compressed = null;
+        if (config.isEnableCompression() && (valueB.length + Ints.BYTES + key.length > compressThreshold)) {
+            try {
+                compressed = CompressionUtils.compress(ByteBuffer.allocate(Ints.BYTES + key.length + valueB.length)
+                        .putInt(key.length).put(key).put(valueB).array());
+            } catch (IOException e) {
+                compressed = null;
+                logger.warn("Compressing value bytes error.", e);
+            }
+        }
+        if (compressed != null) {
+            return ByteBuffer.allocate(Shorts.BYTES + compressed.length).putShort((short) 1).put(compressed).array();
+        } else {
+            return ByteBuffer.allocate(Shorts.BYTES + Ints.BYTES + key.length + valueB.length).putShort((short) 0)
+                    .putInt(key.length).put(key).put(valueB).array();
+        }
+    }
+
+    protected byte[] decodeValue(byte[] key, byte[] valueE) {
+        if (valueE == null)
+            return null;
+        ByteBuffer buf = ByteBuffer.wrap(valueE);
+        short enableCompression = buf.getShort();
+        byte[] uncompressed = null;
+        if (enableCompression == 1) {
+            byte[] value = new byte[buf.remaining()];
+            buf.get(value);
+            try {
+                uncompressed = CompressionUtils.decompress(value);
+            } catch (IOException | DataFormatException e) {
+                logger.error("Decompressing value bytes error.", e);
+                return null;
+            }
+        }
+        if (uncompressed != null) {
+            buf = ByteBuffer.wrap(uncompressed);
+        }
+        final int keyLength = buf.getInt();
+        byte[] keyBytes = new byte[keyLength];
+        buf.get(keyBytes);
+        if (!Arrays.equals(keyBytes, key)) {
+            logger.error("Keys do not match, possible hash collision!");
+            return null;
+        }
+        byte[] value = new byte[buf.remaining()];
+        buf.get(value);
+        return value;
+    }
+
+    protected String computeKeyHash(String key) {
+        // hash keys to keep things under 250 characters for memcached
+        return Joiner.on(":").skipNulls().join(KylinConfig.getInstanceFromEnv().getDeployEnv(), this.memcachedPrefix,
+                DigestUtils.shaHex(key));
+
+    }
+
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCacheConfig.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCacheConfig.java
new file mode 100644
index 0000000..d71c279
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCacheConfig.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import net.spy.memcached.DefaultConnectionFactory;
+
+public class MemcachedCacheConfig {
+    private long timeout = 500L;
+
+    // comma delimited list of memcached servers, given as host:port combination
+    private String hosts;
+
+    private int maxChunkSize = 1024;
+
+    private int maxObjectSize = 1024 * 1024;
+
+    // memcached client read buffer size, -1 uses the spymemcached library default
+    private int readBufferSize = DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE;
+
+    // maximum operation queue size. 0 means unbounded
+    private int maxOperationQueueSize = 0;
+
+    // whether enable compress the value data or not
+    private boolean enableCompression = true;
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    public String getHosts() {
+        return hosts;
+    }
+
+    public void setHosts(String hosts) {
+        this.hosts = hosts;
+    }
+
+    public int getMaxChunkSize() {
+        return maxChunkSize;
+    }
+
+    public void setMaxChunkSize(int maxChunkSize) {
+        this.maxChunkSize = maxChunkSize;
+    }
+
+    public int getMaxObjectSize() {
+        return maxObjectSize;
+    }
+
+    public void setMaxObjectSize(int maxObjectSize) {
+        this.maxObjectSize = maxObjectSize;
+    }
+
+    public int getMaxOperationQueueSize() {
+        return maxOperationQueueSize;
+    }
+
+    public void setMaxOperationQueueSize(int maxOperationQueueSize) {
+        this.maxOperationQueueSize = maxOperationQueueSize;
+    }
+
+    public int getReadBufferSize() {
+        return readBufferSize;
+    }
+
+    public void setReadBufferSize(int readBufferSize) {
+        this.readBufferSize = readBufferSize;
+    }
+
+    public boolean isEnableCompression() {
+        return enableCompression;
+    }
+
+    public void setEnableCompression(boolean enableCompression) {
+        this.enableCompression = enableCompression;
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedChunkingCache.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedChunkingCache.java
new file mode 100644
index 0000000..e79e717
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedChunkingCache.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.common.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Shorts;
+
+import net.spy.memcached.internal.BulkFuture;
+
+/**
+ * Subclass of MemcachedCache. It supports storing large objects.  Memcached itself has a limitation to the value size with default value of 1M.
+ * This implement extends the limit to 1G and can split huge bytes to multiple chunks. It will take care of the data integrity if part of the chunks lost(due to server restart or other reasons)
+ *
+ * @author mingmwang
+ */
+public class MemcachedChunkingCache extends MemcachedCache implements KeyHookLookup {
+    private static final Logger logger = LoggerFactory.getLogger(MemcachedChunkingCache.class);
+
+    public MemcachedChunkingCache(MemcachedCache cache) {
+        super(cache);
+        Preconditions.checkArgument(config.getMaxChunkSize() > 1, "maxChunkSize [%d] must be greater than 1",
+                config.getMaxChunkSize());
+        Preconditions.checkArgument(config.getMaxObjectSize() > 261, "maxObjectSize [%d] must be greater than 261",
+                config.getMaxObjectSize());
+    }
+
+    protected static byte[][] splitBytes(final byte[] data, final int nSplit) {
+        byte[][] dest = new byte[nSplit][];
+
+        final int splitSize = (data.length - 1) / nSplit + 1;
+        for (int i = 0; i < nSplit - 1; i++) {
+            dest[i] = Arrays.copyOfRange(data, i * splitSize, (i + 1) * splitSize);
+        }
+        dest[nSplit - 1] = Arrays.copyOfRange(data, (nSplit - 1) * splitSize, data.length);
+
+        return dest;
+    }
+
+    protected static int getValueSplit(MemcachedCacheConfig config, String keyS, int valueBLen) {
+        // the number 6 means the chunk number size never exceeds 6 bytes
+        final int valueSize = config.getMaxObjectSize() - Shorts.BYTES - Ints.BYTES
+                - keyS.getBytes(Charsets.UTF_8).length - 6;
+        final int maxValueSize = config.getMaxChunkSize() * valueSize;
+        Preconditions.checkArgument(valueBLen <= maxValueSize,
+                "the value bytes length [%d] exceeds maximum value size [%d]", valueBLen, maxValueSize);
+        return (valueBLen - 1) / valueSize + 1;
+    }
+
+    protected static Pair<KeyHook, byte[][]> getKeyValuePair(int nSplit, String keyS, byte[] valueB) {
+        KeyHook keyHook;
+        byte[][] splitValueB = null;
+        if (nSplit > 1) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Enable chunking for putting large cached object values, chunk size = " + nSplit
+                        + ", original value bytes size = " + valueB.length);
+            }
+            String[] chunkKeySs = new String[nSplit];
+            for (int i = 0; i < nSplit; i++) {
+                chunkKeySs[i] = keyS + i;
+            }
+            keyHook = new KeyHook(chunkKeySs, null);
+            splitValueB = splitBytes(valueB, nSplit);
+        } else {
+            if (logger.isDebugEnabled()) {
+                logger.debug(
+                        "Chunking not enabled, put the original value bytes to keyhook directly, original value bytes size = "
+                                + valueB.length);
+            }
+            keyHook = new KeyHook(null, valueB);
+        }
+
+        return new Pair<>(keyHook, splitValueB);
+    }
+
+    /**
+     * This method overrides the parent getBinary(), it gets the KeyHook from the Cache first and check the KeyHook that whether chunking is enabled or not.
+     */
+    @Override
+    public byte[] getBinary(String keyS) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return null;
+        }
+        KeyHook keyHook = lookupKeyHook(keyS);
+        if (keyHook == null) {
+            return null;
+        }
+
+        if (keyHook.getChunkskey() == null || keyHook.getChunkskey().length == 0) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Chunking not enabled, return the value bytes in the keyhook directly, value bytes size = "
+                        + keyHook.getValues().length);
+            }
+            return keyHook.getValues();
+        }
+
+        BulkFuture<Map<String, Object>> bulkFuture;
+        long start = System.currentTimeMillis();
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Chunking enabled, chunk size = " + keyHook.getChunkskey().length);
+        }
+
+        Map<String, String> keyLookup = computeKeyHash(Arrays.asList(keyHook.getChunkskey()));
+        try {
+            bulkFuture = client.asyncGetBulk(keyLookup.keySet());
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", e);
+            return null;
+        } catch (Throwable t) {
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", t);
+            return null;
+        }
+
+        try {
+            Map<String, Object> bulkResult = bulkFuture.get(config.getTimeout(), TimeUnit.MILLISECONDS);
+            cacheGetTime.addAndGet(System.currentTimeMillis() - start);
+            if (bulkResult.size() != keyHook.getChunkskey().length) {
+                missCount.incrementAndGet();
+                logger.warn("Some paritial chunks missing for query key:" + keyS);
+                //remove all the partital chunks here.
+                for (String partitalKey : bulkResult.keySet()) {
+                    client.delete(partitalKey);
+                }
+                deleteKeyHook(keyS);
+                return null;
+            }
+            hitCount.getAndAdd(keyHook.getChunkskey().length);
+            byte[][] bytesArray = new byte[keyHook.getChunkskey().length][];
+            for (Map.Entry<String, Object> entry : bulkResult.entrySet()) {
+                byte[] bytes = (byte[]) entry.getValue();
+                readBytes.addAndGet(bytes.length);
+                String originalKeyS = keyLookup.get(entry.getKey());
+                int idx = Integer.parseInt(originalKeyS.substring(keyS.length()));
+                bytesArray[idx] = decodeValue(originalKeyS.getBytes(Charsets.UTF_8), bytes);
+            }
+            return concatBytes(bytesArray);
+        } catch (TimeoutException e) {
+            timeoutCount.incrementAndGet();
+            bulkFuture.cancel(false);
+            return null;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw Throwables.propagate(e);
+        } catch (ExecutionException e) {
+            errorCount.incrementAndGet();
+            logger.error("ExecutionException when pulling item from cache.", e);
+            return null;
+        }
+    }
+
+    /**
+     * This method overrides the parent putBinary() method. It will split the large value bytes into multiple chunks to fit into the internal Cache.
+     * It generates a KeyHook to store the splitted chunked keys.
+     */
+    @Override
+    public void putBinary(String keyS, byte[] valueB, int expiration) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return;
+        }
+        int nSplit = getValueSplit(config, keyS, valueB.length);
+        Pair<KeyHook, byte[][]> keyValuePair = getKeyValuePair(nSplit, keyS, valueB);
+        KeyHook keyHook = keyValuePair.getFirst();
+        byte[][] splitValueB = keyValuePair.getSecond();
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("put key hook:{} to cache for hash key", keyHook);
+        }
+        super.putBinary(keyS, serializeValue(keyHook), expiration);
+        if (nSplit > 1) {
+            for (int i = 0; i < nSplit; i++) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Chunk[" + i + "] bytes size before encoding  = " + splitValueB[i].length);
+                }
+                super.putBinary(keyHook.getChunkskey()[i], splitValueB[i], expiration);
+            }
+        }
+    }
+
+    public void evict(String keyS) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return;
+        }
+        KeyHook keyHook = lookupKeyHook(keyS);
+        if (keyHook == null) {
+            return;
+        }
+
+        if (keyHook.getChunkskey() != null && keyHook.getChunkskey().length > 0) {
+            String[] chunkKeys = keyHook.getChunkskey();
+            for (String chunkKey : chunkKeys) {
+                super.evict(chunkKey);
+            }
+        }
+        super.evict(keyS);
+    }
+
+    protected Map<String, String> computeKeyHash(List<String> keySList) {
+        return Maps.uniqueIndex(keySList, new Function<String, String>() {
+            @Override
+            public String apply(String keyS) {
+                return computeKeyHash(keyS);
+            }
+        });
+    }
+
+    private void deleteKeyHook(String keyS) {
+        try {
+            super.evict(keyS);
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation: ", e);
+        }
+    }
+
+    private byte[] concatBytes(byte[]... bytesArray) {
+        int length = 0;
+        for (byte[] bytes : bytesArray) {
+            length += bytes.length;
+        }
+        byte[] result = new byte[length];
+        int destPos = 0;
+        for (byte[] bytes : bytesArray) {
+            System.arraycopy(bytes, 0, result, destPos, bytes.length);
+            destPos += bytes.length;
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug("Original value bytes size for all chunks  = " + result.length);
+        }
+
+        return result;
+    }
+
+    @Override
+    public KeyHook lookupKeyHook(String keyS) {
+        byte[] bytes = super.getBinary(keyS);
+        if (bytes == null) {
+            return null;
+        }
+        return (KeyHook) SerializationUtils.deserialize(bytes);
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactory.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactory.java
new file mode 100644
index 0000000..fe48d3e
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactory.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.kylin.common.KylinConfig;
+
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.ConnectionObserver;
+import net.spy.memcached.DefaultConnectionFactory;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.HashAlgorithm;
+import net.spy.memcached.MemcachedConnection;
+import net.spy.memcached.MemcachedNode;
+import net.spy.memcached.NodeLocator;
+import net.spy.memcached.OperationFactory;
+import net.spy.memcached.auth.AuthDescriptor;
+import net.spy.memcached.compat.SpyObject;
+import net.spy.memcached.metrics.MetricCollector;
+import net.spy.memcached.metrics.MetricType;
+import net.spy.memcached.metrics.NoopMetricCollector;
+import net.spy.memcached.ops.Operation;
+import net.spy.memcached.transcoders.Transcoder;
+
+public class MemcachedConnectionFactory extends SpyObject implements ConnectionFactory {
+    private ConnectionFactory underlying;
+    private Map<String, String> metricsConfig = KylinConfig.getInstanceFromEnv().getKylinMetricsConf();
+
+    public MemcachedConnectionFactory(ConnectionFactory underlying) {
+        this.underlying = underlying;
+    }
+
+    @Override
+    public MetricType enableMetrics() {
+        String metricType = metricsConfig.get("memcached.metricstype");
+        return metricType == null ? DefaultConnectionFactory.DEFAULT_METRIC_TYPE
+                : MetricType.valueOf(metricType.toUpperCase(Locale.ROOT));
+    }
+
+    @Override
+    public MetricCollector getMetricCollector() {
+        String enableMetrics = metricsConfig.get("memcached.enabled");
+        if (enableMetrics().equals(MetricType.OFF) || enableMetrics == null
+                || "false".equalsIgnoreCase(enableMetrics)) {
+            getLogger().debug("Memcached metrics collection disabled.");
+            return new NoopMetricCollector();
+        } else {
+            getLogger().info("Memcached metrics collection enabled (Profile " + enableMetrics() + ").");
+            return new MemcachedMetrics();
+        }
+    }
+
+    @Override
+    public MemcachedConnection createConnection(List<InetSocketAddress> addrs) throws IOException {
+        return underlying.createConnection(addrs);
+    }
+
+    @Override
+    public MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c, int bufSize) {
+        return underlying.createMemcachedNode(sa, c, bufSize);
+    }
+
+    @Override
+    public BlockingQueue<Operation> createOperationQueue() {
+        return underlying.createOperationQueue();
+    }
+
+    @Override
+    public BlockingQueue<Operation> createReadOperationQueue() {
+        return underlying.createReadOperationQueue();
+    }
+
+    @Override
+    public BlockingQueue<Operation> createWriteOperationQueue() {
+        return underlying.createWriteOperationQueue();
+    }
+
+    @Override
+    public long getOpQueueMaxBlockTime() {
+        return underlying.getOpQueueMaxBlockTime();
+    }
+
+    @Override
+    public ExecutorService getListenerExecutorService() {
+        return underlying.getListenerExecutorService();
+    }
+
+    @Override
+    public boolean isDefaultExecutorService() {
+        return underlying.isDefaultExecutorService();
+    }
+
+    @Override
+    public NodeLocator createLocator(List<MemcachedNode> nodes) {
+        return underlying.createLocator(nodes);
+    }
+
+    @Override
+    public OperationFactory getOperationFactory() {
+        return underlying.getOperationFactory();
+    }
+
+    @Override
+    public long getOperationTimeout() {
+        return underlying.getOperationTimeout();
+    }
+
+    @Override
+    public boolean isDaemon() {
+        return underlying.isDaemon();
+    }
+
+    @Override
+    public boolean useNagleAlgorithm() {
+        return underlying.useNagleAlgorithm();
+    }
+
+    @Override
+    public Collection<ConnectionObserver> getInitialObservers() {
+        return underlying.getInitialObservers();
+    }
+
+    @Override
+    public FailureMode getFailureMode() {
+        return underlying.getFailureMode();
+    }
+
+    @Override
+    public Transcoder<Object> getDefaultTranscoder() {
+        return underlying.getDefaultTranscoder();
+    }
+
+    @Override
+    public boolean shouldOptimize() {
+        return underlying.shouldOptimize();
+    }
+
+    @Override
+    public int getReadBufSize() {
+        return underlying.getReadBufSize();
+    }
+
+    @Override
+    public HashAlgorithm getHashAlg() {
+        return underlying.getHashAlg();
+    }
+
+    @Override
+    public long getMaxReconnectDelay() {
+        return underlying.getMaxReconnectDelay();
+    }
+
+    @Override
+    public AuthDescriptor getAuthDescriptor() {
+        return underlying.getAuthDescriptor();
+    }
+
+    @Override
+    public int getTimeoutExceptionThreshold() {
+        return underlying.getTimeoutExceptionThreshold();
+    }
+
+    @Override
+    public long getAuthWaitTime() {
+        return underlying.getAuthWaitTime();
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactoryBuilder.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactoryBuilder.java
new file mode 100644
index 0000000..97af4a6
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactoryBuilder.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+
+import net.spy.memcached.ArrayModNodeLocator;
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.ConnectionFactoryBuilder;
+import net.spy.memcached.ConnectionObserver;
+import net.spy.memcached.DefaultConnectionFactory;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.HashAlgorithm;
+import net.spy.memcached.MemcachedNode;
+import net.spy.memcached.NodeLocator;
+import net.spy.memcached.OperationFactory;
+import net.spy.memcached.RefinedKetamaNodeLocator;
+import net.spy.memcached.auth.AuthDescriptor;
+import net.spy.memcached.metrics.MetricCollector;
+import net.spy.memcached.metrics.MetricType;
+import net.spy.memcached.ops.Operation;
+import net.spy.memcached.transcoders.Transcoder;
+
+public class MemcachedConnectionFactoryBuilder extends ConnectionFactoryBuilder {
+    /**
+     * Get the ConnectionFactory set up with the provided parameters.
+     */
+    public ConnectionFactory build() {
+        return new DefaultConnectionFactory() {
+
+            @Override
+            public BlockingQueue<Operation> createOperationQueue() {
+                return opQueueFactory == null ? super.createOperationQueue() : opQueueFactory.create();
+            }
+
+            @Override
+            public BlockingQueue<Operation> createReadOperationQueue() {
+                return readQueueFactory == null ? super.createReadOperationQueue() : readQueueFactory.create();
+            }
+
+            @Override
+            public BlockingQueue<Operation> createWriteOperationQueue() {
+                return writeQueueFactory == null ? super.createReadOperationQueue() : writeQueueFactory.create();
+            }
+
+            @Override
+            public NodeLocator createLocator(List<MemcachedNode> nodes) {
+                switch (locator) {
+                case ARRAY_MOD:
+                    return new ArrayModNodeLocator(nodes, getHashAlg());
+                case CONSISTENT:
+                    return new RefinedKetamaNodeLocator(nodes, getHashAlg());
+                default:
+                    throw new IllegalStateException("Unhandled locator type: " + locator);
+                }
+            }
+
+            @Override
+            public Transcoder<Object> getDefaultTranscoder() {
+                return transcoder == null ? super.getDefaultTranscoder() : transcoder;
+            }
+
+            @Override
+            public FailureMode getFailureMode() {
+                return failureMode == null ? super.getFailureMode() : failureMode;
+            }
+
+            @Override
+            public HashAlgorithm getHashAlg() {
+                return hashAlg == null ? super.getHashAlg() : hashAlg;
+            }
+
+            public Collection<ConnectionObserver> getInitialObservers() {
+                return initialObservers;
+            }
+
+            @Override
+            public OperationFactory getOperationFactory() {
+                return opFact == null ? super.getOperationFactory() : opFact;
+            }
+
+            @Override
+            public long getOperationTimeout() {
+                return opTimeout == -1 ? super.getOperationTimeout() : opTimeout;
+            }
+
+            @Override
+            public int getReadBufSize() {
+                return readBufSize == -1 ? super.getReadBufSize() : readBufSize;
+            }
+
+            @Override
+            public boolean isDaemon() {
+                return isDaemon;
+            }
+
+            @Override
+            public boolean shouldOptimize() {
+                return shouldOptimize;
+            }
+
+            @Override
+            public boolean useNagleAlgorithm() {
+                return useNagle;
+            }
+
+            @Override
+            public long getMaxReconnectDelay() {
+                return maxReconnectDelay;
+            }
+
+            @Override
+            public AuthDescriptor getAuthDescriptor() {
+                return authDescriptor;
+            }
+
+            @Override
+            public long getOpQueueMaxBlockTime() {
+                return opQueueMaxBlockTime > -1 ? opQueueMaxBlockTime : super.getOpQueueMaxBlockTime();
+            }
+
+            @Override
+            public int getTimeoutExceptionThreshold() {
+                return timeoutExceptionThreshold;
+            }
+
+            @Override
+            public MetricType enableMetrics() {
+                return metricType == null ? super.enableMetrics() : metricType;
+            }
+
+            @Override
+            public MetricCollector getMetricCollector() {
+                return collector == null ? super.getMetricCollector() : collector;
+            }
+
+            @Override
+            public ExecutorService getListenerExecutorService() {
+                return executorService == null ? super.getListenerExecutorService() : executorService;
+            }
+
+            @Override
+            public boolean isDefaultExecutorService() {
+                return executorService == null;
+            }
+
+            @Override
+            public long getAuthWaitTime() {
+                return authWaitTime;
+            }
+        };
+
+    }
+}
\ No newline at end of file
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedMetrics.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedMetrics.java
new file mode 100644
index 0000000..ada9144
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedMetrics.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import static org.apache.kylin.metrics.lib.impl.MetricsSystem.Metrics;
+
+import java.util.Map;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.google.common.collect.Maps;
+
+import net.spy.memcached.metrics.AbstractMetricCollector;
+import net.spy.memcached.metrics.DefaultMetricCollector;
+import net.spy.memcached.metrics.MetricCollector;
+
+/**
+ * A {@link MetricCollector} that uses the Codahale Metrics library.
+ *
+ * The following system properies can be used to customize the behavior
+ * of the collector during runtime:
+ */
+public final class MemcachedMetrics extends AbstractMetricCollector {
+
+    /**
+     * Contains all registered {@link Counter}s.
+     */
+    private Map<String, Counter> counters;
+
+    /**
+     * Contains all registered {@link Meter}s.
+     */
+    private Map<String, Meter> meters;
+
+    /**
+     * Contains all registered {@link Histogram}s.
+     */
+    private Map<String, Histogram> histograms;
+
+    /**
+     * Create a new {@link DefaultMetricCollector}.
+     *
+     * Note that when this constructor is called, the reporter is also
+     * automatically established.
+     */
+    public MemcachedMetrics() {
+        counters = Maps.newConcurrentMap();
+        meters = Maps.newConcurrentMap();
+        histograms = Maps.newConcurrentMap();
+    }
+
+    @Override
+    public void addCounter(String name) {
+        if (!counters.containsKey(name)) {
+            counters.put(name, Metrics.counter(name));
+        }
+    }
+
+    @Override
+    public void removeCounter(String name) {
+        if (!counters.containsKey(name)) {
+            Metrics.remove(name);
+            counters.remove(name);
+        }
+    }
+
+    @Override
+    public void incrementCounter(String name, int amount) {
+        if (counters.containsKey(name)) {
+            counters.get(name).inc(amount);
+        }
+    }
+
+    @Override
+    public void decrementCounter(String name, int amount) {
+        if (counters.containsKey(name)) {
+            counters.get(name).dec(amount);
+        }
+    }
+
+    @Override
+    public void addMeter(String name) {
+        if (!meters.containsKey(name)) {
+            meters.put(name, Metrics.meter(name));
+        }
+    }
+
+    @Override
+    public void removeMeter(String name) {
+        if (meters.containsKey(name)) {
+            meters.remove(name);
+        }
+    }
+
+    @Override
+    public void markMeter(String name) {
+        if (meters.containsKey(name)) {
+            meters.get(name).mark();
+        }
+    }
+
+    @Override
+    public void addHistogram(String name) {
+        if (!histograms.containsKey(name)) {
+            histograms.put(name, Metrics.histogram(name));
+        }
+    }
+
+    @Override
+    public void removeHistogram(String name) {
+        if (histograms.containsKey(name)) {
+            histograms.remove(name);
+        }
+    }
+
+    @Override
+    public void updateHistogram(String name, int amount) {
+        if (histograms.containsKey(name)) {
+            histograms.get(name).update(amount);
+        }
+    }
+}
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 778b5bf..159137b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1770,6 +1770,10 @@ abstract public class KylinConfigBase implements Serializable {
                 + getKylinMetricsSubjectSuffix();
     }
 
+    public Map<String, String> getKylinMetricsConf() {
+        return getPropertiesByPrefix("kylin.metrics.");
+    }
+
     // ============================================================================
     // tool
     // ============================================================================
diff --git a/pom.xml b/pom.xml
index 264583d..6cb6716 100644
--- a/pom.xml
+++ b/pom.xml
@@ -115,6 +115,7 @@
     <xerces.version>2.11.0</xerces.version>
     <xalan.version>2.7.2</xalan.version>
     <ehcache.version>2.10.2.2.21</ehcache.version>
+    <memcached.verion>2.12.3</memcached.verion>
     <apache-httpclient.version>4.2.5</apache-httpclient.version>
     <roaring.version>0.6.18</roaring.version>
     <cglib.version>3.2.4</cglib.version>
@@ -261,6 +262,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.kylin</groupId>
+        <artifactId>kylin-cache</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kylin</groupId>
         <artifactId>kylin-engine-mr</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -731,6 +737,11 @@
         <artifactId>metrics-core</artifactId>
         <version>${dropwizard.version}</version>
       </dependency>
+      <dependency>
+        <groupId>io.dropwizard.metrics</groupId>
+        <artifactId>metrics-jvm</artifactId>
+        <version>${dropwizard.version}</version>
+      </dependency>
 
       <!-- Test -->
       <dependency>
@@ -817,6 +828,11 @@
         <version>${ehcache.version}</version>
       </dependency>
       <dependency>
+        <groupId>net.spy</groupId>
+        <artifactId>spymemcached</artifactId>
+        <version>${memcached.verion}</version>
+      </dependency>
+      <dependency>
         <groupId>org.opensaml</groupId>
         <artifactId>opensaml</artifactId>
         <version>${opensaml.version}</version>
@@ -1246,6 +1262,7 @@
     <module>core-metrics</module>
     <module>metrics-reporter-hive</module>
     <module>metrics-reporter-kafka</module>
+    <module>cache</module>
   </modules>
 
   <reporting>