You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/15 05:06:57 UTC

[45/50] [abbrv] incubator-kylin git commit: KYLIN-759 make storage cache realization independent

KYLIN-759 make storage cache realization independent


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/91792351
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/91792351
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/91792351

Branch: refs/heads/streaming-localdict
Commit: 91792351312dee2c54c6c3b0f861e35722caa9df
Parents: ac515a7
Author: honma <ho...@ebay.com>
Authored: Thu May 14 17:31:19 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu May 14 20:22:04 2015 +0800

----------------------------------------------------------------------
 .../AbstractCacheFledgedStorageEngine.java      | 23 +++++----
 .../cache/CacheFledgedDynamicStorageEngine.java |  6 +--
 .../cache/CacheFledgedStaticStorageEngine.java  |  4 +-
 .../apache/kylin/storage/cache/EhcacheTest.java | 51 +++++++++++---------
 4 files changed, 47 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/91792351/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
index 09f0026..8e1b50a 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
@@ -3,10 +3,9 @@ package org.apache.kylin.storage.cache;
 import net.sf.ehcache.Cache;
 import net.sf.ehcache.CacheManager;
 import net.sf.ehcache.config.CacheConfiguration;
-import net.sf.ehcache.config.MemoryUnit;
+import net.sf.ehcache.config.Configuration;
 import net.sf.ehcache.config.PersistenceConfiguration;
 import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
-
 import org.apache.kylin.metadata.realization.StreamSQLDigest;
 import org.apache.kylin.metadata.tuple.TeeTupleItrListener;
 import org.apache.kylin.storage.ICachableStorageEngine;
