You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/12/31 13:19:23 UTC
[06/50] [abbrv] kylin git commit: KYLIN-3092 Synchronize w/r
operations on entity-caching managers
KYLIN-3092 Synchronize w/r operations on entity-caching managers
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f2f487fe
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f2f487fe
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f2f487fe
Branch: refs/heads/master
Commit: f2f487fe2dc868119562303cb8b9a0b630f704cf
Parents: b8d7987
Author: Li Yang <li...@apache.org>
Authored: Sat Dec 9 00:23:51 2017 +0800
Committer: Dong Li <li...@apache.org>
Committed: Mon Dec 11 10:16:35 2017 +0800
----------------------------------------------------------------------
.../org/apache/kylin/cube/CubeDescManager.java | 311 +++--
.../org/apache/kylin/cube/CubeInstance.java | 31 +-
.../java/org/apache/kylin/cube/CubeManager.java | 1079 +++++++++---------
.../kylin/cube/cli/CubeSignatureRefresher.java | 2 +-
.../org/apache/kylin/cube/model/CubeDesc.java | 29 +-
.../kylin/metadata/TableMetadataManager.java | 583 +++++-----
.../kylin/metadata/TempStatementManager.java | 157 ++-
.../org/apache/kylin/metadata/acl/TableACL.java | 11 +
.../kylin/metadata/acl/TableACLManager.java | 111 +-
.../metadata/cachesync/CachedCrudAssist.java | 72 +-
.../kylin/metadata/model/DataModelDesc.java | 7 +-
.../kylin/metadata/model/DataModelManager.java | 269 +++--
.../metadata/model/ExternalFilterDesc.java | 5 +
.../apache/kylin/metadata/model/TableDesc.java | 79 +-
.../kylin/metadata/model/TableExtDesc.java | 38 +-
.../kylin/metadata/project/ProjectInstance.java | 2 +-
.../kylin/metadata/project/ProjectManager.java | 12 +-
.../metadata/streaming/StreamingConfig.java | 6 +
.../metadata/streaming/StreamingManager.java | 183 +--
.../metadata/TempStatementManagerTest.java | 6 +-
.../streaming/StreamingManagerTest.java | 68 ++
.../kylin/storage/hybrid/HybridInstance.java | 37 +-
.../kylin/storage/hybrid/HybridManager.java | 133 ++-
.../apache/kylin/engine/spark/SparkCubing.java | 4 +-
.../DEFAULT_SESSION/temp_table1.json | 1 +
.../DEFAULT_SESSION/temp_table2.json | 1 +
.../kylin/provision/BuildCubeWithStream.java | 2 +-
.../kylin/rest/controller/ModelController.java | 2 +-
.../kylin/rest/service/KafkaConfigService.java | 2 +-
.../kylin/rest/service/StreamingService.java | 4 +-
.../kylin/source/kafka/KafkaConfigManager.java | 166 +--
.../kylin/source/kafka/config/KafkaConfig.java | 8 +-
32 files changed, 1746 insertions(+), 1675 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
index f724549..a58ba40 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
@@ -25,9 +25,9 @@ import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.AutoReadWriteLock;
+import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
import org.apache.kylin.cube.cuboid.CuboidManager;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.validation.CubeMetadataValidator;
@@ -36,9 +36,9 @@ import org.apache.kylin.dimension.DictionaryDimEnc;
import org.apache.kylin.dimension.DimensionEncoding;
import org.apache.kylin.dimension.DimensionEncodingFactory;
import org.apache.kylin.measure.topn.TopNMeasureType;
-import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.cachesync.Broadcaster;
import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.MeasureDesc;
@@ -59,8 +59,6 @@ public class CubeDescManager {
private static final Logger logger = LoggerFactory.getLogger(CubeDescManager.class);
- public static final Serializer<CubeDesc> CUBE_DESC_SERIALIZER = new JsonSerializer<CubeDesc>(CubeDesc.class);
-
public static CubeDescManager getInstance(KylinConfig config) {
return config.getManager(CubeDescManager.class);
}
@@ -69,20 +67,42 @@ public class CubeDescManager {
static CubeDescManager newInstance(KylinConfig config) throws IOException {
return new CubeDescManager(config);
}
-
+
// ============================================================================
private KylinConfig config;
+
// name ==> CubeDesc
private CaseInsensitiveStringCache<CubeDesc> cubeDescMap;
+ private CachedCrudAssist<CubeDesc> crud;
+
+ // protects concurrent operations around the cached map, to avoid for example
+ // writing an entity in the middle of reloading it (dirty read)
+ private AutoReadWriteLock descMapLock = new AutoReadWriteLock();
- private CubeDescManager(KylinConfig config) throws IOException {
- logger.info("Initializing CubeDescManager with config " + config);
- this.config = config;
+ private CubeDescManager(KylinConfig cfg) throws IOException {
+ logger.info("Initializing CubeDescManager with config " + cfg);
+ this.config = cfg;
this.cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(config, "cube_desc");
+ this.crud = new CachedCrudAssist<CubeDesc>(getStore(), ResourceStore.CUBE_DESC_RESOURCE_ROOT, CubeDesc.class,
+ cubeDescMap) {
+ @Override
+ protected CubeDesc initEntityAfterReload(CubeDesc cubeDesc, String resourceName) {
+ if (cubeDesc.isDraft())
+ throw new IllegalArgumentException("CubeDesc '" + cubeDesc.getName() + "' must not be a draft");
+
+ try {
+ cubeDesc.init(config);
+ } catch (Exception e) {
+ logger.warn("Broken cube desc " + cubeDesc.resourceName(), e);
+ cubeDesc.addError(e.getMessage());
+ }
+ return cubeDesc;
+ }
+ };
// touch lower level metadata before registering my listener
- reloadAllCubeDesc();
+ crud.reloadAll();
Broadcaster.getInstance(config).registerListener(new CubeDescSyncListener(), "cube_desc");
}
@@ -93,7 +113,7 @@ public class CubeDescManager {
for (IRealization real : ProjectManager.getInstance(config).listAllRealizations(project)) {
if (real instanceof CubeInstance) {
String descName = ((CubeInstance) real).getDescName();
- reloadCubeDescLocal(descName);
+ reloadCubeDescQuietly(descName);
}
}
}
@@ -108,7 +128,7 @@ public class CubeDescManager {
if (event == Event.DROP)
removeLocalCubeDesc(cubeDescName);
else
- reloadCubeDescLocal(cubeDescName);
+ reloadCubeDescQuietly(cubeDescName);
for (ProjectInstance prj : ProjectManager.getInstance(config).findProjectsByModel(modelName)) {
broadcaster.notifyProjectSchemaUpdate(prj.getName());
@@ -117,58 +137,45 @@ public class CubeDescManager {
}
public CubeDesc getCubeDesc(String name) {
- return cubeDescMap.get(name);
+ try (AutoLock lock = descMapLock.lockForRead()) {
+ return cubeDescMap.get(name);
+ }
}
public List<CubeDesc> listAllDesc() {
- return new ArrayList<CubeDesc>(cubeDescMap.values());
- }
-
- /**
- * Reload CubeDesc from resource store It will be triggered by an desc
- * update event.
- *
- * @param name
- * @throws IOException
- */
- public CubeDesc reloadCubeDescLocal(String name) throws IOException {
- // Broken CubeDesc is not allowed to be saved and broadcast.
- CubeDesc ndesc = loadCubeDesc(CubeDesc.concatResourcePath(name), false);
-
- cubeDescMap.putLocal(ndesc.getName(), ndesc);
- clearCuboidCache(ndesc.getName());
-
- // if related cube is in DESCBROKEN state before, change it back to DISABLED
- CubeManager cubeManager = CubeManager.getInstance(config);
- for (CubeInstance cube : cubeManager.getCubesByDesc(name)) {
- if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
- cubeManager.reloadCubeLocal(cube.getName());
- }
+ try (AutoLock lock = descMapLock.lockForRead()) {
+ return new ArrayList<CubeDesc>(cubeDescMap.values());
}
-
- return ndesc;
}
-
- private CubeDesc loadCubeDesc(String path, boolean allowBroken) throws IOException {
- ResourceStore store = getStore();
- CubeDesc ndesc = store.getResource(path, CubeDesc.class, CUBE_DESC_SERIALIZER);
- if (ndesc == null)
- throw new IllegalArgumentException("No cube desc found at " + path);
- if (ndesc.isDraft())
- throw new IllegalArgumentException("CubeDesc '" + ndesc.getName() + "' must not be a draft");
-
- try {
- ndesc.init(config);
+
+ public CubeDesc reloadCubeDescQuietly(String name) {
+ try (AutoLock lock = descMapLock.lockForWrite()) {
+ return reloadCubeDescLocal(name);
} catch (Exception e) {
- logger.warn("Broken cube desc " + path, e);
- ndesc.addError(e.getMessage());
+ logger.error("Failed to reload CubeDesc " + name, e);
+ return null;
}
+ }
- if (!allowBroken && !ndesc.getError().isEmpty()) {
- throw new IllegalStateException("Cube desc at " + path + " has issues: " + ndesc.getError());
+ public CubeDesc reloadCubeDescLocal(String name) throws IOException {
+ try (AutoLock lock = descMapLock.lockForWrite()) {
+ CubeDesc ndesc = crud.reload(name);
+ clearCuboidCache(name);
+
+ // Broken CubeDesc is not allowed to be saved and broadcast.
+ if (ndesc.isBroken())
+ throw new IllegalStateException("CubeDesc " + name + " is broken");
+
+ // if related cube is in DESCBROKEN state before, change it back to DISABLED
+ CubeManager cubeManager = CubeManager.getInstance(config);
+ for (CubeInstance cube : cubeManager.getCubesByDesc(name)) {
+ if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
+ cube.init(config);
+ }
+ }
+
+ return ndesc;
}
-
- return ndesc;
}
/**
@@ -179,38 +186,83 @@ public class CubeDescManager {
* @throws IOException
*/
public CubeDesc createCubeDesc(CubeDesc cubeDesc) throws IOException {
- if (cubeDesc.getUuid() == null || cubeDesc.getName() == null)
- throw new IllegalArgumentException();
- if (cubeDescMap.containsKey(cubeDesc.getName()))
- throw new IllegalArgumentException("CubeDesc '" + cubeDesc.getName() + "' already exists");
- if (cubeDesc.isDraft())
- throw new IllegalArgumentException("CubeDesc '" + cubeDesc.getName() + "' must not be a draft");
-
- try {
- cubeDesc.init(config);
- } catch (Exception e) {
- logger.warn("Broken cube desc " + cubeDesc, e);
- cubeDesc.addError(e.getMessage());
- }
- postProcessCubeDesc(cubeDesc);
- // Check base validation
- if (!cubeDesc.getError().isEmpty()) {
- return cubeDesc;
- }
- // Semantic validation
- CubeMetadataValidator validator = new CubeMetadataValidator();
- ValidateContext context = validator.validate(cubeDesc);
- if (!context.ifPass()) {
+ try (AutoLock lock = descMapLock.lockForWrite()) {
+ if (cubeDesc.getUuid() == null || cubeDesc.getName() == null)
+ throw new IllegalArgumentException();
+ if (cubeDescMap.containsKey(cubeDesc.getName()))
+ throw new IllegalArgumentException("CubeDesc '" + cubeDesc.getName() + "' already exists");
+ if (cubeDesc.isDraft())
+ throw new IllegalArgumentException("CubeDesc '" + cubeDesc.getName() + "' must not be a draft");
+
+ try {
+ cubeDesc.init(config);
+ } catch (Exception e) {
+ logger.warn("Broken cube desc " + cubeDesc, e);
+ cubeDesc.addError(e.getMessage());
+ }
+
+ postProcessCubeDesc(cubeDesc);
+ // Check base validation
+ if (!cubeDesc.getError().isEmpty()) {
+ return cubeDesc;
+ }
+ // Semantic validation
+ CubeMetadataValidator validator = new CubeMetadataValidator();
+ ValidateContext context = validator.validate(cubeDesc);
+ if (!context.ifPass()) {
+ return cubeDesc;
+ }
+
+ cubeDesc.setSignature(cubeDesc.calculateSignature());
+
+ // save resource
+ crud.save(cubeDesc);
+
return cubeDesc;
}
+ }
+
+ /**
+ * Update CubeDesc with the input. Broadcast the event into cluster
+ *
+ * @param desc
+ * @return
+ * @throws IOException
+ */
+ public CubeDesc updateCubeDesc(CubeDesc desc) throws IOException {
+ try (AutoLock lock = descMapLock.lockForWrite()) {
+ // Validate CubeDesc
+ if (desc.getUuid() == null || desc.getName() == null)
+ throw new IllegalArgumentException();
+ String name = desc.getName();
+ if (!cubeDescMap.containsKey(name))
+ throw new IllegalArgumentException("CubeDesc '" + name + "' does not exist.");
+ if (desc.isDraft())
+ throw new IllegalArgumentException("CubeDesc '" + desc.getName() + "' must not be a draft");
+
+ try {
+ desc.init(config);
+ } catch (Exception e) {
+ logger.warn("Broken cube desc " + desc, e);
+ desc.addError(e.getMessage());
+ return desc;
+ }
+
+ postProcessCubeDesc(desc);
+ // Semantic validation
+ CubeMetadataValidator validator = new CubeMetadataValidator();
+ ValidateContext context = validator.validate(desc);
+ if (!context.ifPass()) {
+ return desc;
+ }
- cubeDesc.setSignature(cubeDesc.calculateSignature());
+ desc.setSignature(desc.calculateSignature());
- String path = cubeDesc.getResourcePath();
- getStore().putResource(path, cubeDesc, CUBE_DESC_SERIALIZER);
- cubeDescMap.put(cubeDesc.getName(), cubeDesc);
+ // save resource
+ crud.save(desc);
- return cubeDesc;
+ return desc;
+ }
}
/**
@@ -259,16 +311,18 @@ public class CubeDescManager {
// remove cubeDesc
public void removeCubeDesc(CubeDesc cubeDesc) throws IOException {
- String path = cubeDesc.getResourcePath();
- getStore().deleteResource(path);
- cubeDescMap.remove(cubeDesc.getName());
- clearCuboidCache(cubeDesc.getName());
+ try (AutoLock lock = descMapLock.lockForWrite()) {
+ crud.delete(cubeDesc);
+ clearCuboidCache(cubeDesc.getName());
+ }
}
// remove cubeDesc
public void removeLocalCubeDesc(String name) throws IOException {
- cubeDescMap.removeLocal(name);
- clearCuboidCache(name);
+ try (AutoLock lock = descMapLock.lockForWrite()) {
+ cubeDescMap.removeLocal(name);
+ clearCuboidCache(name);
+ }
}
private void clearCuboidCache(String descName) {
@@ -276,87 +330,6 @@ public class CubeDescManager {
CuboidManager.getInstance(config).clearCache(descName);
}
- private void reloadAllCubeDesc() throws IOException {
- ResourceStore store = getStore();
- logger.info("Reloading Cube Metadata from folder "
- + store.getReadableResourcePath(ResourceStore.CUBE_DESC_RESOURCE_ROOT));
-
- cubeDescMap.clear();
-
- List<String> paths = store.collectResourceRecursively(ResourceStore.CUBE_DESC_RESOURCE_ROOT,
- MetadataConstants.FILE_SURFIX);
- for (String path : paths) {
- CubeDesc desc = null;
- try {
- desc = loadCubeDesc(path, true);
- } catch (Exception e) {
- logger.error("Error during load cube desc, skipping " + path, e);
- continue;
- }
-
- if (!path.equals(desc.getResourcePath())) {
- logger.error(
- "Skip suspicious desc at " + path + ", " + desc + " should be at " + desc.getResourcePath());
- continue;
- }
- if (cubeDescMap.containsKey(desc.getName())) {
- logger.error("Dup CubeDesc name '" + desc.getName() + "' on path " + path);
- continue;
- }
-
- cubeDescMap.putLocal(desc.getName(), desc);
- }
-
- logger.info("Loaded " + cubeDescMap.size() + " Cube Desc(s)");
- }
-
- /**
- * Update CubeDesc with the input. Broadcast the event into cluster
- *
- * @param desc
- * @return
- * @throws IOException
- */
- public CubeDesc updateCubeDesc(CubeDesc desc) throws IOException {
- // Validate CubeDesc
- if (desc.getUuid() == null || desc.getName() == null)
- throw new IllegalArgumentException();
- String name = desc.getName();
- if (!cubeDescMap.containsKey(name))
- throw new IllegalArgumentException("CubeDesc '" + name + "' does not exist.");
- if (desc.isDraft())
- throw new IllegalArgumentException("CubeDesc '" + desc.getName() + "' must not be a draft");
-
- try {
- desc.init(config);
- } catch (Exception e) {
- logger.warn("Broken cube desc " + desc, e);
- desc.addError(e.getMessage());
- return desc;
- }
-
- postProcessCubeDesc(desc);
- // Semantic validation
- CubeMetadataValidator validator = new CubeMetadataValidator();
- ValidateContext context = validator.validate(desc);
- if (!context.ifPass()) {
- return desc;
- }
-
- desc.setSignature(desc.calculateSignature());
-
- // Save Source
- String path = desc.getResourcePath();
- getStore().putResource(path, desc, CUBE_DESC_SERIALIZER);
-
- // Reload the CubeDesc
- CubeDesc ndesc = loadCubeDesc(path, false);
- // Here replace the old one
- cubeDescMap.put(ndesc.getName(), desc);
-
- return ndesc;
- }
-
private ResourceStore getStore() {
return ResourceStore.getStore(this.config);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index d1c5166..1be7923 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -18,6 +18,8 @@
package org.apache.kylin.cube;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -50,6 +52,8 @@ import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.metadata.realization.SQLDigest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
@@ -63,6 +67,8 @@ import com.google.common.collect.Lists;
@SuppressWarnings("serial")
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
public class CubeInstance extends RootPersistentEntity implements IRealization, IBuildable {
+ private static final Logger logger = LoggerFactory.getLogger(CubeInstance.class);
+
public static final int COST_WEIGHT_MEASURE = 1;
public static final int COST_WEIGHT_DIMENSION = 10;
public static final int COST_WEIGHT_INNER_JOIN = 100;
@@ -121,6 +127,24 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
// default constructor for jackson
public CubeInstance() {
}
+
+ void init(KylinConfig config) {
+ CubeDesc cubeDesc = CubeDescManager.getInstance(config).getCubeDesc(descName);
+ checkNotNull(cubeDesc, "cube descriptor '%s' (for cube '%s') not found", descName, name);
+
+ if (cubeDesc.isBroken()) {
+ setStatus(RealizationStatusEnum.DESCBROKEN);
+ logger.error("cube descriptor {} (for cube '{}') is broken", cubeDesc.getResourcePath(), name);
+ for (String error : cubeDesc.getError()) {
+ logger.error("Error: {}", error);
+ }
+ } else if (getStatus() == RealizationStatusEnum.DESCBROKEN) {
+ setStatus(RealizationStatusEnum.DISABLED);
+ logger.info("cube {} changed from DESCBROKEN to DISABLED", name);
+ }
+
+ setConfig((KylinConfigExt) cubeDesc.getConfig());
+ }
public CuboidScheduler getCuboidScheduler() {
if (cuboidScheduler != null)
@@ -174,9 +198,14 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
return (getStatus() == RealizationStatusEnum.DISABLED || getStatus() == RealizationStatusEnum.DESCBROKEN)
&& segments.isEmpty();
}
+
+ @Override
+ public String resourceName() {
+ return name;
+ }
public String getResourcePath() {
- return concatResourcePath(name);
+ return concatResourcePath(resourceName());
}
public static String concatResourcePath(String cubeName) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index e00735c..3220a0f 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -18,13 +18,9 @@
package org.apache.kylin.cube;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -37,10 +33,11 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.AutoReadWriteLock;
+import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.cuboid.Cuboid;
@@ -54,6 +51,7 @@ import org.apache.kylin.dict.lookup.SnapshotTable;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.cachesync.Broadcaster;
import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.JoinDesc;
@@ -87,7 +85,7 @@ public class CubeManager implements IRealizationProvider {
private static String ALPHA_NUM = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
private static int HBASE_TABLE_LENGTH = 10;
- public static final Serializer<CubeInstance> CUBE_SERIALIZER = new JsonSerializer<CubeInstance>(CubeInstance.class);
+ public static final Serializer<CubeInstance> CUBE_SERIALIZER = new JsonSerializer<>(CubeInstance.class);
private static final Logger logger = LoggerFactory.getLogger(CubeManager.class);
@@ -103,21 +101,41 @@ public class CubeManager implements IRealizationProvider {
// ============================================================================
private KylinConfig config;
+
// cube name ==> CubeInstance
private CaseInsensitiveStringCache<CubeInstance> cubeMap;
- // "table/column" ==> lookup table
- // private SingleValueCache<String, LookupStringTable> lookupTables = new SingleValueCache<String, LookupStringTable>(Broadcaster.TYPE.METADATA);
+ private CachedCrudAssist<CubeInstance> crud;
+
+ // protects concurrent operations around the cached map, to avoid for example
+ // writing an entity in the middle of reloading it (dirty read)
+ private AutoReadWriteLock cubeMapLock = new AutoReadWriteLock();
// for generation hbase table name of a new segment
private ConcurrentMap<String, String> usedStorageLocation = new ConcurrentHashMap<>();
- private CubeManager(KylinConfig config) throws IOException {
+ // a few inner classes to group related methods
+ private SegmentAssist segAssist = new SegmentAssist();
+ private DictionaryAssist dictAssist = new DictionaryAssist();
+
+ private CubeManager(KylinConfig cfg) throws IOException {
logger.info("Initializing CubeManager with config " + config);
- this.config = config;
+ this.config = cfg;
this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, "cube");
+ this.crud = new CachedCrudAssist<CubeInstance>(getStore(), ResourceStore.CUBE_RESOURCE_ROOT, CubeInstance.class,
+ cubeMap) {
+ @Override
+ protected CubeInstance initEntityAfterReload(CubeInstance cube, String resourceName) {
+ cube.init(config);
+
+ for (CubeSegment segment : cube.getSegments()) {
+ usedStorageLocation.put(segment.getUuid(), segment.getStorageLocationIdentifier());
+ }
+ return cube;
+ }
+ };
// touch lower level metadata before registering my listener
- loadAllCubeInstance();
+ crud.reloadAll();
Broadcaster.getInstance(config).registerListener(new CubeSyncListener(), "cube");
}
@@ -127,7 +145,7 @@ public class CubeManager implements IRealizationProvider {
public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException {
for (IRealization real : ProjectManager.getInstance(config).listAllRealizations(project)) {
if (real instanceof CubeInstance) {
- reloadCubeLocal(real.getName());
+ reloadCubeQuietly(real.getName());
}
}
}
@@ -140,7 +158,7 @@ public class CubeManager implements IRealizationProvider {
if (event == Event.DROP)
removeCubeLocal(cubeName);
else
- reloadCubeLocal(cubeName);
+ reloadCubeQuietly(cubeName);
for (ProjectInstance prj : ProjectManager.getInstance(config).findProjects(RealizationType.CUBE,
cubeName)) {
@@ -150,20 +168,25 @@ public class CubeManager implements IRealizationProvider {
}
public List<CubeInstance> listAllCubes() {
- return new ArrayList<CubeInstance>(cubeMap.values());
+ try (AutoLock lock = cubeMapLock.lockForRead()) {
+ return new ArrayList<CubeInstance>(cubeMap.values());
+ }
}
public CubeInstance getCube(String cubeName) {
- return cubeMap.get(cubeName);
+ try (AutoLock lock = cubeMapLock.lockForRead()) {
+ return cubeMap.get(cubeName);
+ }
}
public CubeInstance getCubeByUuid(String uuid) {
- Collection<CubeInstance> copy = new ArrayList<CubeInstance>(cubeMap.values());
- for (CubeInstance cube : copy) {
- if (uuid.equals(cube.getUuid()))
- return cube;
+ try (AutoLock lock = cubeMapLock.lockForRead()) {
+ for (CubeInstance cube : cubeMap.values()) {
+ if (uuid.equals(cube.getUuid()))
+ return cube;
+ }
+ return null;
}
- return null;
}
/**
@@ -174,148 +197,57 @@ public class CubeManager implements IRealizationProvider {
* @return
*/
public List<CubeInstance> getCubesByDesc(String descName) {
-
- List<CubeInstance> list = listAllCubes();
- List<CubeInstance> result = new ArrayList<CubeInstance>();
- Iterator<CubeInstance> it = list.iterator();
- while (it.hasNext()) {
- CubeInstance ci = it.next();
- if (descName.equalsIgnoreCase(ci.getDescName())) {
- result.add(ci);
+ try (AutoLock lock = cubeMapLock.lockForRead()) {
+ List<CubeInstance> list = listAllCubes();
+ List<CubeInstance> result = new ArrayList<CubeInstance>();
+ Iterator<CubeInstance> it = list.iterator();
+ while (it.hasNext()) {
+ CubeInstance ci = it.next();
+ if (descName.equalsIgnoreCase(ci.getDescName())) {
+ result.add(ci);
+ }
}
+ return result;
}
- return result;
- }
-
- public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
- throws IOException {
- CubeDesc cubeDesc = cubeSeg.getCubeDesc();
- if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
- return null;
-
- String builderClass = cubeDesc.getDictionaryBuilderClass(col);
- DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(col, inpTable, builderClass);
-
- saveDictionaryInfo(cubeSeg, col, dictInfo);
- return dictInfo;
}
- public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
- Dictionary<String> dict) throws IOException {
- CubeDesc cubeDesc = cubeSeg.getCubeDesc();
- if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
- return null;
-
- DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(col, inpTable, dict);
-
- saveDictionaryInfo(cubeSeg, col, dictInfo);
- return dictInfo;
- }
-
- private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, DictionaryInfo dictInfo) throws IOException {
- if (dictInfo != null) {
- Dictionary<?> dict = dictInfo.getDictionaryObject();
- cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
- cubeSeg.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() });
-
- CubeUpdate update = new CubeUpdate(cubeSeg.getCubeInstance());
- update.setToUpdateSegs(cubeSeg);
- updateCube(update);
- }
- }
-
- /**
- * return null if no dictionary for given column
- */
- @SuppressWarnings("unchecked")
- public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
- DictionaryInfo info = null;
- try {
- DictionaryManager dictMgr = getDictionaryManager();
- String dictResPath = cubeSeg.getDictResPath(col);
- if (dictResPath == null)
- return null;
-
- info = dictMgr.getDictionaryInfo(dictResPath);
- if (info == null)
- throw new IllegalStateException("No dictionary found by " + dictResPath
- + ", invalid cube state; cube segment" + cubeSeg + ", col " + col);
- } catch (IOException e) {
- throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col, e);
- }
- return (Dictionary<String>) info.getDictionaryObject();
- }
-
- public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException {
- TableMetadataManager metaMgr = getTableManager();
- SnapshotManager snapshotMgr = getSnapshotManager();
-
- TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, cubeSeg.getProject()));
- IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
- SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
-
- cubeSeg.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
- CubeUpdate cubeBuilder = new CubeUpdate(cubeSeg.getCubeInstance());
- cubeBuilder.setToUpdateSegs(cubeSeg);
- updateCube(cubeBuilder);
-
- return snapshot;
- }
-
- // sync on update
- public CubeInstance dropCube(String cubeName, boolean deleteDesc) throws IOException {
- logger.info("Dropping cube '" + cubeName + "'");
- // load projects before remove cube from project
-
- // delete cube instance and cube desc
- CubeInstance cube = getCube(cubeName);
-
- // remove cube and update cache
- getStore().deleteResource(cube.getResourcePath());
- cubeMap.remove(cube.getName());
- Cuboid.clearCache(cube);
-
- if (deleteDesc && cube.getDescriptor() != null) {
- CubeDescManager.getInstance(config).removeCubeDesc(cube.getDescriptor());
- }
-
- // delete cube from project
- ProjectManager.getInstance(config).removeRealizationsFromProjects(RealizationType.CUBE, cubeName);
-
- return cube;
- }
-
- // sync on update
public CubeInstance createCube(String cubeName, String projectName, CubeDesc desc, String owner)
throws IOException {
- logger.info("Creating cube '" + projectName + "-->" + cubeName + "' from desc '" + desc.getName() + "'");
+ try (AutoLock lock = cubeMapLock.lockForWrite()) {
+ logger.info("Creating cube '" + projectName + "-->" + cubeName + "' from desc '" + desc.getName() + "'");
- // save cube resource
- CubeInstance cube = CubeInstance.create(cubeName, desc);
- cube.setOwner(owner);
- updateCubeWithRetry(new CubeUpdate(cube), 0);
+ // save cube resource
+ CubeInstance cube = CubeInstance.create(cubeName, desc);
+ cube.setOwner(owner);
+ updateCubeWithRetry(new CubeUpdate(cube), 0);
- ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cubeName, projectName, owner);
+ ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cubeName, projectName,
+ owner);
- return cube;
+ return cube;
+ }
}
public CubeInstance createCube(CubeInstance cube, String projectName, String owner) throws IOException {
- logger.info("Creating cube '" + projectName + "-->" + cube.getName() + "' from instance object. '");
+ try (AutoLock lock = cubeMapLock.lockForWrite()) {
+ logger.info("Creating cube '" + projectName + "-->" + cube.getName() + "' from instance object. '");
- // save cube resource
- cube.setOwner(owner);
- updateCubeWithRetry(new CubeUpdate(cube), 0);
+ // save cube resource
+ cube.setOwner(owner);
+ updateCubeWithRetry(new CubeUpdate(cube), 0);
- ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cube.getName(), projectName,
- owner);
+ ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cube.getName(),
+ projectName, owner);
- return cube;
+ return cube;
+ }
}
public CubeInstance updateCube(CubeUpdate update) throws IOException {
- CubeInstance cube = updateCubeWithRetry(update, 0);
- return cube;
+ try (AutoLock lock = cubeMapLock.lockForWrite()) {
+ CubeInstance cube = updateCubeWithRetry(update, 0);
+ return cube;
+ }
}
private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IOException {
@@ -378,7 +310,7 @@ public class CubeManager implements IRealizationProvider {
}
try {
- getStore().putResource(cube.getResourcePath(), cube, CUBE_SERIALIZER);
+ crud.save(cube);
} catch (IllegalStateException ise) {
logger.warn("Write conflict to update cube " + cube.getName() + " at try " + retry + ", will retry...");
if (retry >= 7) {
@@ -386,7 +318,7 @@ public class CubeManager implements IRealizationProvider {
throw ise;
}
- cube = reloadCubeLocal(cube.getName());
+ cube = crud.reload(cube.getName());
update.setCubeInstance(cube);
retry++;
cube = updateCubeWithRetry(update, retry);
@@ -402,486 +334,607 @@ public class CubeManager implements IRealizationProvider {
}
}
- cubeMap.put(cube.getName(), cube);
-
//this is a duplicate call to take care of scenarios where REST cache service unavailable
ProjectManager.getInstance(cube.getConfig()).clearL2Cache();
return cube;
}
- // append a full build segment
- public CubeSegment appendSegment(CubeInstance cube) throws IOException {
- return appendSegment(cube, null, null, null, null);
+ public CubeInstance reloadCubeQuietly(String cubeName) {
+ try (AutoLock lock = cubeMapLock.lockForWrite()) {
+ CubeInstance cube = crud.reloadQuietly(cubeName);
+ if (cube != null)
+ Cuboid.clearCache(cube);
+ return cube;
+ }
}
- public CubeSegment appendSegment(CubeInstance cube, TSRange tsRange) throws IOException {
- return appendSegment(cube, tsRange, null, null, null);
+ public void removeCubeLocal(String cubeName) {
+ try (AutoLock lock = cubeMapLock.lockForWrite()) {
+ CubeInstance cube = cubeMap.get(cubeName);
+ if (cube != null) {
+ cubeMap.removeLocal(cubeName);
+ for (CubeSegment segment : cube.getSegments()) {
+ usedStorageLocation.remove(segment.getUuid());
+ }
+ Cuboid.clearCache(cube);
+ }
+ }
}
- public CubeSegment appendSegment(CubeInstance cube, SourcePartition src) throws IOException {
- return appendSegment(cube, src.getTSRange(), src.getSegRange(), src.getSourcePartitionOffsetStart(),
- src.getSourcePartitionOffsetEnd());
- }
+ public CubeInstance dropCube(String cubeName, boolean deleteDesc) throws IOException {
+ try (AutoLock lock = cubeMapLock.lockForWrite()) {
+ logger.info("Dropping cube '" + cubeName + "'");
+ // load projects before remove cube from project
- CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange,
- Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd)
- throws IOException {
- checkInputRanges(tsRange, segRange);
- checkBuildingSegment(cube);
+ // delete cube instance and cube desc
+ CubeInstance cube = getCube(cubeName);
+
+ // remove cube and update cache
+ crud.delete(cube);
+ Cuboid.clearCache(cube);
- // fix start/end a bit
- if (cube.getModel().getPartitionDesc().isPartitioned()) {
- // if missing start, set it to where last time ends
- CubeSegment last = cube.getLastSegment();
- if (last != null && !last.isOffsetCube() && tsRange.start.v == 0) {
- tsRange = new TSRange(last.getTSRange().end.v, tsRange.end.v);
+ if (deleteDesc && cube.getDescriptor() != null) {
+ CubeDescManager.getInstance(config).removeCubeDesc(cube.getDescriptor());
}
- } else {
- // full build
- tsRange = null;
- segRange = null;
- }
- CubeSegment newSegment = newSegment(cube, tsRange, segRange);
- newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart);
- newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd);
- validateNewSegments(cube, newSegment);
+ // delete cube from project
+ ProjectManager.getInstance(config).removeRealizationsFromProjects(RealizationType.CUBE, cubeName);
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToAddSegs(newSegment);
- updateCube(cubeBuilder);
- return newSegment;
+ return cube;
+ }
}
- public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) throws IOException {
- checkInputRanges(tsRange, segRange);
- checkBuildingSegment(cube);
-
- if (cube.getModel().getPartitionDesc().isPartitioned() == false) {
- // full build
- tsRange = null;
- segRange = null;
- }
+ @VisibleForTesting
+ /*private*/ String generateStorageLocation() {
+ String namePrefix = config.getHBaseTableNamePrefix();
+ String tableName = "";
+ Random ran = new Random();
+ do {
+ StringBuffer sb = new StringBuffer();
+ sb.append(namePrefix);
+ for (int i = 0; i < HBASE_TABLE_LENGTH; i++) {
+ sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length())));
+ }
+ tableName = sb.toString();
+ } while (this.usedStorageLocation.containsValue(tableName));
+ return tableName;
+ }
- CubeSegment newSegment = newSegment(cube, tsRange, segRange);
+ private boolean isReady(CubeSegment seg) {
+ return seg.getStatus() == SegmentStatusEnum.READY;
+ }
- Pair<Boolean, Boolean> pair = cube.getSegments().fitInSegments(newSegment);
- if (pair.getFirst() == false || pair.getSecond() == false)
- throw new IllegalArgumentException("The new refreshing segment " + newSegment
- + " does not match any existing segment in cube " + cube);
+ private TableMetadataManager getTableManager() {
+ return TableMetadataManager.getInstance(config);
+ }
- if (segRange != null) {
- CubeSegment toRefreshSeg = null;
- for (CubeSegment cubeSegment : cube.getSegments()) {
- if (cubeSegment.getSegRange().equals(segRange)) {
- toRefreshSeg = cubeSegment;
- break;
- }
- }
+ private DictionaryManager getDictionaryManager() {
+ return DictionaryManager.getInstance(config);
+ }
- if (toRefreshSeg == null) {
- throw new IllegalArgumentException("For streaming cube, only one segment can be refreshed at one time");
- }
+ private SnapshotManager getSnapshotManager() {
+ return SnapshotManager.getInstance(config);
+ }
- newSegment.setSourcePartitionOffsetStart(toRefreshSeg.getSourcePartitionOffsetStart());
- newSegment.setSourcePartitionOffsetEnd(toRefreshSeg.getSourcePartitionOffsetEnd());
- }
+ private ResourceStore getStore() {
+ return ResourceStore.getStore(this.config);
+ }
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToAddSegs(newSegment);
- updateCube(cubeBuilder);
+ @Override
+ public RealizationType getRealizationType() {
+ return RealizationType.CUBE;
+ }
- return newSegment;
+ @Override
+ public IRealization getRealization(String name) {
+ return getCube(name);
}
- public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force)
- throws IOException {
- if (cube.getSegments().isEmpty())
- throw new IllegalArgumentException("Cube " + cube + " has no segments");
-
- checkInputRanges(tsRange, segRange);
- checkBuildingSegment(cube);
- checkCubeIsPartitioned(cube);
-
- if (cube.getSegments().getFirstSegment().isOffsetCube()) {
- // offset cube, merge by date range?
- if (segRange == null && tsRange != null) {
- Pair<CubeSegment, CubeSegment> pair = cube.getSegments(SegmentStatusEnum.READY)
- .findMergeOffsetsByDateRange(tsRange, Long.MAX_VALUE);
- if (pair == null)
- throw new IllegalArgumentException("Find no segments to merge by " + tsRange + " for cube " + cube);
- segRange = new SegmentRange(pair.getFirst().getSegRange().start, pair.getSecond().getSegRange().end);
- }
- tsRange = null;
- Preconditions.checkArgument(segRange != null);
- } else {
- segRange = null;
- Preconditions.checkArgument(tsRange != null);
- }
-
- CubeSegment newSegment = newSegment(cube, tsRange, segRange);
-
- Segments<CubeSegment> mergingSegments = cube.getMergingSegments(newSegment);
- if (mergingSegments.size() <= 1)
- throw new IllegalArgumentException("Range " + newSegment.getSegRange()
- + " must contain at least 2 segments, but there is " + mergingSegments.size());
-
- CubeSegment first = mergingSegments.get(0);
- CubeSegment last = mergingSegments.get(mergingSegments.size() - 1);
- if (force == false) {
- for (int i = 0; i < mergingSegments.size() - 1; i++) {
- if (!mergingSegments.get(i).getSegRange().connects(mergingSegments.get(i + 1).getSegRange()))
- throw new IllegalStateException("Merging segments must not have gaps between "
- + mergingSegments.get(i) + " and " + mergingSegments.get(i + 1));
- }
- }
- if (first.isOffsetCube()) {
- newSegment.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end));
- newSegment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
- newSegment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
- newSegment.setTSRange(null);
- } else {
- newSegment.setTSRange(new TSRange(mergingSegments.getTSStart(), mergingSegments.getTSEnd()));
- newSegment.setSegRange(null);
- }
-
- if (force == false) {
- List<String> emptySegment = Lists.newArrayList();
- for (CubeSegment seg : mergingSegments) {
- if (seg.getSizeKB() == 0) {
- emptySegment.add(seg.getName());
- }
- }
+ // ============================================================================
+ // Segment related methods
+ // ============================================================================
- if (emptySegment.size() > 0) {
- throw new IllegalArgumentException(
- "Empty cube segment found, couldn't merge unless 'forceMergeEmptySegment' set to true: "
- + emptySegment);
- }
- }
+ // append a full build segment
+ public CubeSegment appendSegment(CubeInstance cube) throws IOException {
+ return appendSegment(cube, null, null, null, null);
+ }
- validateNewSegments(cube, newSegment);
+ public CubeSegment appendSegment(CubeInstance cube, TSRange tsRange) throws IOException {
+ return appendSegment(cube, tsRange, null, null, null);
+ }
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToAddSegs(newSegment);
- updateCube(cubeBuilder);
+ public CubeSegment appendSegment(CubeInstance cube, SourcePartition src) throws IOException {
+ return appendSegment(cube, src.getTSRange(), src.getSegRange(), src.getSourcePartitionOffsetStart(),
+ src.getSourcePartitionOffsetEnd());
+ }
- return newSegment;
+ CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange,
+ Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd)
+ throws IOException {
+ try (AutoLock lock = cubeMapLock.lockForWrite()) {
+ return segAssist.appendSegment(cube, tsRange, segRange, sourcePartitionOffsetStart,
+ sourcePartitionOffsetEnd);
+ }
}
- private void checkInputRanges(TSRange tsRange, SegmentRange segRange) {
- if (tsRange != null && segRange != null) {
- throw new IllegalArgumentException(
- "Build or refresh cube segment either by TSRange or by SegmentRange, not both.");
+ public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) throws IOException {
+ try (AutoLock lock = cubeMapLock.lockForWrite()) {
+ return segAssist.refreshSegment(cube, tsRange, segRange);
}
}
- private void checkBuildingSegment(CubeInstance cube) {
- int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments();
- if (cube.getBuildingSegments().size() >= maxBuldingSeg) {
- throw new IllegalStateException(
- "There is already " + cube.getBuildingSegments().size() + " building segment; ");
+ public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force)
+ throws IOException {
+ try (AutoLock lock = cubeMapLock.lockForWrite()) {
+ return segAssist.mergeSegments(cube, tsRange, segRange, force);
}
}
- private void checkCubeIsPartitioned(CubeInstance cube) {
- if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned() == false) {
- throw new IllegalStateException(
- "there is no partition date column specified, only full build is supported");
+ public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException {
+ try (AutoLock lock = cubeMapLock.lockForWrite()) {
+ segAssist.promoteNewlyBuiltSegments(cube, newSegment);
}
}
- /**
- * After cube update, reload cube related cache
- *
- * @param cubeName
- */
- public CubeInstance reloadCubeLocal(String cubeName) {
- CubeInstance cubeInstance = reloadCubeLocalAt(CubeInstance.concatResourcePath(cubeName));
- if (cubeInstance != null)
- Cuboid.clearCache(cubeInstance);
- return cubeInstance;
+ public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) {
+ segAssist.validateNewSegments(cube, newSegments);
}
- public void removeCubeLocal(String cubeName) {
- CubeInstance cube = cubeMap.get(cubeName);
- if (cube != null) {
- cubeMap.removeLocal(cubeName);
- for (CubeSegment segment : cube.getSegments()) {
- usedStorageLocation.remove(segment.getUuid());
- }
- Cuboid.clearCache(cube);
- }
+ public List<CubeSegment> calculateHoles(String cubeName) {
+ return segAssist.calculateHoles(cubeName);
}
- public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
+ private class SegmentAssist {
+
+ CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange,
+ Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd)
+ throws IOException {
+ checkInputRanges(tsRange, segRange);
+ checkBuildingSegment(cube);
+
+ // fix start/end a bit
+ if (cube.getModel().getPartitionDesc().isPartitioned()) {
+ // if missing start, set it to where last time ends
+ CubeSegment last = cube.getLastSegment();
+ if (last != null && !last.isOffsetCube() && tsRange.start.v == 0) {
+ tsRange = new TSRange(last.getTSRange().end.v, tsRange.end.v);
+ }
+ } else {
+ // full build
+ tsRange = null;
+ segRange = null;
+ }
- String tableName = join.getPKSide().getTableIdentity();
- String[] pkCols = join.getPrimaryKey();
- String snapshotResPath = cubeSegment.getSnapshotResPath(tableName);
- if (snapshotResPath == null)
- throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment"
- + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment);
+ CubeSegment newSegment = newSegment(cube, tsRange, segRange);
+ newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart);
+ newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd);
+ validateNewSegments(cube, newSegment);
- try {
- SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
- TableDesc tableDesc = getTableManager().getTableDesc(tableName, cubeSegment.getProject());
- return new LookupStringTable(tableDesc, pkCols, snapshot);
- } catch (IOException e) {
- throw new IllegalStateException(
- "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToAddSegs(newSegment);
+ updateCube(cubeBuilder);
+ return newSegment;
}
- }
- private CubeSegment newSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) {
- DataModelDesc modelDesc = cube.getModel();
+ public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange)
+ throws IOException {
+ checkInputRanges(tsRange, segRange);
+ checkBuildingSegment(cube);
- CubeSegment segment = new CubeSegment();
- segment.setUuid(UUID.randomUUID().toString());
- segment.setName(CubeSegment.makeSegmentName(tsRange, segRange, modelDesc));
- segment.setCreateTimeUTC(System.currentTimeMillis());
- segment.setCubeInstance(cube);
+ if (cube.getModel().getPartitionDesc().isPartitioned() == false) {
+ // full build
+ tsRange = null;
+ segRange = null;
+ }
- // let full build range be backward compatible
- if (tsRange == null && segRange == null)
- tsRange = new TSRange(0L, Long.MAX_VALUE);
+ CubeSegment newSegment = newSegment(cube, tsRange, segRange);
- segment.setTSRange(tsRange);
- segment.setSegRange(segRange);
- segment.setStatus(SegmentStatusEnum.NEW);
- segment.setStorageLocationIdentifier(generateStorageLocation());
+ Pair<Boolean, Boolean> pair = cube.getSegments().fitInSegments(newSegment);
+ if (pair.getFirst() == false || pair.getSecond() == false)
+ throw new IllegalArgumentException("The new refreshing segment " + newSegment
+ + " does not match any existing segment in cube " + cube);
- segment.setCubeInstance(cube);
+ if (segRange != null) {
+ CubeSegment toRefreshSeg = null;
+ for (CubeSegment cubeSegment : cube.getSegments()) {
+ if (cubeSegment.getSegRange().equals(segRange)) {
+ toRefreshSeg = cubeSegment;
+ break;
+ }
+ }
- segment.validate();
- return segment;
- }
+ if (toRefreshSeg == null) {
+ throw new IllegalArgumentException(
+ "For streaming cube, only one segment can be refreshed at one time");
+ }
- @VisibleForTesting
- /*private*/ String generateStorageLocation() {
- String namePrefix = config.getHBaseTableNamePrefix();
- String tableName = "";
- Random ran = new Random();
- do {
- StringBuffer sb = new StringBuffer();
- sb.append(namePrefix);
- for (int i = 0; i < HBASE_TABLE_LENGTH; i++) {
- sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length())));
+ newSegment.setSourcePartitionOffsetStart(toRefreshSeg.getSourcePartitionOffsetStart());
+ newSegment.setSourcePartitionOffsetEnd(toRefreshSeg.getSourcePartitionOffsetEnd());
}
- tableName = sb.toString();
- } while (this.usedStorageLocation.containsValue(tableName));
- return tableName;
- }
- public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException {
- if (StringUtils.isBlank(newSegment.getStorageLocationIdentifier()))
- throw new IllegalStateException(
- "For cube " + cube + ", segment " + newSegment + " missing StorageLocationIdentifier");
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToAddSegs(newSegment);
+ updateCube(cubeBuilder);
- if (StringUtils.isBlank(newSegment.getLastBuildJobID()))
- throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " missing LastBuildJobID");
-
- if (isReady(newSegment) == true) {
- logger.warn("For cube " + cube + ", segment " + newSegment + " state should be NEW but is READY");
+ return newSegment;
}
- List<CubeSegment> tobe = cube.calculateToBeSegments(newSegment);
+ public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force)
+ throws IOException {
+ if (cube.getSegments().isEmpty())
+ throw new IllegalArgumentException("Cube " + cube + " has no segments");
+
+ checkInputRanges(tsRange, segRange);
+ checkBuildingSegment(cube);
+ checkCubeIsPartitioned(cube);
+
+ if (cube.getSegments().getFirstSegment().isOffsetCube()) {
+ // offset cube, merge by date range?
+ if (segRange == null && tsRange != null) {
+ Pair<CubeSegment, CubeSegment> pair = cube.getSegments(SegmentStatusEnum.READY)
+ .findMergeOffsetsByDateRange(tsRange, Long.MAX_VALUE);
+ if (pair == null)
+ throw new IllegalArgumentException(
+ "Find no segments to merge by " + tsRange + " for cube " + cube);
+ segRange = new SegmentRange(pair.getFirst().getSegRange().start,
+ pair.getSecond().getSegRange().end);
+ }
+ tsRange = null;
+ Preconditions.checkArgument(segRange != null);
+ } else {
+ segRange = null;
+ Preconditions.checkArgument(tsRange != null);
+ }
+
+ CubeSegment newSegment = newSegment(cube, tsRange, segRange);
+
+ Segments<CubeSegment> mergingSegments = cube.getMergingSegments(newSegment);
+ if (mergingSegments.size() <= 1)
+ throw new IllegalArgumentException("Range " + newSegment.getSegRange()
+ + " must contain at least 2 segments, but there is " + mergingSegments.size());
+
+ CubeSegment first = mergingSegments.get(0);
+ CubeSegment last = mergingSegments.get(mergingSegments.size() - 1);
+ if (force == false) {
+ for (int i = 0; i < mergingSegments.size() - 1; i++) {
+ if (!mergingSegments.get(i).getSegRange().connects(mergingSegments.get(i + 1).getSegRange()))
+ throw new IllegalStateException("Merging segments must not have gaps between "
+ + mergingSegments.get(i) + " and " + mergingSegments.get(i + 1));
+ }
+ }
+ if (first.isOffsetCube()) {
+ newSegment.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end));
+ newSegment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
+ newSegment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
+ newSegment.setTSRange(null);
+ } else {
+ newSegment.setTSRange(new TSRange(mergingSegments.getTSStart(), mergingSegments.getTSEnd()));
+ newSegment.setSegRange(null);
+ }
+
+ if (force == false) {
+ List<String> emptySegment = Lists.newArrayList();
+ for (CubeSegment seg : mergingSegments) {
+ if (seg.getSizeKB() == 0) {
+ emptySegment.add(seg.getName());
+ }
+ }
+
+ if (emptySegment.size() > 0) {
+ throw new IllegalArgumentException(
+ "Empty cube segment found, couldn't merge unless 'forceMergeEmptySegment' set to true: "
+ + emptySegment);
+ }
+ }
- if (tobe.contains(newSegment) == false)
- throw new IllegalStateException(
- "For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe);
+ validateNewSegments(cube, newSegment);
- newSegment.setStatus(SegmentStatusEnum.READY);
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToAddSegs(newSegment);
+ updateCube(cubeBuilder);
- List<CubeSegment> toRemoveSegs = Lists.newArrayList();
- for (CubeSegment segment : cube.getSegments()) {
- if (!tobe.contains(segment))
- toRemoveSegs.add(segment);
+ return newSegment;
}
- logger.info("Promoting cube " + cube + ", new segment " + newSegment + ", to remove segments " + toRemoveSegs);
+ private void checkInputRanges(TSRange tsRange, SegmentRange segRange) {
+ if (tsRange != null && segRange != null) {
+ throw new IllegalArgumentException(
+ "Build or refresh cube segment either by TSRange or by SegmentRange, not both.");
+ }
+ }
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()]))
- .setToUpdateSegs(newSegment).setStatus(RealizationStatusEnum.READY);
- updateCube(cubeBuilder);
- }
+ private void checkBuildingSegment(CubeInstance cube) {
+ int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments();
+ if (cube.getBuildingSegments().size() >= maxBuldingSeg) {
+ throw new IllegalStateException(
+ "There is already " + cube.getBuildingSegments().size() + " building segment; ");
+ }
+ }
- public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) {
- List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments);
- List<CubeSegment> newList = Arrays.asList(newSegments);
- if (tobe.containsAll(newList) == false) {
- throw new IllegalStateException("For cube " + cube + ", the new segments " + newList
- + " do not fit in its current " + cube.getSegments() + "; the resulted tobe is " + tobe);
+ private void checkCubeIsPartitioned(CubeInstance cube) {
+ if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned() == false) {
+ throw new IllegalStateException(
+ "there is no partition date column specified, only full build is supported");
+ }
}
- }
- private boolean isReady(CubeSegment seg) {
- return seg.getStatus() == SegmentStatusEnum.READY;
- }
+ private CubeSegment newSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) {
+ DataModelDesc modelDesc = cube.getModel();
- private void loadAllCubeInstance() throws IOException {
- ResourceStore store = getStore();
- List<String> paths = store.collectResourceRecursively(ResourceStore.CUBE_RESOURCE_ROOT, ".json");
+ CubeSegment segment = new CubeSegment();
+ segment.setUuid(UUID.randomUUID().toString());
+ segment.setName(CubeSegment.makeSegmentName(tsRange, segRange, modelDesc));
+ segment.setCreateTimeUTC(System.currentTimeMillis());
+ segment.setCubeInstance(cube);
- logger.info("Loading Cube from folder " + store.getReadableResourcePath(ResourceStore.CUBE_RESOURCE_ROOT));
+ // let full build range be backward compatible
+ if (tsRange == null && segRange == null)
+ tsRange = new TSRange(0L, Long.MAX_VALUE);
- int succeed = 0;
- int fail = 0;
- for (String path : paths) {
- CubeInstance cube = reloadCubeLocalAt(path);
- if (cube == null) {
- fail++;
- } else {
- succeed++;
- }
+ segment.setTSRange(tsRange);
+ segment.setSegRange(segRange);
+ segment.setStatus(SegmentStatusEnum.NEW);
+ segment.setStorageLocationIdentifier(generateStorageLocation());
+
+ segment.setCubeInstance(cube);
+
+ segment.validate();
+ return segment;
}
- logger.info("Loaded " + succeed + " cubes, fail on " + fail + " cubes");
- }
+ public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException {
+ if (StringUtils.isBlank(newSegment.getStorageLocationIdentifier()))
+ throw new IllegalStateException(
+ "For cube " + cube + ", segment " + newSegment + " missing StorageLocationIdentifier");
- private CubeInstance reloadCubeLocalAt(String path) {
- ResourceStore store = getStore();
- CubeInstance cube;
+ if (StringUtils.isBlank(newSegment.getLastBuildJobID()))
+ throw new IllegalStateException(
+ "For cube " + cube + ", segment " + newSegment + " missing LastBuildJobID");
- try {
- cube = store.getResource(path, CubeInstance.class, CUBE_SERIALIZER);
- if (cube == null) {
- return cube;
+ if (isReady(newSegment) == true) {
+ logger.warn("For cube " + cube + ", segment " + newSegment + " state should be NEW but is READY");
}
- String cubeName = cube.getName();
- checkState(StringUtils.isNotBlank(cubeName), "cube (at %s) name must not be blank", path);
+ List<CubeSegment> tobe = cube.calculateToBeSegments(newSegment);
- CubeDesc cubeDesc = CubeDescManager.getInstance(config).getCubeDesc(cube.getDescName());
- checkNotNull(cubeDesc, "cube descriptor '%s' (for cube '%s') not found", cube.getDescName(), cubeName);
+ if (tobe.contains(newSegment) == false)
+ throw new IllegalStateException(
+ "For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe);
- if (!cubeDesc.getError().isEmpty()) {
- cube.setStatus(RealizationStatusEnum.DESCBROKEN);
- logger.error("cube descriptor {} (for cube '{}') is broken", cubeDesc.getResourcePath(), cubeName);
- for (String error : cubeDesc.getError()) {
- logger.error("Error: {}", error);
- }
- } else if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
- cube.setStatus(RealizationStatusEnum.DISABLED);
- logger.info("cube {} changed from DESCBROKEN to DISABLED", cubeName);
+ newSegment.setStatus(SegmentStatusEnum.READY);
+
+ List<CubeSegment> toRemoveSegs = Lists.newArrayList();
+ for (CubeSegment segment : cube.getSegments()) {
+ if (!tobe.contains(segment))
+ toRemoveSegs.add(segment);
}
- cube.setConfig((KylinConfigExt) cubeDesc.getConfig());
- cubeMap.putLocal(cubeName, cube);
+ logger.info(
+ "Promoting cube " + cube + ", new segment " + newSegment + ", to remove segments " + toRemoveSegs);
- for (CubeSegment segment : cube.getSegments()) {
- usedStorageLocation.put(segment.getUuid(), segment.getStorageLocationIdentifier());
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()]))
+ .setToUpdateSegs(newSegment).setStatus(RealizationStatusEnum.READY);
+ updateCube(cubeBuilder);
+ }
+
+ public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) {
+ List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments);
+ List<CubeSegment> newList = Arrays.asList(newSegments);
+ if (tobe.containsAll(newList) == false) {
+ throw new IllegalStateException("For cube " + cube + ", the new segments " + newList
+ + " do not fit in its current " + cube.getSegments() + "; the resulted tobe is " + tobe);
}
+ }
- logger.info("Reloaded cube {} being {} having {} segments", cubeName, cube, cube.getSegments().size());
- return cube;
+ /**
+ * Calculate the holes (gaps) in segments.
+ * @param cubeName
+ * @return
+ */
+ public List<CubeSegment> calculateHoles(String cubeName) {
+ List<CubeSegment> holes = Lists.newArrayList();
+ final CubeInstance cube = getCube(cubeName);
+ DataModelDesc modelDesc = cube.getModel();
+ Preconditions.checkNotNull(cube);
+ final List<CubeSegment> segments = cube.getSegments();
+ logger.info("totally " + segments.size() + " cubeSegments");
+ if (segments.size() == 0) {
+ return holes;
+ }
- } catch (Exception e) {
- logger.error("Error during load cube instance, skipping : " + path, e);
- return null;
+ Collections.sort(segments);
+ for (int i = 0; i < segments.size() - 1; ++i) {
+ CubeSegment first = segments.get(i);
+ CubeSegment second = segments.get(i + 1);
+ if (first.getSegRange().connects(second.getSegRange()))
+ continue;
+
+ if (first.getSegRange().apartBefore(second.getSegRange())) {
+ CubeSegment hole = new CubeSegment();
+ hole.setCubeInstance(cube);
+ if (first.isOffsetCube()) {
+ hole.setSegRange(new SegmentRange(first.getSegRange().end, second.getSegRange().start));
+ hole.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetEnd());
+ hole.setSourcePartitionOffsetEnd(second.getSourcePartitionOffsetStart());
+ hole.setName(CubeSegment.makeSegmentName(null, hole.getSegRange(), modelDesc));
+ } else {
+ hole.setTSRange(new TSRange(first.getTSRange().end.v, second.getTSRange().start.v));
+ hole.setName(CubeSegment.makeSegmentName(hole.getTSRange(), null, modelDesc));
+ }
+ holes.add(hole);
+ }
+ }
+ return holes;
}
+
}
- private TableMetadataManager getTableManager() {
- return TableMetadataManager.getInstance(config);
+ // ============================================================================
+ // Dictionary/Snapshot related methods
+ // ============================================================================
+
+ public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
+ throws IOException {
+ return dictAssist.buildDictionary(cubeSeg, col, inpTable);
}
- private DictionaryManager getDictionaryManager() {
- return DictionaryManager.getInstance(config);
+ public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
+ Dictionary<String> dict) throws IOException {
+ return dictAssist.saveDictionary(cubeSeg, col, inpTable, dict);
}
- private SnapshotManager getSnapshotManager() {
- return SnapshotManager.getInstance(config);
+ /**
+ * return null if no dictionary for given column
+ */
+ public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
+ return dictAssist.getDictionary(cubeSeg, col);
}
- private ResourceStore getStore() {
- return ResourceStore.getStore(this.config);
+ public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException {
+ return dictAssist.buildSnapshotTable(cubeSeg, lookupTable);
}
- @Override
- public RealizationType getRealizationType() {
- return RealizationType.CUBE;
+ public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
+ return dictAssist.getLookupTable(cubeSegment, join);
}
- @Override
- public IRealization getRealization(String name) {
- return getCube(name);
+ //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
+ public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
+ return dictAssist.getUHCIndex(cubeDesc);
}
- /**
- * Calculate the holes (gaps) in segments.
- * @param cubeName
- * @return
- */
- public List<CubeSegment> calculateHoles(String cubeName) {
- List<CubeSegment> holes = Lists.newArrayList();
- final CubeInstance cube = getCube(cubeName);
- DataModelDesc modelDesc = cube.getModel();
- Preconditions.checkNotNull(cube);
- final List<CubeSegment> segments = cube.getSegments();
- logger.info("totally " + segments.size() + " cubeSegments");
- if (segments.size() == 0) {
- return holes;
+ private class DictionaryAssist {
+ public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
+ throws IOException {
+ CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+ if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
+ return null;
+
+ String builderClass = cubeDesc.getDictionaryBuilderClass(col);
+ DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(col, inpTable, builderClass);
+
+ saveDictionaryInfo(cubeSeg, col, dictInfo);
+ return dictInfo;
}
- Collections.sort(segments);
- for (int i = 0; i < segments.size() - 1; ++i) {
- CubeSegment first = segments.get(i);
- CubeSegment second = segments.get(i + 1);
- if (first.getSegRange().connects(second.getSegRange()))
- continue;
-
- if (first.getSegRange().apartBefore(second.getSegRange())) {
- CubeSegment hole = new CubeSegment();
- hole.setCubeInstance(cube);
- if (first.isOffsetCube()) {
- hole.setSegRange(new SegmentRange(first.getSegRange().end, second.getSegRange().start));
- hole.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetEnd());
- hole.setSourcePartitionOffsetEnd(second.getSourcePartitionOffsetStart());
- hole.setName(CubeSegment.makeSegmentName(null, hole.getSegRange(), modelDesc));
- } else {
- hole.setTSRange(new TSRange(first.getTSRange().end.v, second.getTSRange().start.v));
- hole.setName(CubeSegment.makeSegmentName(hole.getTSRange(), null, modelDesc));
- }
- holes.add(hole);
+ public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
+ Dictionary<String> dict) throws IOException {
+ CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+ if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
+ return null;
+
+ DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(col, inpTable, dict);
+
+ saveDictionaryInfo(cubeSeg, col, dictInfo);
+ return dictInfo;
+ }
+
+ private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, DictionaryInfo dictInfo)
+ throws IOException {
+ if (dictInfo != null) {
+ Dictionary<?> dict = dictInfo.getDictionaryObject();
+ cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
+ cubeSeg.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() });
+
+ CubeUpdate update = new CubeUpdate(cubeSeg.getCubeInstance());
+ update.setToUpdateSegs(cubeSeg);
+ updateCube(update);
}
}
- return holes;
- }
- private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
+ /**
+ * return null if no dictionary for given column
+ */
+ @SuppressWarnings("unchecked")
+ public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
+ DictionaryInfo info = null;
+ try {
+ DictionaryManager dictMgr = getDictionaryManager();
+ String dictResPath = cubeSeg.getDictResPath(col);
+ if (dictResPath == null)
+ return null;
+
+ info = dictMgr.getDictionaryInfo(dictResPath);
+ if (info == null)
+ throw new IllegalStateException("No dictionary found by " + dictResPath
+ + ", invalid cube state; cube segment" + cubeSeg + ", col " + col);
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col,
+ e);
+ }
+ return (Dictionary<String>) info.getDictionaryObject();
+ }
- //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
- public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
- List<TblColRef> dictCols = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt());
- int[] uhcIndex = new int[dictCols.size()];
-
- //add GlobalDictionaryColumns
- List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries();
- if (dictionaryDescList != null) {
- for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
- if (dictionaryDesc.getBuilderClass() != null
- && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) {
- for (int i = 0; i < dictCols.size(); i++) {
- if (dictCols.get(i).equals(dictionaryDesc.getColumnRef())) {
- uhcIndex[i] = 1;
- break;
+ public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException {
+ TableMetadataManager metaMgr = getTableManager();
+ SnapshotManager snapshotMgr = getSnapshotManager();
+
+ TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, cubeSeg.getProject()));
+ IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
+ SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
+
+ cubeSeg.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
+ CubeUpdate cubeBuilder = new CubeUpdate(cubeSeg.getCubeInstance());
+ cubeBuilder.setToUpdateSegs(cubeSeg);
+ updateCube(cubeBuilder);
+
+ return snapshot;
+ }
+
+ public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
+
+ String tableName = join.getPKSide().getTableIdentity();
+ String[] pkCols = join.getPrimaryKey();
+ String snapshotResPath = cubeSegment.getSnapshotResPath(tableName);
+ if (snapshotResPath == null)
+ throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment"
+ + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment);
+
+ try {
+ SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
+ TableDesc tableDesc = getTableManager().getTableDesc(tableName, cubeSegment.getProject());
+ return new LookupStringTable(tableDesc, pkCols, snapshot);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
+ }
+ }
+
+ private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
+
+ //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
+ public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
+ List<TblColRef> dictCols = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt());
+ int[] uhcIndex = new int[dictCols.size()];
+
+ //add GlobalDictionaryColumns
+ List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries();
+ if (dictionaryDescList != null) {
+ for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
+ if (dictionaryDesc.getBuilderClass() != null
+ && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) {
+ for (int i = 0; i < dictCols.size(); i++) {
+ if (dictCols.get(i).equals(dictionaryDesc.getColumnRef())) {
+ uhcIndex[i] = 1;
+ break;
+ }
}
}
}
}
- }
- //add ShardByColumns
- Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns();
- for (int i = 0; i < dictCols.size(); i++) {
- if (shardByColumns.contains(dictCols.get(i))) {
- uhcIndex[i] = 1;
+ //add ShardByColumns
+ Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns();
+ for (int i = 0; i < dictCols.size(); i++) {
+ if (shardByColumns.contains(dictCols.get(i))) {
+ uhcIndex[i] = 1;
+ }
}
- }
- return uhcIndex;
+ return uhcIndex;
+ }
}
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-cube/src/main/java/org/apache/kylin/cube/cli/CubeSignatureRefresher.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/CubeSignatureRefresher.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/CubeSignatureRefresher.java
index d07c93b..2eaebb1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cli/CubeSignatureRefresher.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/CubeSignatureRefresher.java
@@ -97,7 +97,7 @@ public class CubeSignatureRefresher {
String calculatedSign = cubeDesc.calculateSignature();
if (cubeDesc.getSignature() == null || (!cubeDesc.getSignature().equals(calculatedSign))) {
cubeDesc.setSignature(calculatedSign);
- store.putResource(cubeDesc.getResourcePath(), cubeDesc, CubeDescManager.CUBE_DESC_SERIALIZER);
+ store.putResource(cubeDesc.getResourcePath(), cubeDesc, CubeDesc.newSerializerForLowLevelAccess());
updatedResources.add(cubeDesc.getResourcePath());
}
} catch (Exception e) {