You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/11/13 11:02:12 UTC
[1/2] incubator-kylin git commit: KYLIN-1144 allow disable/enable
storage cache on both server level and query level
Repository: incubator-kylin
Updated Branches:
refs/heads/2.x-staging b918d3e23 -> ae0f1a72e
KYLIN-1144 allow disable/enable storage cache on both server level and query level
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/692438f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/692438f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/692438f6
Branch: refs/heads/2.x-staging
Commit: 692438f67979866e4395188064e3e1d752b9e83d
Parents: b918d3e
Author: honma <ho...@ebay.com>
Authored: Fri Nov 13 11:40:22 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Nov 13 17:18:05 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/KylinConfig.java | 12 +-
.../kylin/common/debug/BackdoorToggles.java | 16 +++
.../org/apache/kylin/common/util/RangeUtil.java | 4 +-
.../metadata/realization/StreamSQLDigest.java | 2 +-
.../cache/AbstractCacheFledgedQuery.java | 26 +++-
.../storage/cache/CacheFledgedDynamicQuery.java | 118 ++++++++++---------
.../storage/cache/CacheFledgedStaticQuery.java | 45 ++-----
.../kylin/rest/controller/QueryController.java | 78 ++++++------
.../apache/kylin/rest/service/QueryService.java | 11 +-
.../kylin/storage/hbase/HBaseStorage.java | 10 +-
10 files changed, 176 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 709e5aa..e76aa94 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -23,14 +23,18 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.Properties;
import java.util.SortedSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import com.google.common.collect.Sets;
-
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.restclient.RestClient;
@@ -38,9 +42,7 @@ import org.apache.kylin.common.util.CliCommandExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.*;
-import java.util.Map;
-import java.util.Properties;
+import com.google.common.collect.Sets;
/**
*/
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
index 1e26557..7304b8b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
+++ b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
@@ -47,6 +47,10 @@ public class BackdoorToggles {
return getString(DEBUG_TOGGLE_HBASE_CUBE_QUERY_PROTOCOL);
}
+ public static boolean getDisableCache() {
+ return getBoolean(DEBUG_TOGGLE_DISABLE_QUERY_CACHE);
+ }
+
public static boolean getDisableFuzzyKey() {
return getBoolean(DEBUG_TOGGLE_DISABLE_FUZZY_KEY);
}
@@ -85,6 +89,18 @@ public class BackdoorToggles {
public final static String DEBUG_TOGGLE_DISABLE_FUZZY_KEY = "DEBUG_TOGGLE_DISABLE_FUZZY_KEY";
/**
+ * set DEBUG_TOGGLE_DISABLE_QUERY_CACHE=true to prevent using cache for current query
+ *
+ *
+ *
+ example:(put it into request body)
+ "backdoorToggles": {
+ "DEBUG_TOGGLE_DISABLE_QUERY_CACHE": "true"
+ }
+ */
+ public final static String DEBUG_TOGGLE_DISABLE_QUERY_CACHE = "DEBUG_TOGGLE_DISABLE_QUERY_CACHE";
+
+ /**
* set DEBUG_TOGGLE_HBASE_CUBE_QUERY_VERSION=v1/v2 to control which version CubeStorageQuery to use
*
example:(put it into request body)
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/core-common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/RangeUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
index 8fa5535..33dfb83 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
@@ -63,15 +63,17 @@ public class RangeUtil {
* @return
*/
public static <C extends Comparable<?>> List<Range<C>> remove(Range<C> self, Range<C> other) {
+
// mimic the following logic in guava 18:
// RangeSet<C> rangeSet = TreeRangeSet.create();
// rangeSet.add(self);
// rangeSet.remove(other);
// return Lists.newArrayList(rangeSet.asRanges());
- if (!self.isConnected(other)) {
+ if (other == null || !self.isConnected(other)) {
return Collections.singletonList(self);
}
+
Range<C> share = self.intersection(other);
if (share.isEmpty()) {
return Collections.singletonList(self);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/core-metadata/src/main/java/org/apache/kylin/metadata/realization/StreamSQLDigest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/StreamSQLDigest.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/StreamSQLDigest.java
index edbe897..1420470 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/StreamSQLDigest.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/StreamSQLDigest.java
@@ -28,7 +28,7 @@ public class StreamSQLDigest {
filterSerialized = TupleFilterSerializer.serialize(sqlDigest.filter, decorator, StringCodeSystem.INSTANCE);
int nonFilterHashCode = calculateNonFilterHashCode();
- this.hashCode = 31 * nonFilterHashCode + (filterSerialized != null ? Arrays.hashCode(filterSerialized) : 0);
+ this.hashCode = 31 * nonFilterHashCode + Arrays.hashCode(filterSerialized);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
index 5ffdf91..a7bdae0 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
@@ -2,12 +2,14 @@ package org.apache.kylin.storage.cache;
import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.Element;
import net.sf.ehcache.config.CacheConfiguration;
import net.sf.ehcache.config.Configuration;
import net.sf.ehcache.config.MemoryUnit;
import net.sf.ehcache.config.PersistenceConfiguration;
import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.metadata.realization.StreamSQLDigest;
import org.apache.kylin.metadata.tuple.TeeTupleItrListener;
import org.apache.kylin.storage.ICachableStorageQuery;
@@ -23,7 +25,6 @@ public abstract class AbstractCacheFledgedQuery implements IStorageQuery, TeeTup
protected static CacheManager CACHE_MANAGER;
- protected boolean queryCacheExists;
protected ICachableStorageQuery underlyingStorage;
protected StreamSQLDigest streamSQLDigest;
@@ -53,6 +54,29 @@ public abstract class AbstractCacheFledgedQuery implements IStorageQuery, TeeTup
CACHE_MANAGER.addCache(storageCache);
}
+ protected StreamSQLResult getStreamSQLResult(StreamSQLDigest streamSQLDigest) {
+
+ Cache cache = CACHE_MANAGER.getCache(this.underlyingStorage.getStorageUUID());
+ Element element = cache.get(streamSQLDigest.hashCode());//TODO: hash code cannot guarantee uniqueness
+ if (element != null) {
+ return (StreamSQLResult) element.getObjectValue();
+ }
+ return null;
+ }
+
+ protected boolean needSaveCache(long createTime) {
+ long storageQueryTime = System.currentTimeMillis() - createTime;
+ long durationThreshold = KylinConfig.getInstanceFromEnv().getQueryDurationCacheThreshold();
+ //TODO: check scan count necessary?
+
+ if (storageQueryTime < durationThreshold) {
+ logger.info("Skip saving storage caching for storage cache because storage query time {} less than {}", storageQueryTime, durationThreshold);
+ return false;
+ }
+
+ return true;
+ }
+
private void makeCacheIfNecessary(String storageUUID) {
if (CACHE_MANAGER == null) {
logger.warn("CACHE_MANAGER is not provided");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
index febe1a9..b9ae0e9 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
@@ -2,7 +2,6 @@ package org.apache.kylin.storage.cache;
import java.util.List;
-import net.sf.ehcache.Cache;
import net.sf.ehcache.Element;
import org.apache.kylin.common.util.RangeUtil;
@@ -33,6 +32,8 @@ public class CacheFledgedDynamicQuery extends AbstractCacheFledgedQuery {
private final TblColRef partitionColRef;
+ private boolean noCacheUsed = true;
+
private Range<Long> ts;
public CacheFledgedDynamicQuery(ICachableStorageQuery underlyingStorage, TblColRef partitionColRef) {
@@ -52,70 +53,31 @@ public class CacheFledgedDynamicQuery extends AbstractCacheFledgedQuery {
return ITupleIterator.EMPTY_TUPLE_ITERATOR;
}
+ ITupleIterator ret = null;
+
//enable dynamic cache iff group by columns contains partition col
//because cache extraction requires partition col value as selection key
- boolean needUpdateCache = sqlDigest.groupbyColumns.contains(partitionColRef);
-
- streamSQLDigest = new StreamSQLDigest(sqlDigest, partitionColRef);
- StreamSQLResult cachedResult = null;
- Cache cache = CACHE_MANAGER.getCache(this.underlyingStorage.getStorageUUID());
- Element element = cache.get(streamSQLDigest.hashCode());
- if (element != null) {
- this.queryCacheExists = true;
- cachedResult = (StreamSQLResult) element.getObjectValue();
- }
-
- ITupleIterator ret = null;
- if (cachedResult != null) {
- Range<Long> reusePeriod = cachedResult.getReusableResults(ts);
-
- logger.info("existing cache : " + cachedResult);
- logger.info("ts Range in query: " + RangeUtil.formatTsRange(ts));
- logger.info("potential reusable range : " + RangeUtil.formatTsRange(reusePeriod));
-
- if (reusePeriod != null) {
- List<Range<Long>> remainings = RangeUtil.remove(ts, reusePeriod);
- if (remainings.size() == 1) {//if using cache causes two underlyingStorage searches, we'd rather not use the cache
-
- SimpleTupleIterator reusedTuples = new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
- List<ITupleIterator> iTupleIteratorList = Lists.newArrayList();
- iTupleIteratorList.add(reusedTuples);
-
- for (Range<Long> remaining : remainings) {//actually there will be only one loop
- logger.info("Appending ts " + RangeUtil.formatTsRange(remaining) + " as additional filter");
-
- ITupleIterator freshTuples = SQLDigestUtil.appendTsFilterToExecute(sqlDigest, partitionColRef, remaining, new Function<Void, ITupleIterator>() {
- @Override
- public ITupleIterator apply(Void input) {
- return underlyingStorage.search(context, sqlDigest, returnTupleInfo);
- }
- });
- iTupleIteratorList.add(freshTuples);
- }
+ boolean enableDynamicCache = sqlDigest.groupbyColumns.contains(partitionColRef);
- ret = new CompoundTupleIterator(iTupleIteratorList);
- } else if (remainings.size() == 0) {
- logger.info("The ts range in new query was fully cached");
- needUpdateCache = false;
- ret = new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
- } else {
- //if using cache causes more than one underlyingStorage searches
- //the incurred overhead might be more expensive than the cache benefit
- logger.info("Give up using cache to avoid complexity");
- }
+ if (enableDynamicCache) {
+ StreamSQLResult cachedResult = getStreamSQLResult(new StreamSQLDigest(sqlDigest, partitionColRef));
+ if (cachedResult != null) {
+ ret = tryReuseCache(context, sqlDigest, returnTupleInfo, ret, cachedResult);
+ } else {
+ logger.info("no cache entry for this query");
}
- } else {
- logger.info("no cache entry for this query");
}
if (ret == null) {
ret = underlyingStorage.search(context, sqlDigest, returnTupleInfo);
+ noCacheUsed = true;
logger.info("No Cache being used");
} else {
+ noCacheUsed = false;
logger.info("Cache being used");
}
- if (needUpdateCache) {
+ if (enableDynamicCache) {
//use another nested ITupleIterator to deal with cache
final TeeTupleIterator tee = new TeeTupleIterator(ret);
tee.addCloseListener(this);
@@ -125,15 +87,65 @@ public class CacheFledgedDynamicQuery extends AbstractCacheFledgedQuery {
}
}
+ /**
+ * if cache is not enough it will try to combine existing cache as well as fresh records
+ */
+ private ITupleIterator tryReuseCache(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo, ITupleIterator ret, StreamSQLResult cachedResult) {
+ Range<Long> reusePeriod = cachedResult.getReusableResults(ts);
+
+ logger.info("existing cache: " + cachedResult);
+ logger.info("ts Range in query: " + RangeUtil.formatTsRange(ts));
+ logger.info("potential reusable range: " + RangeUtil.formatTsRange(reusePeriod));
+
+ if (reusePeriod != null) {
+ List<Range<Long>> remainings = RangeUtil.remove(ts, reusePeriod);
+ if (remainings.size() == 1) {//if using cache causes two underlyingStorage searches, we'd rather not use the cache
+
+ SimpleTupleIterator reusedTuples = new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
+ List<ITupleIterator> iTupleIteratorList = Lists.newArrayList();
+ iTupleIteratorList.add(reusedTuples);
+
+ Range<Long> remaining = remainings.get(0);
+ logger.info("Appending ts " + RangeUtil.formatTsRange(remaining) + " as additional filter");
+
+ ITupleIterator freshTuples = SQLDigestUtil.appendTsFilterToExecute(sqlDigest, partitionColRef, remaining, new Function<Void, ITupleIterator>() {
+ @Override
+ public ITupleIterator apply(Void input) {
+ return underlyingStorage.search(context, sqlDigest, returnTupleInfo);
+ }
+ });
+ iTupleIteratorList.add(freshTuples);
+
+ return new CompoundTupleIterator(iTupleIteratorList);
+ } else if (remainings.size() == 0) {
+ logger.info("The ts range in new query was fully cached");
+ return new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
+ } else {
+ //if using cache causes more than one underlyingStorage searches
+ //the incurred overhead might be more expensive than the cache benefit
+ logger.info("Give up using cache to avoid complexity");
+ return null;
+ }
+ } else {
+ logger.info("cached results not reusable by current query");
+ return null;
+ }
+ }
+
@Override
public void notify(List<ITuple> duplicated, long createTime) {
+ //for streaming sql only check if needSaveCache at first entry of cache
+ if (noCacheUsed && !needSaveCache(createTime)) {
+ return;
+ }
+
Range<Long> cacheExclude = this.underlyingStorage.getVolatilePeriod();
if (cacheExclude != null) {
List<Range<Long>> cachablePeriods = RangeUtil.remove(ts, cacheExclude);
if (cachablePeriods.size() == 1) {
if (!ts.equals(cachablePeriods.get(0))) {
- logger.info("With respect to growing storage, the cacheable tsRange shrinks from " + RangeUtil.formatTsRange(ts) + " to " + RangeUtil.formatTsRange(cachablePeriods.get(0)));
+ logger.info("tsRange shrinks from " + RangeUtil.formatTsRange(ts) + " to " + RangeUtil.formatTsRange(cachablePeriods.get(0)));
}
ts = cachablePeriods.get(0);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java
index 2272aac..93e5e09 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java
@@ -2,7 +2,6 @@ package org.apache.kylin.storage.cache;
import java.util.List;
-import net.sf.ehcache.Cache;
import net.sf.ehcache.Element;
import org.apache.kylin.metadata.realization.SQLDigest;
@@ -29,60 +28,30 @@ public class CacheFledgedStaticQuery extends AbstractCacheFledgedQuery {
@Override
public ITupleIterator search(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo) {
- streamSQLDigest = new StreamSQLDigest(sqlDigest, null);
- StreamSQLResult cachedResult = null;
- Cache cache = CACHE_MANAGER.getCache(this.underlyingStorage.getStorageUUID());
- Element element = cache.get(streamSQLDigest.hashCode());
- if (element != null) {
- this.queryCacheExists = true;
- cachedResult = (StreamSQLResult) element.getObjectValue();
- }
-
+ StreamSQLResult cachedResult = getStreamSQLResult(new StreamSQLDigest(sqlDigest, null));
ITupleIterator ret = null;
+
if (cachedResult != null) {
- ret = new SimpleTupleIterator(cachedResult.reuse(Ranges.<Long> all()));
+ logger.info("using existing cache");
+ return new SimpleTupleIterator(cachedResult.reuse(Ranges.<Long> all()));
} else {
- logger.info("no cache entry for this query");
- }
-
- if (ret == null) {
- logger.info("decision: not using cache");
+ logger.info("no existing cache to use");
ret = underlyingStorage.search(context, sqlDigest, returnTupleInfo);
- } else {
- logger.info("decision: use cache");
- }
- if (!queryCacheExists) {
//use another nested ITupleIterator to deal with cache
final TeeTupleIterator tee = new TeeTupleIterator(ret);
tee.addCloseListener(this);
return tee;
- } else {
- return ret;
}
}
@Override
public void notify(List<ITuple> duplicated, long createTime) {
- boolean cacheIt = true;
- // long storageQueryTime = System.currentTimeMillis() - createTime;
- // long durationThreshold = KylinConfig.getInstanceFromEnv().getQueryDurationCacheThreshold();
- // long scancountThreshold = KylinConfig.getInstanceFromEnv().getQueryScanCountCacheThreshold();
- //
- // if (storageQueryTime < durationThreshold) {
- // logger.info("Skip storage caching for storage cache because storage query time {} less than {}", storageQueryTime, durationThreshold);
- // cacheIt = false;
- // }
- //
- // if (duplicated.size() < scancountThreshold) {
- // logger.info("Skip storage caching for storage cache because scan count {} less than {}", duplicated.size(), scancountThreshold);
- // cacheIt = false;
- // }
-
- if (cacheIt) {
+ if (needSaveCache(createTime)) {
StreamSQLResult newCacheEntry = new StreamSQLResult(duplicated, Ranges.<Long> all(), null);
CACHE_MANAGER.getCache(this.underlyingStorage.getStorageUUID()).put(new Element(streamSQLDigest.hashCode(), newCacheEntry));
logger.info("cache after the query: " + newCacheEntry);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
index 63e973a..dfd55f1 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
@@ -32,6 +32,7 @@ import net.sf.ehcache.Element;
import org.apache.commons.io.IOUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.exception.InternalErrorException;
@@ -166,55 +167,64 @@ public class QueryController extends BasicController {
}
private SQLResponse doQueryWithCache(SQLRequest sqlRequest) {
- String sql = sqlRequest.getSql();
- String project = sqlRequest.getProject();
- logger.info("Using project: " + project);
- logger.info("The original query: " + sql);
-
- String serverMode = KylinConfig.getInstanceFromEnv().getServerMode();
- if (!(Constant.SERVER_MODE_QUERY.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase()))) {
- throw new InternalErrorException("Query is not allowed in " + serverMode + " mode.");
- }
+ try {
+ BackdoorToggles.setToggles(sqlRequest.getBackdoorToggles());
- if (sql.toLowerCase().contains("select") == false) {
- logger.debug("Directly return expection as not supported");
- throw new InternalErrorException("Not Supported SQL.");
- }
+ String sql = sqlRequest.getSql();
+ String project = sqlRequest.getProject();
+ logger.info("Using project: " + project);
+ logger.info("The original query: " + sql);
- SQLResponse sqlResponse = searchQueryInCache(sqlRequest);
- try {
- if (null == sqlResponse) {
- sqlResponse = queryService.query(sqlRequest);
+ String serverMode = KylinConfig.getInstanceFromEnv().getServerMode();
+ if (!(Constant.SERVER_MODE_QUERY.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase()))) {
+ throw new InternalErrorException("Query is not allowed in " + serverMode + " mode.");
}
- checkQueryAuth(sqlResponse);
+ if (!sql.toLowerCase().contains("select")) {
+ logger.debug("Directly return exception as not supported");
+ throw new InternalErrorException("Not Supported SQL.");
+ }
+
+ SQLResponse sqlResponse = searchQueryInCache(sqlRequest);
+ try {
+ if (null == sqlResponse) {
+ sqlResponse = queryService.query(sqlRequest);
+ }
+
+ checkQueryAuth(sqlResponse);
- } catch (Throwable e) { // calcite may throw AssertError
- logger.error("Exception when execute sql", e);
- String errMsg = QueryUtil.makeErrorMsgUserFriendly(e);
+ } catch (Throwable e) { // calcite may throw AssertError
+ logger.error("Exception when execute sql", e);
+ String errMsg = QueryUtil.makeErrorMsgUserFriendly(e);
- sqlResponse = new SQLResponse(null, null, 0, true, errMsg);
+ sqlResponse = new SQLResponse(null, null, 0, true, errMsg);
- // for exception queries, only cache ScanOutOfLimitException
- if (e instanceof ScanOutOfLimitException) {
- Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
- exceptionCache.put(new Element(sqlRequest, sqlResponse));
+ // for exception queries, only cache ScanOutOfLimitException
+ if (e instanceof ScanOutOfLimitException) {
+ Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
+ exceptionCache.put(new Element(sqlRequest, sqlResponse));
+ }
}
- }
- queryService.logQuery(sqlRequest, sqlResponse);
-
- if (sqlResponse.getIsException())
- throw new InternalErrorException(sqlResponse.getExceptionMessage());
-
- return sqlResponse;
+ queryService.logQuery(sqlRequest, sqlResponse);
+
+ if (sqlResponse.getIsException())
+ throw new InternalErrorException(sqlResponse.getExceptionMessage());
+
+ return sqlResponse;
+
+ } finally {
+ BackdoorToggles.cleanToggles();
+ }
}
private SQLResponse searchQueryInCache(SQLRequest sqlRequest) {
SQLResponse response = null;
Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
- if (KylinConfig.getInstanceFromEnv().isQueryCacheEnabled() && null != exceptionCache.get(sqlRequest)) {
+ if (KylinConfig.getInstanceFromEnv().isQueryCacheEnabled() && //
+ !BackdoorToggles.getDisableCache() && //
+ exceptionCache.get(sqlRequest) != null) {
Element element = exceptionCache.get(sqlRequest);
response = (SQLResponse) element.getObjectValue();
response.setHitCache(true);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
index a1e8a29..13eb09c 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -84,7 +83,7 @@ public class QueryService extends BasicService {
@Autowired
private CacheService cacheService;
-
+
public static final String USER_QUERY_FAMILY = "q";
private static final String DEFAULT_TABLE_PREFIX = "kylin_metadata";
private static final String USER_TABLE_NAME = "_user";
@@ -267,14 +266,8 @@ public class QueryService extends BasicService {
parameters.put(OLAPContext.PRM_ACCEPT_PARTIAL_RESULT, String.valueOf(sqlRequest.isAcceptPartial()));
OLAPContext.setParameters(parameters);
- try {
- BackdoorToggles.setToggles(sqlRequest.getBackdoorToggles());
-
- return execute(correctedSql, sqlRequest);
+ return execute(correctedSql, sqlRequest);
- } finally {
- BackdoorToggles.cleanToggles();
- }
}
protected List<TableMeta> getMetadata(CubeManager cubeMgr, String project, boolean cubedOnly) throws SQLException {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/692438f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
index 2e2000d..e7c8116 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
@@ -44,8 +44,6 @@ import com.google.common.base.Preconditions;
//used by reflection
public class HBaseStorage implements IStorage {
- private final static boolean allowStorageLayerCache = true;
-
private final static String v2CubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v2.CubeStorageQuery";
private final static String v1CubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v1.CubeStorageQuery";
@@ -53,6 +51,10 @@ public class HBaseStorage implements IStorage {
@Override
public IStorageQuery createQuery(IRealization realization) {
+
+ boolean queryCacheGloballyEnabled = KylinConfig.getInstanceFromEnv().isQueryCacheEnabled();
+ boolean queryCacheQueryLevelEnabled = !BackdoorToggles.getDisableCache();
+
if (realization.getType() == RealizationType.INVERTED_INDEX) {
ICachableStorageQuery ret;
try {
@@ -61,7 +63,7 @@ public class HBaseStorage implements IStorage {
throw new RuntimeException("Failed to initialize storage query for " + defaultIIStorageQuery, e);
}
- if (allowStorageLayerCache) {
+ if (queryCacheGloballyEnabled && queryCacheQueryLevelEnabled) {
return wrapWithCache(ret, realization);
} else {
return ret;
@@ -82,7 +84,7 @@ public class HBaseStorage implements IStorage {
throw new RuntimeException("Failed to initialize storage query for " + cubeStorageQuery, e);
}
- if (allowStorageLayerCache) {
+ if (queryCacheGloballyEnabled && queryCacheQueryLevelEnabled) {
return wrapWithCache(ret, realization);
} else {
return ret;
[2/2] incubator-kylin git commit: KYLIN-1142 more detailed hitCache
message in query's log
Posted by ma...@apache.org.
KYLIN-1142 more detailed hitCache message in query's log
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/ae0f1a72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/ae0f1a72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/ae0f1a72
Branch: refs/heads/2.x-staging
Commit: ae0f1a72e76f436eab4e0eb4e5d786bde83a719b
Parents: 692438f
Author: honma <ho...@ebay.com>
Authored: Fri Nov 13 17:49:30 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Nov 13 17:50:30 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/storage/StorageContext.java | 16 +++++++++++-----
.../storage/cache/CacheFledgedDynamicQuery.java | 8 ++++----
.../storage/cache/CacheFledgedStaticQuery.java | 3 ++-
.../kylin/query/enumerator/OLAPEnumerator.java | 1 +
.../kylin/rest/controller/QueryController.java | 2 +-
.../apache/kylin/rest/response/SQLResponse.java | 19 ++++++++++++++-----
.../apache/kylin/rest/service/QueryService.java | 11 +++++++++--
.../kylin/rest/service/QueryServiceTest.java | 2 +-
8 files changed, 43 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ae0f1a72/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 1643aa4..90f950c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -18,13 +18,11 @@
package org.apache.kylin.storage;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.realization.SQLDigest;
+
+import com.google.common.collect.Range;
/**
* @author xjiang
@@ -33,7 +31,6 @@ public class StorageContext {
public static final int DEFAULT_THRESHOLD = 1000000;
-
private String connUrl;
private int threshold;
private int limit;
@@ -49,6 +46,8 @@ public class StorageContext {
private Cuboid cuboid;
private boolean partialResultReturned;
+ private Range<Long> reusedPeriod;
+
public StorageContext() {
this.threshold = DEFAULT_THRESHOLD;
this.limit = DEFAULT_THRESHOLD;
@@ -160,4 +159,11 @@ public class StorageContext {
return this.enableCoprocessor;
}
+ public Range<Long> getReusedPeriod() {
+ return reusedPeriod;
+ }
+
+ public void setReusedPeriod(Range<Long> reusedPeriod) {
+ this.reusedPeriod = reusedPeriod;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ae0f1a72/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
index b9ae0e9..b9808ea 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
@@ -62,7 +62,7 @@ public class CacheFledgedDynamicQuery extends AbstractCacheFledgedQuery {
if (enableDynamicCache) {
StreamSQLResult cachedResult = getStreamSQLResult(new StreamSQLDigest(sqlDigest, partitionColRef));
if (cachedResult != null) {
- ret = tryReuseCache(context, sqlDigest, returnTupleInfo, ret, cachedResult);
+ ret = tryReuseCache(context, sqlDigest, returnTupleInfo, cachedResult);
} else {
logger.info("no cache entry for this query");
}
@@ -70,10 +70,8 @@ public class CacheFledgedDynamicQuery extends AbstractCacheFledgedQuery {
if (ret == null) {
ret = underlyingStorage.search(context, sqlDigest, returnTupleInfo);
- noCacheUsed = true;
logger.info("No Cache being used");
} else {
- noCacheUsed = false;
logger.info("Cache being used");
}
@@ -90,7 +88,7 @@ public class CacheFledgedDynamicQuery extends AbstractCacheFledgedQuery {
/**
* if cache is not enough it will try to combine existing cache as well as fresh records
*/
- private ITupleIterator tryReuseCache(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo, ITupleIterator ret, StreamSQLResult cachedResult) {
+ private ITupleIterator tryReuseCache(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo, StreamSQLResult cachedResult) {
Range<Long> reusePeriod = cachedResult.getReusableResults(ts);
logger.info("existing cache: " + cachedResult);
@@ -116,9 +114,11 @@ public class CacheFledgedDynamicQuery extends AbstractCacheFledgedQuery {
});
iTupleIteratorList.add(freshTuples);
+ context.setReusedPeriod(reusePeriod);
return new CompoundTupleIterator(iTupleIteratorList);
} else if (remainings.size() == 0) {
logger.info("The ts range in new query was fully cached");
+ context.setReusedPeriod(reusePeriod);
return new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
} else {
//if using cache causes more than one underlyingStorage searches
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ae0f1a72/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java
index 93e5e09..d024cf7 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticQuery.java
@@ -29,10 +29,11 @@ public class CacheFledgedStaticQuery extends AbstractCacheFledgedQuery {
public ITupleIterator search(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo) {
StreamSQLResult cachedResult = getStreamSQLResult(new StreamSQLDigest(sqlDigest, null));
- ITupleIterator ret = null;
+ ITupleIterator ret;
if (cachedResult != null) {
logger.info("using existing cache");
+ context.setReusedPeriod(Ranges.<Long> all());
return new SimpleTupleIterator(cachedResult.reuse(Ranges.<Long> all()));
} else {
logger.info("no existing cache to use");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ae0f1a72/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
index 07f72b1..aa8b2a7 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
@@ -125,6 +125,7 @@ public class OLAPEnumerator implements Enumerator<Object[]> {
ITupleIterator iterator = storageEngine.search(olapContext.storageContext, sqlDigest, olapContext.returnTupleInfo);
if (logger.isDebugEnabled()) {
logger.debug("return TupleIterator...");
+ logger.debug("Storage cache used for this storage query:" + olapContext.storageContext.getReusedPeriod());
}
return iterator;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ae0f1a72/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
index dfd55f1..f60894e 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
@@ -227,7 +227,7 @@ public class QueryController extends BasicController {
exceptionCache.get(sqlRequest) != null) {
Element element = exceptionCache.get(sqlRequest);
response = (SQLResponse) element.getObjectValue();
- response.setHitCache(true);
+ response.setHitExceptionCache(true);
}
return response;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ae0f1a72/server/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index 6541ef9..f5ef8c5 100644
--- a/server/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ b/server/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -52,7 +52,9 @@ public class SQLResponse implements Serializable {
private long totalScanCount;
- private boolean hitCache = false;
+ private boolean hitExceptionCache = false;
+
+ private boolean storageCacheUsed = false;
public SQLResponse() {
}
@@ -133,12 +135,19 @@ public class SQLResponse implements Serializable {
this.totalScanCount = totalScanCount;
}
- public boolean isHitCache() {
- return hitCache;
+ public boolean isHitExceptionCache() {
+ return hitExceptionCache;
}
- public void setHitCache(boolean hitCache) {
- this.hitCache = hitCache;
+ public void setHitExceptionCache(boolean hitExceptionCache) {
+ this.hitExceptionCache = hitExceptionCache;
}
+ public boolean isStorageCacheUsed() {
+ return storageCacheUsed;
+ }
+
+ public void setStorageCacheUsed(boolean storageCacheUsed) {
+ this.storageCacheUsed = storageCacheUsed;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ae0f1a72/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 13eb09c..669616b 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -203,8 +203,9 @@ public class QueryService extends BasicService {
final Set<String> realizationNames = new HashSet<String>();
final Set<Long> cuboidIds = new HashSet<Long>();
float duration = response.getDuration() / (float) 1000;
+ boolean storageCacheUsed = false;
- if (!response.isHitCache() && null != OLAPContext.getThreadLocalContexts()) {
+ if (!response.isHitExceptionCache() && null != OLAPContext.getThreadLocalContexts()) {
for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) {
Cuboid cuboid = ctx.storageContext.getCuboid();
if (cuboid != null) {
@@ -216,6 +217,11 @@ public class QueryService extends BasicService {
String realizationName = ctx.realization.getName();
realizationNames.add(realizationName);
}
+
+ if (ctx.storageContext.getReusedPeriod() != null) {
+ response.setStorageCacheUsed(true);
+ storageCacheUsed = true;
+ }
}
}
@@ -239,7 +245,8 @@ public class QueryService extends BasicService {
stringBuilder.append("Result row count: ").append(resultRowCount).append(newLine);
stringBuilder.append("Accept Partial: ").append(request.isAcceptPartial()).append(newLine);
stringBuilder.append("Is Partial Result: ").append(response.isPartial()).append(newLine);
- stringBuilder.append("Hit Cache: ").append(response.isHitCache()).append(newLine);
+ stringBuilder.append("Hit Exception Cache: ").append(response.isHitExceptionCache()).append(newLine);
+ stringBuilder.append("Storage cache used: ").append(storageCacheUsed).append(newLine);
stringBuilder.append("Message: ").append(response.getExceptionMessage()).append(newLine);
stringBuilder.append("==========================[QUERY]===============================").append(newLine);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ae0f1a72/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index cc86e1e..119d88b 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -59,7 +59,7 @@ public class QueryServiceTest extends ServiceTestBase {
request.setSql("select * from test_table");
request.setAcceptPartial(true);
SQLResponse response = new SQLResponse();
- response.setHitCache(true);
+ response.setHitExceptionCache(true);
queryService.logQuery(request, response);
}
}