@@ -19,7 +18,13 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class AbstractCacheFledgedStorageEngine implements IStorageEngine, TeeTupleItrListener {
     private static final Logger logger = LoggerFactory.getLogger(AbstractCacheFledgedStorageEngine.class);
-    protected static CacheManager cacheManager = CacheManager.create();
+    protected static CacheManager cacheManager;
+
+    static {
+        Configuration conf = new Configuration();
+        conf.setMaxBytesLocalHeap("1024M");
+        cacheManager = CacheManager.create(conf);
+    }
 
     protected final ICachableStorageEngine underlyingStorage;
     protected StreamSQLDigest streamSQLDigest;
@@ -28,20 +33,20 @@ public abstract class AbstractCacheFledgedStorageEngine implements IStorageEngin
     public AbstractCacheFledgedStorageEngine(ICachableStorageEngine underlyingStorage) {
         this.underlyingStorage = underlyingStorage;
         this.queryCacheExists = false;
-        this.makeCacheIfNecessary(underlyingStorage.getClass().getName());
+        this.makeCacheIfNecessary(underlyingStorage.getStorageUUID());
     }
 
-    private void makeCacheIfNecessary(String storageClassName) {
-        if (cacheManager.getCache(storageClassName) == null) {
-            logger.info("Cache for {} initting:", storageClassName);
+    private void makeCacheIfNecessary(String realizationUUID) {
+        if (cacheManager.getCache(realizationUUID) == null) {
+            logger.info("Cache for {} initting...", realizationUUID);
             // TODO: L4J [2015-04-20 10:44:03,817][WARN][net.sf.ehcache.pool.sizeof.ObjectGraphWalker] - The configured limit of 1,000 object references was reached while attempting to calculate the size of the object graph. Severe performance degradation could occur if the sizing operation continues. This can be avoided by setting the CacheManger or Cache <sizeOfPolicy> elements maxDepthExceededBehavior to "abort" or adding stop points with @IgnoreSizeOf annotations. If performance degradation is NOT an issue at the configured limit, raise the limit value using the CacheManager or Cache <sizeOfPolicy
             //Create a Cache specifying its configuration.
-            Cache storageCache = new Cache(new CacheConfiguration(storageClassName, 0).//
+            Cache storageCache = new Cache(new CacheConfiguration(realizationUUID, 0).//
                     memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU).//
                     eternal(false).//
                     timeToIdleSeconds(86400).//
                     diskExpiryThreadIntervalSeconds(0).//
-                    maxBytesLocalHeap(256, MemoryUnit.MEGABYTES).//
+                    //maxBytesLocalHeap(256, MemoryUnit.MEGABYTES).//already defined at manager scope
                     persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.NONE)));
             //TODO: deal with failed queries, and only cache too long query
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/91792351/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
index 0a4fd2a..f62551b 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
@@ -36,7 +36,7 @@ public class CacheFledgedDynamicStorageEngine extends AbstractCacheFledgedStorag
         this.partitionColRef = partitionColRef;
 
         Preconditions.checkArgument(this.partitionColRef != null, "For dynamic columns like " + //
-                this.underlyingStorage.getClass().getName() + ", partition column must be provided");
+                this.underlyingStorage.getStorageUUID()+ ", partition column must be provided");
     }
 
     @Override
@@ -47,7 +47,7 @@ public class CacheFledgedDynamicStorageEngine extends AbstractCacheFledgedStorag
 
         streamSQLDigest = new StreamSQLDigest(sqlDigest, partitionColRef);
         StreamSQLResult cachedResult = null;
-        Cache cache = cacheManager.getCache(this.underlyingStorage.getClass().getName());
+        Cache cache = cacheManager.getCache(this.underlyingStorage.getStorageUUID());
         Element element = cache.get(streamSQLDigest);
         if (element != null) {
             this.queryCacheExists = true;
@@ -133,7 +133,7 @@ public class CacheFledgedDynamicStorageEngine extends AbstractCacheFledgedStorag
         }
 
         StreamSQLResult newCacheEntry = new StreamSQLResult(duplicated, ts, partitionColRef);
-        cacheManager.getCache(this.underlyingStorage.getClass().getName()).put(new Element(streamSQLDigest, newCacheEntry));
+        cacheManager.getCache(this.underlyingStorage.getStorageUUID()).put(new Element(streamSQLDigest, newCacheEntry));
         logger.info("cache after the query: " + newCacheEntry);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/91792351/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
index a309c24..72372c6 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
@@ -32,7 +32,7 @@ public class CacheFledgedStaticStorageEngine extends AbstractCacheFledgedStorage
 
         streamSQLDigest = new StreamSQLDigest(sqlDigest, null);
         StreamSQLResult cachedResult = null;
-        Cache cache = cacheManager.getCache(this.underlyingStorage.getClass().getName());
+        Cache cache = cacheManager.getCache(this.underlyingStorage.getStorageUUID());
         Element element = cache.get(streamSQLDigest);
         if (element != null) {
             this.queryCacheExists = true;
@@ -66,7 +66,7 @@ public class CacheFledgedStaticStorageEngine extends AbstractCacheFledgedStorage
     @Override
     public void notify(List<ITuple> duplicated) {
         StreamSQLResult newCacheEntry = new StreamSQLResult(duplicated, Ranges.<Long> all(), null);
-        cacheManager.getCache(this.underlyingStorage.getClass().getName()).put(new Element(streamSQLDigest, newCacheEntry));
+        cacheManager.getCache(this.underlyingStorage.getStorageUUID()).put(new Element(streamSQLDigest, newCacheEntry));
         logger.info("cache after the query: " + newCacheEntry);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/91792351/storage/src/test/java/org/apache/kylin/storage/cache/EhcacheTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/cache/EhcacheTest.java b/storage/src/test/java/org/apache/kylin/storage/cache/EhcacheTest.java
index 1dec9e6..fe390de 100644
--- a/storage/src/test/java/org/apache/kylin/storage/cache/EhcacheTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/cache/EhcacheTest.java
@@ -2,8 +2,9 @@ package org.apache.kylin.storage.cache;
 
 import net.sf.ehcache.Cache;
 import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.Element;
 import net.sf.ehcache.config.CacheConfiguration;
-import net.sf.ehcache.config.MemoryUnit;
+import net.sf.ehcache.config.Configuration;
 import net.sf.ehcache.config.PersistenceConfiguration;
 import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
 
@@ -14,7 +15,11 @@ import org.junit.Test;
 public class EhcacheTest {
     @Test
     public void basicTest() throws InterruptedException {
-        CacheManager cacheManager = CacheManager.create();
+        System.out.println("runtime used memory: " + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024 + "M");
+
+        Configuration conf = new Configuration();
+        conf.setMaxBytesLocalHeap("100M");
+        CacheManager cacheManager = CacheManager.create(conf);
 
         //Create a Cache specifying its configuration.
         Cache testCache = //Create a Cache specifying its configuration.
@@ -23,31 +28,31 @@ public class EhcacheTest {
                 eternal(false).//
                 timeToIdleSeconds(86400).//
                 diskExpiryThreadIntervalSeconds(0).//
-                maxBytesLocalHeap(100, MemoryUnit.MEGABYTES).//
+                //maxBytesLocalHeap(1000, MemoryUnit.MEGABYTES).//
                 persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.NONE)));
 
         cacheManager.addCache(testCache);
 
-        //
-        //        byte[] blob2 = new byte[(1024 * 400 * 1024)];//400M
-        //
-        //        testCache.put(new Element("1", blob));
-        //        System.out.println(testCache.get("1") == null);
-        //        System.out.println(testCache.getSize());
-        //        System.out.println(testCache.getStatistics().getLocalHeapSizeInBytes());
-        //        System.out.println("runtime used memory: " + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024 + "M");
-        //        testCache.put(new Element("2", blob));
-        //        System.out.println(testCache.get("1") == null);
-        //        System.out.println(testCache.getSize());
-        //        System.out.println(testCache.getStatistics().getLocalHeapSizeInBytes());
-        //        System.out.println("runtime used memory: " + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024 + "M");
-        //        testCache.put(new Element("3", blob));
-        //        System.out.println(testCache.get("1") == null);
-        //        System.out.println(testCache.get("2") == null);
-        //        System.out.println(testCache.get("3") == null);
-        //        System.out.println(testCache.getSize());
-        //        System.out.println(testCache.getStatistics().getLocalHeapSizeInBytes());
-        //        System.out.println("runtime used memory: " + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024 + "M");
+        System.out.println("runtime used memory: " + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024 + "M");
+        byte[] blob = new byte[(1024 * 40 * 1024)];//400M
+
+        testCache.put(new Element("1", blob));
+        System.out.println(testCache.get("1") == null);
+        System.out.println(testCache.getSize());
+        System.out.println(testCache.getStatistics().getLocalHeapSizeInBytes());
+        System.out.println("runtime used memory: " + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024 + "M");
+        testCache.put(new Element("2", blob));
+        System.out.println(testCache.get("1") == null);
+        System.out.println(testCache.getSize());
+        System.out.println(testCache.getStatistics().getLocalHeapSizeInBytes());
+        System.out.println("runtime used memory: " + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024 + "M");
+        testCache.put(new Element("3", blob));
+        System.out.println(testCache.get("1") == null);
+        System.out.println(testCache.get("2") == null);
+        System.out.println(testCache.get("3") == null);
+        System.out.println(testCache.getSize());
+        System.out.println(testCache.getStatistics().getLocalHeapSizeInBytes());
+        System.out.println("runtime used memory: " + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024 + "M");
 
         cacheManager.shutdown();
     }