You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/09 04:54:41 UTC

[1/3] incubator-kylin git commit: KYLIN-1127 Move cache related stuff from BasicService into CacheService

Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging f9f96404f -> a3397d044


KYLIN-1127 Move cache related stuff from BasicService into CacheService


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f1dedab7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f1dedab7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f1dedab7

Branch: refs/heads/2.x-staging
Commit: f1dedab768f52b7dcb22e16b1897167703897fc4
Parents: f9f9640
Author: Yang Li <li...@apache.org>
Authored: Mon Nov 9 10:41:59 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Nov 9 10:41:59 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java |   8 +-
 .../engine/streaming/cli/StreamingCLI.java      |   2 -
 .../kylin/rest/controller/JobController.java    |   2 +-
 .../rest/controller/StreamingController.java    |   2 +-
 .../apache/kylin/rest/service/BasicService.java | 129 +---------
 .../apache/kylin/rest/service/CacheService.java | 257 +++++++++++++------
 .../apache/kylin/rest/service/QueryService.java |  32 ++-
 .../kylin/rest/service/StreamingService.java    |  12 +-
 .../kylin/rest/service/CacheServiceTest.java    |   2 +-
 .../kylin/rest/service/CubeServiceTest.java     |   7 +-
 .../kylin/rest/service/JobServiceTest.java      |   7 +-
 .../kylin/rest/service/QueryServiceTest.java    |   7 +-
 .../kylin/rest/service/ServiceTestBase.java     |   3 +-
 13 files changed, 236 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 2232f01..9b7a024 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -525,11 +525,7 @@ public class CubeManager implements IRealizationProvider {
      * @param cubeName
      */
     public CubeInstance reloadCubeLocal(String cubeName) {
-        try {
-            return reloadCubeLocalAt(CubeInstance.concatResourcePath(cubeName));
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
+        return reloadCubeLocalAt(CubeInstance.concatResourcePath(cubeName));
     }
 
     public void removeCubeLocal(String cubeName) {
@@ -792,7 +788,7 @@ public class CubeManager implements IRealizationProvider {
         logger.debug("Loaded " + paths.size() + " Cube(s)");
     }
 
-    private synchronized CubeInstance reloadCubeLocalAt(String path) throws IOException {
+    private synchronized CubeInstance reloadCubeLocalAt(String path) {
         ResourceStore store = getStore();
 
         CubeInstance cubeInstance;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
index 3b1693a..e3a7133 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -56,8 +56,6 @@ public class StreamingCLI {
 
     public static void main(String[] args) {
         try {
-            AbstractRestCache.setCacheUpdater(new RemoteCacheUpdater());
-
             Preconditions.checkArgument(args[0].equals("streaming"));
             Preconditions.checkArgument(args[1].equals("start"));
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 6fb2813..f6323ed 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -71,7 +71,7 @@ public class JobController extends BasicController implements InitializingBean {
     @Override
     public void afterPropertiesSet() throws Exception {
 
-        String timeZone = jobService.getKylinConfig().getTimeZone();
+        String timeZone = jobService.getConfig().getTimeZone();
         TimeZone tzone = TimeZone.getTimeZone(timeZone);
         TimeZone.setDefault(tzone);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index 78b63c0..e22bd30 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -158,7 +158,7 @@ public class StreamingController extends BasicController {
     @RequestMapping(value = "/{configName}", method = { RequestMethod.DELETE })
     @ResponseBody
     public void deleteConfig(@PathVariable String configName) throws IOException {
-        StreamingConfig config = streamingService.getSreamingManager().getStreamingConfig(configName);
+        StreamingConfig config = streamingService.getStreamingManager().getStreamingConfig(configName);
         KafkaConfig kafkaConfig = kafkaConfigService.getKafkaConfig(configName);
         if (null == config) {
             throw new NotFoundException("StreamingConfig with name " + configName + " not found..");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
index 5ac12ea..9135dfa 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -18,27 +18,12 @@
 
 package org.apache.kylin.rest.service;
 
-import java.io.File;
 import java.io.IOException;
-import java.nio.charset.Charset;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
-import javax.sql.DataSource;
-
-import net.sf.ehcache.CacheManager;
-
-import org.apache.calcite.jdbc.Driver;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeManager;
@@ -54,104 +39,17 @@ import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.enumerator.OLAPQuery;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.schema.OLAPSchemaFactory;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.storage.hybrid.HybridManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.jdbc.datasource.DriverManagerDataSource;
 
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Lists;
-import com.google.common.io.Files;
 
 public abstract class BasicService {
 
-    private static final Logger logger = LoggerFactory.getLogger(BasicService.class);
-
-    private static ConcurrentMap<String, DataSource> olapDataSources = new ConcurrentHashMap<String, DataSource>();
-
-    @Autowired
-    private CacheManager cacheManager;
-
     public KylinConfig getConfig() {
-        return KylinConfig.getInstanceFromEnv();
-    }
-
-    protected void cleanDataCache(String storageUUID) {
-        if (cacheManager != null && cacheManager.getCache(storageUUID) != null) {
-            logger.info("cleaning cache for " + storageUUID);
-            cacheManager.getCache(storageUUID).removeAll();
-        } else {
-            logger.warn("skip cleaning cache for " + storageUUID);
-        }
-    }
-
-    protected void cleanAllDataCache() {
-        if (cacheManager != null) {
-            logger.warn("cleaning all storage cache");
-            cacheManager.clearAll();
-        } else {
-            logger.warn("skip cleaning all storage cache");
-        }
-    }
-
-    public void removeOLAPDataSource(String project) {
-        logger.info("removeOLAPDataSource is called for project " + project);
-        if (StringUtils.isEmpty(project))
-            throw new IllegalArgumentException("removeOLAPDataSource: project name not given");
-
-        project = ProjectInstance.getNormalizedProjectName(project);
-        olapDataSources.remove(project);
-    }
-
-    public static void removeAllOLAPDataSources() {
-        // brutal, yet simplest way
-        logger.info("removeAllOLAPDataSources is called.");
-        olapDataSources.clear();
-    }
-
-    public DataSource getOLAPDataSource(String project) {
-
-        project = ProjectInstance.getNormalizedProjectName(project);
-
-        DataSource ret = olapDataSources.get(project);
-        if (ret == null) {
-            logger.debug("Creating a new data source");
-            logger.debug("OLAP data source pointing to " + getConfig());
-
-            File modelJson = OLAPSchemaFactory.createTempOLAPJson(project, getConfig());
-
-            try {
-                List<String> text = Files.readLines(modelJson, Charset.defaultCharset());
-                logger.debug("The new temp olap json is :");
-                for (String line : text)
-                    logger.debug(line);
-            } catch (IOException e) {
-                e.printStackTrace(); // logging failure is not critical
-            }
-
-            DriverManagerDataSource ds = new DriverManagerDataSource();
-            Properties props = new Properties();
-            props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, String.valueOf(KylinConfig.getInstanceFromEnv().getScanThreshold()));
-            ds.setConnectionProperties(props);
-            ds.setDriverClassName(Driver.class.getName());
-            ds.setUrl("jdbc:calcite:model=" + modelJson.getAbsolutePath());
-
-            ret = olapDataSources.putIfAbsent(project, ds);
-            if (ret == null) {
-                ret = ds;
-            }
-        }
-        return ret;
-    }
-
-    public final KylinConfig getKylinConfig() {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
 
         if (kylinConfig == null) {
@@ -160,7 +58,7 @@ public abstract class BasicService {
 
         return kylinConfig;
     }
-
+    
     public final MetadataManager getMetadataManager() {
         return MetadataManager.getInstance(getConfig());
     }
@@ -169,7 +67,7 @@ public abstract class BasicService {
         return CubeManager.getInstance(getConfig());
     }
 
-    public final StreamingManager getSreamingManager() {
+    public final StreamingManager getStreamingManager() {
         return StreamingManager.getInstance(getConfig());
     }
 
@@ -246,27 +144,4 @@ public abstract class BasicService {
         return listAllCubingJobs(cubeName, projectName, EnumSet.allOf(ExecutableState.class), getExecutableManager().getAllOutputs());
     }
 
-    protected static void close(ResultSet resultSet, Statement stat, Connection conn) {
-        OLAPContext.clearParameter();
-
-        if (resultSet != null)
-            try {
-                resultSet.close();
-            } catch (SQLException e) {
-                logger.error("failed to close", e);
-            }
-        if (stat != null)
-            try {
-                stat.close();
-            } catch (SQLException e) {
-                logger.error("failed to close", e);
-            }
-        if (conn != null)
-            try {
-                conn.close();
-            } catch (SQLException e) {
-                logger.error("failed to close", e);
-            }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 63fe8c4..f9c3ec1 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -18,8 +18,21 @@
 
 package org.apache.kylin.rest.service;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
+import javax.sql.DataSource;
+
+import net.sf.ehcache.CacheManager;
+
+import org.apache.calcite.jdbc.Driver;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
@@ -32,88 +45,172 @@ import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.realization.RealizationRegistry;
 import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.query.enumerator.OLAPQuery;
+import org.apache.kylin.query.schema.OLAPSchemaFactory;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.storage.hybrid.HybridManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.jdbc.datasource.DriverManagerDataSource;
 import org.springframework.stereotype.Component;
 
+import com.google.common.io.Files;
+
 /**
  */
 @Component("cacheService")
 public class CacheService extends BasicService {
 
+    private static final Logger logger = LoggerFactory.getLogger(CacheService.class);
+
+    private static ConcurrentMap<String, DataSource> olapDataSources = new ConcurrentHashMap<String, DataSource>();
+
     @Autowired
     private CubeService cubeService;
 
+    @Autowired
+    private CacheManager cacheManager;
+
+    // for test
     public void setCubeService(CubeService cubeService) {
         this.cubeService = cubeService;
     }
 
-    private static final Logger logger = LoggerFactory.getLogger(CacheService.class);
+    protected void cleanDataCache(String storageUUID) {
+        if (cacheManager != null && cacheManager.getCache(storageUUID) != null) {
+            logger.info("cleaning cache for " + storageUUID);
+            cacheManager.getCache(storageUUID).removeAll();
+        } else {
+            logger.warn("skip cleaning cache for " + storageUUID);
+        }
+    }
+
+    protected void cleanAllDataCache() {
+        if (cacheManager != null) {
+            logger.warn("cleaning all storage cache");
+            cacheManager.clearAll();
+        } else {
+            logger.warn("skip cleaning all storage cache");
+        }
+    }
+
+    protected void removeOLAPDataSource(String project) {
+        logger.info("removeOLAPDataSource is called for project " + project);
+        if (StringUtils.isEmpty(project))
+            throw new IllegalArgumentException("removeOLAPDataSource: project name not given");
+
+        project = ProjectInstance.getNormalizedProjectName(project);
+        olapDataSources.remove(project);
+    }
+
+    public static void removeAllOLAPDataSources() {
+        // brutal, yet simplest way
+        logger.info("removeAllOLAPDataSources is called.");
+        olapDataSources.clear();
+    }
+
+    public DataSource getOLAPDataSource(String project) {
+
+        project = ProjectInstance.getNormalizedProjectName(project);
+
+        DataSource ret = olapDataSources.get(project);
+        if (ret == null) {
+            logger.debug("Creating a new data source");
+            logger.debug("OLAP data source pointing to " + getConfig());
+
+            File modelJson = OLAPSchemaFactory.createTempOLAPJson(project, getConfig());
+
+            try {
+                List<String> text = Files.readLines(modelJson, Charset.defaultCharset());
+                logger.debug("The new temp olap json is :");
+                for (String line : text)
+                    logger.debug(line);
+            } catch (IOException e) {
+                e.printStackTrace(); // logging failure is not critical
+            }
+
+            DriverManagerDataSource ds = new DriverManagerDataSource();
+            Properties props = new Properties();
+            props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, String.valueOf(KylinConfig.getInstanceFromEnv().getScanThreshold()));
+            ds.setConnectionProperties(props);
+            ds.setDriverClassName(Driver.class.getName());
+            ds.setUrl("jdbc:calcite:model=" + modelJson.getAbsolutePath());
+
+            ret = olapDataSources.putIfAbsent(project, ds);
+            if (ret == null) {
+                ret = ds;
+            }
+        }
+        return ret;
+    }
 
     public void rebuildCache(Broadcaster.TYPE cacheType, String cacheKey) {
         final String log = "rebuild cache type: " + cacheType + " name:" + cacheKey;
         logger.info(log);
         try {
             switch (cacheType) {
-                case CUBE:
-                    CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey);
-                    getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey);
-                    getProjectManager().clearL2Cache();
-                    //clean query related cache first
-                    super.cleanDataCache(newCube.getUuid());
-                    cubeService.updateOnNewSegmentReady(cacheKey);
-                    break;
-                case STREAMING:
-                    getSreamingManager().reloadStreamingConfigLocal(cacheKey);
-                    break;
-                case KAFKA:
-                    getKafkaManager().reloadKafkaConfigLocal(cacheKey);
-                    break;
-                case CUBE_DESC:
-                    getCubeDescManager().reloadCubeDescLocal(cacheKey);
-                    break;
-                case PROJECT:
-                    ProjectInstance projectInstance = getProjectManager().reloadProjectLocal(cacheKey);
+            case CUBE:
+                CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey);
+                getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey);
+                getProjectManager().clearL2Cache();
+                //clean query related cache first
+                if (newCube != null) {
+                    cleanDataCache(newCube.getUuid());
+                }
+                cubeService.updateOnNewSegmentReady(cacheKey);
+                break;
+            case STREAMING:
+                getStreamingManager().reloadStreamingConfigLocal(cacheKey);
+                break;
+            case KAFKA:
+                getKafkaManager().reloadKafkaConfigLocal(cacheKey);
+                break;
+            case CUBE_DESC:
+                getCubeDescManager().reloadCubeDescLocal(cacheKey);
+                break;
+            case PROJECT:
+                ProjectInstance projectInstance = getProjectManager().reloadProjectLocal(cacheKey);
+                if (projectInstance != null) {
                     removeOLAPDataSource(projectInstance.getName());
-                    break;
-                case INVERTED_INDEX:
-                    //II update does not need to update storage cache because it is dynamic already
-                    getIIManager().reloadIILocal(cacheKey);
-                    getHybridManager().reloadHybridInstanceByChild(RealizationType.INVERTED_INDEX, cacheKey);
-                    getProjectManager().clearL2Cache();
-                    break;
-                case INVERTED_INDEX_DESC:
-                    getIIDescManager().reloadIIDescLocal(cacheKey);
-                    break;
-                case TABLE:
-                    getMetadataManager().reloadTableCache(cacheKey);
-                    IIDescManager.clearCache();
-                    CubeDescManager.clearCache();
-                    break;
-                case DATA_MODEL:
-                    getMetadataManager().reloadDataModelDesc(cacheKey);
-                    IIDescManager.clearCache();
-                    CubeDescManager.clearCache();
-                    break;
-                case ALL:
-                    MetadataManager.clearCache();
-                    CubeDescManager.clearCache();
-                    CubeManager.clearCache();
-                    IIDescManager.clearCache();
-                    IIManager.clearCache();
-                    HybridManager.clearCache();
-                    RealizationRegistry.clearCache();
-                    ProjectManager.clearCache();
-                    KafkaConfigManager.clearCache();
-                    StreamingManager.clearCache();
-                    super.cleanAllDataCache();
-                    BasicService.removeAllOLAPDataSources();
-                    break;
-                default:
-                    throw new RuntimeException("invalid cacheType:" + cacheType);
+                }
+                break;
+            case INVERTED_INDEX:
+                //II update does not need to update storage cache because it is dynamic already
+                getIIManager().reloadIILocal(cacheKey);
+                getHybridManager().reloadHybridInstanceByChild(RealizationType.INVERTED_INDEX, cacheKey);
+                getProjectManager().clearL2Cache();
+                break;
+            case INVERTED_INDEX_DESC:
+                getIIDescManager().reloadIIDescLocal(cacheKey);
+                break;
+            case TABLE:
+                getMetadataManager().reloadTableCache(cacheKey);
+                IIDescManager.clearCache();
+                CubeDescManager.clearCache();
+                break;
+            case DATA_MODEL:
+                getMetadataManager().reloadDataModelDesc(cacheKey);
+                IIDescManager.clearCache();
+                CubeDescManager.clearCache();
+                break;
+            case ALL:
+                MetadataManager.clearCache();
+                CubeDescManager.clearCache();
+                CubeManager.clearCache();
+                IIDescManager.clearCache();
+                IIManager.clearCache();
+                HybridManager.clearCache();
+                RealizationRegistry.clearCache();
+                ProjectManager.clearCache();
+                KafkaConfigManager.clearCache();
+                StreamingManager.clearCache();
+
+                cleanAllDataCache();
+                removeAllOLAPDataSources();
+                break;
+            default:
+                throw new RuntimeException("invalid cacheType:" + cacheType);
             }
         } catch (IOException e) {
             throw new RuntimeException("error " + log, e);
@@ -124,31 +221,31 @@ public class CacheService extends BasicService {
         final String log = "remove cache type: " + cacheType + " name:" + cacheKey;
         try {
             switch (cacheType) {
-                case CUBE:
-                if (getCubeManager().getCube(cacheKey) != null) {
-                    String storageUUID = getCubeManager().getCube(cacheKey).getUuid();
-                    getCubeManager().removeCubeLocal(cacheKey);
-                    super.cleanDataCache(storageUUID);
+            case CUBE:
+                CubeInstance cube = getCubeManager().getCube(cacheKey);
+                getCubeManager().removeCubeLocal(cacheKey);
+                if (cube != null) {
+                    cleanDataCache(cube.getUuid());
                 }
-                    break;
-                case CUBE_DESC:
-                    getCubeDescManager().removeLocalCubeDesc(cacheKey);
-                    break;
-                case PROJECT:
-                    ProjectManager.clearCache();
-                    break;
-                case INVERTED_INDEX:
-                    getIIManager().removeIILocal(cacheKey);
-                    break;
-                case INVERTED_INDEX_DESC:
-                    getIIDescManager().removeIIDescLocal(cacheKey);
-                    break;
-                case TABLE:
-                    throw new UnsupportedOperationException(log);
-                case DATA_MODEL:
-                    throw new UnsupportedOperationException(log);
-                default:
-                    throw new RuntimeException("invalid cacheType:" + cacheType);
+                break;
+            case CUBE_DESC:
+                getCubeDescManager().removeLocalCubeDesc(cacheKey);
+                break;
+            case PROJECT:
+                ProjectManager.clearCache();
+                break;
+            case INVERTED_INDEX:
+                getIIManager().removeIILocal(cacheKey);
+                break;
+            case INVERTED_INDEX_DESC:
+                getIIDescManager().removeIIDescLocal(cacheKey);
+                break;
+            case TABLE:
+                throw new UnsupportedOperationException(log);
+            case DATA_MODEL:
+                throw new UnsupportedOperationException(log);
+            default:
+                throw new RuntimeException("invalid cacheType:" + cacheType);
             }
         } catch (IOException e) {
             throw new RuntimeException("error " + log, e);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 4e96360..a1e8a29 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -68,6 +68,7 @@ import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.h2.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.security.access.AccessDeniedException;
 import org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.security.core.context.SecurityContextHolder;
@@ -81,6 +82,9 @@ public class QueryService extends BasicService {
 
     private static final Logger logger = LoggerFactory.getLogger(QueryService.class);
 
+    @Autowired
+    private CacheService cacheService;
+    
     public static final String USER_QUERY_FAMILY = "q";
     private static final String DEFAULT_TABLE_PREFIX = "kylin_metadata";
     private static final String USER_TABLE_NAME = "_user";
@@ -282,7 +286,7 @@ public class QueryService extends BasicService {
             return Collections.emptyList();
         }
         try {
-            DataSource dataSource = getOLAPDataSource(project);
+            DataSource dataSource = cacheService.getOLAPDataSource(project);
             conn = dataSource.getConnection();
             DatabaseMetaData metaData = conn.getMetaData();
 
@@ -342,7 +346,7 @@ public class QueryService extends BasicService {
         List<SelectedColumnMeta> columnMetas = new LinkedList<SelectedColumnMeta>();
 
         try {
-            conn = getOLAPDataSource(sqlRequest.getProject()).getConnection();
+            conn = cacheService.getOLAPDataSource(sqlRequest.getProject()).getConnection();
 
             if (sqlRequest instanceof PrepareSqlRequest) {
                 PreparedStatement preparedState = conn.prepareStatement(sql);
@@ -481,4 +485,28 @@ public class QueryService extends BasicService {
             return -1;
         }
     }
+
+    private static void close(ResultSet resultSet, Statement stat, Connection conn) {
+        OLAPContext.clearParameter();
+
+        if (resultSet != null)
+            try {
+                resultSet.close();
+            } catch (SQLException e) {
+                logger.error("failed to close", e);
+            }
+        if (stat != null)
+            try {
+                stat.close();
+            } catch (SQLException e) {
+                logger.error("failed to close", e);
+            }
+        if (conn != null)
+            try {
+                conn.close();
+            } catch (SQLException e) {
+                logger.error("failed to close", e);
+            }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
index 0f67ac8..e40426b 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -41,9 +41,9 @@ public class StreamingService extends BasicService {
         List<StreamingConfig> streamingConfigs = new ArrayList();
         CubeInstance cubeInstance = (null != cubeName) ? getCubeManager().getCube(cubeName) : null;
         if (null == cubeInstance) {
-            streamingConfigs = getSreamingManager().listAllStreaming();
+            streamingConfigs = getStreamingManager().listAllStreaming();
         } else {
-            for(StreamingConfig config : getSreamingManager().listAllStreaming()){
+            for(StreamingConfig config : getStreamingManager().listAllStreaming()){
                 if(cubeInstance.getName().equals(config.getCubeName())){
                     streamingConfigs.add(config);
                 }
@@ -70,21 +70,21 @@ public class StreamingService extends BasicService {
     }
 
     public StreamingConfig createStreamingConfig(StreamingConfig config) throws IOException {
-        if (getSreamingManager().getStreamingConfig(config.getName()) != null) {
+        if (getStreamingManager().getStreamingConfig(config.getName()) != null) {
             throw new InternalErrorException("The streamingConfig named " + config.getName() + " already exists");
         }
-        StreamingConfig streamingConfig =  getSreamingManager().saveStreamingConfig(config);
+        StreamingConfig streamingConfig =  getStreamingManager().saveStreamingConfig(config);
         return streamingConfig;
     }
 
 //    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
     public StreamingConfig updateStreamingConfig(StreamingConfig config) throws IOException {
-        return getSreamingManager().updateStreamingConfig(config);
+        return getStreamingManager().updateStreamingConfig(config);
     }
 
 //    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
     public void dropStreamingConfig(StreamingConfig config) throws IOException {
-        getSreamingManager().removeStreamingConfig(config);
+        getStreamingManager().removeStreamingConfig(config);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index 88d6273..af1bbc0 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -248,7 +248,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
         CubeUpdate cubeBuilder = new CubeUpdate(cube);
         cubeBuilder.setToAddSegs(segment);
         cube = cubeManager.updateCube(cubeBuilder);
-        //one for cube update, one for project update
+        //one for cube update
         assertEquals(1, broadcaster.getCounterAndClear());
         waitForCounterAndClear(1);
         assertEquals(1, cubeManagerB.getCube(cubeName).getSegments().size());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java
index 7ab08f8..226380c 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java
@@ -38,12 +38,15 @@ public class CubeServiceTest extends ServiceTestBase {
     @Autowired
     CubeService cubeService;
 
+    @Autowired
+    private CacheService cacheService;
+    
     @Test
     public void testBasics() throws JsonProcessingException, JobException, UnknownHostException {
         Assert.assertNotNull(cubeService.getConfig());
-        Assert.assertNotNull(cubeService.getKylinConfig());
+        Assert.assertNotNull(cubeService.getConfig());
         Assert.assertNotNull(cubeService.getMetadataManager());
-        Assert.assertNotNull(cubeService.getOLAPDataSource(ProjectInstance.DEFAULT_PROJECT_NAME));
+        Assert.assertNotNull(cacheService.getOLAPDataSource(ProjectInstance.DEFAULT_PROJECT_NAME));
 
         Assert.assertTrue(CubeService.getCubeDescNameFromCube("testCube").equals("testCube_desc"));
         Assert.assertTrue(CubeService.getCubeNameFromDesc("testCube_desc").equals("testCube"));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
index a53dd3c..41057e5 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
@@ -34,12 +34,15 @@ public class JobServiceTest extends ServiceTestBase {
     @Autowired
     JobService jobService;
 
+    @Autowired
+    private CacheService cacheService;
+    
     @Test
     public void testBasics() throws JobException, IOException {
         Assert.assertNotNull(jobService.getConfig());
-        Assert.assertNotNull(jobService.getKylinConfig());
+        Assert.assertNotNull(jobService.getConfig());
         Assert.assertNotNull(jobService.getMetadataManager());
-        Assert.assertNotNull(jobService.getOLAPDataSource(ProjectInstance.DEFAULT_PROJECT_NAME));
+        Assert.assertNotNull(cacheService.getOLAPDataSource(ProjectInstance.DEFAULT_PROJECT_NAME));
         Assert.assertNull(jobService.getJobInstance("job_not_exist"));
         Assert.assertNotNull(jobService.listAllJobs(null, null, null));
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index 44e5ef6..cc86e1e 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -37,12 +37,15 @@ public class QueryServiceTest extends ServiceTestBase {
     @Autowired
     QueryService queryService;
 
+    @Autowired
+    private CacheService cacheService;
+    
     @Test
     public void testBasics() throws JobException, IOException, SQLException {
         Assert.assertNotNull(queryService.getConfig());
-        Assert.assertNotNull(queryService.getKylinConfig());
+        Assert.assertNotNull(queryService.getConfig());
         Assert.assertNotNull(queryService.getMetadataManager());
-        Assert.assertNotNull(queryService.getOLAPDataSource(ProjectInstance.DEFAULT_PROJECT_NAME));
+        Assert.assertNotNull(cacheService.getOLAPDataSource(ProjectInstance.DEFAULT_PROJECT_NAME));
 
         //        Assert.assertTrue(queryService.getQueries("ADMIN").size() == 0);
         //

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
index fd613f4..f8dc945 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
@@ -69,8 +69,7 @@ public class ServiceTestBase extends LocalFileMetadataTestCase {
         IIManager.clearCache();
         RealizationRegistry.clearCache();
         ProjectManager.clearCache();
-        BasicService.removeAllOLAPDataSources();
-
+        CacheService.removeAllOLAPDataSources();
     }
 
     @After



[2/3] incubator-kylin git commit: KYLIN-1127 Broadcaster be configured by KylinConfig (instead of spring profile)

Posted by li...@apache.org.
KYLIN-1127 Broadcaster be configured by KylinConfig (instead of spring profile)


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2f4595ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2f4595ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2f4595ae

Branch: refs/heads/2.x-staging
Commit: 2f4595ae9db582a5b1c498534916f90db22b55b2
Parents: f1dedab
Author: Yang Li <li...@apache.org>
Authored: Mon Nov 9 11:15:12 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Nov 9 11:15:12 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/cache/CacheUpdater.java | 11 ----
 .../kylin/common/cache/LocalCacheUpdater.java   | 18 ------
 .../kylin/common/cache/RemoteCacheUpdater.java  | 14 -----
 .../common/restclient/AbstractRestCache.java    | 17 +++---
 .../kylin/common/restclient/Broadcaster.java    | 58 +++++++++++++++-----
 .../restclient/CaseInsensitiveStringCache.java  |  6 +-
 .../common/restclient/SingleValueCache.java     | 16 +++---
 .../org/apache/kylin/cube/CubeDescManager.java  |  3 +-
 .../java/org/apache/kylin/cube/CubeManager.java |  4 +-
 .../apache/kylin/metadata/MetadataManager.java  | 10 +++-
 .../kylin/metadata/project/ProjectManager.java  |  3 +-
 .../kylin/storage/hybrid/HybridManager.java     |  4 +-
 .../engine/streaming/StreamingManager.java      |  3 +-
 .../engine/streaming/cli/StreamingCLI.java      |  7 +--
 .../test_case_data/localmeta/kylin.properties   |  2 +-
 .../kylin/invertedindex/IIDescManager.java      |  3 +-
 .../apache/kylin/invertedindex/IIManager.java   |  4 +-
 .../kylin/rest/service/CacheServiceTest.java    |  6 +-
 .../kylin/source/kafka/KafkaConfigManager.java  | 23 ++++----
 19 files changed, 105 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
deleted file mode 100644
index 615ee14..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.kylin.common.cache;
-
-import org.apache.kylin.common.restclient.AbstractRestCache;
-import org.apache.kylin.common.restclient.Broadcaster;
-
-/**
- */
-@SuppressWarnings("rawtypes")
-public interface CacheUpdater {
-    void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache);
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
deleted file mode 100644
index 8d3b648..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.apache.kylin.common.cache;
-
-import org.apache.kylin.common.restclient.AbstractRestCache;
-import org.apache.kylin.common.restclient.Broadcaster;
-
-/**
- */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class LocalCacheUpdater implements CacheUpdater {
-    @Override
-    public void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache) {
-        if (syncAction == Broadcaster.EVENT.CREATE || syncAction == Broadcaster.EVENT.UPDATE) {
-            cache.putLocal(key, value);
-        } else if (syncAction == Broadcaster.EVENT.DROP) {
-            cache.removeLocal(key);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
deleted file mode 100644
index 2927d2d..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.kylin.common.cache;
-
-import org.apache.kylin.common.restclient.AbstractRestCache;
-import org.apache.kylin.common.restclient.Broadcaster;
-
-/**
- */
-@SuppressWarnings("rawtypes")
-public class RemoteCacheUpdater implements CacheUpdater {
-    @Override
-    public void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache) {
-        Broadcaster.getInstance().queue(type.getType(), syncAction.getType(), key.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
index 68d9be5..fc030b4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
@@ -18,8 +18,7 @@
 
 package org.apache.kylin.common.restclient;
 
-import org.apache.kylin.common.cache.CacheUpdater;
-import org.apache.kylin.common.cache.RemoteCacheUpdater;
+import org.apache.kylin.common.KylinConfig;
 
 /**
  * @author xjiang
@@ -27,17 +26,17 @@ import org.apache.kylin.common.cache.RemoteCacheUpdater;
  */
 public abstract class AbstractRestCache<K, V> {
 
-    protected static CacheUpdater cacheUpdater = new RemoteCacheUpdater();
-
-    public static void setCacheUpdater(CacheUpdater cu) {
-        cacheUpdater = cu;
-    }
-
+    protected final KylinConfig config;
     protected final Broadcaster.TYPE syncType;
 
-    protected AbstractRestCache(Broadcaster.TYPE syncType) {
+    protected AbstractRestCache(KylinConfig config, Broadcaster.TYPE syncType) {
+        this.config = config;
         this.syncType = syncType;
     }
+    
+    public Broadcaster getBroadcaster() {
+        return Broadcaster.getInstance(config);
+    }
 
     public abstract void put(K key, V value);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
index 80ec33c..871d77c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -43,24 +44,52 @@ public class Broadcaster {
 
     private static final Logger logger = LoggerFactory.getLogger(Broadcaster.class);
 
+    // static cached instances
+    private static final ConcurrentHashMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>();
+
+    public static Broadcaster getInstance(KylinConfig config) {
+        Broadcaster r = CACHE.get(config);
+        if (r != null) {
+            return r;
+        }
+
+        synchronized (Broadcaster.class) {
+            r = CACHE.get(config);
+            if (r != null) {
+                return r;
+            }
+
+            r = new Broadcaster(config);
+            CACHE.put(config, r);
+            if (CACHE.size() > 1) {
+                logger.warn("More than one cubemanager singleton exist");
+            }
+            return r;
+        }
+    }
+
+    public static void clearCache() {
+        CACHE.clear();
+    }
+
+    // ============================================================================
+
     private BlockingDeque<BroadcastEvent> broadcastEvents = new LinkedBlockingDeque<>();
 
     private AtomicLong counter = new AtomicLong();
 
-    static class BroadcasterHolder {
-        static final Broadcaster INSTANCE = new Broadcaster();
-    }
+    private Broadcaster(final KylinConfig config) {
+        final String[] nodes = config.getRestServers();
+        if (nodes == null || nodes.length < 1) {
+            logger.warn("There is no available rest server; check the 'kylin.rest.servers' config");
+            broadcastEvents = null; // disable the broadcaster
+            return;
+        }
+        logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
 
-    private Broadcaster() {
         Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() {
             @Override
             public void run() {
-                final String[] nodes = KylinConfig.getInstanceFromEnv().getRestServers();
-                if (nodes == null || nodes.length < 1) {//TODO if the node count is greater than 1, it means it is a cluster
-                    logger.warn("There is no available rest server; check the 'kylin.rest.servers' config");
-                    return;
-                }
-                logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
                 final List<RestClient> restClients = Lists.newArrayList();
                 for (String node : nodes) {
                     restClients.add(new RestClient(node));
@@ -90,10 +119,6 @@ public class Broadcaster {
         });
     }
 
-    public static Broadcaster getInstance() {
-        return BroadcasterHolder.INSTANCE;
-    }
-
     /**
      * Broadcast the cubedesc event out
      * 
@@ -101,6 +126,9 @@ public class Broadcaster {
      *            event action
      */
     public void queue(String type, String action, String key) {
+        if (broadcastEvents == null)
+            return;
+
         try {
             counter.incrementAndGet();
             broadcastEvents.putFirst(new BroadcastEvent(type, action, key));
@@ -138,7 +166,7 @@ public class Broadcaster {
     }
 
     public enum TYPE {
-        ALL("all"), CUBE("cube"),STREAMING("streaming"),KAFKA("kafka"),CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid");
+        ALL("all"), CUBE("cube"), STREAMING("streaming"), KAFKA("kafka"), CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid");
         private String text;
 
         TYPE(String text) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
index 68e3c04..2bcddbf 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
@@ -20,12 +20,14 @@ package org.apache.kylin.common.restclient;
 
 import java.util.concurrent.ConcurrentSkipListMap;
 
+import org.apache.kylin.common.KylinConfig;
+
 /**
  */
 public class CaseInsensitiveStringCache<V> extends SingleValueCache<String, V> {
 
-    public CaseInsensitiveStringCache(Broadcaster.TYPE syncType) {
-        super(syncType, new ConcurrentSkipListMap<String, V>(String.CASE_INSENSITIVE_ORDER));
+    public CaseInsensitiveStringCache(KylinConfig config, Broadcaster.TYPE syncType) {
+        super(config, syncType, new ConcurrentSkipListMap<String, V>(String.CASE_INSENSITIVE_ORDER));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
index cb6c286..9acfeca 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
@@ -25,6 +25,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.kylin.common.KylinConfig;
+
 /**
  * @author xjiang
  * 
@@ -33,12 +35,12 @@ public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> {
 
     private final ConcurrentMap<K, V> innerCache;
 
-    public SingleValueCache(Broadcaster.TYPE syncType) {
-        this(syncType, new ConcurrentHashMap<K, V>());
+    public SingleValueCache(KylinConfig config, Broadcaster.TYPE syncType) {
+        this(config, syncType, new ConcurrentHashMap<K, V>());
     }
 
-    public SingleValueCache(Broadcaster.TYPE syncType, ConcurrentMap<K, V> innerCache) {
-        super(syncType);
+    public SingleValueCache(KylinConfig config, Broadcaster.TYPE syncType, ConcurrentMap<K, V> innerCache) {
+        super(config, syncType);
         this.innerCache = innerCache;
     }
 
@@ -48,9 +50,9 @@ public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> {
         innerCache.put(key, value);
 
         if (!exists) {
-            cacheUpdater.updateCache(key, value, Broadcaster.EVENT.CREATE, syncType, this);
+            getBroadcaster().queue(syncType.getType(), Broadcaster.EVENT.CREATE.getType(), key.toString());
         } else {
-            cacheUpdater.updateCache(key, value, Broadcaster.EVENT.UPDATE, syncType, this);
+            getBroadcaster().queue(syncType.getType(), Broadcaster.EVENT.UPDATE.getType(), key.toString());
         }
     }
 
@@ -64,7 +66,7 @@ public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> {
         innerCache.remove(key);
         
         if (exists) {
-            cacheUpdater.updateCache(key, null, Broadcaster.EVENT.DROP, syncType, this);
+            getBroadcaster().queue(syncType.getType(), Broadcaster.EVENT.DROP.getType(), key.toString());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
index 8e75d29..c50836c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
@@ -57,7 +57,7 @@ public class CubeDescManager {
 
     private KylinConfig config;
     // name ==> CubeDesc
-    private CaseInsensitiveStringCache<CubeDesc> cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(Broadcaster.TYPE.CUBE_DESC);
+    private CaseInsensitiveStringCache<CubeDesc> cubeDescMap;
 
     public static CubeDescManager getInstance(KylinConfig config) {
         CubeDescManager r = CACHE.get(config);
@@ -90,6 +90,7 @@ public class CubeDescManager {
     private CubeDescManager(KylinConfig config) throws IOException {
         logger.info("Initializing CubeDescManager with config " + config);
         this.config = config;
+        this.cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(config, Broadcaster.TYPE.CUBE_DESC);
         reloadAllCubeDesc();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 9b7a024..a249cd7 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -116,7 +116,7 @@ public class CubeManager implements IRealizationProvider {
 
     private KylinConfig config;
     // cube name ==> CubeInstance
-    private CaseInsensitiveStringCache<CubeInstance> cubeMap = new CaseInsensitiveStringCache<CubeInstance>(Broadcaster.TYPE.CUBE);
+    private CaseInsensitiveStringCache<CubeInstance> cubeMap;
     // "table/column" ==> lookup table
     //    private SingleValueCache<String, LookupStringTable> lookupTables = new SingleValueCache<String, LookupStringTable>(Broadcaster.TYPE.METADATA);
 
@@ -126,7 +126,7 @@ public class CubeManager implements IRealizationProvider {
     private CubeManager(KylinConfig config) throws IOException {
         logger.info("Initializing CubeManager with config " + config);
         this.config = config;
-
+        this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, Broadcaster.TYPE.CUBE);
         loadAllCubeInstance();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
index b7e7dc5..c907afd 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
@@ -103,11 +103,11 @@ public class MetadataManager {
 
     private KylinConfig config;
     // table name ==> SourceTable
-    private CaseInsensitiveStringCache<TableDesc> srcTableMap = new CaseInsensitiveStringCache<TableDesc>(Broadcaster.TYPE.TABLE);
+    private CaseInsensitiveStringCache<TableDesc> srcTableMap;
     // name => value
-    private CaseInsensitiveStringCache<Map<String, String>> srcTableExdMap = new CaseInsensitiveStringCache<Map<String, String>>(Broadcaster.TYPE.TABLE);
+    private CaseInsensitiveStringCache<Map<String, String>> srcTableExdMap;
     // name => DataModelDesc
-    private CaseInsensitiveStringCache<DataModelDesc> dataModelDescMap = new CaseInsensitiveStringCache<DataModelDesc>(Broadcaster.TYPE.DATA_MODEL);
+    private CaseInsensitiveStringCache<DataModelDesc> dataModelDescMap;
 
     private MetadataManager(KylinConfig config) throws IOException {
         init(config);
@@ -198,6 +198,10 @@ public class MetadataManager {
 
     private void init(KylinConfig config) throws IOException {
         this.config = config;
+        this.srcTableMap = new CaseInsensitiveStringCache<TableDesc>(config, Broadcaster.TYPE.TABLE);
+        this.srcTableExdMap = new CaseInsensitiveStringCache<Map<String, String>>(config, Broadcaster.TYPE.TABLE);
+        this.dataModelDescMap = new CaseInsensitiveStringCache<DataModelDesc>(config, Broadcaster.TYPE.DATA_MODEL);
+        
         reloadAllSourceTable();
         reloadAllSourceTableExd();
         reloadAllDataModel();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index fd41f59..8304128 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -81,11 +81,12 @@ public class ProjectManager {
     private KylinConfig config;
     private ProjectL2Cache l2Cache;
     // project name => ProjrectInstance
-    private CaseInsensitiveStringCache<ProjectInstance> projectMap = new CaseInsensitiveStringCache<ProjectInstance>(Broadcaster.TYPE.PROJECT);
+    private CaseInsensitiveStringCache<ProjectInstance> projectMap;
 
     private ProjectManager(KylinConfig config) throws IOException {
         logger.info("Initializing ProjectManager with metadata url " + config);
         this.config = config;
+        this.projectMap = new CaseInsensitiveStringCache<ProjectInstance>(config, Broadcaster.TYPE.PROJECT);
         this.l2Cache = new ProjectL2Cache(this);
 
         reloadAllProjects();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
index 0f00f1a..9392ef5 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -76,12 +76,12 @@ public class HybridManager implements IRealizationProvider {
 
     private KylinConfig config;
 
-    private CaseInsensitiveStringCache<HybridInstance> hybridMap = new CaseInsensitiveStringCache<HybridInstance>(Broadcaster.TYPE.HYBRID);
+    private CaseInsensitiveStringCache<HybridInstance> hybridMap;
 
     private HybridManager(KylinConfig config) throws IOException {
         logger.info("Initializing HybridManager with config " + config);
         this.config = config;
-
+        this.hybridMap = new CaseInsensitiveStringCache<HybridInstance>(config, Broadcaster.TYPE.HYBRID);
         loadAllHybridInstance();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index fa7d0f8..8cabe1b 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -79,7 +79,7 @@ public class StreamingManager {
     private KylinConfig config;
 
     // name ==> StreamingConfig
-    private CaseInsensitiveStringCache<StreamingConfig> streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(Broadcaster.TYPE.STREAMING);
+    private CaseInsensitiveStringCache<StreamingConfig> streamingMap;
 
     public static void clearCache() {
         CACHE.clear();
@@ -87,6 +87,7 @@ public class StreamingManager {
 
     private StreamingManager(KylinConfig config) throws IOException {
         this.config = config;
+        this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, Broadcaster.TYPE.STREAMING);
         reloadAllStreaming();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
index e3a7133..a73a6ac 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -34,11 +34,10 @@
 
 package org.apache.kylin.engine.streaming.cli;
 
-import com.google.common.base.Preconditions;
+import java.util.List;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.cache.RemoteCacheUpdater;
-import org.apache.kylin.common.restclient.AbstractRestCache;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.engine.streaming.BootstrapConfig;
 import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
@@ -48,7 +47,7 @@ import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
+import com.google.common.base.Preconditions;
 
 public class StreamingCLI {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/examples/test_case_data/localmeta/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties
index 92773a1..48f01f5 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -1,7 +1,7 @@
 ## Config for Kylin Engine ##
 
 # List of web servers in use, this enables one web server instance to sync up with other servers.
-kylin.rest.servers=localhost:7070
+#kylin.rest.servers=localhost:7070
 
 # The metadata store in hbase
 kylin.metadata.url=

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java
index a166ae7..917fe46 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java
@@ -54,7 +54,7 @@ public class IIDescManager {
 
     private KylinConfig config;
     // name ==> IIDesc
-    private CaseInsensitiveStringCache<IIDesc> iiDescMap = new CaseInsensitiveStringCache<IIDesc>(Broadcaster.TYPE.INVERTED_INDEX_DESC);
+    private CaseInsensitiveStringCache<IIDesc> iiDescMap;
 
     public static IIDescManager getInstance(KylinConfig config) {
         IIDescManager r = CACHE.get(config);
@@ -87,6 +87,7 @@ public class IIDescManager {
     private IIDescManager(KylinConfig config) throws IOException {
         logger.info("Initializing IIDescManager with config " + config);
         this.config = config;
+        this.iiDescMap = new CaseInsensitiveStringCache<IIDesc>(config, Broadcaster.TYPE.INVERTED_INDEX_DESC);
         reloadAllIIDesc();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
index 5633004..b6dfdf1 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
@@ -92,7 +92,7 @@ public class IIManager implements IRealizationProvider {
 
     private KylinConfig config;
     // ii name ==> IIInstance
-    private CaseInsensitiveStringCache<IIInstance> iiMap = new CaseInsensitiveStringCache<IIInstance>(Broadcaster.TYPE.INVERTED_INDEX);
+    private CaseInsensitiveStringCache<IIInstance> iiMap;
 
     // for generation hbase table name of a new segment
     private Multimap<String, String> usedStorageLocation = HashMultimap.create();
@@ -100,7 +100,7 @@ public class IIManager implements IRealizationProvider {
     private IIManager(KylinConfig config) throws IOException {
         logger.info("Initializing IIManager with config " + config);
         this.config = config;
-
+        this.iiMap = new CaseInsensitiveStringCache<IIInstance>(config, Broadcaster.TYPE.INVERTED_INDEX);
         loadAllIIInstance();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index af1bbc0..b90c10a 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -71,7 +71,9 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
     public static void beforeClass() throws Exception {
         staticCreateTestMetadata();
         configA = KylinConfig.getInstanceFromEnv();
+        configA.setProperty("kylin.rest.servers", "localhost:7070");
         configB = KylinConfig.getKylinConfigFromInputStream(KylinConfig.getKylinPropertiesAsInputSteam());
+        configB.setProperty("kylin.rest.servers", "localhost:7070");
         configB.setMetadataUrl("../examples/test_metadata");
 
         server = new Server(7070);
@@ -209,7 +211,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
 
     @Test
     public void testCubeCRUD() throws Exception {
-        final Broadcaster broadcaster = Broadcaster.getInstance();
+        final Broadcaster broadcaster = Broadcaster.getInstance(configA);
         broadcaster.getCounterAndClear();
 
         getStore().deleteResource("/cube/a_whole_new_cube.json");
@@ -306,7 +308,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
     public void testMetaCRUD() throws Exception {
         final MetadataManager metadataManager = MetadataManager.getInstance(configA);
         final MetadataManager metadataManagerB = MetadataManager.getInstance(configB);
-        final Broadcaster broadcaster = Broadcaster.getInstance();
+        final Broadcaster broadcaster = Broadcaster.getInstance(configA);
         broadcaster.getCounterAndClear();
 
         TableDesc tableDesc = createTestTableDesc();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index 3032d13..8cf51b6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -34,10 +34,12 @@
 
 package org.apache.kylin.source.kafka;
 
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
@@ -45,17 +47,15 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
-import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
 
 /**
  */
@@ -69,7 +69,7 @@ public class KafkaConfigManager {
     private KylinConfig config;
 
     // name ==> StreamingConfig
-    private CaseInsensitiveStringCache<KafkaConfig> kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(Broadcaster.TYPE.KAFKA);
+    private CaseInsensitiveStringCache<KafkaConfig> kafkaMap;
 
     public static final Serializer<KafkaConfig> KAFKA_SERIALIZER = new JsonSerializer<KafkaConfig>(KafkaConfig.class);
 
@@ -79,6 +79,7 @@ public class KafkaConfigManager {
 
     private KafkaConfigManager(KylinConfig config) throws IOException {
         this.config = config;
+        this.kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(config, Broadcaster.TYPE.KAFKA);
         reloadAllKafkaConfig();
     }
 


[3/3] incubator-kylin git commit: KYLIN-1127 Adopt listener pattern to wipe query cache on cube update

Posted by li...@apache.org.
KYLIN-1127 Adopt listener pattern to wipe query cache on cube update


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/a3397d04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/a3397d04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/a3397d04

Branch: refs/heads/2.x-staging
Commit: a3397d044d6cd333d8f9d94bfd75aa603180d67e
Parents: 2f4595a
Author: Yang Li <li...@apache.org>
Authored: Mon Nov 9 11:53:43 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Nov 9 11:53:43 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java | 34 ++++++++--
 .../kylin/storage/hybrid/HybridManager.java     |  4 +-
 .../apache/kylin/rest/service/CacheService.java | 68 +++++++++++++++-----
 .../kylin/rest/service/CacheServiceTest.java    |  2 +
 4 files changed, 85 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index a249cd7..89873d2 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -236,6 +236,9 @@ public class CubeManager implements IRealizationProvider {
 
         // delete cube from project
         ProjectManager.getInstance(config).removeRealizationsFromProjects(RealizationType.CUBE, cubeName);
+        
+        if (listener != null)
+            listener.afterCubeDelete(cube);
 
         return cube;
     }
@@ -248,14 +251,22 @@ public class CubeManager implements IRealizationProvider {
         CubeInstance cube = CubeInstance.create(cubeName, projectName, desc);
         cube.setOwner(owner);
 
-        updateCube(new CubeUpdate(cube));
+        updateCubeWithRetry(new CubeUpdate(cube), 0);
         ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cubeName, projectName, owner);
 
+        if (listener != null)
+            listener.afterCubeCreate(cube);
+        
         return cube;
     }
 
     public CubeInstance updateCube(CubeUpdate update) throws IOException {
-        return updateCube(update, 0);
+        CubeInstance cube = updateCubeWithRetry(update, 0);
+
+        if (listener != null)
+            listener.afterCubeUpdate(cube);
+        
+        return cube;
     }
 
     private boolean validateReadySegments(CubeInstance cube) {
@@ -283,7 +294,7 @@ public class CubeManager implements IRealizationProvider {
         return true;
     }
 
-    private CubeInstance updateCube(CubeUpdate update, int retry) throws IOException {
+    private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IOException {
         if (update == null || update.getCubeInstance() == null)
             throw new IllegalStateException();
 
@@ -348,7 +359,7 @@ public class CubeManager implements IRealizationProvider {
             cube = reloadCubeLocal(cube.getName());
             update.setCubeInstance(cube);
             retry++;
-            cube = updateCube(update, retry);
+            cube = updateCubeWithRetry(update, retry);
         }
 
         if (toRemoveResources.size() > 0) {
@@ -850,4 +861,19 @@ public class CubeManager implements IRealizationProvider {
         return getCube(name);
     }
 
+    // ============================================================================
+    
+    public interface CubeChangeListener {
+        void afterCubeCreate(CubeInstance cube);
+
+        void afterCubeUpdate(CubeInstance cube);
+
+        void afterCubeDelete(CubeInstance cube);
+    }
+    
+    private CubeChangeListener listener;
+    
+    public void setCubeChangeListener(CubeChangeListener listener) {
+        this.listener = listener;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
index 9392ef5..5f16b6b 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -98,7 +98,7 @@ public class HybridManager implements IRealizationProvider {
         logger.debug("Loaded " + paths.size() + " Hybrid(s)");
     }
 
-    public void reloadHybridInstanceByChild(RealizationType type, String realizationName) throws IOException {
+    public void reloadHybridInstanceByChild(RealizationType type, String realizationName) {
         for (HybridInstance hybridInstance : hybridMap.values()) {
             boolean includes = false;
             for (IRealization realization : hybridInstance.getRealizations()) {
@@ -113,7 +113,7 @@ public class HybridManager implements IRealizationProvider {
         }
     }
 
-    private synchronized HybridInstance loadHybridInstance(String path) throws IOException {
+    private synchronized HybridInstance loadHybridInstance(String path) {
         ResourceStore store = getStore();
 
         HybridInstance hybridInstance = null;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index f9c3ec1..8371907 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -26,6 +26,7 @@ import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import javax.annotation.PostConstruct;
 import javax.sql.DataSource;
 
 import net.sf.ehcache.CacheManager;
@@ -72,6 +73,28 @@ public class CacheService extends BasicService {
     @Autowired
     private CacheManager cacheManager;
 
+    @PostConstruct
+    public void initCubeChangeListener() throws IOException {
+        CubeManager cubeMgr = CubeManager.getInstance(getConfig());
+        cubeMgr.setCubeChangeListener(new CubeManager.CubeChangeListener() {
+
+            @Override
+            public void afterCubeCreate(CubeInstance cube) {
+                // no cache need change
+            }
+
+            @Override
+            public void afterCubeUpdate(CubeInstance cube) {
+                rebuildCubeCache(cube.getName());
+            }
+
+            @Override
+            public void afterCubeDelete(CubeInstance cube) {
+                removeCubeCache(cube.getName(), cube);
+            }
+        });
+    }
+
     // for test
     public void setCubeService(CubeService cubeService) {
         this.cubeService = cubeService;
@@ -151,14 +174,7 @@ public class CacheService extends BasicService {
         try {
             switch (cacheType) {
             case CUBE:
-                CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey);
-                getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey);
-                getProjectManager().clearL2Cache();
-                //clean query related cache first
-                if (newCube != null) {
-                    cleanDataCache(newCube.getUuid());
-                }
-                cubeService.updateOnNewSegmentReady(cacheKey);
+                rebuildCubeCache(cacheKey);
                 break;
             case STREAMING:
                 getStreamingManager().reloadStreamingConfigLocal(cacheKey);
@@ -170,10 +186,8 @@ public class CacheService extends BasicService {
                 getCubeDescManager().reloadCubeDescLocal(cacheKey);
                 break;
             case PROJECT:
-                ProjectInstance projectInstance = getProjectManager().reloadProjectLocal(cacheKey);
-                if (projectInstance != null) {
-                    removeOLAPDataSource(projectInstance.getName());
-                }
+                getProjectManager().reloadProjectLocal(cacheKey);
+                removeOLAPDataSource(cacheKey);
                 break;
             case INVERTED_INDEX:
                 //II update does not need to update storage cache because it is dynamic already
@@ -217,16 +231,23 @@ public class CacheService extends BasicService {
         }
     }
 
+    private void rebuildCubeCache(String cubeName) {
+        CubeInstance cube = getCubeManager().reloadCubeLocal(cubeName);
+        getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cubeName);
+        getProjectManager().clearL2Cache();
+        //clean query related cache first
+        if (cube != null) {
+            cleanDataCache(cube.getUuid());
+        }
+        cubeService.updateOnNewSegmentReady(cubeName);
+    }
+
     public void removeCache(Broadcaster.TYPE cacheType, String cacheKey) {
         final String log = "remove cache type: " + cacheType + " name:" + cacheKey;
         try {
             switch (cacheType) {
             case CUBE:
-                CubeInstance cube = getCubeManager().getCube(cacheKey);
-                getCubeManager().removeCubeLocal(cacheKey);
-                if (cube != null) {
-                    cleanDataCache(cube.getUuid());
-                }
+                removeCubeCache(cacheKey, null);
                 break;
             case CUBE_DESC:
                 getCubeDescManager().removeLocalCubeDesc(cacheKey);
@@ -251,4 +272,17 @@ public class CacheService extends BasicService {
             throw new RuntimeException("error " + log, e);
         }
     }
+
+    private void removeCubeCache(String cubeName, CubeInstance cube) {
+        // you may not get the cube instance if it's already removed from metadata
+        if (cube == null) {
+            cube = getCubeManager().getCube(cubeName);
+        }
+        
+        getCubeManager().removeCubeLocal(cubeName);
+        
+        if (cube != null) {
+            cleanDataCache(cube.getUuid());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index b90c10a..25b131a 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -108,7 +108,9 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
         };
 
         serviceA.setCubeService(cubeServiceA);
+        serviceA.initCubeChangeListener();
         serviceB.setCubeService(cubeServiceB);
+        serviceB.initCubeChangeListener();
 
         context.addServlet(new ServletHolder(new BroadcasterReceiveServlet(new BroadcasterReceiveServlet.BroadcasterHandler() {
             @Override