You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/04/26 06:27:19 UTC
[37/50] incubator-kylin git commit: KYLIN-671 implement tee
itupleiterator
KYLIN-671 implement tee itupleiterator
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d6218b44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d6218b44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d6218b44
Branch: refs/heads/streaming-localdict
Commit: d6218b44577da8c5b0b9bbbc40ff2b1d27f355eb
Parents: 9288902
Author: honma <ho...@ebay.com>
Authored: Wed Apr 15 17:41:57 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Apr 22 16:46:02 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/common/util/DateFormat.java | 84 ++++++++++++++++++++
.../org/apache/kylin/common/util/RangeUtil.java | 36 ++++++++-
.../apache/kylin/dict/DateStrDictionary.java | 2 +-
.../kylin/invertedindex/index/TableRecord.java | 2 +-
.../metadata/realization/SQLDigestUtil.java | 2 +-
.../metadata/serializer/DateTimeSerializer.java | 2 +-
.../kylin/metadata/tuple/ITupleIterator.java | 9 +++
.../kylin/metadata/tuple/TeeTupleIterator.java | 54 +++++++++++++
.../apache/kylin/metadata/util/DateFormat.java | 80 -------------------
.../cache/CacheFledgedStorageEngine.java | 72 +++++++++++------
.../kylin/storage/cache/StreamSQLResult.java | 11 +++
.../kylin/storage/hbase/HBaseKeyRange.java | 2 +-
.../hbase/coprocessor/endpoint/IIEndpoint.java | 12 +--
.../endpoint/TsConditionExtractor.java | 2 +-
.../org/apache/kylin/storage/tuple/Tuple.java | 2 +-
.../apache/kylin/storage/cache/EhcacheTest.java | 46 +++++++----
16 files changed, 283 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/common/src/main/java/org/apache/kylin/common/util/DateFormat.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/DateFormat.java b/common/src/main/java/org/apache/kylin/common/util/DateFormat.java
new file mode 100644
index 0000000..5b540a4
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/DateFormat.java
@@ -0,0 +1,84 @@
+package org.apache.kylin.common.util;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DateFormat {
+
+ public static final String DEFAULT_DATE_PATTERN = "yyyy-MM-dd";
+ public static final String DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS = "yyyy-MM-dd HH:mm:ss";
+ public static final String DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS = "yyyy-MM-dd HH:mm:ss.SSS";
+
+ static final private Map<String, ThreadLocal<SimpleDateFormat>> threadLocalMap = new ConcurrentHashMap<String, ThreadLocal<SimpleDateFormat>>();
+
+ static SimpleDateFormat getDateFormat(String datePattern) {
+ ThreadLocal<SimpleDateFormat> formatThreadLocal = threadLocalMap.get(datePattern);
+ if (formatThreadLocal == null) {
+ threadLocalMap.put(datePattern, formatThreadLocal = new ThreadLocal<SimpleDateFormat>());
+ }
+ SimpleDateFormat format = formatThreadLocal.get();
+ if (format == null) {
+ format = new SimpleDateFormat(datePattern);
+ format.setTimeZone(TimeZone.getTimeZone("GMT")); // NOTE: this must be GMT to calculate epoch date correctly
+ formatThreadLocal.set(format);
+ }
+ return format;
+ }
+
+ public static String formatToDateStr(long millis) {
+ return getDateFormat(DEFAULT_DATE_PATTERN).format(new Date(millis));
+ }
+
+ public static String formatToTimeStr(long millis) {
+ return getDateFormat(DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS).format(new Date(millis));
+ }
+
+ public static String dateToString(Date date) {
+ return dateToString(date, DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
+ }
+
+ public static String dateToString(Date date, String pattern) {
+ return getDateFormat(pattern).format(date);
+ }
+
+ public static Date stringToDate(String str) {
+ return stringToDate(str, DEFAULT_DATE_PATTERN);
+ }
+
+ public static Date stringToDate(String str, String pattern) {
+ Date date = null;
+ try {
+ date = getDateFormat(pattern).parse(str);
+ } catch (ParseException e) {
+ throw new IllegalArgumentException("'" + str + "' is not a valid date of pattern '" + pattern + "'", e);
+ }
+ return date;
+ }
+
+ public static long stringToMillis(String str) {
+ if (isAllDigits(str)) {
+ return Long.parseLong(str);
+ } else if (str.length() == 10) {
+ return stringToDate(str, DEFAULT_DATE_PATTERN).getTime();
+ } else if (str.length() == 19) {
+ return stringToDate(str, DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS).getTime();
+ } else if (str.length() == 23) {
+ return stringToDate(str, DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS).getTime();
+ } else {
+ throw new IllegalArgumentException("there is no valid date pattern for:" + str);
+ }
+ }
+
+ private static boolean isAllDigits(String str) {
+ for (int i = 0, n = str.length(); i < n; i++) {
+ if (Character.isDigit(str.charAt(i)) == false)
+ return false;
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/RangeUtil.java b/common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
index fae521c..2dd2032 100644
--- a/common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/RangeUtil.java
@@ -2,10 +2,7 @@ package org.apache.kylin.common.util;
import java.util.List;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Range;
-import com.google.common.collect.RangeSet;
-import com.google.common.collect.TreeRangeSet;
+import com.google.common.collect.*;
/**
* Created by Hongbin Ma(Binmahone) on 4/14/15.
@@ -21,4 +18,35 @@ public class RangeUtil {
rangeSet.remove(other);
return Lists.newArrayList(rangeSet.asRanges());
}
+
+ public static String formatTsRange(Range<Long> tsRange) {
+ if(tsRange == null)
+ return null;
+
+ StringBuilder sb = new StringBuilder();
+ if (tsRange.hasLowerBound()) {
+ if (tsRange.lowerBoundType() == BoundType.CLOSED) {
+ sb.append("[");
+ } else {
+ sb.append("(");
+ }
+ DateFormat.formatToTimeStr(tsRange.lowerEndpoint());
+ } else {
+ sb.append("(null");
+ }
+
+ sb.append("~");
+
+ if (tsRange.hasUpperBound()) {
+ DateFormat.formatToTimeStr(tsRange.upperEndpoint());
+ if (tsRange.upperBoundType() == BoundType.CLOSED) {
+ sb.append("]");
+ } else {
+ sb.append(")");
+ }
+ } else {
+ sb.append("null)");
+ }
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java b/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
index e0231c5..acd7404 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
@@ -18,7 +18,7 @@
package org.apache.kylin.dict;
-import static org.apache.kylin.metadata.util.DateFormat.*;
+import static org.apache.kylin.common.util.DateFormat.*;
import java.io.DataInput;
import java.io.DataOutput;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
index d2a81d4..287ca51 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
@@ -22,7 +22,7 @@ import org.apache.commons.lang.ObjectUtils;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.util.DateFormat;
+import org.apache.kylin.common.util.DateFormat;
import java.util.Arrays;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java b/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
index f1861b0..d15b3e3 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
@@ -6,7 +6,7 @@ import com.google.common.collect.Range;
import org.apache.kylin.metadata.filter.*;
import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.util.DateFormat;
+import org.apache.kylin.common.util.DateFormat;
/**
* Created by Hongbin Ma(Binmahone) on 4/14/15.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
index 45dd6b1..8223b69 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
@@ -5,7 +5,7 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.util.DateFormat;
+import org.apache.kylin.common.util.DateFormat;
public class DateTimeSerializer extends DataTypeSerializer<LongWritable> {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java b/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
index 4a00780..0b83818 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/tuple/ITupleIterator.java
@@ -44,7 +44,16 @@ public interface ITupleIterator extends Iterator<ITuple> {
@Override
public void close() {
}
+
};
public void close();
+
+ /**
+ * if hasNext() returns false because there's no more data, return true
+ * if hasNext() returns false because limits or threshold, return false
+ * if hasNext() returns true, throw IllegalStateException
+ * @return
+ */
+ //public boolean isDrained();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java b/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
new file mode 100644
index 0000000..0314762
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java
@@ -0,0 +1,54 @@
+package org.apache.kylin.metadata.tuple;
+
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 4/15/15.
+ *
+ * Like "tee" command in linux,it effectively duplicates the underlying
+ * ITupleIterator's results
+ */
+public class TeeTupleIterator implements ITupleIterator {
+
+ private static final Logger logger = LoggerFactory.getLogger(TeeTupleIterator.class);
+
+ private Function<List<ITuple>, Void> actionOnSeeingWholeData;
+ private ITupleIterator underlying;
+ private List<ITuple> duplicatedData;
+
+ public TeeTupleIterator(ITupleIterator underlying, Function<List<ITuple>, Void> actionOnSeeingWholeData) {
+ this.actionOnSeeingWholeData = actionOnSeeingWholeData;
+ this.underlying = underlying;
+ this.duplicatedData = Lists.newArrayList();
+ }
+
+ @Override
+ public void close() {
+ this.underlying.close();
+ //if(this.underlying.isDrained)
+ actionOnSeeingWholeData.apply(duplicatedData);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return this.underlying.hasNext();
+ }
+
+ @Override
+ public ITuple next() {
+ ITuple ret = this.underlying.next();
+ duplicatedData.add(ret);
+ return ret;
+ }
+
+ @Override
+ public void remove() {
+ this.underlying.remove();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/metadata/src/main/java/org/apache/kylin/metadata/util/DateFormat.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/util/DateFormat.java b/metadata/src/main/java/org/apache/kylin/metadata/util/DateFormat.java
deleted file mode 100644
index 91d0720..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/util/DateFormat.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package org.apache.kylin.metadata.util;
-
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class DateFormat {
-
- public static final String DEFAULT_DATE_PATTERN = "yyyy-MM-dd";
- public static final String DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS = "yyyy-MM-dd HH:mm:ss";
- public static final String DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS = "yyyy-MM-dd HH:mm:ss.SSS";
-
- static final private Map<String, ThreadLocal<SimpleDateFormat>> threadLocalMap = new ConcurrentHashMap<String, ThreadLocal<SimpleDateFormat>>();
-
- static SimpleDateFormat getDateFormat(String datePattern) {
- ThreadLocal<SimpleDateFormat> formatThreadLocal = threadLocalMap.get(datePattern);
- if (formatThreadLocal == null) {
- threadLocalMap.put(datePattern, formatThreadLocal = new ThreadLocal<SimpleDateFormat>());
- }
- SimpleDateFormat format = formatThreadLocal.get();
- if (format == null) {
- format = new SimpleDateFormat(datePattern);
- format.setTimeZone(TimeZone.getTimeZone("GMT")); // NOTE: this must be GMT to calculate epoch date correctly
- formatThreadLocal.set(format);
- }
- return format;
- }
-
- public static String formatToDateStr(long millis) {
- return getDateFormat(DEFAULT_DATE_PATTERN).format(new Date(millis));
- }
-
- public static String dateToString(Date date) {
- return dateToString(date, DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
- }
-
- public static String dateToString(Date date, String pattern) {
- return getDateFormat(pattern).format(date);
- }
-
- public static Date stringToDate(String str) {
- return stringToDate(str, DEFAULT_DATE_PATTERN);
- }
-
- public static Date stringToDate(String str, String pattern) {
- Date date = null;
- try {
- date = getDateFormat(pattern).parse(str);
- } catch (ParseException e) {
- throw new IllegalArgumentException("'" + str + "' is not a valid date of pattern '" + pattern + "'", e);
- }
- return date;
- }
-
- public static long stringToMillis(String str) {
- if (isAllDigits(str)) {
- return Long.parseLong(str);
- } else if (str.length() == 10) {
- return stringToDate(str, DEFAULT_DATE_PATTERN).getTime();
- } else if (str.length() == 19) {
- return stringToDate(str, DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS).getTime();
- } else if (str.length() == 23) {
- return stringToDate(str, DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS).getTime();
- } else {
- throw new IllegalArgumentException("there is no valid date pattern for:" + str);
- }
- }
-
- private static boolean isAllDigits(String str) {
- for (int i = 0, n = str.length(); i < n; i++) {
- if (Character.isDigit(str.charAt(i)) == false)
- return false;
- }
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java
index b1fe17f..c6cb4ba 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java
@@ -1,9 +1,10 @@
package org.apache.kylin.storage.cache;
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
+import java.util.List;
+
import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.Element;
import net.sf.ehcache.config.CacheConfiguration;
import net.sf.ehcache.config.MemoryUnit;
import net.sf.ehcache.config.PersistenceConfiguration;
@@ -16,27 +17,30 @@ import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.PartitionDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.*;
-import org.apache.kylin.metadata.tuple.CompoundTupleIterator;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.SimpleTupleIterator;
+import org.apache.kylin.metadata.tuple.*;
import org.apache.kylin.storage.IStorageEngine;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.StorageEngineFactory;
import org.apache.kylin.storage.hbase.coprocessor.endpoint.TsConditionExtractor;
+import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.collect.Range;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.util.List;
/**
* Created by Hongbin Ma(Binmahone) on 4/13/15.
*/
public class CacheFledgedStorageEngine implements IStorageEngine {
- public static final String SUCCESS_QUERY_CACHE = "SuccessQueryCache";
- public static final String EXCEPTION_QUERY_CACHE = "ExceptionQueryCache";//TODO
+ private static final Logger logger = LoggerFactory.getLogger(CacheFledgedStorageEngine.class);
+
+ public static final String STORAGE_LAYER_TUPLE_CACHE = "STORAGE_LAYER_TUPLE_CACHE";
+ //TODO: deal with failed queries
static CacheManager cacheManager;
@@ -44,13 +48,13 @@ public class CacheFledgedStorageEngine implements IStorageEngine {
cacheManager = CacheManager.create();
//Create a Cache specifying its configuration.
- Cache successCache = new Cache(new CacheConfiguration(SUCCESS_QUERY_CACHE, 0).//
- memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LFU).//
+ Cache successCache = new Cache(new CacheConfiguration(STORAGE_LAYER_TUPLE_CACHE, 0).//
+ memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU).//
eternal(false).//
timeToIdleSeconds(86400).//
diskExpiryThreadIntervalSeconds(0).//
- maxBytesLocalHeap(500, MemoryUnit.MEGABYTES).//
- persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.LOCALTEMPSWAP)));
+ maxBytesLocalHeap(1, MemoryUnit.GIGABYTES).//
+ persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.NONE)));
cacheManager.addCache(successCache);
}
@@ -73,37 +77,59 @@ public class CacheFledgedStorageEngine implements IStorageEngine {
@Override
public ITupleIterator search(final StorageContext context, final SQLDigest sqlDigest) {
- StreamSQLDigest streamSQLDigest = new StreamSQLDigest(sqlDigest, partitionColRef);
- StreamSQLResult cachedResult = (StreamSQLResult) cacheManager.getCache(SUCCESS_QUERY_CACHE).get(streamSQLDigest).getObjectValue();
+ final StreamSQLDigest streamSQLDigest = new StreamSQLDigest(sqlDigest, partitionColRef);
+ StreamSQLResult cachedResult = (StreamSQLResult) cacheManager.getCache(STORAGE_LAYER_TUPLE_CACHE).get(streamSQLDigest).getObjectValue();
- ITupleIterator ret;
+ final Range<Long> tsRange = TsConditionExtractor.extractTsCondition(partitionColRef, sqlDigest.filter);
+ ITupleIterator ret = null;
if (cachedResult != null) {
- Range<Long> tsRange = TsConditionExtractor.extractTsCondition(partitionColRef, sqlDigest.filter);
+ logger.debug("current cache : " + cachedResult);
Range<Long> reusePeriod = cachedResult.getReusableResults(tsRange);
+
+ logger.info("ts Range in query: " + RangeUtil.formatTsRange(tsRange));
+ logger.info("reusable range : " + RangeUtil.formatTsRange(reusePeriod));
+
if (reusePeriod != null) {
+
List<Range<Long>> remainings = RangeUtil.remove(tsRange, reusePeriod);
if (remainings.size() == 1) {
+
SimpleTupleIterator reusedTuples = new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
Range<Long> remaining = remainings.get(0);
ITupleIterator freshTuples = SQLDigestUtil.appendTsFilterToExecute(sqlDigest, partitionColRef, remaining, new Function<Void, ITupleIterator>() {
- @Nullable
@Override
public ITupleIterator apply(Void input) {
return StorageEngineFactory.getStorageEngine(realization, false).search(context, sqlDigest);
}
});
+
ret = new CompoundTupleIterator(Lists.newArrayList(reusedTuples, freshTuples));
- //TODO:update cache
- return ret;//cache successfully reused
}
}
}
- //cache cannot reuse case:
- ret = StorageEngineFactory.getStorageEngine(realization, false).search(context, sqlDigest);
- //TODO:update cache
- return ret;
+ if (ret == null) {
+ logger.info("no cache is being leveraged");
+ //cache cannot reuse case:
+ ret = StorageEngineFactory.getStorageEngine(realization, false).search(context, sqlDigest);
+ } else {
+ logger.info("cache is being leveraged");
+ }
+
+ //use another nested ITupleIterator to deal with cache
+ ITupleIterator tee = new TeeTupleIterator(ret, new Function<List<ITuple>, Void>() {
+ @Nullable
+ @Override
+ public Void apply(List<ITuple> input) {
+ //TODO: tsRange needs updated
+ StreamSQLResult newCache = new StreamSQLResult(input, tsRange);
+ cacheManager.getCache(STORAGE_LAYER_TUPLE_CACHE).put(new Element(streamSQLDigest, newCache));
+ logger.debug("current cache: " + newCache);
+ return null;
+ }
+ });
+ return tee;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/storage/src/main/java/org/apache/kylin/storage/cache/StreamSQLResult.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/StreamSQLResult.java b/storage/src/main/java/org/apache/kylin/storage/cache/StreamSQLResult.java
index f537fc7..ce7667a 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/StreamSQLResult.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/StreamSQLResult.java
@@ -3,6 +3,7 @@ package org.apache.kylin.storage.cache;
import java.util.Iterator;
import java.util.List;
+import org.apache.kylin.common.util.RangeUtil;
import org.apache.kylin.metadata.tuple.ITuple;
import com.google.common.collect.Range;
@@ -14,6 +15,11 @@ public class StreamSQLResult {
private List<ITuple> rows;
private Range<Long> timeCovered;
+ public StreamSQLResult(List<ITuple> rows, Range<Long> timeCovered) {
+ this.rows = rows;
+ this.timeCovered = timeCovered;
+ }
+
public Range<Long> getReusableResults(Range<Long> tsRange) {
if (tsRange.equals(timeCovered))
return timeCovered;
@@ -31,4 +37,9 @@ public class StreamSQLResult {
//TODO: currently regardless of reusablePeriod, all rows are returned
return rows.iterator();
}
+
+ @Override
+ public String toString() {
+ return rows.size() + " tuples cached for period " + RangeUtil.formatTsRange(timeCovered);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
index e766317..c7be838 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java
@@ -35,7 +35,7 @@ import org.apache.kylin.cube.kv.FuzzyMaskEncoder;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.util.DateFormat;
+import org.apache.kylin.common.util.DateFormat;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
index 4753214..abe6157 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
-import com.google.common.collect.Range;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.SerializationUtils;
@@ -55,10 +54,13 @@ import org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import it.uniroma3.mat.extendedset.intset.ConciseSet;
/**
* Created by honma on 11/7/14.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TsConditionExtractor.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TsConditionExtractor.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TsConditionExtractor.java
index 53f2945..0b36d4e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TsConditionExtractor.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/TsConditionExtractor.java
@@ -4,7 +4,7 @@ import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.LogicalTupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.util.DateFormat;
+import org.apache.kylin.common.util.DateFormat;
import com.google.common.collect.Range;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java b/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
index 20aa386..992ea61 100644
--- a/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
+++ b/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
@@ -29,7 +29,7 @@ import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.dict.lookup.LookupStringTable;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.ITuple;
-import org.apache.kylin.metadata.util.DateFormat;
+import org.apache.kylin.common.util.DateFormat;
/**
* @author xjiang
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d6218b44/storage/src/test/java/org/apache/kylin/storage/cache/EhcacheTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/cache/EhcacheTest.java b/storage/src/test/java/org/apache/kylin/storage/cache/EhcacheTest.java
index 14ebe3e..fd34224 100644
--- a/storage/src/test/java/org/apache/kylin/storage/cache/EhcacheTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/cache/EhcacheTest.java
@@ -1,6 +1,6 @@
package org.apache.kylin.storage.cache;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Element;
@@ -8,38 +8,52 @@ import net.sf.ehcache.config.CacheConfiguration;
import net.sf.ehcache.config.MemoryUnit;
import net.sf.ehcache.config.PersistenceConfiguration;
import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
-import org.junit.Assert;
+
import org.junit.Test;
-import java.util.List;
+import java.util.HashMap;
+import java.util.HashSet;
/**
* Created by Hongbin Ma(Binmahone) on 4/13/15.
*/
public class EhcacheTest {
@Test
- public void basicTest() {
+ public void basicTest() throws InterruptedException {
CacheManager cacheManager = CacheManager.create();
//Create a Cache specifying its configuration.
- Cache testCache = new Cache(new CacheConfiguration("test", 0).//
- memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LFU).//
+ Cache testCache = //Create a Cache specifying its configuration.
+ new Cache(new CacheConfiguration("test", 0).//
+ memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU).//
eternal(false).//
timeToIdleSeconds(86400).//
diskExpiryThreadIntervalSeconds(0).//
- maxBytesLocalHeap(500, MemoryUnit.MEGABYTES).//
- persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.LOCALTEMPSWAP)));
+ maxBytesLocalHeap(100, MemoryUnit.MEGABYTES).//
+ persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.NONE)));
cacheManager.addCache(testCache);
- testCache.put(new Element("x", Lists.<String> newArrayList()));
-
- List<String> v = (List<String>) testCache.get("x").getObjectValue();
- Assert.assertTrue(v.size() == 0);
- v.add("hi");
-
- List<String> v2 = (List<String>) testCache.get("x").getObjectValue();
- Assert.assertTrue(v2.size() == 1);
+ //
+ // byte[] blob2 = new byte[(1024 * 400 * 1024)];//400M
+ //
+ // testCache.put(new Element("1", blob));
+ // System.out.println(testCache.get("1") == null);
+ // System.out.println(testCache.getSize());
+ // System.out.println(testCache.getStatistics().getLocalHeapSizeInBytes());
+ // System.out.println("runtime used memory: " + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024 + "M");
+ // testCache.put(new Element("2", blob));
+ // System.out.println(testCache.get("1") == null);
+ // System.out.println(testCache.getSize());
+ // System.out.println(testCache.getStatistics().getLocalHeapSizeInBytes());
+ // System.out.println("runtime used memory: " + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024 + "M");
+ // testCache.put(new Element("3", blob));
+ // System.out.println(testCache.get("1") == null);
+ // System.out.println(testCache.get("2") == null);
+ // System.out.println(testCache.get("3") == null);
+ // System.out.println(testCache.getSize());
+ // System.out.println(testCache.getStatistics().getLocalHeapSizeInBytes());
+ // System.out.println("runtime used memory: " + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024 + "M");
cacheManager.shutdown();
}