You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/09 04:54:41 UTC
[1/3] incubator-kylin git commit: KYLIN-1127 Move cache related stuff
from BasicService into CacheService
Repository: incubator-kylin
Updated Branches:
refs/heads/2.x-staging f9f96404f -> a3397d044
KYLIN-1127 Move cache related stuff from BasicService into CacheService
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f1dedab7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f1dedab7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f1dedab7
Branch: refs/heads/2.x-staging
Commit: f1dedab768f52b7dcb22e16b1897167703897fc4
Parents: f9f9640
Author: Yang Li <li...@apache.org>
Authored: Mon Nov 9 10:41:59 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Nov 9 10:41:59 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeManager.java | 8 +-
.../engine/streaming/cli/StreamingCLI.java | 2 -
.../kylin/rest/controller/JobController.java | 2 +-
.../rest/controller/StreamingController.java | 2 +-
.../apache/kylin/rest/service/BasicService.java | 129 +---------
.../apache/kylin/rest/service/CacheService.java | 257 +++++++++++++------
.../apache/kylin/rest/service/QueryService.java | 32 ++-
.../kylin/rest/service/StreamingService.java | 12 +-
.../kylin/rest/service/CacheServiceTest.java | 2 +-
.../kylin/rest/service/CubeServiceTest.java | 7 +-
.../kylin/rest/service/JobServiceTest.java | 7 +-
.../kylin/rest/service/QueryServiceTest.java | 7 +-
.../kylin/rest/service/ServiceTestBase.java | 3 +-
13 files changed, 236 insertions(+), 234 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 2232f01..9b7a024 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -525,11 +525,7 @@ public class CubeManager implements IRealizationProvider {
* @param cubeName
*/
public CubeInstance reloadCubeLocal(String cubeName) {
- try {
- return reloadCubeLocalAt(CubeInstance.concatResourcePath(cubeName));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ return reloadCubeLocalAt(CubeInstance.concatResourcePath(cubeName));
}
public void removeCubeLocal(String cubeName) {
@@ -792,7 +788,7 @@ public class CubeManager implements IRealizationProvider {
logger.debug("Loaded " + paths.size() + " Cube(s)");
}
- private synchronized CubeInstance reloadCubeLocalAt(String path) throws IOException {
+ private synchronized CubeInstance reloadCubeLocalAt(String path) {
ResourceStore store = getStore();
CubeInstance cubeInstance;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
index 3b1693a..e3a7133 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -56,8 +56,6 @@ public class StreamingCLI {
public static void main(String[] args) {
try {
- AbstractRestCache.setCacheUpdater(new RemoteCacheUpdater());
-
Preconditions.checkArgument(args[0].equals("streaming"));
Preconditions.checkArgument(args[1].equals("start"));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 6fb2813..f6323ed 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -71,7 +71,7 @@ public class JobController extends BasicController implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
- String timeZone = jobService.getKylinConfig().getTimeZone();
+ String timeZone = jobService.getConfig().getTimeZone();
TimeZone tzone = TimeZone.getTimeZone(timeZone);
TimeZone.setDefault(tzone);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index 78b63c0..e22bd30 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -158,7 +158,7 @@ public class StreamingController extends BasicController {
@RequestMapping(value = "/{configName}", method = { RequestMethod.DELETE })
@ResponseBody
public void deleteConfig(@PathVariable String configName) throws IOException {
- StreamingConfig config = streamingService.getSreamingManager().getStreamingConfig(configName);
+ StreamingConfig config = streamingService.getStreamingManager().getStreamingConfig(configName);
KafkaConfig kafkaConfig = kafkaConfigService.getKafkaConfig(configName);
if (null == config) {
throw new NotFoundException("StreamingConfig with name " + configName + " not found..");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
index 5ac12ea..9135dfa 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -18,27 +18,12 @@
package org.apache.kylin.rest.service;
-import java.io.File;
import java.io.IOException;
-import java.nio.charset.Charset;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import javax.sql.DataSource;
-
-import net.sf.ehcache.CacheManager;
-
-import org.apache.calcite.jdbc.Driver;
-import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeManager;
@@ -54,104 +39,17 @@ import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.enumerator.OLAPQuery;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.schema.OLAPSchemaFactory;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.storage.hybrid.HybridManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.jdbc.datasource.DriverManagerDataSource;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists;
-import com.google.common.io.Files;
public abstract class BasicService {
- private static final Logger logger = LoggerFactory.getLogger(BasicService.class);
-
- private static ConcurrentMap<String, DataSource> olapDataSources = new ConcurrentHashMap<String, DataSource>();
-
- @Autowired
- private CacheManager cacheManager;
-
public KylinConfig getConfig() {
- return KylinConfig.getInstanceFromEnv();
- }
-
- protected void cleanDataCache(String storageUUID) {
- if (cacheManager != null && cacheManager.getCache(storageUUID) != null) {
- logger.info("cleaning cache for " + storageUUID);
- cacheManager.getCache(storageUUID).removeAll();
- } else {
- logger.warn("skip cleaning cache for " + storageUUID);
- }
- }
-
- protected void cleanAllDataCache() {
- if (cacheManager != null) {
- logger.warn("cleaning all storage cache");
- cacheManager.clearAll();
- } else {
- logger.warn("skip cleaning all storage cache");
- }
- }
-
- public void removeOLAPDataSource(String project) {
- logger.info("removeOLAPDataSource is called for project " + project);
- if (StringUtils.isEmpty(project))
- throw new IllegalArgumentException("removeOLAPDataSource: project name not given");
-
- project = ProjectInstance.getNormalizedProjectName(project);
- olapDataSources.remove(project);
- }
-
- public static void removeAllOLAPDataSources() {
- // brutal, yet simplest way
- logger.info("removeAllOLAPDataSources is called.");
- olapDataSources.clear();
- }
-
- public DataSource getOLAPDataSource(String project) {
-
- project = ProjectInstance.getNormalizedProjectName(project);
-
- DataSource ret = olapDataSources.get(project);
- if (ret == null) {
- logger.debug("Creating a new data source");
- logger.debug("OLAP data source pointing to " + getConfig());
-
- File modelJson = OLAPSchemaFactory.createTempOLAPJson(project, getConfig());
-
- try {
- List<String> text = Files.readLines(modelJson, Charset.defaultCharset());
- logger.debug("The new temp olap json is :");
- for (String line : text)
- logger.debug(line);
- } catch (IOException e) {
- e.printStackTrace(); // logging failure is not critical
- }
-
- DriverManagerDataSource ds = new DriverManagerDataSource();
- Properties props = new Properties();
- props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, String.valueOf(KylinConfig.getInstanceFromEnv().getScanThreshold()));
- ds.setConnectionProperties(props);
- ds.setDriverClassName(Driver.class.getName());
- ds.setUrl("jdbc:calcite:model=" + modelJson.getAbsolutePath());
-
- ret = olapDataSources.putIfAbsent(project, ds);
- if (ret == null) {
- ret = ds;
- }
- }
- return ret;
- }
-
- public final KylinConfig getKylinConfig() {
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
if (kylinConfig == null) {
@@ -160,7 +58,7 @@ public abstract class BasicService {
return kylinConfig;
}
-
+
public final MetadataManager getMetadataManager() {
return MetadataManager.getInstance(getConfig());
}
@@ -169,7 +67,7 @@ public abstract class BasicService {
return CubeManager.getInstance(getConfig());
}
- public final StreamingManager getSreamingManager() {
+ public final StreamingManager getStreamingManager() {
return StreamingManager.getInstance(getConfig());
}
@@ -246,27 +144,4 @@ public abstract class BasicService {
return listAllCubingJobs(cubeName, projectName, EnumSet.allOf(ExecutableState.class), getExecutableManager().getAllOutputs());
}
- protected static void close(ResultSet resultSet, Statement stat, Connection conn) {
- OLAPContext.clearParameter();
-
- if (resultSet != null)
- try {
- resultSet.close();
- } catch (SQLException e) {
- logger.error("failed to close", e);
- }
- if (stat != null)
- try {
- stat.close();
- } catch (SQLException e) {
- logger.error("failed to close", e);
- }
- if (conn != null)
- try {
- conn.close();
- } catch (SQLException e) {
- logger.error("failed to close", e);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 63fe8c4..f9c3ec1 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -18,8 +18,21 @@
package org.apache.kylin.rest.service;
+import java.io.File;
import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.sql.DataSource;
+
+import net.sf.ehcache.CacheManager;
+
+import org.apache.calcite.jdbc.Driver;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.restclient.Broadcaster;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
@@ -32,88 +45,172 @@ import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.realization.RealizationRegistry;
import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.query.enumerator.OLAPQuery;
+import org.apache.kylin.query.schema.OLAPSchemaFactory;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.storage.hybrid.HybridManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.springframework.stereotype.Component;
+import com.google.common.io.Files;
+
/**
*/
@Component("cacheService")
public class CacheService extends BasicService {
+ private static final Logger logger = LoggerFactory.getLogger(CacheService.class);
+
+ private static ConcurrentMap<String, DataSource> olapDataSources = new ConcurrentHashMap<String, DataSource>();
+
@Autowired
private CubeService cubeService;
+ @Autowired
+ private CacheManager cacheManager;
+
+ // for test
public void setCubeService(CubeService cubeService) {
this.cubeService = cubeService;
}
- private static final Logger logger = LoggerFactory.getLogger(CacheService.class);
+ protected void cleanDataCache(String storageUUID) {
+ if (cacheManager != null && cacheManager.getCache(storageUUID) != null) {
+ logger.info("cleaning cache for " + storageUUID);
+ cacheManager.getCache(storageUUID).removeAll();
+ } else {
+ logger.warn("skip cleaning cache for " + storageUUID);
+ }
+ }
+
+ protected void cleanAllDataCache() {
+ if (cacheManager != null) {
+ logger.warn("cleaning all storage cache");
+ cacheManager.clearAll();
+ } else {
+ logger.warn("skip cleaning all storage cache");
+ }
+ }
+
+ protected void removeOLAPDataSource(String project) {
+ logger.info("removeOLAPDataSource is called for project " + project);
+ if (StringUtils.isEmpty(project))
+ throw new IllegalArgumentException("removeOLAPDataSource: project name not given");
+
+ project = ProjectInstance.getNormalizedProjectName(project);
+ olapDataSources.remove(project);
+ }
+
+ public static void removeAllOLAPDataSources() {
+ // brutal, yet simplest way
+ logger.info("removeAllOLAPDataSources is called.");
+ olapDataSources.clear();
+ }
+
+ public DataSource getOLAPDataSource(String project) {
+
+ project = ProjectInstance.getNormalizedProjectName(project);
+
+ DataSource ret = olapDataSources.get(project);
+ if (ret == null) {
+ logger.debug("Creating a new data source");
+ logger.debug("OLAP data source pointing to " + getConfig());
+
+ File modelJson = OLAPSchemaFactory.createTempOLAPJson(project, getConfig());
+
+ try {
+ List<String> text = Files.readLines(modelJson, Charset.defaultCharset());
+ logger.debug("The new temp olap json is :");
+ for (String line : text)
+ logger.debug(line);
+ } catch (IOException e) {
+ e.printStackTrace(); // logging failure is not critical
+ }
+
+ DriverManagerDataSource ds = new DriverManagerDataSource();
+ Properties props = new Properties();
+ props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, String.valueOf(KylinConfig.getInstanceFromEnv().getScanThreshold()));
+ ds.setConnectionProperties(props);
+ ds.setDriverClassName(Driver.class.getName());
+ ds.setUrl("jdbc:calcite:model=" + modelJson.getAbsolutePath());
+
+ ret = olapDataSources.putIfAbsent(project, ds);
+ if (ret == null) {
+ ret = ds;
+ }
+ }
+ return ret;
+ }
public void rebuildCache(Broadcaster.TYPE cacheType, String cacheKey) {
final String log = "rebuild cache type: " + cacheType + " name:" + cacheKey;
logger.info(log);
try {
switch (cacheType) {
- case CUBE:
- CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey);
- getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey);
- getProjectManager().clearL2Cache();
- //clean query related cache first
- super.cleanDataCache(newCube.getUuid());
- cubeService.updateOnNewSegmentReady(cacheKey);
- break;
- case STREAMING:
- getSreamingManager().reloadStreamingConfigLocal(cacheKey);
- break;
- case KAFKA:
- getKafkaManager().reloadKafkaConfigLocal(cacheKey);
- break;
- case CUBE_DESC:
- getCubeDescManager().reloadCubeDescLocal(cacheKey);
- break;
- case PROJECT:
- ProjectInstance projectInstance = getProjectManager().reloadProjectLocal(cacheKey);
+ case CUBE:
+ CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey);
+ getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey);
+ getProjectManager().clearL2Cache();
+ //clean query related cache first
+ if (newCube != null) {
+ cleanDataCache(newCube.getUuid());
+ }
+ cubeService.updateOnNewSegmentReady(cacheKey);
+ break;
+ case STREAMING:
+ getStreamingManager().reloadStreamingConfigLocal(cacheKey);
+ break;
+ case KAFKA:
+ getKafkaManager().reloadKafkaConfigLocal(cacheKey);
+ break;
+ case CUBE_DESC:
+ getCubeDescManager().reloadCubeDescLocal(cacheKey);
+ break;
+ case PROJECT:
+ ProjectInstance projectInstance = getProjectManager().reloadProjectLocal(cacheKey);
+ if (projectInstance != null) {
removeOLAPDataSource(projectInstance.getName());
- break;
- case INVERTED_INDEX:
- //II update does not need to update storage cache because it is dynamic already
- getIIManager().reloadIILocal(cacheKey);
- getHybridManager().reloadHybridInstanceByChild(RealizationType.INVERTED_INDEX, cacheKey);
- getProjectManager().clearL2Cache();
- break;
- case INVERTED_INDEX_DESC:
- getIIDescManager().reloadIIDescLocal(cacheKey);
- break;
- case TABLE:
- getMetadataManager().reloadTableCache(cacheKey);
- IIDescManager.clearCache();
- CubeDescManager.clearCache();
- break;
- case DATA_MODEL:
- getMetadataManager().reloadDataModelDesc(cacheKey);
- IIDescManager.clearCache();
- CubeDescManager.clearCache();
- break;
- case ALL:
- MetadataManager.clearCache();
- CubeDescManager.clearCache();
- CubeManager.clearCache();
- IIDescManager.clearCache();
- IIManager.clearCache();
- HybridManager.clearCache();
- RealizationRegistry.clearCache();
- ProjectManager.clearCache();
- KafkaConfigManager.clearCache();
- StreamingManager.clearCache();
- super.cleanAllDataCache();
- BasicService.removeAllOLAPDataSources();
- break;
- default:
- throw new RuntimeException("invalid cacheType:" + cacheType);
+ }
+ break;
+ case INVERTED_INDEX:
+ //II update does not need to update storage cache because it is dynamic already
+ getIIManager().reloadIILocal(cacheKey);
+ getHybridManager().reloadHybridInstanceByChild(RealizationType.INVERTED_INDEX, cacheKey);
+ getProjectManager().clearL2Cache();
+ break;
+ case INVERTED_INDEX_DESC:
+ getIIDescManager().reloadIIDescLocal(cacheKey);
+ break;
+ case TABLE:
+ getMetadataManager().reloadTableCache(cacheKey);
+ IIDescManager.clearCache();
+ CubeDescManager.clearCache();
+ break;
+ case DATA_MODEL:
+ getMetadataManager().reloadDataModelDesc(cacheKey);
+ IIDescManager.clearCache();
+ CubeDescManager.clearCache();
+ break;
+ case ALL:
+ MetadataManager.clearCache();
+ CubeDescManager.clearCache();
+ CubeManager.clearCache();
+ IIDescManager.clearCache();
+ IIManager.clearCache();
+ HybridManager.clearCache();
+ RealizationRegistry.clearCache();
+ ProjectManager.clearCache();
+ KafkaConfigManager.clearCache();
+ StreamingManager.clearCache();
+
+ cleanAllDataCache();
+ removeAllOLAPDataSources();
+ break;
+ default:
+ throw new RuntimeException("invalid cacheType:" + cacheType);
}
} catch (IOException e) {
throw new RuntimeException("error " + log, e);
@@ -124,31 +221,31 @@ public class CacheService extends BasicService {
final String log = "remove cache type: " + cacheType + " name:" + cacheKey;
try {
switch (cacheType) {
- case CUBE:
- if (getCubeManager().getCube(cacheKey) != null) {
- String storageUUID = getCubeManager().getCube(cacheKey).getUuid();
- getCubeManager().removeCubeLocal(cacheKey);
- super.cleanDataCache(storageUUID);
+ case CUBE:
+ CubeInstance cube = getCubeManager().getCube(cacheKey);
+ getCubeManager().removeCubeLocal(cacheKey);
+ if (cube != null) {
+ cleanDataCache(cube.getUuid());
}
- break;
- case CUBE_DESC:
- getCubeDescManager().removeLocalCubeDesc(cacheKey);
- break;
- case PROJECT:
- ProjectManager.clearCache();
- break;
- case INVERTED_INDEX:
- getIIManager().removeIILocal(cacheKey);
- break;
- case INVERTED_INDEX_DESC:
- getIIDescManager().removeIIDescLocal(cacheKey);
- break;
- case TABLE:
- throw new UnsupportedOperationException(log);
- case DATA_MODEL:
- throw new UnsupportedOperationException(log);
- default:
- throw new RuntimeException("invalid cacheType:" + cacheType);
+ break;
+ case CUBE_DESC:
+ getCubeDescManager().removeLocalCubeDesc(cacheKey);
+ break;
+ case PROJECT:
+ ProjectManager.clearCache();
+ break;
+ case INVERTED_INDEX:
+ getIIManager().removeIILocal(cacheKey);
+ break;
+ case INVERTED_INDEX_DESC:
+ getIIDescManager().removeIIDescLocal(cacheKey);
+ break;
+ case TABLE:
+ throw new UnsupportedOperationException(log);
+ case DATA_MODEL:
+ throw new UnsupportedOperationException(log);
+ default:
+ throw new RuntimeException("invalid cacheType:" + cacheType);
}
} catch (IOException e) {
throw new RuntimeException("error " + log, e);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 4e96360..a1e8a29 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -68,6 +68,7 @@ import org.apache.kylin.storage.hbase.HBaseConnection;
import org.h2.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.AccessDeniedException;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.core.context.SecurityContextHolder;
@@ -81,6 +82,9 @@ public class QueryService extends BasicService {
private static final Logger logger = LoggerFactory.getLogger(QueryService.class);
+ @Autowired
+ private CacheService cacheService;
+
public static final String USER_QUERY_FAMILY = "q";
private static final String DEFAULT_TABLE_PREFIX = "kylin_metadata";
private static final String USER_TABLE_NAME = "_user";
@@ -282,7 +286,7 @@ public class QueryService extends BasicService {
return Collections.emptyList();
}
try {
- DataSource dataSource = getOLAPDataSource(project);
+ DataSource dataSource = cacheService.getOLAPDataSource(project);
conn = dataSource.getConnection();
DatabaseMetaData metaData = conn.getMetaData();
@@ -342,7 +346,7 @@ public class QueryService extends BasicService {
List<SelectedColumnMeta> columnMetas = new LinkedList<SelectedColumnMeta>();
try {
- conn = getOLAPDataSource(sqlRequest.getProject()).getConnection();
+ conn = cacheService.getOLAPDataSource(sqlRequest.getProject()).getConnection();
if (sqlRequest instanceof PrepareSqlRequest) {
PreparedStatement preparedState = conn.prepareStatement(sql);
@@ -481,4 +485,28 @@ public class QueryService extends BasicService {
return -1;
}
}
+
+ private static void close(ResultSet resultSet, Statement stat, Connection conn) {
+ OLAPContext.clearParameter();
+
+ if (resultSet != null)
+ try {
+ resultSet.close();
+ } catch (SQLException e) {
+ logger.error("failed to close", e);
+ }
+ if (stat != null)
+ try {
+ stat.close();
+ } catch (SQLException e) {
+ logger.error("failed to close", e);
+ }
+ if (conn != null)
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ logger.error("failed to close", e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
index 0f67ac8..e40426b 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -41,9 +41,9 @@ public class StreamingService extends BasicService {
List<StreamingConfig> streamingConfigs = new ArrayList();
CubeInstance cubeInstance = (null != cubeName) ? getCubeManager().getCube(cubeName) : null;
if (null == cubeInstance) {
- streamingConfigs = getSreamingManager().listAllStreaming();
+ streamingConfigs = getStreamingManager().listAllStreaming();
} else {
- for(StreamingConfig config : getSreamingManager().listAllStreaming()){
+ for(StreamingConfig config : getStreamingManager().listAllStreaming()){
if(cubeInstance.getName().equals(config.getCubeName())){
streamingConfigs.add(config);
}
@@ -70,21 +70,21 @@ public class StreamingService extends BasicService {
}
public StreamingConfig createStreamingConfig(StreamingConfig config) throws IOException {
- if (getSreamingManager().getStreamingConfig(config.getName()) != null) {
+ if (getStreamingManager().getStreamingConfig(config.getName()) != null) {
throw new InternalErrorException("The streamingConfig named " + config.getName() + " already exists");
}
- StreamingConfig streamingConfig = getSreamingManager().saveStreamingConfig(config);
+ StreamingConfig streamingConfig = getStreamingManager().saveStreamingConfig(config);
return streamingConfig;
}
// @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
public StreamingConfig updateStreamingConfig(StreamingConfig config) throws IOException {
- return getSreamingManager().updateStreamingConfig(config);
+ return getStreamingManager().updateStreamingConfig(config);
}
// @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
public void dropStreamingConfig(StreamingConfig config) throws IOException {
- getSreamingManager().removeStreamingConfig(config);
+ getStreamingManager().removeStreamingConfig(config);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index 88d6273..af1bbc0 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -248,7 +248,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
CubeUpdate cubeBuilder = new CubeUpdate(cube);
cubeBuilder.setToAddSegs(segment);
cube = cubeManager.updateCube(cubeBuilder);
- //one for cube update, one for project update
+ //one for cube update
assertEquals(1, broadcaster.getCounterAndClear());
waitForCounterAndClear(1);
assertEquals(1, cubeManagerB.getCube(cubeName).getSegments().size());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java
index 7ab08f8..226380c 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java
@@ -38,12 +38,15 @@ public class CubeServiceTest extends ServiceTestBase {
@Autowired
CubeService cubeService;
+ @Autowired
+ private CacheService cacheService;
+
@Test
public void testBasics() throws JsonProcessingException, JobException, UnknownHostException {
Assert.assertNotNull(cubeService.getConfig());
- Assert.assertNotNull(cubeService.getKylinConfig());
+ Assert.assertNotNull(cubeService.getConfig());
Assert.assertNotNull(cubeService.getMetadataManager());
- Assert.assertNotNull(cubeService.getOLAPDataSource(ProjectInstance.DEFAULT_PROJECT_NAME));
+ Assert.assertNotNull(cacheService.getOLAPDataSource(ProjectInstance.DEFAULT_PROJECT_NAME));
Assert.assertTrue(CubeService.getCubeDescNameFromCube("testCube").equals("testCube_desc"));
Assert.assertTrue(CubeService.getCubeNameFromDesc("testCube_desc").equals("testCube"));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
index a53dd3c..41057e5 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
@@ -34,12 +34,15 @@ public class JobServiceTest extends ServiceTestBase {
@Autowired
JobService jobService;
+ @Autowired
+ private CacheService cacheService;
+
@Test
public void testBasics() throws JobException, IOException {
Assert.assertNotNull(jobService.getConfig());
- Assert.assertNotNull(jobService.getKylinConfig());
+ Assert.assertNotNull(jobService.getConfig());
Assert.assertNotNull(jobService.getMetadataManager());
- Assert.assertNotNull(jobService.getOLAPDataSource(ProjectInstance.DEFAULT_PROJECT_NAME));
+ Assert.assertNotNull(cacheService.getOLAPDataSource(ProjectInstance.DEFAULT_PROJECT_NAME));
Assert.assertNull(jobService.getJobInstance("job_not_exist"));
Assert.assertNotNull(jobService.listAllJobs(null, null, null));
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index 44e5ef6..cc86e1e 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -37,12 +37,15 @@ public class QueryServiceTest extends ServiceTestBase {
@Autowired
QueryService queryService;
+ @Autowired
+ private CacheService cacheService;
+
@Test
public void testBasics() throws JobException, IOException, SQLException {
Assert.assertNotNull(queryService.getConfig());
- Assert.assertNotNull(queryService.getKylinConfig());
+ Assert.assertNotNull(queryService.getConfig());
Assert.assertNotNull(queryService.getMetadataManager());
- Assert.assertNotNull(queryService.getOLAPDataSource(ProjectInstance.DEFAULT_PROJECT_NAME));
+ Assert.assertNotNull(cacheService.getOLAPDataSource(ProjectInstance.DEFAULT_PROJECT_NAME));
// Assert.assertTrue(queryService.getQueries("ADMIN").size() == 0);
//
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f1dedab7/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
index fd613f4..f8dc945 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
@@ -69,8 +69,7 @@ public class ServiceTestBase extends LocalFileMetadataTestCase {
IIManager.clearCache();
RealizationRegistry.clearCache();
ProjectManager.clearCache();
- BasicService.removeAllOLAPDataSources();
-
+ CacheService.removeAllOLAPDataSources();
}
@After
[2/3] incubator-kylin git commit: KYLIN-1127 Broadcaster be
configured by KylinConfig (instead of spring profile)
Posted by li...@apache.org.
KYLIN-1127 Broadcaster be configured by KylinConfig (instead of spring profile)
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2f4595ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2f4595ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2f4595ae
Branch: refs/heads/2.x-staging
Commit: 2f4595ae9db582a5b1c498534916f90db22b55b2
Parents: f1dedab
Author: Yang Li <li...@apache.org>
Authored: Mon Nov 9 11:15:12 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Nov 9 11:15:12 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/common/cache/CacheUpdater.java | 11 ----
.../kylin/common/cache/LocalCacheUpdater.java | 18 ------
.../kylin/common/cache/RemoteCacheUpdater.java | 14 -----
.../common/restclient/AbstractRestCache.java | 17 +++---
.../kylin/common/restclient/Broadcaster.java | 58 +++++++++++++++-----
.../restclient/CaseInsensitiveStringCache.java | 6 +-
.../common/restclient/SingleValueCache.java | 16 +++---
.../org/apache/kylin/cube/CubeDescManager.java | 3 +-
.../java/org/apache/kylin/cube/CubeManager.java | 4 +-
.../apache/kylin/metadata/MetadataManager.java | 10 +++-
.../kylin/metadata/project/ProjectManager.java | 3 +-
.../kylin/storage/hybrid/HybridManager.java | 4 +-
.../engine/streaming/StreamingManager.java | 3 +-
.../engine/streaming/cli/StreamingCLI.java | 7 +--
.../test_case_data/localmeta/kylin.properties | 2 +-
.../kylin/invertedindex/IIDescManager.java | 3 +-
.../apache/kylin/invertedindex/IIManager.java | 4 +-
.../kylin/rest/service/CacheServiceTest.java | 6 +-
.../kylin/source/kafka/KafkaConfigManager.java | 23 ++++----
19 files changed, 105 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
deleted file mode 100644
index 615ee14..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/cache/CacheUpdater.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.kylin.common.cache;
-
-import org.apache.kylin.common.restclient.AbstractRestCache;
-import org.apache.kylin.common.restclient.Broadcaster;
-
-/**
- */
-@SuppressWarnings("rawtypes")
-public interface CacheUpdater {
- void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache);
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
deleted file mode 100644
index 8d3b648..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/cache/LocalCacheUpdater.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.apache.kylin.common.cache;
-
-import org.apache.kylin.common.restclient.AbstractRestCache;
-import org.apache.kylin.common.restclient.Broadcaster;
-
-/**
- */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class LocalCacheUpdater implements CacheUpdater {
- @Override
- public void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache) {
- if (syncAction == Broadcaster.EVENT.CREATE || syncAction == Broadcaster.EVENT.UPDATE) {
- cache.putLocal(key, value);
- } else if (syncAction == Broadcaster.EVENT.DROP) {
- cache.removeLocal(key);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java b/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
deleted file mode 100644
index 2927d2d..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/cache/RemoteCacheUpdater.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.kylin.common.cache;
-
-import org.apache.kylin.common.restclient.AbstractRestCache;
-import org.apache.kylin.common.restclient.Broadcaster;
-
-/**
- */
-@SuppressWarnings("rawtypes")
-public class RemoteCacheUpdater implements CacheUpdater {
- @Override
- public void updateCache(Object key, Object value, Broadcaster.EVENT syncAction, Broadcaster.TYPE type, AbstractRestCache cache) {
- Broadcaster.getInstance().queue(type.getType(), syncAction.getType(), key.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
index 68d9be5..fc030b4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
@@ -18,8 +18,7 @@
package org.apache.kylin.common.restclient;
-import org.apache.kylin.common.cache.CacheUpdater;
-import org.apache.kylin.common.cache.RemoteCacheUpdater;
+import org.apache.kylin.common.KylinConfig;
/**
* @author xjiang
@@ -27,17 +26,17 @@ import org.apache.kylin.common.cache.RemoteCacheUpdater;
*/
public abstract class AbstractRestCache<K, V> {
- protected static CacheUpdater cacheUpdater = new RemoteCacheUpdater();
-
- public static void setCacheUpdater(CacheUpdater cu) {
- cacheUpdater = cu;
- }
-
+ protected final KylinConfig config;
protected final Broadcaster.TYPE syncType;
- protected AbstractRestCache(Broadcaster.TYPE syncType) {
+ protected AbstractRestCache(KylinConfig config, Broadcaster.TYPE syncType) {
+ this.config = config;
this.syncType = syncType;
}
+
+ public Broadcaster getBroadcaster() {
+ return Broadcaster.getInstance(config);
+ }
public abstract void put(K key, V value);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
index 80ec33c..871d77c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
@@ -43,24 +44,52 @@ public class Broadcaster {
private static final Logger logger = LoggerFactory.getLogger(Broadcaster.class);
+ // static cached instances
+ private static final ConcurrentHashMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>();
+
+ public static Broadcaster getInstance(KylinConfig config) {
+ Broadcaster r = CACHE.get(config);
+ if (r != null) {
+ return r;
+ }
+
+ synchronized (Broadcaster.class) {
+ r = CACHE.get(config);
+ if (r != null) {
+ return r;
+ }
+
+ r = new Broadcaster(config);
+ CACHE.put(config, r);
+ if (CACHE.size() > 1) {
+ logger.warn("More than one cubemanager singleton exist");
+ }
+ return r;
+ }
+ }
+
+ public static void clearCache() {
+ CACHE.clear();
+ }
+
+ // ============================================================================
+
private BlockingDeque<BroadcastEvent> broadcastEvents = new LinkedBlockingDeque<>();
private AtomicLong counter = new AtomicLong();
- static class BroadcasterHolder {
- static final Broadcaster INSTANCE = new Broadcaster();
- }
+ private Broadcaster(final KylinConfig config) {
+ final String[] nodes = config.getRestServers();
+ if (nodes == null || nodes.length < 1) {
+ logger.warn("There is no available rest server; check the 'kylin.rest.servers' config");
+ broadcastEvents = null; // disable the broadcaster
+ return;
+ }
+ logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
- private Broadcaster() {
Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() {
@Override
public void run() {
- final String[] nodes = KylinConfig.getInstanceFromEnv().getRestServers();
- if (nodes == null || nodes.length < 1) {//TODO if the node count is greater than 1, it means it is a cluster
- logger.warn("There is no available rest server; check the 'kylin.rest.servers' config");
- return;
- }
- logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
final List<RestClient> restClients = Lists.newArrayList();
for (String node : nodes) {
restClients.add(new RestClient(node));
@@ -90,10 +119,6 @@ public class Broadcaster {
});
}
- public static Broadcaster getInstance() {
- return BroadcasterHolder.INSTANCE;
- }
-
/**
* Broadcast the cubedesc event out
*
@@ -101,6 +126,9 @@ public class Broadcaster {
* event action
*/
public void queue(String type, String action, String key) {
+ if (broadcastEvents == null)
+ return;
+
try {
counter.incrementAndGet();
broadcastEvents.putFirst(new BroadcastEvent(type, action, key));
@@ -138,7 +166,7 @@ public class Broadcaster {
}
public enum TYPE {
- ALL("all"), CUBE("cube"),STREAMING("streaming"),KAFKA("kafka"),CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid");
+ ALL("all"), CUBE("cube"), STREAMING("streaming"), KAFKA("kafka"), CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid");
private String text;
TYPE(String text) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
index 68e3c04..2bcddbf 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
@@ -20,12 +20,14 @@ package org.apache.kylin.common.restclient;
import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.kylin.common.KylinConfig;
+
/**
*/
public class CaseInsensitiveStringCache<V> extends SingleValueCache<String, V> {
- public CaseInsensitiveStringCache(Broadcaster.TYPE syncType) {
- super(syncType, new ConcurrentSkipListMap<String, V>(String.CASE_INSENSITIVE_ORDER));
+ public CaseInsensitiveStringCache(KylinConfig config, Broadcaster.TYPE syncType) {
+ super(config, syncType, new ConcurrentSkipListMap<String, V>(String.CASE_INSENSITIVE_ORDER));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java b/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
index cb6c286..9acfeca 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
@@ -25,6 +25,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.kylin.common.KylinConfig;
+
/**
* @author xjiang
*
@@ -33,12 +35,12 @@ public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> {
private final ConcurrentMap<K, V> innerCache;
- public SingleValueCache(Broadcaster.TYPE syncType) {
- this(syncType, new ConcurrentHashMap<K, V>());
+ public SingleValueCache(KylinConfig config, Broadcaster.TYPE syncType) {
+ this(config, syncType, new ConcurrentHashMap<K, V>());
}
- public SingleValueCache(Broadcaster.TYPE syncType, ConcurrentMap<K, V> innerCache) {
- super(syncType);
+ public SingleValueCache(KylinConfig config, Broadcaster.TYPE syncType, ConcurrentMap<K, V> innerCache) {
+ super(config, syncType);
this.innerCache = innerCache;
}
@@ -48,9 +50,9 @@ public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> {
innerCache.put(key, value);
if (!exists) {
- cacheUpdater.updateCache(key, value, Broadcaster.EVENT.CREATE, syncType, this);
+ getBroadcaster().queue(syncType.getType(), Broadcaster.EVENT.CREATE.getType(), key.toString());
} else {
- cacheUpdater.updateCache(key, value, Broadcaster.EVENT.UPDATE, syncType, this);
+ getBroadcaster().queue(syncType.getType(), Broadcaster.EVENT.UPDATE.getType(), key.toString());
}
}
@@ -64,7 +66,7 @@ public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> {
innerCache.remove(key);
if (exists) {
- cacheUpdater.updateCache(key, null, Broadcaster.EVENT.DROP, syncType, this);
+ getBroadcaster().queue(syncType.getType(), Broadcaster.EVENT.DROP.getType(), key.toString());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
index 8e75d29..c50836c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
@@ -57,7 +57,7 @@ public class CubeDescManager {
private KylinConfig config;
// name ==> CubeDesc
- private CaseInsensitiveStringCache<CubeDesc> cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(Broadcaster.TYPE.CUBE_DESC);
+ private CaseInsensitiveStringCache<CubeDesc> cubeDescMap;
public static CubeDescManager getInstance(KylinConfig config) {
CubeDescManager r = CACHE.get(config);
@@ -90,6 +90,7 @@ public class CubeDescManager {
private CubeDescManager(KylinConfig config) throws IOException {
logger.info("Initializing CubeDescManager with config " + config);
this.config = config;
+ this.cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(config, Broadcaster.TYPE.CUBE_DESC);
reloadAllCubeDesc();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 9b7a024..a249cd7 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -116,7 +116,7 @@ public class CubeManager implements IRealizationProvider {
private KylinConfig config;
// cube name ==> CubeInstance
- private CaseInsensitiveStringCache<CubeInstance> cubeMap = new CaseInsensitiveStringCache<CubeInstance>(Broadcaster.TYPE.CUBE);
+ private CaseInsensitiveStringCache<CubeInstance> cubeMap;
// "table/column" ==> lookup table
// private SingleValueCache<String, LookupStringTable> lookupTables = new SingleValueCache<String, LookupStringTable>(Broadcaster.TYPE.METADATA);
@@ -126,7 +126,7 @@ public class CubeManager implements IRealizationProvider {
private CubeManager(KylinConfig config) throws IOException {
logger.info("Initializing CubeManager with config " + config);
this.config = config;
-
+ this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, Broadcaster.TYPE.CUBE);
loadAllCubeInstance();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
index b7e7dc5..c907afd 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
@@ -103,11 +103,11 @@ public class MetadataManager {
private KylinConfig config;
// table name ==> SourceTable
- private CaseInsensitiveStringCache<TableDesc> srcTableMap = new CaseInsensitiveStringCache<TableDesc>(Broadcaster.TYPE.TABLE);
+ private CaseInsensitiveStringCache<TableDesc> srcTableMap;
// name => value
- private CaseInsensitiveStringCache<Map<String, String>> srcTableExdMap = new CaseInsensitiveStringCache<Map<String, String>>(Broadcaster.TYPE.TABLE);
+ private CaseInsensitiveStringCache<Map<String, String>> srcTableExdMap;
// name => DataModelDesc
- private CaseInsensitiveStringCache<DataModelDesc> dataModelDescMap = new CaseInsensitiveStringCache<DataModelDesc>(Broadcaster.TYPE.DATA_MODEL);
+ private CaseInsensitiveStringCache<DataModelDesc> dataModelDescMap;
private MetadataManager(KylinConfig config) throws IOException {
init(config);
@@ -198,6 +198,10 @@ public class MetadataManager {
private void init(KylinConfig config) throws IOException {
this.config = config;
+ this.srcTableMap = new CaseInsensitiveStringCache<TableDesc>(config, Broadcaster.TYPE.TABLE);
+ this.srcTableExdMap = new CaseInsensitiveStringCache<Map<String, String>>(config, Broadcaster.TYPE.TABLE);
+ this.dataModelDescMap = new CaseInsensitiveStringCache<DataModelDesc>(config, Broadcaster.TYPE.DATA_MODEL);
+
reloadAllSourceTable();
reloadAllSourceTableExd();
reloadAllDataModel();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index fd41f59..8304128 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -81,11 +81,12 @@ public class ProjectManager {
private KylinConfig config;
private ProjectL2Cache l2Cache;
// project name => ProjrectInstance
- private CaseInsensitiveStringCache<ProjectInstance> projectMap = new CaseInsensitiveStringCache<ProjectInstance>(Broadcaster.TYPE.PROJECT);
+ private CaseInsensitiveStringCache<ProjectInstance> projectMap;
private ProjectManager(KylinConfig config) throws IOException {
logger.info("Initializing ProjectManager with metadata url " + config);
this.config = config;
+ this.projectMap = new CaseInsensitiveStringCache<ProjectInstance>(config, Broadcaster.TYPE.PROJECT);
this.l2Cache = new ProjectL2Cache(this);
reloadAllProjects();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
index 0f00f1a..9392ef5 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -76,12 +76,12 @@ public class HybridManager implements IRealizationProvider {
private KylinConfig config;
- private CaseInsensitiveStringCache<HybridInstance> hybridMap = new CaseInsensitiveStringCache<HybridInstance>(Broadcaster.TYPE.HYBRID);
+ private CaseInsensitiveStringCache<HybridInstance> hybridMap;
private HybridManager(KylinConfig config) throws IOException {
logger.info("Initializing HybridManager with config " + config);
this.config = config;
-
+ this.hybridMap = new CaseInsensitiveStringCache<HybridInstance>(config, Broadcaster.TYPE.HYBRID);
loadAllHybridInstance();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index fa7d0f8..8cabe1b 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -79,7 +79,7 @@ public class StreamingManager {
private KylinConfig config;
// name ==> StreamingConfig
- private CaseInsensitiveStringCache<StreamingConfig> streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(Broadcaster.TYPE.STREAMING);
+ private CaseInsensitiveStringCache<StreamingConfig> streamingMap;
public static void clearCache() {
CACHE.clear();
@@ -87,6 +87,7 @@ public class StreamingManager {
private StreamingManager(KylinConfig config) throws IOException {
this.config = config;
+ this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, Broadcaster.TYPE.STREAMING);
reloadAllStreaming();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
index e3a7133..a73a6ac 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -34,11 +34,10 @@
package org.apache.kylin.engine.streaming.cli;
-import com.google.common.base.Preconditions;
+import java.util.List;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.cache.RemoteCacheUpdater;
-import org.apache.kylin.common.restclient.AbstractRestCache;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.engine.streaming.BootstrapConfig;
import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
@@ -48,7 +47,7 @@ import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
+import com.google.common.base.Preconditions;
public class StreamingCLI {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/examples/test_case_data/localmeta/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties
index 92773a1..48f01f5 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -1,7 +1,7 @@
## Config for Kylin Engine ##
# List of web servers in use, this enables one web server instance to sync up with other servers.
-kylin.rest.servers=localhost:7070
+#kylin.rest.servers=localhost:7070
# The metadata store in hbase
kylin.metadata.url=
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java
index a166ae7..917fe46 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIDescManager.java
@@ -54,7 +54,7 @@ public class IIDescManager {
private KylinConfig config;
// name ==> IIDesc
- private CaseInsensitiveStringCache<IIDesc> iiDescMap = new CaseInsensitiveStringCache<IIDesc>(Broadcaster.TYPE.INVERTED_INDEX_DESC);
+ private CaseInsensitiveStringCache<IIDesc> iiDescMap;
public static IIDescManager getInstance(KylinConfig config) {
IIDescManager r = CACHE.get(config);
@@ -87,6 +87,7 @@ public class IIDescManager {
private IIDescManager(KylinConfig config) throws IOException {
logger.info("Initializing IIDescManager with config " + config);
this.config = config;
+ this.iiDescMap = new CaseInsensitiveStringCache<IIDesc>(config, Broadcaster.TYPE.INVERTED_INDEX_DESC);
reloadAllIIDesc();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
index 5633004..b6dfdf1 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
@@ -92,7 +92,7 @@ public class IIManager implements IRealizationProvider {
private KylinConfig config;
// ii name ==> IIInstance
- private CaseInsensitiveStringCache<IIInstance> iiMap = new CaseInsensitiveStringCache<IIInstance>(Broadcaster.TYPE.INVERTED_INDEX);
+ private CaseInsensitiveStringCache<IIInstance> iiMap;
// for generation hbase table name of a new segment
private Multimap<String, String> usedStorageLocation = HashMultimap.create();
@@ -100,7 +100,7 @@ public class IIManager implements IRealizationProvider {
private IIManager(KylinConfig config) throws IOException {
logger.info("Initializing IIManager with config " + config);
this.config = config;
-
+ this.iiMap = new CaseInsensitiveStringCache<IIInstance>(config, Broadcaster.TYPE.INVERTED_INDEX);
loadAllIIInstance();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index af1bbc0..b90c10a 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -71,7 +71,9 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
public static void beforeClass() throws Exception {
staticCreateTestMetadata();
configA = KylinConfig.getInstanceFromEnv();
+ configA.setProperty("kylin.rest.servers", "localhost:7070");
configB = KylinConfig.getKylinConfigFromInputStream(KylinConfig.getKylinPropertiesAsInputSteam());
+ configB.setProperty("kylin.rest.servers", "localhost:7070");
configB.setMetadataUrl("../examples/test_metadata");
server = new Server(7070);
@@ -209,7 +211,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
@Test
public void testCubeCRUD() throws Exception {
- final Broadcaster broadcaster = Broadcaster.getInstance();
+ final Broadcaster broadcaster = Broadcaster.getInstance(configA);
broadcaster.getCounterAndClear();
getStore().deleteResource("/cube/a_whole_new_cube.json");
@@ -306,7 +308,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
public void testMetaCRUD() throws Exception {
final MetadataManager metadataManager = MetadataManager.getInstance(configA);
final MetadataManager metadataManagerB = MetadataManager.getInstance(configB);
- final Broadcaster broadcaster = Broadcaster.getInstance();
+ final Broadcaster broadcaster = Broadcaster.getInstance(configA);
broadcaster.getCounterAndClear();
TableDesc tableDesc = createTestTableDesc();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2f4595ae/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index 3032d13..8cf51b6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -34,10 +34,12 @@
package org.apache.kylin.source.kafka;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
@@ -45,17 +47,15 @@ import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.restclient.Broadcaster;
import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
-import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
/**
*/
@@ -69,7 +69,7 @@ public class KafkaConfigManager {
private KylinConfig config;
// name ==> StreamingConfig
- private CaseInsensitiveStringCache<KafkaConfig> kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(Broadcaster.TYPE.KAFKA);
+ private CaseInsensitiveStringCache<KafkaConfig> kafkaMap;
public static final Serializer<KafkaConfig> KAFKA_SERIALIZER = new JsonSerializer<KafkaConfig>(KafkaConfig.class);
@@ -79,6 +79,7 @@ public class KafkaConfigManager {
private KafkaConfigManager(KylinConfig config) throws IOException {
this.config = config;
+ this.kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(config, Broadcaster.TYPE.KAFKA);
reloadAllKafkaConfig();
}
[3/3] incubator-kylin git commit: KYLIN-1127 Adopt listener pattern
to wipe query cache on cube update
Posted by li...@apache.org.
KYLIN-1127 Adopt listener pattern to wipe query cache on cube update
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/a3397d04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/a3397d04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/a3397d04
Branch: refs/heads/2.x-staging
Commit: a3397d044d6cd333d8f9d94bfd75aa603180d67e
Parents: 2f4595a
Author: Yang Li <li...@apache.org>
Authored: Mon Nov 9 11:53:43 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Nov 9 11:53:43 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeManager.java | 34 ++++++++--
.../kylin/storage/hybrid/HybridManager.java | 4 +-
.../apache/kylin/rest/service/CacheService.java | 68 +++++++++++++++-----
.../kylin/rest/service/CacheServiceTest.java | 2 +
4 files changed, 85 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index a249cd7..89873d2 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -236,6 +236,9 @@ public class CubeManager implements IRealizationProvider {
// delete cube from project
ProjectManager.getInstance(config).removeRealizationsFromProjects(RealizationType.CUBE, cubeName);
+
+ if (listener != null)
+ listener.afterCubeDelete(cube);
return cube;
}
@@ -248,14 +251,22 @@ public class CubeManager implements IRealizationProvider {
CubeInstance cube = CubeInstance.create(cubeName, projectName, desc);
cube.setOwner(owner);
- updateCube(new CubeUpdate(cube));
+ updateCubeWithRetry(new CubeUpdate(cube), 0);
ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cubeName, projectName, owner);
+ if (listener != null)
+ listener.afterCubeCreate(cube);
+
return cube;
}
public CubeInstance updateCube(CubeUpdate update) throws IOException {
- return updateCube(update, 0);
+ CubeInstance cube = updateCubeWithRetry(update, 0);
+
+ if (listener != null)
+ listener.afterCubeUpdate(cube);
+
+ return cube;
}
private boolean validateReadySegments(CubeInstance cube) {
@@ -283,7 +294,7 @@ public class CubeManager implements IRealizationProvider {
return true;
}
- private CubeInstance updateCube(CubeUpdate update, int retry) throws IOException {
+ private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IOException {
if (update == null || update.getCubeInstance() == null)
throw new IllegalStateException();
@@ -348,7 +359,7 @@ public class CubeManager implements IRealizationProvider {
cube = reloadCubeLocal(cube.getName());
update.setCubeInstance(cube);
retry++;
- cube = updateCube(update, retry);
+ cube = updateCubeWithRetry(update, retry);
}
if (toRemoveResources.size() > 0) {
@@ -850,4 +861,19 @@ public class CubeManager implements IRealizationProvider {
return getCube(name);
}
+ // ============================================================================
+
+ public interface CubeChangeListener {
+ void afterCubeCreate(CubeInstance cube);
+
+ void afterCubeUpdate(CubeInstance cube);
+
+ void afterCubeDelete(CubeInstance cube);
+ }
+
+ private CubeChangeListener listener;
+
+ public void setCubeChangeListener(CubeChangeListener listener) {
+ this.listener = listener;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
index 9392ef5..5f16b6b 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -98,7 +98,7 @@ public class HybridManager implements IRealizationProvider {
logger.debug("Loaded " + paths.size() + " Hybrid(s)");
}
- public void reloadHybridInstanceByChild(RealizationType type, String realizationName) throws IOException {
+ public void reloadHybridInstanceByChild(RealizationType type, String realizationName) {
for (HybridInstance hybridInstance : hybridMap.values()) {
boolean includes = false;
for (IRealization realization : hybridInstance.getRealizations()) {
@@ -113,7 +113,7 @@ public class HybridManager implements IRealizationProvider {
}
}
- private synchronized HybridInstance loadHybridInstance(String path) throws IOException {
+ private synchronized HybridInstance loadHybridInstance(String path) {
ResourceStore store = getStore();
HybridInstance hybridInstance = null;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index f9c3ec1..8371907 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -26,6 +26,7 @@ import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import javax.annotation.PostConstruct;
import javax.sql.DataSource;
import net.sf.ehcache.CacheManager;
@@ -72,6 +73,28 @@ public class CacheService extends BasicService {
@Autowired
private CacheManager cacheManager;
+ @PostConstruct
+ public void initCubeChangeListener() throws IOException {
+ CubeManager cubeMgr = CubeManager.getInstance(getConfig());
+ cubeMgr.setCubeChangeListener(new CubeManager.CubeChangeListener() {
+
+ @Override
+ public void afterCubeCreate(CubeInstance cube) {
+ // no cache need change
+ }
+
+ @Override
+ public void afterCubeUpdate(CubeInstance cube) {
+ rebuildCubeCache(cube.getName());
+ }
+
+ @Override
+ public void afterCubeDelete(CubeInstance cube) {
+ removeCubeCache(cube.getName(), cube);
+ }
+ });
+ }
+
// for test
public void setCubeService(CubeService cubeService) {
this.cubeService = cubeService;
@@ -151,14 +174,7 @@ public class CacheService extends BasicService {
try {
switch (cacheType) {
case CUBE:
- CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey);
- getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey);
- getProjectManager().clearL2Cache();
- //clean query related cache first
- if (newCube != null) {
- cleanDataCache(newCube.getUuid());
- }
- cubeService.updateOnNewSegmentReady(cacheKey);
+ rebuildCubeCache(cacheKey);
break;
case STREAMING:
getStreamingManager().reloadStreamingConfigLocal(cacheKey);
@@ -170,10 +186,8 @@ public class CacheService extends BasicService {
getCubeDescManager().reloadCubeDescLocal(cacheKey);
break;
case PROJECT:
- ProjectInstance projectInstance = getProjectManager().reloadProjectLocal(cacheKey);
- if (projectInstance != null) {
- removeOLAPDataSource(projectInstance.getName());
- }
+ getProjectManager().reloadProjectLocal(cacheKey);
+ removeOLAPDataSource(cacheKey);
break;
case INVERTED_INDEX:
//II update does not need to update storage cache because it is dynamic already
@@ -217,16 +231,23 @@ public class CacheService extends BasicService {
}
}
+ private void rebuildCubeCache(String cubeName) {
+ CubeInstance cube = getCubeManager().reloadCubeLocal(cubeName);
+ getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cubeName);
+ getProjectManager().clearL2Cache();
+ //clean query related cache first
+ if (cube != null) {
+ cleanDataCache(cube.getUuid());
+ }
+ cubeService.updateOnNewSegmentReady(cubeName);
+ }
+
public void removeCache(Broadcaster.TYPE cacheType, String cacheKey) {
final String log = "remove cache type: " + cacheType + " name:" + cacheKey;
try {
switch (cacheType) {
case CUBE:
- CubeInstance cube = getCubeManager().getCube(cacheKey);
- getCubeManager().removeCubeLocal(cacheKey);
- if (cube != null) {
- cleanDataCache(cube.getUuid());
- }
+ removeCubeCache(cacheKey, null);
break;
case CUBE_DESC:
getCubeDescManager().removeLocalCubeDesc(cacheKey);
@@ -251,4 +272,17 @@ public class CacheService extends BasicService {
throw new RuntimeException("error " + log, e);
}
}
+
+ private void removeCubeCache(String cubeName, CubeInstance cube) {
+ // you may not get the cube instance if it's already removed from metadata
+ if (cube == null) {
+ cube = getCubeManager().getCube(cubeName);
+ }
+
+ getCubeManager().removeCubeLocal(cubeName);
+
+ if (cube != null) {
+ cleanDataCache(cube.getUuid());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a3397d04/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index b90c10a..25b131a 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -108,7 +108,9 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
};
serviceA.setCubeService(cubeServiceA);
+ serviceA.initCubeChangeListener();
serviceB.setCubeService(cubeServiceB);
+ serviceB.initCubeChangeListener();
context.addServlet(new ServletHolder(new BroadcasterReceiveServlet(new BroadcasterReceiveServlet.BroadcasterHandler() {
@Override