You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/07/06 03:51:54 UTC
[kylin] 01/01: Revert "Remove unused property,
class and maven module which is for Kylin 3 only"
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch revert-1679-kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit d92e40878612fe1d4297169365dee0642ec45bb4
Author: Xiaoxiang Yu <xx...@apache.org>
AuthorDate: Tue Jul 6 11:50:58 2021 +0800
Revert "Remove unused property, class and maven module which is for Kylin 3 only"
This reverts commit e9291e3b232cb628bbfa8dc9fc21fbab1d491a19.
---
.../kylin/engine/mr/SortedColumnDFSFile.java | 6 +-
.../engine/mr/common/MapReduceExecutable.java | 6 +
.../kylin/engine/mr/common/MapReduceUtil.java | 267 ++--
.../org/apache/kylin/common/KylinConfigBase.java | 813 ++++++++----
.../apache/kylin/common/annotation/ConfigTag.java | 3 -
.../org/apache/kylin/common/util/HadoopUtil.java | 4 +
.../org/apache/kylin/common/KylinConfigTest.java | 20 +-
core-cube/pom.xml | 8 +-
.../java/org/apache/kylin/cube/CubeManager.java | 341 ++---
.../java/org/apache/kylin/cube/CubeSegment.java | 34 +-
.../kylin/cube/cli/DictionaryGeneratorCLI.java | 187 +++
.../apache/kylin/cube/cli/DumpDictionaryCLI.java | 62 +
.../java/org/apache/kylin/cube/model/CubeDesc.java | 46 +-
.../apache/kylin/cube/model/SnapshotTableDesc.java | 10 +-
.../cube/model/validation/rule/DictionaryRule.java | 6 +
.../org/apache/kylin/cube/util/CubingUtils.java | 284 ++--
.../model/validation/rule/DictionaryRuleTest.java | 14 +
.../org/apache/kylin/dict/DictionaryGenerator.java | 382 +++---
.../apache/kylin/dict/lookup}/ILookupTable.java | 2 +-
.../apache/kylin/dict/lookup/SnapshotManager.java | 12 +-
.../java/org/apache/kylin/job/JoinedFlatTable.java | 22 +-
.../kylin/dimension/DimensionEncodingFactory.java | 1 -
.../org/apache/kylin/storage/StorageFactory.java | 13 +
.../storage/gtrecord/CubeScanRangePlanner.java | 15 +-
.../kylin/storage/gtrecord/CubeSegmentScanner.java | 6 +
.../kylin/storage/gtrecord/CubeTupleConverter.java | 4 +-
.../storage/gtrecord/GTCubeStorageQueryBase.java | 2 +-
.../storage/translate/DerivedFilterTranslator.java | 2 +-
.../kylin/storage/gtrecord/DictGridTableTest.java | 1380 ++++++++++----------
pom.xml | 122 +-
.../query/enumerator/LookupTableEnumerator.java | 147 +++
.../apache/kylin/query/enumerator/OLAPQuery.java | 4 +
.../apache/kylin/query/relnode/OLAPFilterRel.java | 3 +-
.../query/relnode/visitor/TupleFilterVisitor.java | 5 +-
.../kylin/rest/controller/CubeController.java | 30 +-
.../org/apache/kylin/rest/service/CubeService.java | 30 +-
.../apache/kylin/rest/service/TableService.java | 61 +-
.../apache/kylin/source/hive/HiveInputBase.java | 33 +-
38 files changed, 2673 insertions(+), 1714 deletions(-)
diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
index 507898d..bcf4b98 100644
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.dict.ByteComparator;
+import org.apache.kylin.dict.StringBytesConverter;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.source.IReadableTable;
import org.slf4j.Logger;
@@ -90,9 +92,9 @@ public class SortedColumnDFSFile implements IReadableTable {
}
private Comparator<String> getComparatorByType(DataType type) {
- Comparator<String> comparator = null;
+ Comparator<String> comparator;
if (!type.isNumberFamily()) {
-// comparator = new ByteComparator<>(new StringBytesConverter());
+ comparator = new ByteComparator<>(new StringBytesConverter());
} else if (type.isIntegerFamily()) {
comparator = new Comparator<String>() {
@Override
diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index c6c80f4..4978fa0 100755
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -500,6 +500,12 @@ public class MapReduceExecutable extends AbstractExecutable {
for (Map.Entry<String, String> entry : configOverride.getMRConfigOverride().entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
+ if (conf.get("mapreduce.job.is-mem-hungry") != null
+ && Boolean.parseBoolean(conf.get("mapreduce.job.is-mem-hungry"))) {
+ for (Map.Entry<String, String> entry : configOverride.getMemHungryConfigOverride().entrySet()) {
+ conf.set(entry.getKey(), entry.getValue());
+ }
+ }
if (StringUtils.isNotBlank(cubeName)) {
remainingArgs.add("-" + BatchConstants.ARG_CUBE_NAME);
diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
index 4fa025b..ecde4aa 100644
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
@@ -18,130 +18,147 @@
package org.apache.kylin.engine.mr.common;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.job.exception.JobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
public class MapReduceUtil {
-//
-// private static final Logger logger = LoggerFactory.getLogger(MapReduceUtil.class);
-//
-// /**
-// * @return reducer number for calculating hll
-// */
-// public static int getCuboidHLLCounterReducerNum(CubeInstance cube) {
-// int nCuboids = cube.getCuboidScheduler().getAllCuboidIds().size();
-// int shardBase = (nCuboids - 1) / cube.getConfig().getHadoopJobPerReducerHLLCuboidNumber() + 1;
-//
-// int hllMaxReducerNumber = cube.getConfig().getHadoopJobHLLMaxReducerNumber();
-// if (shardBase > hllMaxReducerNumber) {
-// shardBase = hllMaxReducerNumber;
-// }
-// return shardBase;
-// }
-//
-// /**
-// * @param cuboidScheduler specified can provide more flexibility
-// * */
-// public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
-// double totalMapInputMB, int level)
-// throws ClassNotFoundException, IOException, InterruptedException, JobException {
-// CubeDesc cubeDesc = cubeSegment.getCubeDesc();
-// KylinConfig kylinConfig = cubeDesc.getConfig();
-//
-// double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
-// double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
-// logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level "
-// + level);
-//
-// CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cuboidScheduler, kylinConfig);
-//
-// double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
-//
-// if (level == -1) {
-// //merge case
-// double estimatedSize = cubeStatsReader.estimateCubeSize();
-// adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize;
-// logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize,
-// totalMapInputMB, adjustedCurrentLayerSizeEst);
-// } else if (level == 0) {
-// //base cuboid case TODO: the estimation could be very WRONG because it has no correction
-// adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
-// logger.debug("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
-// } else {
-// parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
-// currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
-// adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
-// logger.debug(
-// "totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}",
-// totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
-// }
-//
-// // number of reduce tasks
-// int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99);
-//
-// // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
-// if (cubeDesc.hasMemoryHungryMeasures()) {
-// logger.debug("Multiply reducer num by 4 to boost performance for memory hungry measures");
-// numReduceTasks = numReduceTasks * 4;
-// }
-//
-// // at least 1 reducer by default
-// numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
-// // no more than 500 reducer by default
-// numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-//
-// return numReduceTasks;
-// }
-//
-// public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, CuboidScheduler cuboidScheduler)
-// throws IOException {
-// KylinConfig kylinConfig = cubeSeg.getConfig();
-//
-// Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig).getCuboidSizeMap();
-// double totalSizeInM = 0;
-// for (Double cuboidSize : cubeSizeMap.values()) {
-// totalSizeInM += cuboidSize;
-// }
-// return getReduceTaskNum(totalSizeInM, kylinConfig);
-// }
-//
-// // @return the first indicates the total reducer number, the second indicates the reducer number for base cuboid
-// public static Pair<Integer, Integer> getConvergeCuboidDataReduceTaskNums(CubeSegment cubeSeg) throws IOException {
-// long baseCuboidId = cubeSeg.getCuboidScheduler().getBaseCuboidId();
-//
-// Set<Long> overlapCuboids = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
-// overlapCuboids.retainAll(cubeSeg.getCubeInstance().getCuboidsRecommend());
-// overlapCuboids.add(baseCuboidId);
-//
-// Pair<Map<Long, Long>, Long> cuboidStats = CuboidStatsReaderUtil
-// .readCuboidStatsWithSourceFromSegment(overlapCuboids, cubeSeg);
-// Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSeg, cuboidStats.getFirst(),
-// cuboidStats.getSecond());
-// double totalSizeInM = 0;
-// for (Double cuboidSize : cubeSizeMap.values()) {
-// totalSizeInM += cuboidSize;
-// }
-//
-// double baseSizeInM = cubeSizeMap.get(baseCuboidId);
-//
-// KylinConfig kylinConfig = cubeSeg.getConfig();
-// int nBase = getReduceTaskNum(baseSizeInM, kylinConfig);
-// int nOther = getReduceTaskNum(totalSizeInM - baseSizeInM, kylinConfig);
-// return new Pair<>(nBase + nOther, nBase);
-// }
-//
-// private static int getReduceTaskNum(double totalSizeInM, KylinConfig kylinConfig) {
-// double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
-// double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
-//
-// // number of reduce tasks
-// int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);
-//
-// // at least 1 reducer by default
-// numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
-// // no more than 500 reducer by default
-// numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-//
-// logger.info("Having total map input MB " + Math.round(totalSizeInM));
-// logger.info("Having per reduce MB " + perReduceInputMB);
-// logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
-// return numReduceTasks;
-// }
+
+ private static final Logger logger = LoggerFactory.getLogger(MapReduceUtil.class);
+
+ /**
+ * @return reducer number for calculating hll
+ */
+ public static int getCuboidHLLCounterReducerNum(CubeInstance cube) {
+ int nCuboids = cube.getCuboidScheduler().getAllCuboidIds().size();
+ int shardBase = (nCuboids - 1) / cube.getConfig().getHadoopJobPerReducerHLLCuboidNumber() + 1;
+
+ int hllMaxReducerNumber = cube.getConfig().getHadoopJobHLLMaxReducerNumber();
+ if (shardBase > hllMaxReducerNumber) {
+ shardBase = hllMaxReducerNumber;
+ }
+ return shardBase;
+ }
+
+ /**
+ * @param cuboidScheduler specified can provide more flexibility
+ * */
+ public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
+ double totalMapInputMB, int level)
+ throws ClassNotFoundException, IOException, InterruptedException, JobException {
+ CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+ KylinConfig kylinConfig = cubeDesc.getConfig();
+
+ double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+ double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+ logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level "
+ + level);
+
+ CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cuboidScheduler, kylinConfig);
+
+ double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
+
+ if (level == -1) {
+ //merge case
+ double estimatedSize = cubeStatsReader.estimateCubeSize();
+ adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize;
+ logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize,
+ totalMapInputMB, adjustedCurrentLayerSizeEst);
+ } else if (level == 0) {
+ //base cuboid case TODO: the estimation could be very WRONG because it has no correction
+ adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
+ logger.debug("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
+ } else {
+ parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
+ currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
+ adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
+ logger.debug(
+ "totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}",
+ totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
+ }
+
+ // number of reduce tasks
+ int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99);
+
+ // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
+ if (cubeDesc.hasMemoryHungryMeasures()) {
+ logger.debug("Multiply reducer num by 4 to boost performance for memory hungry measures");
+ numReduceTasks = numReduceTasks * 4;
+ }
+
+ // at least 1 reducer by default
+ numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
+ // no more than 500 reducer by default
+ numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+ return numReduceTasks;
+ }
+
+ public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, CuboidScheduler cuboidScheduler)
+ throws IOException {
+ KylinConfig kylinConfig = cubeSeg.getConfig();
+
+ Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig).getCuboidSizeMap();
+ double totalSizeInM = 0;
+ for (Double cuboidSize : cubeSizeMap.values()) {
+ totalSizeInM += cuboidSize;
+ }
+ return getReduceTaskNum(totalSizeInM, kylinConfig);
+ }
+
+ // @return the first indicates the total reducer number, the second indicates the reducer number for base cuboid
+ public static Pair<Integer, Integer> getConvergeCuboidDataReduceTaskNums(CubeSegment cubeSeg) throws IOException {
+ long baseCuboidId = cubeSeg.getCuboidScheduler().getBaseCuboidId();
+
+ Set<Long> overlapCuboids = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
+ overlapCuboids.retainAll(cubeSeg.getCubeInstance().getCuboidsRecommend());
+ overlapCuboids.add(baseCuboidId);
+
+ Pair<Map<Long, Long>, Long> cuboidStats = CuboidStatsReaderUtil
+ .readCuboidStatsWithSourceFromSegment(overlapCuboids, cubeSeg);
+ Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSeg, cuboidStats.getFirst(),
+ cuboidStats.getSecond());
+ double totalSizeInM = 0;
+ for (Double cuboidSize : cubeSizeMap.values()) {
+ totalSizeInM += cuboidSize;
+ }
+
+ double baseSizeInM = cubeSizeMap.get(baseCuboidId);
+
+ KylinConfig kylinConfig = cubeSeg.getConfig();
+ int nBase = getReduceTaskNum(baseSizeInM, kylinConfig);
+ int nOther = getReduceTaskNum(totalSizeInM - baseSizeInM, kylinConfig);
+ return new Pair<>(nBase + nOther, nBase);
+ }
+
+ private static int getReduceTaskNum(double totalSizeInM, KylinConfig kylinConfig) {
+ double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+ double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+
+ // number of reduce tasks
+ int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);
+
+ // at least 1 reducer by default
+ numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
+ // no more than 500 reducer by default
+ numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+ logger.info("Having total map input MB " + Math.round(totalSizeInM));
+ logger.info("Having per reduce MB " + perReduceInputMB);
+ logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
+ return numReduceTasks;
+ }
}
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index fc76506..39f8eae 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -292,14 +292,6 @@ public abstract class KylinConfigBase implements Serializable {
|| "LOCAL".equals(getOptional("kylin.env", "DEV"));
}
- public boolean isUTEnv() {
- return "UT".equals(getDeployEnv());
- }
-
- public boolean isLocalEnv() {
- return "LOCAL".equals(getDeployEnv());
- }
-
public String getDeployEnv() {
return getOptional("kylin.env", "DEV");
}
@@ -351,7 +343,7 @@ public abstract class KylinConfigBase implements Serializable {
String root = getOptional("kylin.env.hdfs-metastore-bigcell-dir");
if (root == null) {
- return getHdfsWorkingDirectory();
+ return getJdbcHdfsWorkingDirectory();
}
Path path = new Path(root);
@@ -360,8 +352,12 @@ public abstract class KylinConfigBase implements Serializable {
"kylin.env.hdfs-metastore-bigcell-dir must be absolute, but got " + root);
// make sure path is qualified
- FileSystem fs = HadoopUtil.getWorkingFileSystem();
- path = fs.makeQualified(path);
+ try {
+ FileSystem fs = HadoopUtil.getReadFileSystem();
+ path = fs.makeQualified(path);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
root = new Path(path, StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).toString();
@@ -378,6 +374,28 @@ public abstract class KylinConfigBase implements Serializable {
return cachedBigCellDirectory;
}
+ public String getReadHdfsWorkingDirectory() {
+ if (StringUtils.isNotEmpty(getHBaseClusterFs())) {
+ Path workingDir = new Path(getHdfsWorkingDirectory());
+ return new Path(getHBaseClusterFs(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString() + "/";
+ }
+
+ return getHdfsWorkingDirectory();
+ }
+
+ private String getJdbcHdfsWorkingDirectory() {
+ if (StringUtils.isNotEmpty(getJdbcFileSystem())) {
+ Path workingDir = new Path(getReadHdfsWorkingDirectory());
+ return new Path(getJdbcFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString() + "/";
+ }
+
+ return getReadHdfsWorkingDirectory();
+ }
+
+ private String getJdbcFileSystem() {
+ return getOptional("kylin.storage.columnar.jdbc.file-system", "");
+ }
+
public String getHdfsWorkingDirectory(String project) {
if (isProjectIsolationEnabled() && project != null) {
return new Path(getHdfsWorkingDirectory(), project).toString() + "/";
@@ -461,7 +479,7 @@ public abstract class KylinConfigBase implements Serializable {
Map<String, String> r = Maps.newLinkedHashMap();
// ref constants in ISourceAware
r.put("", "org.apache.kylin.common.persistence.FileResourceStore");
-// r.put("hbase", "org.apache.kylin.storage.hbase.HBaseResourceStore");
+ r.put("hbase", "org.apache.kylin.storage.hbase.HBaseResourceStore");
r.put("hdfs", "org.apache.kylin.common.persistence.HDFSResourceStore");
r.put("ifile", "org.apache.kylin.common.persistence.IdentifierFileResourceStore");
r.put("jdbc", "org.apache.kylin.common.persistence.JDBCResourceStore");
@@ -516,7 +534,6 @@ public abstract class KylinConfigBase implements Serializable {
return (DistributedLockFactory) ClassUtil.newInstance(clsName);
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getHBaseMappingAdapter() {
return getOptional("kylin.metadata.hbasemapping-adapter");
}
@@ -525,17 +542,14 @@ public abstract class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.metadata.check-copy-on-write", FALSE));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getHbaseClientScannerTimeoutPeriod() {
return getOptional("kylin.metadata.hbase-client-scanner-timeout-period", "10000");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getHbaseRpcTimeout() {
return getOptional("kylin.metadata.hbase-rpc-timeout", "5000");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getHbaseClientRetriesNumber() {
return getOptional("kylin.metadata.hbase-client-retries-number", "1");
}
@@ -548,31 +562,119 @@ public abstract class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.metadata.able-change-string-to-datetime", "false"));
}
- public String getMetadataDialect() {
- return getOptional("kylin.metadata.jdbc.dialect", "mysql");
+ // ============================================================================
+ // DICTIONARY & SNAPSHOT
+ // ============================================================================
+
+ public boolean isUseForestTrieDictionary() {
+ return Boolean.parseBoolean(getOptional("kylin.dictionary.use-forest-trie", TRUE));
}
- public boolean isJsonAlwaysSmallCell() {
- return Boolean.parseBoolean(getOptional("kylin.metadata.jdbc.json-always-small-cell", TRUE));
+ public long getTrieDictionaryForestMaxTrieSizeMB() {
+ return Integer.parseInt(getOptional("kylin.dictionary.forest-trie-max-mb", "500"));
}
- public int getSmallCellMetadataWarningThreshold() {
- return Integer.parseInt(
- getOptional("kylin.metadata.jdbc.small-cell-meta-size-warning-threshold", String.valueOf(100 << 20))); //100mb
+ public long getCachedDictMaxEntrySize() {
+ return Long.parseLong(getOptional("kylin.dictionary.max-cache-entry", "3000"));
}
- public int getSmallCellMetadataErrorThreshold() {
- return Integer.parseInt(
- getOptional("kylin.metadata.jdbc.small-cell-meta-size-error-threshold", String.valueOf(1 << 30))); // 1gb
+ public int getCachedDictMaxSize() {
+ return Integer.parseInt(getOptional("kylin.dictionary.max-cache-size", "-1"));
}
- public int getJdbcResourceStoreMaxCellSize() {
- return Integer.parseInt(getOptional("kylin.metadata.jdbc.max-cell-size", "1048576")); // 1mb
+ public boolean isGrowingDictEnabled() {
+ return Boolean.parseBoolean(this.getOptional("kylin.dictionary.growing-enabled", FALSE));
+ }
+
+ public boolean isDictResuable() {
+ return Boolean.parseBoolean(this.getOptional("kylin.dictionary.resuable", FALSE));
+ }
+
+ public long getCachedDictionaryMaxEntrySize() {
+ return Long.parseLong(getOptional("kylin.dictionary.cached-dict-max-cache-entry", "50000"));
+ }
+
+ public int getAppendDictEntrySize() {
+ return Integer.parseInt(getOptional("kylin.dictionary.append-entry-size", "10000000"));
+ }
+
+ public int getAppendDictMaxVersions() {
+ return Integer.parseInt(getOptional("kylin.dictionary.append-max-versions", "3"));
+ }
+
+ public int getAppendDictVersionTTL() {
+ return Integer.parseInt(getOptional("kylin.dictionary.append-version-ttl", "259200000"));
+ }
+
+ @ConfigTag(ConfigTag.Tag.DEPRECATED)
+ public int getCachedSnapshotMaxEntrySize() {
+ return Integer.parseInt(getOptional("kylin.snapshot.max-cache-entry", "500"));
+ }
+
+ @ConfigTag(ConfigTag.Tag.DEPRECATED)
+ public int getTableSnapshotMaxMB() {
+ return Integer.parseInt(getOptional("kylin.snapshot.max-mb", "300"));
+ }
+
+ @ConfigTag(ConfigTag.Tag.DEPRECATED)
+ public int getExtTableSnapshotShardingMB() {
+ return Integer.parseInt(getOptional("kylin.snapshot.ext.shard-mb", "500"));
+ }
+
+ @ConfigTag(ConfigTag.Tag.DEPRECATED)
+ public String getExtTableSnapshotLocalCachePath() {
+ return getOptional("kylin.snapshot.ext.local.cache.path", "lookup_cache");
+ }
+
+ @ConfigTag(ConfigTag.Tag.DEPRECATED)
+ public double getExtTableSnapshotLocalCacheMaxSizeGB() {
+ return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200"));
+ }
+
+ @ConfigTag(ConfigTag.Tag.DEPRECATED)
+ public long getExtTableSnapshotLocalCacheCheckVolatileRange() {
+ return Long.parseLong(getOptional("kylin.snapshot.ext.local.cache.check.volatile", "3600000"));
}
+ @ConfigTag({ConfigTag.Tag.DEPRECATED, ConfigTag.Tag.CUBE_LEVEL})
+ public boolean isShrunkenDictFromGlobalEnabled() {
+ return Boolean.parseBoolean(this.getOptional("kylin.dictionary.shrunken-from-global-enabled", TRUE));
+ }
+
+
// ============================================================================
- // DICTIONARY & SNAPSHOT
+ // Hive Global Dictionary
+ //
+ // ============================================================================
+
+ /**
+ * @return if mr-hive dict not enabled, return empty array
+ * else return array contains "{TABLE_NAME}_{COLUMN_NAME}"
+ */
+ @ConfigTag(ConfigTag.Tag.DEPRECATED)
+ public String[] getMrHiveDictColumns() {
+ String columnStr = getOptional("kylin.dictionary.mr-hive.columns", "");
+ if (!columnStr.equals("")) {
+ return columnStr.split(",");
+ }
+ return new String[0];
+ }
+
+ @ConfigTag(ConfigTag.Tag.DEPRECATED)
+ public String getMrHiveDictDB() {
+ return getOptional("kylin.dictionary.mr-hive.database", getHiveDatabaseForIntermediateTable());
+ }
+
+ @ConfigTag(ConfigTag.Tag.DEPRECATED)
+ public String getMrHiveDictTableSuffix() {
+ return getOptional("kylin.dictionary.mr-hive.table.suffix", "_global_dict");
+ }
+
// ============================================================================
+ // Distributed/Spark Global dictionary
+ // Add wiki link here
+ // ============================================================================
+
public int getGlobalDictV2MinHashPartitions() {
return Integer.parseInt(getOptional("kylin.dictionary.globalV2-min-hash-partitions", "10"));
@@ -606,42 +708,6 @@ public abstract class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.dictionary.globalV2-check", "true"));
}
- /*
- * Detect dataset skew in dictionary encode step.
- * */
- public boolean detectDataSkewInDictEncodingEnabled() {
- return Boolean.valueOf(getOptional("kylin.dictionary.detect.data.skew.in.encoding", "false"));
- }
-
- /*
- * In some data skew cases, the repartition step during dictionary encoding will be slow.
- * We can choose to sample from the dataset to detect skewed. This configuration is used to set the sample rate.
- * */
- public double sampleRateInEncodingSkewDetection() {
- return Double.valueOf(getOptional("kylin.dictionary.detect.data.skew.sample.rate", "0.1"));
- }
-
- /*
- * In KYLIN4, dictionaries are hashed into several buckets, column data are repartitioned by the same hash algorithm
- * during encoding step too. In data skew cases, the repartition step will be very slow. Kylin will automatically
- * sample from the source to detect skewed data and repartition these skewed data to random partitions.
- * This configuration is used to set the skew data threshhold, valued from 0 to 1.
- * e.g.
- * if you set this value to 0.05, for each value that takes up more than 5% percent of the total will be regarded
- * as skew data, as a result the skewed data will be no more than 20 records
- * */
- public double skewPercentageThreshHold() {
- return Double.valueOf(getOptional("kylin.dictionary.data.skew.percentage.threshhold", "0.05"));
- }
-
- public boolean isSnapshotParallelBuildEnabled() {
- return Boolean.parseBoolean(getOptional("kylin.snapshot.parallel-build-enabled", "true"));
- }
-
- public int snapshotParallelBuildTimeoutSeconds() {
- return Integer.parseInt(getOptional("kylin.snapshot.parallel-build-timeout-seconds", "3600"));
- }
-
// ============================================================================
// CUBE
// ============================================================================
@@ -973,6 +1039,14 @@ public abstract class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.job.error-record-threshold", "0"));
}
+ public boolean isAdvancedFlatTableUsed() {
+ return Boolean.parseBoolean(getOptional("kylin.job.use-advanced-flat-table", FALSE));
+ }
+
+ public String getAdvancedFlatTableClass() {
+ return getOptional("kylin.job.advanced-flat-table.class");
+ }
+
public String getJobTrackingURLPattern() {
return getOptional("kylin.job.tracking-url-pattern", "");
}
@@ -1032,7 +1106,7 @@ public abstract class KylinConfigBase implements Serializable {
/**
* was for route to hive, not used any more
*/
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
+ @Deprecated
public String getHiveUrl() {
return getOptional("kylin.source.hive.connection-url", "");
}
@@ -1040,7 +1114,7 @@ public abstract class KylinConfigBase implements Serializable {
/**
* was for route to hive, not used any more
*/
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
+ @Deprecated
public String getHiveUser() {
return getOptional("kylin.source.hive.connection-user", "");
}
@@ -1048,87 +1122,71 @@ public abstract class KylinConfigBase implements Serializable {
/**
* was for route to hive, not used any more
*/
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
+ @Deprecated
public String getHivePassword() {
return getOptional("kylin.source.hive.connection-password", "");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public Map<String, String> getHiveConfigOverride() {
return getPropertiesByPrefix("kylin.source.hive.config-override.");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getOverrideHiveTableLocation(String table) {
return getOptional("kylin.source.hive.table-location." + table.toUpperCase(Locale.ROOT));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public boolean isHiveKeepFlatTable() {
return Boolean.parseBoolean(this.getOptional("kylin.source.hive.keep-flat-table", FALSE));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getHiveDatabaseForIntermediateTable() {
return CliCommandExecutor.checkHiveProperty(this.getOptional("kylin.source.hive.database-for-flat-table", DEFAULT));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getFlatTableStorageFormat() {
return this.getOptional("kylin.source.hive.flat-table-storage-format", "SEQUENCEFILE");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getFlatTableFieldDelimiter() {
return this.getOptional("kylin.source.hive.flat-table-field-delimiter", "\u001F");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public boolean isHiveRedistributeEnabled() {
return Boolean.parseBoolean(this.getOptional("kylin.source.hive.redistribute-flat-table", TRUE));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getHiveClientMode() {
return getOptional("kylin.source.hive.client", "cli");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getHiveBeelineShell() {
return getOptional("kylin.source.hive.beeline-shell", "beeline");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getHiveBeelineParams() {
return getOptional("kylin.source.hive.beeline-params", "");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public boolean getEnableSparkSqlForTableOps() {
return Boolean.parseBoolean(getOptional("kylin.source.hive.enable-sparksql-for-table-ops", FALSE));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getSparkSqlBeelineShell() {
return getOptional("kylin.source.hive.sparksql-beeline-shell", "");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getSparkSqlBeelineParams() {
return getOptional("kylin.source.hive.sparksql-beeline-params", "");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public boolean getHiveTableDirCreateFirst() {
return Boolean.parseBoolean(getOptional("kylin.source.hive.table-dir-create-first", FALSE));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getFlatHiveTableClusterByDictColumn() {
return getOptional("kylin.source.hive.flat-table-cluster-by-dict-column");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public int getHiveRedistributeColumnCount() {
return Integer.parseInt(getOptional("kylin.source.hive.redistribute-column-count", "3"));
}
@@ -1185,7 +1243,7 @@ public abstract class KylinConfigBase implements Serializable {
}
// ============================================================================
- // SOURCE.KAFKA(Removed)
+ // SOURCE.KAFKA
// ============================================================================
@ConfigTag(ConfigTag.Tag.NOT_IMPLEMENTED)
@@ -1194,7 +1252,7 @@ public abstract class KylinConfigBase implements Serializable {
}
// ============================================================================
- // SOURCE.JDBC(Removed)
+ // SOURCE.JDBC
// ============================================================================
@ConfigTag(ConfigTag.Tag.NOT_IMPLEMENTED)
@@ -1253,7 +1311,7 @@ public abstract class KylinConfigBase implements Serializable {
}
// ============================================================================
- // STORAGE.HBASE(Removed)
+ // STORAGE.HBASE
// ============================================================================
public Map<Integer, String> getStorageEngines() {
@@ -1463,7 +1521,7 @@ public abstract class KylinConfigBase implements Serializable {
}
// ============================================================================
- // ENGINE.MR(Removed)
+ // ENGINE.MR
// ============================================================================
public Map<Integer, String> getJobEngines() {
@@ -1482,6 +1540,8 @@ public abstract class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.engine.default", "6"));
}
+
+
public String getKylinJobJarPath() {
final String jobJar = getOptional(KYLIN_ENGINE_MR_JOB_JAR);
if (StringUtils.isNotEmpty(jobJar)) {
@@ -1499,63 +1559,76 @@ public abstract class KylinConfigBase implements Serializable {
System.setProperty(KYLIN_ENGINE_MR_JOB_JAR, path);
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getKylinJobMRLibDir() {
return getOptional("kylin.engine.mr.lib-dir", "");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public Map<String, String> getMRConfigOverride() {
return getPropertiesByPrefix("kylin.engine.mr.config-override.");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
- public int getHadoopJobMinReducerNumber() {
- return Integer.parseInt(getOptional("kylin.engine.mr.min-reducer-number", "1"));
+ // used for some mem-hungry step
+ public Map<String, String> getMemHungryConfigOverride() {
+ return getPropertiesByPrefix("kylin.engine.mr.mem-hungry-config-override.");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
- public int getHadoopJobMaxReducerNumber() {
- return Integer.parseInt(getOptional("kylin.engine.mr.max-reducer-number", "500"));
+ public Map<String, String> getUHCMRConfigOverride() {
+ return getPropertiesByPrefix("kylin.engine.mr.uhc-config-override.");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
- public int getHadoopJobMapperInputRows() {
- return Integer.parseInt(getOptional("kylin.engine.mr.mapper-input-rows", "1000000"));
+ public Map<String, String> getBaseCuboidMRConfigOverride() {
+ return getPropertiesByPrefix("kylin.engine.mr.base-cuboid-config-override.");
+ }
+
+ public Map<String, String> getSparkConfigOverride() {
+ return getPropertiesByPrefix("kylin.engine.spark-conf.");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public Map<String, String> getFlinkConfigOverride() {
return getPropertiesByPrefix("kylin.engine.flink-conf.");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public Map<String, String> getSparkConfigOverrideWithSpecificName(String configName) {
return getPropertiesByPrefix("kylin.engine.spark-conf-" + configName + ".");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public Map<String, String> getFlinkConfigOverrideWithSpecificName(String configName) {
return getPropertiesByPrefix("kylin.engine.flink-conf-" + configName + ".");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
+ public double getDefaultHadoopJobReducerInputMB() {
+ return Double.parseDouble(getOptional("kylin.engine.mr.reduce-input-mb", "500"));
+ }
+
+ public double getDefaultHadoopJobReducerCountRatio() {
+ return Double.parseDouble(getOptional("kylin.engine.mr.reduce-count-ratio", "1.0"));
+ }
+
+ public int getHadoopJobMinReducerNumber() {
+ return Integer.parseInt(getOptional("kylin.engine.mr.min-reducer-number", "1"));
+ }
+
+ public int getHadoopJobMaxReducerNumber() {
+ return Integer.parseInt(getOptional("kylin.engine.mr.max-reducer-number", "500"));
+ }
+
+ public int getHadoopJobMapperInputRows() {
+ return Integer.parseInt(getOptional("kylin.engine.mr.mapper-input-rows", "1000000"));
+ }
+
public int getCuboidStatsCalculatorMaxNumber() {
// set 1 to disable multi-thread statistics calculation
return Integer.parseInt(getOptional("kylin.engine.mr.max-cuboid-stats-calculator-number", "1"));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public int getCuboidNumberPerStatsCalculator() {
return Integer.parseInt(getOptional("kylin.engine.mr.cuboid-number-per-stats-calculator", "100"));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public int getHadoopJobPerReducerHLLCuboidNumber() {
return Integer.parseInt(getOptional("kylin.engine.mr.per-reducer-hll-cuboid-number", "100"));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public int getHadoopJobHLLMaxReducerNumber() {
// by default multi-reducer hll calculation is disabled
return Integer.parseInt(getOptional("kylin.engine.mr.hll-max-reducer-number", "1"));
@@ -1566,27 +1639,22 @@ public abstract class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "3"));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public boolean isBuildUHCDictWithMREnabled() {
return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-uhc-dict-in-additional-step", FALSE));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public boolean isBuildDictInReducerEnabled() {
return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-dict-in-reducer", TRUE));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getYarnStatusCheckUrl() {
return getOptional("kylin.engine.mr.yarn-check-status-url", null);
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public int getYarnStatusCheckIntervalSeconds() {
return Integer.parseInt(getOptional("kylin.engine.mr.yarn-check-interval-seconds", "10"));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public boolean isUseLocalClasspathEnabled() {
return Boolean.parseBoolean(getOptional("kylin.engine.mr.use-local-classpath", TRUE));
}
@@ -1595,114 +1663,122 @@ public abstract class KylinConfigBase implements Serializable {
* different version hive use different UNION style
* https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Union
*/
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getHiveUnionStyle() {
return getOptional("kylin.hive.union.style", "UNION");
}
// ============================================================================
- // ENGINE.SPARK (DEPRECATED)
+ // ENGINE.SPARK
// ============================================================================
public String getHadoopConfDir() {
return getOptional("kylin.env.hadoop-conf-dir", "");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
+ /**
+ * Get the sparder app name, default value is: 'sparder_on_localhost-7070'
+ */
+ public String getSparderAppName() {
+ String customSparderAppName = getOptional("kylin.query.sparder-context.app-name", "");
+ if (StringUtils.isEmpty(customSparderAppName)) {
+ customSparderAppName =
+ "sparder_on_" + getServerRestAddress().replaceAll(":", "-");
+ }
+ return customSparderAppName;
+ }
+
public String getSparkAdditionalJars() {
return getOptional("kylin.engine.spark.additional-jars", "");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getFlinkAdditionalJars() {
return getOptional("kylin.engine.flink.additional-jars", "");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public float getSparkRDDPartitionCutMB() {
return Float.parseFloat(getOptional("kylin.engine.spark.rdd-partition-cut-mb", "10.0"));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public float getFlinkPartitionCutMB() {
return Float.parseFloat(getOptional("kylin.engine.flink.partition-cut-mb", "10.0"));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public int getSparkMinPartition() {
return Integer.parseInt(getOptional("kylin.engine.spark.min-partition", "1"));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public int getFlinkMinPartition() {
return Integer.parseInt(getOptional("kylin.engine.flink.min-partition", "1"));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public int getSparkMaxPartition() {
return Integer.parseInt(getOptional("kylin.engine.spark.max-partition", "5000"));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public int getFlinkMaxPartition() {
return Integer.parseInt(getOptional("kylin.engine.flink.max-partition", "5000"));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getSparkStorageLevel() {
return getOptional("kylin.engine.spark.storage-level", "MEMORY_AND_DISK_SER");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public boolean isSparkSanityCheckEnabled() {
return Boolean.parseBoolean(getOptional("kylin.engine.spark.sanity-check-enabled", FALSE));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
+ public boolean isSparkFactDistinctEnable() {
+ return Boolean.parseBoolean(getOptional("kylin.engine.spark-fact-distinct", "false"));
+ }
+
+ public boolean isSparkUHCDictionaryEnable() {
+ return Boolean.parseBoolean(getOptional("kylin.engine.spark-udc-dictionary", "false"));
+ }
+
+ public boolean isSparkCardinalityEnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.engine.spark-cardinality", "false"));
+ }
+
public int getSparkOutputMaxSize() {
return Integer.valueOf(getOptional("kylin.engine.spark.output.max-size", "10485760"));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public boolean isSparkDimensionDictionaryEnabled() {
return Boolean.parseBoolean(getOptional("kylin.engine.spark-dimension-dictionary", "false"));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public boolean isFlinkSanityCheckEnabled() {
return Boolean.parseBoolean(getOptional("kylin.engine.flink.sanity-check-enabled", FALSE));
}
+ public boolean isSparCreateHiveTableViaSparkEnable() {
+ return Boolean.parseBoolean(getOptional("kylin.engine.spark-create-table-enabled", "false"));
+ }
+
// ============================================================================
// ENGINE.LIVY
// ============================================================================
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public boolean isLivyEnabled() {
return Boolean.parseBoolean(getOptional("kylin.engine.livy-conf.livy-enabled", FALSE));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getLivyRestApiBacktick() {
return getOptional("kylin.engine.livy.backtick.quote", "");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getLivyUrl() {
return getOptional("kylin.engine.livy-conf.livy-url");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public Map<String, String> getLivyKey() {
return getPropertiesByPrefix("kylin.engine.livy-conf.livy-key.");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public Map<String, String> getLivyArr() {
return getPropertiesByPrefix("kylin.engine.livy-conf.livy-arr.");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public Map<String, String> getLivyMap() {
return getPropertiesByPrefix("kylin.engine.livy-conf.livy-map.");
}
@@ -1763,7 +1839,6 @@ public abstract class KylinConfigBase implements Serializable {
// check KYLIN-3358, need deploy coprocessor if enabled
// finally should be deprecated
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public boolean isDynamicColumnEnabled() {
return Boolean.parseBoolean(getOptional("kylin.query.enable-dynamic-column", FALSE));
}
@@ -1818,13 +1893,11 @@ public abstract class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold", "0"));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public long getQueryMaxScanBytes() {
long value = Long.parseLong(getOptional("kylin.query.max-scan-bytes", "0"));
return value > 0 ? value : Long.MAX_VALUE;
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public long getQueryMaxReturnRows() {
return Integer.parseInt(this.getOptional("kylin.query.max-return-rows", "5000000"));
}
@@ -1942,7 +2015,6 @@ public abstract class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.query.max-dimension-count-distinct", "5000000"));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public Map<String, String> getUDFs() {
Map<String, String> udfMap = Maps.newLinkedHashMap();
udfMap.put("version", "org.apache.kylin.query.udf.VersionUDF");
@@ -1975,12 +2047,10 @@ public abstract class KylinConfigBase implements Serializable {
return this.getOptional("kylin.query.schema-factory", "org.apache.kylin.query.schema.OLAPSchemaFactory");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getPushDownRunnerClassName() {
return getOptional("kylin.query.pushdown.runner-class-name", "");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public List<String> getPushDownRunnerIds() {
List<String> ids = Lists.newArrayList();
String idsStr = getOptional("kylin.query.pushdown.runner.ids", "");
@@ -1992,7 +2062,6 @@ public abstract class KylinConfigBase implements Serializable {
return ids;
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String[] getPushDownConverterClassNames() {
return getOptionalStringArray("kylin.query.pushdown.converter-class-names",
new String[]{"org.apache.kylin.source.adhocquery.HivePushDownConverter"});
@@ -2002,7 +2071,6 @@ public abstract class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(this.getOptional("kylin.query.pushdown.cache-enabled", FALSE));
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getJdbcUrl(String id) {
if (null == id) {
return getOptional("kylin.query.pushdown.jdbc.url", "");
@@ -2011,7 +2079,6 @@ public abstract class KylinConfigBase implements Serializable {
}
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getJdbcDriverClass(String id) {
if (null == id) {
return getOptional("kylin.query.pushdown.jdbc.driver", "");
@@ -2020,7 +2087,6 @@ public abstract class KylinConfigBase implements Serializable {
}
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getJdbcUsername(String id) {
if (null == id) {
return getOptional("kylin.query.pushdown.jdbc.username", "");
@@ -2029,7 +2095,6 @@ public abstract class KylinConfigBase implements Serializable {
}
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public String getJdbcPassword(String id) {
if (null == id) {
return getOptional("kylin.query.pushdown.jdbc.password", "");
@@ -2038,7 +2103,6 @@ public abstract class KylinConfigBase implements Serializable {
}
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public int getPoolMaxTotal(String id) {
if (null == id) {
return Integer.parseInt(
@@ -2051,7 +2115,6 @@ public abstract class KylinConfigBase implements Serializable {
}
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public int getPoolMaxIdle(String id) {
if (null == id) {
return Integer.parseInt(
@@ -2064,7 +2127,6 @@ public abstract class KylinConfigBase implements Serializable {
}
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
public int getPoolMinIdle(String id) {
if (null == id) {
return Integer.parseInt(
@@ -2378,23 +2440,221 @@ public abstract class KylinConfigBase implements Serializable {
return getOptional("kylin.tool.auto-migrate-cube.dest-config", "");
}
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
- public String getJdbcSourceAdaptor() {
- return getOptional("kylin.source.jdbc.adaptor");
- }
-
- @ConfigTag(ConfigTag.Tag.DEPRECATED)
- public boolean isLimitPushDownEnabled() {
- return Boolean.parseBoolean(getOptional("kylin.storage.limit-push-down-enabled", TRUE));
- }
-
// ============================================================================
- // Realtime streaming (Removed)
+ // jdbc metadata resource store
// ============================================================================
+ public String getMetadataDialect() {
+ return getOptional("kylin.metadata.jdbc.dialect", "mysql");
+ }
+
+ public boolean isJsonAlwaysSmallCell() {
+ return Boolean.parseBoolean(getOptional("kylin.metadata.jdbc.json-always-small-cell", TRUE));
+ }
+
+ public int getSmallCellMetadataWarningThreshold() {
+ return Integer.parseInt(
+ getOptional("kylin.metadata.jdbc.small-cell-meta-size-warning-threshold", String.valueOf(100 << 20))); //100mb
+ }
+
+ public int getSmallCellMetadataErrorThreshold() {
+ return Integer.parseInt(
+ getOptional("kylin.metadata.jdbc.small-cell-meta-size-error-threshold", String.valueOf(1 << 30))); // 1gb
+ }
+
+ public int getJdbcResourceStoreMaxCellSize() {
+ return Integer.parseInt(getOptional("kylin.metadata.jdbc.max-cell-size", "1048576")); // 1mb
+ }
+
+ public String getJdbcSourceAdaptor() {
+ return getOptional("kylin.source.jdbc.adaptor");
+ }
+
+ public boolean isLimitPushDownEnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.storage.limit-push-down-enabled", TRUE));
+ }
+
+ // ============================================================================
+ // Realtime streaming
+ // ============================================================================
+ public String getStreamingStoreClass() {
+ return getOptional("kylin.stream.store.class",
+ "org.apache.kylin.stream.core.storage.columnar.ColumnarSegmentStore");
+ }
+
+ public String getStreamingBasicCuboidJobDFSBlockSize() {
+ return getOptional("kylin.stream.job.dfs.block.size", String.valueOf(16 * 1024 * 1024));
+ }
+
+ public String getStreamingIndexPath() {
+ return getOptional("kylin.stream.index.path", "stream_index");
+ }
+
+ public int getStreamingCubeConsumerTasksNum() {
+ return Integer.parseInt(getOptional("kylin.stream.cube-num-of-consumer-tasks", "3"));
+ }
+
+ public int getStreamingCubeWindowInSecs() {
+ return Integer.parseInt(getOptional("kylin.stream.cube.window", "3600"));
+ }
+
+ public int getStreamingCubeDurationInSecs() {
+ return Integer.parseInt(getOptional("kylin.stream.cube.duration", "7200"));
+ }
+
+ public int getStreamingCubeMaxDurationInSecs() {
+ return Integer.parseInt(getOptional("kylin.stream.cube.duration.max", "43200"));
+ }
+
+ public int getStreamingCheckPointFileMaxNum() {
+ return Integer.parseInt(getOptional("kylin.stream.checkpoint.file.max.num", "5"));
+ }
+
+ public int getStreamingCheckPointIntervalsInSecs() {
+ return Integer.parseInt(getOptional("kylin.stream.index.checkpoint.intervals", "300"));
+ }
+
+ public int getStreamingIndexMaxRows() {
+ return Integer.parseInt(getOptional("kylin.stream.index.maxrows", "50000"));
+ }
+
+ public int getStreamingMaxImmutableSegments() {
+ return Integer.parseInt(getOptional("kylin.stream.immutable.segments.max.num", "100"));
+ }
+
+ public boolean isStreamingConsumeFromLatestOffsets() {
+ return Boolean.parseBoolean(getOptional("kylin.stream.consume.offsets.latest", "true"));
+ }
+
+ public String getStreamingNode() {
+ return getOptional("kylin.stream.node", null);
+ }
+
+ public Map<String, String> getStreamingNodeProperties() {
+ return getPropertiesByPrefix("kylin.stream.node");
+ }
+
+ public String getStreamingMetadataStoreType() {
+ return getOptional("kylin.stream.metadata.store.type", "zk");
+ }
+
+ public String getStreamingSegmentRetentionPolicy() {
+ return getOptional("kylin.stream.segment.retention.policy", "fullBuild");
+ }
+
+ public String getStreamingAssigner() {
+ return getOptional("kylin.stream.assigner", "DefaultAssigner");
+ }
+
+ public int getCoordinatorHttpClientTimeout() {
+ return Integer.parseInt(getOptional("kylin.stream.coordinator.client.timeout.millsecond", "5000"));
+ }
+
+ public int getReceiverHttpClientTimeout() {
+ return Integer.parseInt(getOptional("kylin.stream.receiver.client.timeout.millsecond", "5000"));
+ }
+
+ public int getStreamingReceiverHttpMaxThreads() {
+ return Integer.parseInt(getOptional("kylin.stream.receiver.http.max.threads", "200"));
+ }
+
+ public int getStreamingReceiverHttpMinThreads() {
+ return Integer.parseInt(getOptional("kylin.stream.receiver.http.min.threads", "10"));
+ }
+
+ public int getStreamingReceiverQueryCoreThreads() {
+ return Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", "50"));
+ }
+
+ public int getStreamingReceiverQueryMaxThreads() {
+ return Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", "200"));
+ }
+
+ public int getStreamingReceiverUseThreadsPerQuery() {
+ return Integer.parseInt(getOptional("kylin.stream.receiver.use-threads-per-query", "8"));
+ }
+
+ public int getStreamingRPCHttpConnTimeout() {
+ return Integer.parseInt(getOptional("kylin.stream.rpc.http.connect.timeout", "10000"));
+ }
+
+ public int getStreamingRPCHttpReadTimeout() {
+ return Integer.parseInt(getOptional("kylin.stream.rpc.http.read.timeout", "60000"));
+ }
+
+ public boolean isStreamingBuildAdditionalCuboids() {
+ return Boolean.parseBoolean(getOptional("kylin.stream.build.additional.cuboids", "false"));
+ }
+
+ public Map<String, String> getStreamingSegmentRetentionPolicyProperties(String policyName) {
+ return getPropertiesByPrefix("kylin.stream.segment.retention.policy." + policyName + ".");
+ }
+
+ public int getStreamingMaxFragmentsInSegment() {
+ return Integer.parseInt(getOptional("kylin.stream.segment-max-fragments", "50"));
+ }
+
+ public int getStreamingMinFragmentsInSegment() {
+ return Integer.parseInt(getOptional("kylin.stream.segment-min-fragments", "15"));
+ }
+
+ public int getStreamingMaxFragmentSizeInMb() {
+ return Integer.parseInt(getOptional("kylin.stream.max-fragment-size-mb", "300"));
+ }
+
+ public boolean isStreamingFragmentsAutoMergeEnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.stream.fragments-auto-merge-enable", "true"));
+ }
+
+ public boolean isStreamingConcurrentScanEnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.stream.segment.concurrent.scan", "false"));
+ }
+
+ public boolean isStreamingStandAloneMode() {
+ return Boolean.parseBoolean(getOptional("kylin.stream.stand-alone.mode", "false"));
+ }
+
+ public boolean isNewCoordinatorEnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.stream.new.coordinator-enabled", "true"));
+ }
+
+ public String getLocalStorageImpl() {
+ return getOptional("kylin.stream.settled.storage", null);
+ }
+
+ public String getStreamMetrics() {
+ return getOptional("kylin.stream.metrics.option", "");
+ }
+
+ /**
+ * whether to print encode integer value for count distinct string value, only for debug/test purpose
+ */
+ public boolean isPrintRealtimeDictEnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.stream.print-realtime-dict-enabled", "false"));
+ }
+
+ public long getStreamMetricsInterval() {
+ return Long.parseLong(getOptional("kylin.stream.metrics.interval", "5"));
+ }
+
+ /**
+ * whether realtime query should add timezone offset by kylin's web-timezone, please refer to KYLIN-4010 for detail
+ */
+ public String getStreamingDerivedTimeTimezone() {
+ return (getOptional("kylin.stream.event.timezone", ""));
+ }
+
+ public boolean isAutoResubmitDiscardJob() {
+ return Boolean.parseBoolean(getOptional("kylin.stream.auto-resubmit-after-discard-enabled", "true"));
+ }
+
+ public String getHiveDatabaseLambdaCube() {
+ return this.getOptional("kylin.stream.hive.database-for-lambda-cube", DEFAULT);
+ }
+
+ // ============================================================================
+ // Health Check CLI
// ============================================================================
- // Health Check CLI
- // ============================================================================
public int getWarningSegmentNum() {
return Integer.parseInt(getOptional("kylin.tool.health-check.warning-segment-num", "-1"));
@@ -2417,7 +2677,7 @@ public abstract class KylinConfigBase implements Serializable {
}
// ============================================================================
- // ENGINE.SPARK (Kylin 4)
+ // Kylin 4.x related
// ============================================================================
public String getKylinParquetJobJarPath() {
@@ -2516,106 +2776,22 @@ public abstract class KylinConfigBase implements Serializable {
return new Path(path);
}
- public Map<String, String> getSparkConfigOverride() {
- return getPropertiesByPrefix("kylin.engine.spark-conf.");
- }
-
- public boolean isAutoSetSparkConf() {
- return Boolean.parseBoolean(getOptional("kylin.spark-conf.auto.prior", "true"));
- }
-
- public String getBuildConf() {
- return getOptional("kylin.engine.submit-hadoop-conf-dir", "");
- }
-
- public boolean isJobLogPrintEnabled() {
- return Boolean.parseBoolean(getOptional("kylin.job.log-print-enabled", "true"));
- }
-
- @ConfigTag(ConfigTag.Tag.DEBUG_HACK)
- public String getClusterInfoFetcherClassName() {
- return getOptional("kylin.engine.spark.cluster-info-fetcher-class-name",
- "org.apache.kylin.cluster.YarnInfoFetcher");
- }
-
- @ConfigTag(ConfigTag.Tag.DEBUG_HACK)
- public String getSparkMergeClassName() {
- return getOptional("kylin.engine.spark.merge-class-name", "org.apache.kylin.engine.spark.job.CubeMergeJob");
- }
-
- public String getParentDatasetStorageLevel() {
- return getOptional("kylin.engine.spark.parent-dataset.storage.level", "NONE");
- }
-
- public int getMaxParentDatasetPersistCount() {
- return Integer.parseInt(getOptional("kylin.engine.spark.parent-dataset.max.persist.count", "1"));
- }
-
- public int getRepartitionNumAfterEncode() {
- return Integer.valueOf(getOptional("kylin.engine.spark.dataset.repartition.num.after.encoding", "0"));
- }
-
-
- public int getSparkEngineMaxRetryTime() {
- return Integer.parseInt(getOptional("kylin.engine.max-retry-time", "3"));
- }
-
- public double getSparkEngineRetryMemoryGradient() {
- return Double.parseDouble(getOptional("kylin.engine.retry-memory-gradient", "1.5"));
- }
-
- public double getSparkEngineRetryOverheadMemoryGradient() {
- return Double.parseDouble(getOptional("kylin.engine.retry-overheadMemory-gradient", "0.2"));
- }
-
- public Double getMaxAllocationResourceProportion() {
- return Double.parseDouble(getOptional("kylin.engine.max-allocation-proportion", "0.9"));
- }
-
- public int getSparkEngineBaseExecutorInstances() {
- return Integer.parseInt(getOptional("kylin.engine.base-executor-instance", "5"));
- }
-
- public String getSparkEngineRequiredTotalCores() {
- return getOptional("kylin.engine.spark.required-cores", "1");
- }
-
- public String getSparkEngineExecutorInstanceStrategy() {
- return getOptional("kylin.engine.executor-instance-strategy", "100,2,500,3,1000,4");
+ public boolean isSnapshotParallelBuildEnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.snapshot.parallel-build-enabled", "true"));
}
- public int getSnapshotShardSizeMB() {
- return Integer.parseInt(getOptional("kylin.snapshot.shard-size-mb", "128"));
+ public boolean isUTEnv() {
+ return "UT".equals(getDeployEnv());
}
- /***
- * Global dictionary will be split into several buckets. To encode a column to int value more
- * efficiently, source dataset will be repartitioned by the to-be encoded column to the same
- * amount of partitions as the dictionary's bucket size.
- *
- * It sometimes bring side effect, because repartitioning by a single column is more likely to cause
- * serious data skew, causing one task takes the majority of time in first layer's cuboid building.
- *
- * When faced with this case, you can try repartitioning encoded dataset by all
- * RowKey columns to avoid data skew. The repartition size is default to max bucket
- * size of all dictionaries, but you can also set to other flexible value by this option:
- * 'kylin.engine.spark.dataset.repartition.num.after.encoding'
- ***/
- public boolean rePartitionEncodedDatasetWithRowKey() {
- return Boolean.valueOf(getOptional("kylin.engine.spark.repartition.encoded.dataset", "false"));
+ public boolean isLocalEnv() {
+ return "LOCAL".equals(getDeployEnv());
}
- /**
- * If we should calculate cuboid statistics for each segment, which is needed for cube planner phase two
- */
- public boolean isSegmentStatisticsEnabled() {
- return Boolean.parseBoolean(this.getOptional("kylin.engine.segment-statistics-enabled", "false"));
+ public int snapshotParallelBuildTimeoutSeconds() {
+ return Integer.parseInt(getOptional("kylin.snapshot.parallel-build-timeout-seconds", "3600"));
}
- // ============================================================================
- // STORAGE.PARQUET
- // ============================================================================
-
public String getStorageProvider() {
return getOptional("kylin.storage.provider", "org.apache.kylin.common.storage.DefaultStorageProvider");
}
@@ -2666,21 +2842,61 @@ public abstract class KylinConfigBase implements Serializable {
return Integer.valueOf(getOptional("kylin.storage.columnar.dfs-replication", "3"));
}
- // ============================================================================
- // Query Engine (Sparder)
- // ============================================================================
+ public boolean isAutoSetSparkConf() {
+ return Boolean.parseBoolean(getOptional("kylin.spark-conf.auto.prior", "true"));
+ }
- /**
- * Get the sparder app name, default value is: 'sparder_on_localhost-7070'
- */
- public String getSparderAppName() {
- String customSparderAppName = getOptional("kylin.query.sparder-context.app-name", "");
- if (StringUtils.isEmpty(customSparderAppName)) {
- customSparderAppName =
- "sparder_on_" + getServerRestAddress().replaceAll(":", "-");
- }
- return customSparderAppName;
+ public String getBuildConf() {
+ return getOptional("kylin.engine.submit-hadoop-conf-dir", "");
+ }
+
+ public boolean isJobLogPrintEnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.job.log-print-enabled", "true"));
+ }
+
+ @ConfigTag(ConfigTag.Tag.DEBUG_HACK)
+ public String getClusterInfoFetcherClassName() {
+ return getOptional("kylin.engine.spark.cluster-info-fetcher-class-name",
+ "org.apache.kylin.cluster.YarnInfoFetcher");
+ }
+
+ @ConfigTag(ConfigTag.Tag.DEBUG_HACK)
+ public String getSparkMergeClassName() {
+ return getOptional("kylin.engine.spark.merge-class-name", "org.apache.kylin.engine.spark.job.CubeMergeJob");
+ }
+
+ public int getSparkEngineMaxRetryTime() {
+ return Integer.parseInt(getOptional("kylin.engine.max-retry-time", "3"));
}
+
+ public double getSparkEngineRetryMemoryGradient() {
+ return Double.parseDouble(getOptional("kylin.engine.retry-memory-gradient", "1.5"));
+ }
+
+ public double getSparkEngineRetryOverheadMemoryGradient() {
+ return Double.parseDouble(getOptional("kylin.engine.retry-overheadMemory-gradient", "0.2"));
+ }
+
+ public Double getMaxAllocationResourceProportion() {
+ return Double.parseDouble(getOptional("kylin.engine.max-allocation-proportion", "0.9"));
+ }
+
+ public int getSparkEngineBaseExecutorInstances() {
+ return Integer.parseInt(getOptional("kylin.engine.base-executor-instance", "5"));
+ }
+
+ public String getSparkEngineRequiredTotalCores() {
+ return getOptional("kylin.engine.spark.required-cores", "1");
+ }
+
+ public String getSparkEngineExecutorInstanceStrategy() {
+ return getOptional("kylin.engine.executor-instance-strategy", "100,2,500,3,1000,4");
+ }
+
+ public int getSnapshotShardSizeMB() {
+ return Integer.parseInt(getOptional("kylin.snapshot.shard-size-mb", "128"));
+ }
+
/**
* driver memory that can be used by join(mostly BHJ)
*/
@@ -2852,22 +3068,25 @@ public abstract class KylinConfigBase implements Serializable {
return Integer.parseInt(this.getOptional("kylin.canary.sparder-context-period-min", "3"));
}
+ /**
+ * If we should calculate cuboid statistics for each segment, which is needed for cube planner phase two
+ */
+ public boolean isSegmentStatisticsEnabled() {
+ return Boolean.parseBoolean(this.getOptional("kylin.engine.segment-statistics-enabled", "false"));
+ }
// ============================================================================
// Spark with Kerberos
// ============================================================================
- @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
public Boolean isKerberosEnabled() {
return Boolean.valueOf(getOptional("kylin.kerberos.enabled", FALSE));
}
- @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
public String getKerberosKeytab() {
return getOptional("kylin.kerberos.keytab", "");
}
- @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
public String getKerberosKeytabPath() {
return KylinConfig.getKylinConfDir() + File.separator + getKerberosKeytab();
}
@@ -2917,8 +3136,64 @@ public abstract class KylinConfigBase implements Serializable {
return KylinConfig.getKylinConfDir() + File.separator + getKerberosJaasConf();
}
- @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
public String getKerberosPrincipal() {
return getOptional("kylin.kerberos.principal");
}
+
+ public String getParentDatasetStorageLevel() {
+ return getOptional("kylin.engine.spark.parent-dataset.storage.level", "NONE");
+ }
+
+ public int getMaxParentDatasetPersistCount() {
+ return Integer.parseInt(getOptional("kylin.engine.spark.parent-dataset.max.persist.count", "1"));
+ }
+
+ public int getRepartitionNumAfterEncode() {
+ return Integer.valueOf(getOptional("kylin.engine.spark.dataset.repartition.num.after.encoding", "0"));
+ }
+
+ /***
+ * Global dictionary will be split into several buckets. To encode a column to int value more
+ * efficiently, source dataset will be repartitioned by the to-be encoded column to the same
+ * amount of partitions as the dictionary's bucket size.
+ *
+ * It sometimes bring side effect, because repartitioning by a single column is more likely to cause
+ * serious data skew, causing one task takes the majority of time in first layer's cuboid building.
+ *
+ * When faced with this case, you can try repartitioning encoded dataset by all
+ * RowKey columns to avoid data skew. The repartition size is default to max bucket
+ * size of all dictionaries, but you can also set to other flexible value by this option:
+ * 'kylin.engine.spark.dataset.repartition.num.after.encoding'
+ ***/
+ public boolean rePartitionEncodedDatasetWithRowKey() {
+ return Boolean.valueOf(getOptional("kylin.engine.spark.repartition.encoded.dataset", "false"));
+ }
+
+ /*
+ * Detect dataset skew in dictionary encode step.
+ * */
+ public boolean detectDataSkewInDictEncodingEnabled() {
+ return Boolean.valueOf(getOptional("kylin.dictionary.detect.data.skew.in.encoding", "false"));
+ }
+
+ /*
+ * In some data skew cases, the repartition step during dictionary encoding will be slow.
+ * We can choose to sample from the dataset to detect skewed. This configuration is used to set the sample rate.
+ * */
+ public double sampleRateInEncodingSkewDetection() {
+ return Double.valueOf(getOptional("kylin.dictionary.detect.data.skew.sample.rate", "0.1"));
+ }
+
+ /*
+ * In KYLIN4, dictionaries are hashed into several buckets, column data are repartitioned by the same hash algorithm
+ * during encoding step too. In data skew cases, the repartition step will be very slow. Kylin will automatically
+ * sample from the source to detect skewed data and repartition these skewed data to random partitions.
+ * This configuration is used to set the skew data threshhold, valued from 0 to 1.
+ * e.g.
+ * if you set this value to 0.05, for each value that takes up more than 5% percent of the total will be regarded
+ * as skew data, as a result the skewed data will be no more than 20 records
+ * */
+ public double skewPercentageThreshHold() {
+ return Double.valueOf(getOptional("kylin.dictionary.data.skew.percentage.threshhold", "0.05"));
+ }
}
diff --git a/core-common/src/main/java/org/apache/kylin/common/annotation/ConfigTag.java b/core-common/src/main/java/org/apache/kylin/common/annotation/ConfigTag.java
index 965d252..53a6967 100644
--- a/core-common/src/main/java/org/apache/kylin/common/annotation/ConfigTag.java
+++ b/core-common/src/main/java/org/apache/kylin/common/annotation/ConfigTag.java
@@ -39,9 +39,6 @@ public @interface ConfigTag {
*/
DEPRECATED,
- /**
- * Not tested/verified
- */
NOT_CLEAR,
/**
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 9b782de..e5facf4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -101,6 +101,10 @@ public class HadoopUtil {
return getFileSystem(workingPath, conf);
}
+ public static FileSystem getReadFileSystem() throws IOException {
+ return getFileSystem(KylinConfig.getInstanceFromEnv().getReadHdfsWorkingDirectory());
+ }
+
public static FileSystem getFileSystem(String path) {
return getFileSystem(new Path(makeURI(path)));
}
diff --git a/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java b/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
index 29f92a6..b4ac16b 100644
--- a/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
@@ -76,16 +76,16 @@ public class KylinConfigTest extends HotLoadKylinPropertiesTestCase {
assertEquals("1234", configExt.getOptional("1234"));
}
-// @Test
-// public void testPropertiesHotLoad() {
-// KylinConfig config = KylinConfig.getInstanceFromEnv();
-// assertEquals("whoami@kylin.apache.org", config.getKylinOwner());
-//
-// updateProperty("kylin.storage.hbase.owner-tag", "kylin@kylin.apache.org");
-// KylinConfig.getInstanceFromEnv().reloadFromSiteProperties();
-//
-// assertEquals("kylin@kylin.apache.org", config.getKylinOwner());
-// }
+ @Test
+ public void testPropertiesHotLoad() {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ assertEquals("whoami@kylin.apache.org", config.getKylinOwner());
+
+ updateProperty("kylin.storage.hbase.owner-tag", "kylin@kylin.apache.org");
+ KylinConfig.getInstanceFromEnv().reloadFromSiteProperties();
+
+ assertEquals("kylin@kylin.apache.org", config.getKylinOwner());
+ }
@Test
public void testGetMetadataUrlPrefix() {
diff --git a/core-cube/pom.xml b/core-cube/pom.xml
index 91be01b..c024c06 100644
--- a/core-cube/pom.xml
+++ b/core-cube/pom.xml
@@ -38,10 +38,10 @@
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-core-metadata</artifactId>
</dependency>
-<!-- <dependency>-->
-<!-- <groupId>org.apache.kylin</groupId>-->
-<!-- <artifactId>kylin-core-dictionary</artifactId>-->
-<!-- </dependency>-->
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-dictionary</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-shaded-guava</artifactId>
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 3e99268..585a37a 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -40,23 +41,34 @@ import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.persistence.WriteConflictException;
import org.apache.kylin.common.util.AutoReadWriteLock;
import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeDescTiretreeGlobalDomainDictUtil;
import org.apache.kylin.cube.model.SnapshotTableDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.cachesync.Broadcaster;
import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IEngineAware;
import org.apache.kylin.metadata.model.JoinDesc;
import org.apache.kylin.metadata.model.PartitionDesc;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentRange.TSRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.project.RealizationEntry;
@@ -65,6 +77,8 @@ import org.apache.kylin.metadata.realization.IRealizationProvider;
import org.apache.kylin.metadata.realization.RealizationRegistry;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.SourceManager;
import org.apache.kylin.source.SourcePartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -115,6 +129,7 @@ public class CubeManager implements IRealizationProvider {
// a few inner classes to group related methods
private SegmentAssist segAssist = new SegmentAssist();
+ private DictionaryAssist dictAssist = new DictionaryAssist();
private Random ran = new Random();
@@ -541,24 +556,27 @@ public class CubeManager implements IRealizationProvider {
}
public ILookupTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
- return null;
+ String tableName = join.getPKSide().getTableIdentity();
+ CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+ SnapshotTableDesc snapshotTableDesc = cubeDesc.getSnapshotTableDesc(tableName);
+ return getInMemLookupTable(cubeSegment, join, snapshotTableDesc);
}
-// private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc join,
-// SnapshotTableDesc snapshotTableDesc) {
-// String tableName = join.getPKSide().getTableIdentity();
-// String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc);
-// String[] pkCols = join.getPrimaryKey();
-//
-// try {
-// SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
-// TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject());
-// return LookupProviderFactory.getInMemLookupTable(tableDesc, pkCols, snapshot);
-// } catch (IOException e) {
-// throw new IllegalStateException(
-// "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
-// }
-// }
+ private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc join,
+ SnapshotTableDesc snapshotTableDesc) {
+ String tableName = join.getPKSide().getTableIdentity();
+ String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc);
+ String[] pkCols = join.getPrimaryKey();
+
+ try {
+ SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
+ TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject());
+ return LookupProviderFactory.getInMemLookupTable(tableDesc, pkCols, snapshot);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
+ }
+ }
private String getSnapshotResPath(CubeSegment cubeSegment, String tableName, SnapshotTableDesc snapshotTableDesc) {
String snapshotResPath;
@@ -576,11 +594,20 @@ public class CubeManager implements IRealizationProvider {
@VisibleForTesting
/*private*/ String generateStorageLocation(int engineType) {
+ String namePrefix = config.getHBaseTableNamePrefix();
+ String namespace = config.getHBaseStorageNameSpace();
String tableName = "";
do {
StringBuffer sb = new StringBuffer();
int identifierLength = HBASE_TABLE_LENGTH;
+ if (engineType != IEngineAware.ID_SPARK_II) {
+ if ((namespace.equals("default") || namespace.equals("")) == false) {
+ sb.append(namespace).append(":");
+ }
+ sb.append(namePrefix);
+ } else {
identifierLength = PARQUET_IDENTIFIER_LENGTH;
+ }
for (int i = 0; i < identifierLength; i++) {
sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length())));
}
@@ -601,6 +628,14 @@ public class CubeManager implements IRealizationProvider {
return TableMetadataManager.getInstance(config);
}
+ private DictionaryManager getDictionaryManager() {
+ return DictionaryManager.getInstance(config);
+ }
+
+ private SnapshotManager getSnapshotManager() {
+ return SnapshotManager.getInstance(config);
+ }
+
private ResourceStore getStore() {
return ResourceStore.getStore(this.config);
}
@@ -1090,143 +1125,143 @@ public class CubeManager implements IRealizationProvider {
// Dictionary/Snapshot related methods
// ============================================================================
-// public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
-// throws IOException {
-// return dictAssist.buildDictionary(cubeSeg, col, inpTable);
-// }
-//
-// public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
-// Dictionary<String> dict) throws IOException {
-// return dictAssist.saveDictionary(cubeSeg, col, inpTable, dict);
-// }
-//
-// /**
-// * return null if no dictionary for given column
-// */
-// public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
-// return dictAssist.getDictionary(cubeSeg, col);
-// }
-//
-// public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid) throws IOException {
-// return dictAssist.buildSnapshotTable(cubeSeg, lookupTable, uuid);
-// }
-//
-// private TableMetadataManager getMetadataManager() {
-// return TableMetadataManager.getInstance(config);
-// }
-//
-// private class DictionaryAssist {
-// public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
-// throws IOException {
-// CubeDesc cubeDesc = cubeSeg.getCubeDesc();
-// if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
-// return null;
-//
-// String builderClass = cubeDesc.getDictionaryBuilderClass(col);
-// DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(col, inpTable, builderClass);
-//
-// saveDictionaryInfo(cubeSeg, col, dictInfo);
-// return dictInfo;
-// }
-//
-// public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
-// Dictionary<String> dict) throws IOException {
-// CubeDesc cubeDesc = cubeSeg.getCubeDesc();
-// if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
-// return null;
-//
-// DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(col, inpTable, dict);
-//
-// saveDictionaryInfo(cubeSeg, col, dictInfo);
-// return dictInfo;
-// }
-//
-// private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, DictionaryInfo dictInfo)
-// throws IOException {
-// if (dictInfo == null)
-// return;
-//
-// // work on copy instead of cached objects
-// CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
-// CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
-//
-// Dictionary<?> dict = dictInfo.getDictionaryObject();
-// segCopy.putDictResPath(col, dictInfo.getResourcePath());
-// segCopy.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() });
-//
-// CubeUpdate update = new CubeUpdate(cubeCopy);
-// update.setToUpdateSegs(segCopy);
-// updateCube(update);
-// }
-//
-// /**
-// * return null if no dictionary for given column
-// */
-// @SuppressWarnings("unchecked")
-// public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
-// DictionaryInfo info = null;
-// String dictResPath = null;
-// try {
-// DictionaryManager dictMgr = getDictionaryManager();
-//
-// //tiretree global domain dic
-// List<CubeDescTiretreeGlobalDomainDictUtil.GlobalDict> globalDicts = cubeSeg.getCubeDesc().listDomainDict();
-// if (!globalDicts.isEmpty()) {
-// dictResPath = CubeDescTiretreeGlobalDomainDictUtil.globalReuseDictPath(cubeSeg.getConfig(), col, cubeSeg.getCubeDesc());
-// }
-//
-// if (Objects.isNull(dictResPath)){
-// dictResPath = cubeSeg.getDictResPath(col);
-// }
-//
-// if (dictResPath == null)
-// return null;
-//
-// info = dictMgr.getDictionaryInfo(dictResPath);
-// if (info == null)
-// throw new IllegalStateException("No dictionary found by " + dictResPath
-// + ", invalid cube state; cube segment" + cubeSeg + ", col " + col);
-// } catch (IOException e) {
-// throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col,
-// e);
-// }
-// return (Dictionary<String>) info.getDictionaryObject();
-// }
-//
-// public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid)
-// throws IOException {
-// // work on copy instead of cached objects
-// CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
-// CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
-//
-// TableMetadataManager metaMgr = getTableManager();
-// SnapshotManager snapshotMgr = getSnapshotManager();
-//
-// TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject()));
-// IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, uuid);
-// SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc, cubeSeg.getConfig());
-//
-// CubeDesc cubeDesc = cubeSeg.getCubeDesc();
-// if (!cubeDesc.isGlobalSnapshotTable(lookupTable)) {
-// segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
-// CubeUpdate update = new CubeUpdate(cubeCopy);
-// update.setToUpdateSegs(segCopy);
-// updateCube(update);
-//
-// // Update the input cubeSeg after the resource store updated
-// cubeSeg.putSnapshotResPath(lookupTable, segCopy.getSnapshotResPath(lookupTable));
-// } else {
-// CubeUpdate cubeUpdate = new CubeUpdate(cubeCopy);
-// Map<String, String> map = Maps.newHashMap();
-// map.put(lookupTable, snapshot.getResourcePath());
-// cubeUpdate.setUpdateTableSnapshotPath(map);
-// updateCube(cubeUpdate);
-//
-// cubeSeg.getCubeInstance().putSnapshotResPath(lookupTable, snapshot.getResourcePath());
-// }
-// return snapshot;
-// }
-// }
+ public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
+ throws IOException {
+ return dictAssist.buildDictionary(cubeSeg, col, inpTable);
+ }
+
+ public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
+ Dictionary<String> dict) throws IOException {
+ return dictAssist.saveDictionary(cubeSeg, col, inpTable, dict);
+ }
+
+ /**
+ * return null if no dictionary for given column
+ */
+ public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
+ return dictAssist.getDictionary(cubeSeg, col);
+ }
+
+ public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid) throws IOException {
+ return dictAssist.buildSnapshotTable(cubeSeg, lookupTable, uuid);
+ }
+
+ private TableMetadataManager getMetadataManager() {
+ return TableMetadataManager.getInstance(config);
+ }
+
+ private class DictionaryAssist {
+ public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
+ throws IOException {
+ CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+ if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
+ return null;
+
+ String builderClass = cubeDesc.getDictionaryBuilderClass(col);
+ DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(col, inpTable, builderClass);
+
+ saveDictionaryInfo(cubeSeg, col, dictInfo);
+ return dictInfo;
+ }
+
+ public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
+ Dictionary<String> dict) throws IOException {
+ CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+ if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
+ return null;
+
+ DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(col, inpTable, dict);
+
+ saveDictionaryInfo(cubeSeg, col, dictInfo);
+ return dictInfo;
+ }
+
+ private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, DictionaryInfo dictInfo)
+ throws IOException {
+ if (dictInfo == null)
+ return;
+
+ // work on copy instead of cached objects
+ CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
+ CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
+
+ Dictionary<?> dict = dictInfo.getDictionaryObject();
+ segCopy.putDictResPath(col, dictInfo.getResourcePath());
+ segCopy.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() });
+
+ CubeUpdate update = new CubeUpdate(cubeCopy);
+ update.setToUpdateSegs(segCopy);
+ updateCube(update);
+ }
+
+ /**
+ * return null if no dictionary for given column
+ */
+ @SuppressWarnings("unchecked")
+ public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
+ DictionaryInfo info = null;
+ String dictResPath = null;
+ try {
+ DictionaryManager dictMgr = getDictionaryManager();
+
+ //tiretree global domain dic
+ List<CubeDescTiretreeGlobalDomainDictUtil.GlobalDict> globalDicts = cubeSeg.getCubeDesc().listDomainDict();
+ if (!globalDicts.isEmpty()) {
+ dictResPath = CubeDescTiretreeGlobalDomainDictUtil.globalReuseDictPath(cubeSeg.getConfig(), col, cubeSeg.getCubeDesc());
+ }
+
+ if (Objects.isNull(dictResPath)){
+ dictResPath = cubeSeg.getDictResPath(col);
+ }
+
+ if (dictResPath == null)
+ return null;
+
+ info = dictMgr.getDictionaryInfo(dictResPath);
+ if (info == null)
+ throw new IllegalStateException("No dictionary found by " + dictResPath
+ + ", invalid cube state; cube segment" + cubeSeg + ", col " + col);
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col,
+ e);
+ }
+ return (Dictionary<String>) info.getDictionaryObject();
+ }
+
+ public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid)
+ throws IOException {
+ // work on copy instead of cached objects
+ CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
+ CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
+
+ TableMetadataManager metaMgr = getTableManager();
+ SnapshotManager snapshotMgr = getSnapshotManager();
+
+ TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject()));
+ IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, uuid);
+ SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc, cubeSeg.getConfig());
+
+ CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+ if (!cubeDesc.isGlobalSnapshotTable(lookupTable)) {
+ segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
+ CubeUpdate update = new CubeUpdate(cubeCopy);
+ update.setToUpdateSegs(segCopy);
+ updateCube(update);
+
+ // Update the input cubeSeg after the resource store updated
+ cubeSeg.putSnapshotResPath(lookupTable, segCopy.getSnapshotResPath(lookupTable));
+ } else {
+ CubeUpdate cubeUpdate = new CubeUpdate(cubeCopy);
+ Map<String, String> map = Maps.newHashMap();
+ map.put(lookupTable, snapshot.getResourcePath());
+ cubeUpdate.setUpdateTableSnapshotPath(map);
+ updateCube(cubeUpdate);
+
+ cubeSeg.getCubeInstance().putSnapshotResPath(lookupTable, snapshot.getResourcePath());
+ }
+ return snapshot;
+ }
+ }
/**
* To keep "select * from LOOKUP_TABLE" has consistent and latest result, we manually choose
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 5205cc5..c32da70 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -371,24 +371,26 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
this.storageLocationIdentifier = storageLocationIdentifier;
}
-// public Map<TblColRef, Dictionary<String>> buildDictionaryMap() {
-// Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
-// for (TblColRef col : getCubeDesc().getAllColumnsHaveDictionary()) {
-// result.put(col, (Dictionary<String>) getDictionary(col));
-// }
-// return result;
-// }
-//
-// public Map<TblColRef, Dictionary<String>> buildGlobalDictionaryMap(int globalColumnsSize) {
-// Map<TblColRef, Dictionary<String>> result = Maps.newHashMapWithExpectedSize(globalColumnsSize);
-// for (TblColRef col : getCubeDesc().getAllGlobalDictColumns()) {
-// result.put(col, getDictionary(col));
-// }
-// return result;
-// }
+ public Map<TblColRef, Dictionary<String>> buildDictionaryMap() {
+ Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+ for (TblColRef col : getCubeDesc().getAllColumnsHaveDictionary()) {
+ result.put(col, (Dictionary<String>) getDictionary(col));
+ }
+ return result;
+ }
+
+ public Map<TblColRef, Dictionary<String>> buildGlobalDictionaryMap(int globalColumnsSize) {
+ Map<TblColRef, Dictionary<String>> result = Maps.newHashMapWithExpectedSize(globalColumnsSize);
+ for (TblColRef col : getCubeDesc().getAllGlobalDictColumns()) {
+ result.put(col, getDictionary(col));
+ }
+ return result;
+ }
public Dictionary<String> getDictionary(TblColRef col) {
- return null;
+ TblColRef reuseCol = getCubeDesc().getDictionaryReuseColumn(col);
+ CubeManager cubeMgr = CubeManager.getInstance(this.getCubeInstance().getConfig());
+ return cubeMgr.getDictionary(this, reuseCol);
}
public CubeDimEncMap getDimensionEncodingMap() {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
new file mode 100644
index 0000000..0815942
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cli;
+
+import java.io.IOException;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryInfoSerializer;
+import org.apache.kylin.dict.DictionaryProvider;
+import org.apache.kylin.dict.DistinctColumnValuesProvider;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.dict.lookup.SnapshotTableSerializer;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.IReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
+
+public class DictionaryGeneratorCLI {
+
+ private DictionaryGeneratorCLI() {
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class);
+
+ public static void processSegment(KylinConfig config, String cubeName, String segmentID, String uuid,
+ DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException {
+ CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+ CubeSegment segment = cube.getSegmentById(segmentID);
+
+ int retryTime = 0;
+ while (retryTime < 3) {
+ if (retryTime > 0) {
+ logger.info("Rebuild dictionary and snapshot for Cube: {}, Segment: {}, {} times.", cubeName, segmentID,
+ retryTime);
+ }
+
+ processSegment(config, segment, uuid, factTableValueProvider, dictProvider);
+
+ if (isAllDictsAndSnapshotsReady(config, cubeName, segmentID)) {
+ break;
+ }
+ retryTime++;
+ }
+
+ if (retryTime >= 3) {
+ logger.error("Not all dictionaries and snapshots ready for cube segment: {}", segmentID);
+ } else {
+ logger.info("Succeed to build all dictionaries and snapshots for cube segment: {}", segmentID);
+ }
+ }
+
+ private static void processSegment(KylinConfig config, CubeSegment cubeSeg, String uuid,
+ DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException {
+ CubeManager cubeMgr = CubeManager.getInstance(config);
+
+ // dictionary
+ for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionaryBuilt()) {
+ logger.info("Building dictionary for {}", col);
+ IReadableTable inpTable = factTableValueProvider.getDistinctValuesFor(col);
+
+ Dictionary<String> preBuiltDict = null;
+ if (dictProvider != null) {
+ preBuiltDict = dictProvider.getDictionary(col);
+ }
+
+ if (preBuiltDict != null) {
+ logger.debug("Dict for '{}' has already been built, save it", col.getName());
+ cubeMgr.saveDictionary(cubeSeg, col, inpTable, preBuiltDict);
+ } else {
+ logger.debug("Dict for '{}' not pre-built, build it from {}", col.getName(), inpTable);
+ cubeMgr.buildDictionary(cubeSeg, col, inpTable);
+ }
+ }
+
+ // snapshot
+ Set<String> toSnapshot = Sets.newHashSet();
+ Set<TableRef> toCheckLookup = Sets.newHashSet();
+ for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
+ TableRef table = dim.getTableRef();
+ if (cubeSeg.getModel().isLookupTable(table)) {
+ // only the snapshot desc is not ext type, need to take snapshot
+ toSnapshot.add(table.getTableIdentity());
+ toCheckLookup.add(table);
+ }
+ }
+
+ for (String tableIdentity : toSnapshot) {
+ logger.info("Building snapshot of {}", tableIdentity);
+ cubeMgr.buildSnapshotTable(cubeSeg, tableIdentity, uuid);
+ }
+
+ CubeInstance updatedCube = cubeMgr.getCube(cubeSeg.getCubeInstance().getName());
+ cubeSeg = updatedCube.getSegmentById(cubeSeg.getUuid());
+ for (TableRef lookup : toCheckLookup) {
+ logger.info("Checking snapshot of {}", lookup);
+ try {
+ JoinDesc join = cubeSeg.getModel().getJoinsTree().getJoinByPKSide(lookup);
+ ILookupTable table = cubeMgr.getLookupTable(cubeSeg, join);
+ if (table != null) {
+ IOUtils.closeStream(table);
+ }
+ } catch (Throwable th) {
+ throw new RuntimeException(String.format(Locale.ROOT, "Checking snapshot of %s failed.", lookup), th);
+ }
+ }
+ }
+
+ private static boolean isAllDictsAndSnapshotsReady(KylinConfig config, String cubeName, String segmentID) {
+ CubeInstance cube = CubeManager.getInstance(config).reloadCube(cubeName);
+ CubeSegment segment = cube.getSegmentById(segmentID);
+ ResourceStore store = ResourceStore.getStore(config);
+
+ // check dicts
+ logger.info("Begin to check if all dictionaries exist of Segment: {}", segmentID);
+ Map<String, String> dictionaries = segment.getDictionaries();
+ for (Map.Entry<String, String> entry : dictionaries.entrySet()) {
+ String dictResPath = entry.getValue();
+ String dictKey = entry.getKey();
+ try {
+ DictionaryInfo dictInfo = store.getResource(dictResPath, DictionaryInfoSerializer.INFO_SERIALIZER);
+ if (dictInfo == null) {
+ logger.warn("Dictionary=[key: {}, resource path: {}] doesn't exist in resource store", dictKey,
+ dictResPath);
+ return false;
+ }
+ } catch (IOException e) {
+ logger.warn("Dictionary=[key: {}, path: {}] failed to check, details: {}", dictKey, dictResPath, e);
+ return false;
+ }
+ }
+
+ // check snapshots
+ logger.info("Begin to check if all snapshots exist of Segment: {}", segmentID);
+ Map<String, String> snapshots = segment.getSnapshots();
+ for (Map.Entry<String, String> entry : snapshots.entrySet()) {
+ String snapshotKey = entry.getKey();
+ String snapshotResPath = entry.getValue();
+ try {
+ SnapshotTable snapshot = store.getResource(snapshotResPath, SnapshotTableSerializer.INFO_SERIALIZER);
+ if (snapshot == null) {
+ logger.info("SnapshotTable=[key: {}, resource path: {}] doesn't exist in resource store",
+ snapshotKey, snapshotResPath);
+ return false;
+ }
+ } catch (IOException e) {
+ logger.warn("SnapshotTable=[key: {}, resource path: {}] failed to check, details: {}", snapshotKey,
+ snapshotResPath, e);
+ return false;
+ }
+ }
+
+ logger.info("All dictionaries and snapshots exist checking succeed for Cube Segment: {}", segmentID);
+ return true;
+ }
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java
new file mode 100644
index 0000000..729a6da
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cli;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryInfoSerializer;
+
+public class DumpDictionaryCLI {
+
+ public static void main(String[] args) throws IOException {
+ for (String path : args) {
+ dump(new File(path));
+ }
+ }
+
+ public static void dump(File f) throws IOException {
+ if (f.isDirectory()) {
+ File[] files = f.listFiles();
+ if (files == null) {
+ return;
+ }
+ for (File c : files)
+ dump(c);
+ return;
+ }
+
+ if (f.getName().endsWith(".dict")) {
+ DictionaryInfoSerializer ser = new DictionaryInfoSerializer();
+ DictionaryInfo dictInfo = ser.deserialize(new DataInputStream(new FileInputStream(f)));
+
+ System.out.println("============================================================================");
+ System.out.println("File: " + f.getAbsolutePath());
+ System.out.println(new Date(dictInfo.getLastModified()));
+ System.out.println(JsonUtil.writeValueAsIndentString(dictInfo));
+ dictInfo.getDictionaryObject().dump(System.out);
+ System.out.println();
+ }
+ }
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 6bf67db..c0c6882 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -60,6 +60,8 @@ import org.apache.kylin.common.util.Array;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.dict.GlobalDictionaryBuilder;
+import org.apache.kylin.dict.global.SegmentAppendTrieDictBuilder;
import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType;
import org.apache.kylin.metadata.MetadataConstants;
@@ -1557,30 +1559,30 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
return null;
}
-// public List<TblColRef> getAllGlobalDictColumns() {
-// List<TblColRef> globalDictCols = new ArrayList<TblColRef>();
-// List<DictionaryDesc> dictionaryDescList = getDictionaries();
-//
-// if (dictionaryDescList == null) {
-// return globalDictCols;
-// }
-//
-// for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
-// String cls = dictionaryDesc.getBuilderClass();
-// if (GlobalDictionaryBuilder.class.getName().equals(cls)
-// || SegmentAppendTrieDictBuilder.class.getName().equals(cls))
-// globalDictCols.add(dictionaryDesc.getColumnRef());
-// }
-// return globalDictCols;
-// }
+ public List<TblColRef> getAllGlobalDictColumns() {
+ List<TblColRef> globalDictCols = new ArrayList<TblColRef>();
+ List<DictionaryDesc> dictionaryDescList = getDictionaries();
+
+ if (dictionaryDescList == null) {
+ return globalDictCols;
+ }
+
+ for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
+ String cls = dictionaryDesc.getBuilderClass();
+ if (GlobalDictionaryBuilder.class.getName().equals(cls)
+ || SegmentAppendTrieDictBuilder.class.getName().equals(cls))
+ globalDictCols.add(dictionaryDesc.getColumnRef());
+ }
+ return globalDictCols;
+ }
// UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
-// public List<TblColRef> getAllUHCColumns() {
-// List<TblColRef> uhcColumns = new ArrayList<>();
-// uhcColumns.addAll(getAllGlobalDictColumns());
-// uhcColumns.addAll(getShardByColumns());
-// return uhcColumns;
-// }
+ public List<TblColRef> getAllUHCColumns() {
+ List<TblColRef> uhcColumns = new ArrayList<>();
+ uhcColumns.addAll(getAllGlobalDictColumns());
+ uhcColumns.addAll(getShardByColumns());
+ return uhcColumns;
+ }
public String getProject() {
DataModelDesc modelDesc = getModel();
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java
index 28861a1..30f533b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java
@@ -18,6 +18,8 @@
package org.apache.kylin.cube.model;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -31,8 +33,8 @@ SnapshotTableDesc implements java.io.Serializable{
private String tableName;
@JsonProperty("storage_type")
- private String storageType = "Parquet";
-
+ private String storageType = SnapshotTable.STORAGE_TYPE_METASTORE;
+
@JsonProperty("local_cache_enable")
private boolean enableLocalCache = true;
@@ -63,6 +65,10 @@ SnapshotTableDesc implements java.io.Serializable{
this.global = global;
}
+ public boolean isExtSnapshotTable() {
+ return !SnapshotTable.STORAGE_TYPE_METASTORE.equals(storageType);
+ }
+
public boolean isEnableLocalCache() {
return enableLocalCache;
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
index 04535fd..9023f28 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
@@ -30,6 +30,7 @@ import org.apache.kylin.cube.model.RowKeyDesc;
import org.apache.kylin.cube.model.validation.IValidatorRule;
import org.apache.kylin.cube.model.validation.ResultLevel;
import org.apache.kylin.cube.model.validation.ValidateContext;
+import org.apache.kylin.dict.GlobalDictionaryBuilder;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,6 +92,11 @@ public class DictionaryRule implements IValidatorRule<CubeDesc> {
}
}
+ if (StringUtils.isNotEmpty(builderClass) && builderClass.equalsIgnoreCase(GlobalDictionaryBuilder.class.getName()) && dimensionColumns.contains(dictCol) && rowKeyDesc.isUseDictionary(dictCol)) {
+ context.addResult(ResultLevel.ERROR, ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE + dictCol);
+ return;
+ }
+
if (reuseCol != null) {
reuseDictionaries.add(dictDesc);
} else {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
index 7c3958c..c6d0c00 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
@@ -18,150 +18,150 @@
package org.apache.kylin.cube.util;
-//import java.io.IOException;
-//import java.util.HashMap;
-//import java.util.List;
-//import java.util.Locale;
-//import java.util.Map;
-//import java.util.Set;
-//
-//import org.apache.kylin.common.util.Dictionary;
-//import org.apache.kylin.cube.CubeInstance;
-//import org.apache.kylin.cube.CubeSegment;
-//import org.apache.kylin.cube.cuboid.Cuboid;
-//import org.apache.kylin.cube.model.CubeDesc;
-//import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
-//import org.apache.kylin.dict.DictionaryGenerator;
-//import org.apache.kylin.dict.DictionaryInfo;
-//import org.apache.kylin.dict.DictionaryManager;
-//import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
-//import org.apache.kylin.measure.hllc.HLLCounter;
-//import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-//import org.apache.kylin.metadata.model.TblColRef;
-//import org.apache.kylin.source.IReadableTable;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//import org.apache.kylin.shaded.com.google.common.collect.HashMultimap;
-//import org.apache.kylin.shaded.com.google.common.collect.Maps;
-//import org.apache.kylin.shaded.com.google.common.hash.HashFunction;
-//import org.apache.kylin.shaded.com.google.common.hash.Hasher;
-//import org.apache.kylin.shaded.com.google.common.hash.Hashing;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.IReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kylin.shaded.com.google.common.collect.HashMultimap;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.kylin.shaded.com.google.common.hash.HashFunction;
+import org.apache.kylin.shaded.com.google.common.hash.Hasher;
+import org.apache.kylin.shaded.com.google.common.hash.Hashing;
/**
*/
public class CubingUtils {
-//
-// private static Logger logger = LoggerFactory.getLogger(CubingUtils.class);
-//
-// public static Map<Long, HLLCounter> sampling(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDescIn,
-// Iterable<List<String>> streams) {
-// final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(flatDescIn, cubeDesc);
-// final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
-// final Set<Long> allCuboidIds = cubeDesc.getInitialCuboidScheduler().getAllCuboidIds();
-// final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-// final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap();
-//
-// final Map<Long, HLLCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
-// for (Long cuboidId : allCuboidIds) {
-// result.put(cuboidId, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
-// Integer[] cuboidBitSet = new Integer[Long.bitCount(cuboidId)];
-//
-// long mask = Long.highestOneBit(baseCuboidId);
-// int position = 0;
-// for (int i = 0; i < rowkeyLength; i++) {
-// if ((mask & cuboidId) > 0) {
-// cuboidBitSet[position] = i;
-// position++;
-// }
-// mask = mask >> 1;
-// }
-// allCuboidsBitSet.put(cuboidId, cuboidBitSet);
-// }
-//
-// HashFunction hf = Hashing.murmur3_32();
-// byte[][] row_hashcodes = new byte[rowkeyLength][];
-// for (List<String> row : streams) {
-// //generate hash for each row key column
-// for (int i = 0; i < rowkeyLength; i++) {
-// Hasher hc = hf.newHasher();
-// final String cell = row.get(flatDesc.getRowKeyColumnIndexes()[i]);
-// if (cell != null) {
-// row_hashcodes[i] = hc.putUnencodedChars(cell).hash().asBytes();
-// } else {
-// row_hashcodes[i] = hc.putInt(0).hash().asBytes();
-// }
-// }
-//
-// for (Map.Entry<Long, HLLCounter> longHyperLogLogPlusCounterNewEntry : result.entrySet()) {
-// Long cuboidId = longHyperLogLogPlusCounterNewEntry.getKey();
-// HLLCounter counter = longHyperLogLogPlusCounterNewEntry.getValue();
-// Hasher hc = hf.newHasher();
-// final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId);
-// for (int position = 0; position < cuboidBitSet.length; position++) {
-// hc.putBytes(row_hashcodes[cuboidBitSet[position]]);
-// }
-// counter.add(hc.hash().asBytes());
-// }
-// }
-// return result;
-// }
-//
-// public static Map<TblColRef, Dictionary<String>> buildDictionary(final CubeInstance cubeInstance,
-// Iterable<List<String>> recordList) throws IOException {
-// final List<TblColRef> columnsNeedToBuildDictionary = cubeInstance.getDescriptor()
-// .listDimensionColumnsExcludingDerived(true);
-// final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
-// int index = 0;
-// for (TblColRef column : columnsNeedToBuildDictionary) {
-// tblColRefMap.put(index++, column);
-// }
-//
-// HashMap<TblColRef, Dictionary<String>> result = Maps.newHashMap();
-//
-// HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
-// for (List<String> row : recordList) {
-// for (int i = 0; i < row.size(); i++) {
-// String cell = row.get(i);
-// if (tblColRefMap.containsKey(i)) {
-// valueMap.put(tblColRefMap.get(i), cell);
-// }
-// }
-// }
-// for (TblColRef tblColRef : valueMap.keySet()) {
-// Set<String> values = valueMap.get(tblColRef);
-// Dictionary<String> dict = DictionaryGenerator.buildDictionary(tblColRef.getType(),
-// new IterableDictionaryValueEnumerator(values));
-// result.put(tblColRef, dict);
-// }
-// return result;
-// }
-//
-// @SuppressWarnings("unchecked")
-// public static Map<TblColRef, Dictionary<String>> writeDictionary(CubeSegment cubeSegment,
-// Map<TblColRef, Dictionary<String>> dictionaryMap, long startOffset, long endOffset) {
-// Map<TblColRef, Dictionary<String>> realDictMap = Maps.newHashMap();
-//
-// for (Map.Entry<TblColRef, Dictionary<String>> entry : dictionaryMap.entrySet()) {
-// final TblColRef tblColRef = entry.getKey();
-// final Dictionary<String> dictionary = entry.getValue();
-// IReadableTable.TableSignature signature = new IReadableTable.TableSignature();
-// signature.setLastModifiedTime(System.currentTimeMillis());
-// signature.setPath(String.format(Locale.ROOT, "streaming_%s_%s", startOffset, endOffset));
-// signature.setSize(endOffset - startOffset);
-// DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getColumnDesc(), tblColRef.getDatatype(), signature);
-// logger.info("writing dictionary for TblColRef:" + tblColRef.toString());
-// DictionaryManager dictionaryManager = DictionaryManager.getInstance(cubeSegment.getCubeDesc().getConfig());
-// try {
-// DictionaryInfo realDict = dictionaryManager.trySaveNewDict(dictionary, dictInfo);
-// cubeSegment.putDictResPath(tblColRef, realDict.getResourcePath());
-// realDictMap.put(tblColRef, (Dictionary<String>) realDict.getDictionaryObject());
-// } catch (IOException e) {
-// throw new RuntimeException("error save dictionary for column:" + tblColRef, e);
-// }
-// }
-//
-// return realDictMap;
-// }
-//
+
+ private static Logger logger = LoggerFactory.getLogger(CubingUtils.class);
+
+ public static Map<Long, HLLCounter> sampling(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDescIn,
+ Iterable<List<String>> streams) {
+ final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(flatDescIn, cubeDesc);
+ final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
+ final Set<Long> allCuboidIds = cubeDesc.getInitialCuboidScheduler().getAllCuboidIds();
+ final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap();
+
+ final Map<Long, HLLCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
+ for (Long cuboidId : allCuboidIds) {
+ result.put(cuboidId, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
+ Integer[] cuboidBitSet = new Integer[Long.bitCount(cuboidId)];
+
+ long mask = Long.highestOneBit(baseCuboidId);
+ int position = 0;
+ for (int i = 0; i < rowkeyLength; i++) {
+ if ((mask & cuboidId) > 0) {
+ cuboidBitSet[position] = i;
+ position++;
+ }
+ mask = mask >> 1;
+ }
+ allCuboidsBitSet.put(cuboidId, cuboidBitSet);
+ }
+
+ HashFunction hf = Hashing.murmur3_32();
+ byte[][] row_hashcodes = new byte[rowkeyLength][];
+ for (List<String> row : streams) {
+ //generate hash for each row key column
+ for (int i = 0; i < rowkeyLength; i++) {
+ Hasher hc = hf.newHasher();
+ final String cell = row.get(flatDesc.getRowKeyColumnIndexes()[i]);
+ if (cell != null) {
+ row_hashcodes[i] = hc.putUnencodedChars(cell).hash().asBytes();
+ } else {
+ row_hashcodes[i] = hc.putInt(0).hash().asBytes();
+ }
+ }
+
+ for (Map.Entry<Long, HLLCounter> longHyperLogLogPlusCounterNewEntry : result.entrySet()) {
+ Long cuboidId = longHyperLogLogPlusCounterNewEntry.getKey();
+ HLLCounter counter = longHyperLogLogPlusCounterNewEntry.getValue();
+ Hasher hc = hf.newHasher();
+ final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId);
+ for (int position = 0; position < cuboidBitSet.length; position++) {
+ hc.putBytes(row_hashcodes[cuboidBitSet[position]]);
+ }
+ counter.add(hc.hash().asBytes());
+ }
+ }
+ return result;
+ }
+
+ public static Map<TblColRef, Dictionary<String>> buildDictionary(final CubeInstance cubeInstance,
+ Iterable<List<String>> recordList) throws IOException {
+ final List<TblColRef> columnsNeedToBuildDictionary = cubeInstance.getDescriptor()
+ .listDimensionColumnsExcludingDerived(true);
+ final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
+ int index = 0;
+ for (TblColRef column : columnsNeedToBuildDictionary) {
+ tblColRefMap.put(index++, column);
+ }
+
+ HashMap<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+
+ HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
+ for (List<String> row : recordList) {
+ for (int i = 0; i < row.size(); i++) {
+ String cell = row.get(i);
+ if (tblColRefMap.containsKey(i)) {
+ valueMap.put(tblColRefMap.get(i), cell);
+ }
+ }
+ }
+ for (TblColRef tblColRef : valueMap.keySet()) {
+ Set<String> values = valueMap.get(tblColRef);
+ Dictionary<String> dict = DictionaryGenerator.buildDictionary(tblColRef.getType(),
+ new IterableDictionaryValueEnumerator(values));
+ result.put(tblColRef, dict);
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Map<TblColRef, Dictionary<String>> writeDictionary(CubeSegment cubeSegment,
+ Map<TblColRef, Dictionary<String>> dictionaryMap, long startOffset, long endOffset) {
+ Map<TblColRef, Dictionary<String>> realDictMap = Maps.newHashMap();
+
+ for (Map.Entry<TblColRef, Dictionary<String>> entry : dictionaryMap.entrySet()) {
+ final TblColRef tblColRef = entry.getKey();
+ final Dictionary<String> dictionary = entry.getValue();
+ IReadableTable.TableSignature signature = new IReadableTable.TableSignature();
+ signature.setLastModifiedTime(System.currentTimeMillis());
+ signature.setPath(String.format(Locale.ROOT, "streaming_%s_%s", startOffset, endOffset));
+ signature.setSize(endOffset - startOffset);
+ DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getColumnDesc(), tblColRef.getDatatype(), signature);
+ logger.info("writing dictionary for TblColRef:" + tblColRef.toString());
+ DictionaryManager dictionaryManager = DictionaryManager.getInstance(cubeSegment.getCubeDesc().getConfig());
+ try {
+ DictionaryInfo realDict = dictionaryManager.trySaveNewDict(dictionary, dictInfo);
+ cubeSegment.putDictResPath(tblColRef, realDict.getResourcePath());
+ realDictMap.put(tblColRef, (Dictionary<String>) realDict.getDictionaryObject());
+ } catch (IOException e) {
+ throw new RuntimeException("error save dictionary for column:" + tblColRef, e);
+ }
+ }
+
+ return realDictMap;
+ }
+
}
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
index 90cabfa..de436d0 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
@@ -19,6 +19,7 @@
package org.apache.kylin.cube.model.validation.rule;
import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_DUPLICATE_DICTIONARY_COLUMN;
+import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE;
import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_REUSE_BUILDER_BOTH_EMPTY;
import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_REUSE_BUILDER_BOTH_SET;
import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_TRANSITIVE_REUSE;
@@ -36,6 +37,7 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.DictionaryDesc;
import org.apache.kylin.cube.model.validation.ValidateContext;
+import org.apache.kylin.dict.GlobalDictionaryBuilder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -73,6 +75,7 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase {
@Test
public void testBadDesc() throws IOException {
testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("ORDER_ID", null, "FakeBuilderClass"));
+ testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("ORDER_ID", null, GlobalDictionaryBuilder.class.getName()));
}
@Test
@@ -92,6 +95,17 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase {
DictionaryDesc.create("price", "lstg_site_id", null));
}
+ @Test
+ public void testBadDesc5() throws IOException {
+ testDictionaryDesc(ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE,
+ DictionaryDesc.create("CATEG_LVL2_NAME", null, GlobalDictionaryBuilder.class.getName()));
+ }
+
+ @Test
+ public void testGoodDesc2() throws IOException {
+ testDictionaryDesc(null, DictionaryDesc.create("SELLER_ID", null, GlobalDictionaryBuilder.class.getName()));
+ }
+
private void testDictionaryDesc(String expectMessage, DictionaryDesc... descs) throws IOException {
DictionaryRule rule = new DictionaryRule();
File f = new File(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/test_kylin_cube_without_slr_left_join_desc.json");
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index f132569..a0730ff 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -40,196 +40,196 @@ import com.google.common.collect.Lists;
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public class DictionaryGenerator {
-//
-// private static final Logger logger = LoggerFactory.getLogger(DictionaryGenerator.class);
-//
-// public static IDictionaryBuilder newDictionaryBuilder(DataType dataType) {
-// Preconditions.checkNotNull(dataType, "dataType cannot be null");
-//
-// // build dict, case by data type
-// IDictionaryBuilder builder;
-// boolean useForest = KylinConfig.getInstanceFromEnv().isUseForestTrieDictionary();
-// if (dataType.isNumberFamily())
-// builder = useForest ? new NumberTrieDictForestBuilder() : new NumberTrieDictBuilder();
-// else
-// builder = useForest ? new StringTrieDictForestBuilder() : new StringTrieDictBuilder();
-// return builder;
-// }
-//
-// public static Dictionary<String> buildDictionary(DataType dataType, IDictionaryValueEnumerator valueEnumerator)
-// throws IOException {
-// return buildDictionary(newDictionaryBuilder(dataType), null, valueEnumerator);
-// }
-//
-// static Dictionary<String> buildDictionary(IDictionaryBuilder builder, DictionaryInfo dictInfo,
-// IDictionaryValueEnumerator valueEnumerator) throws IOException {
-// int baseId = 0; // always 0 for now
-// int nSamples = 5;
-// ArrayList<String> samples = new ArrayList<String>(nSamples);
-//
-// // init the builder
-// builder.init(dictInfo, baseId, null);
-//
-// // add values
-// try {
-// while (valueEnumerator.moveNext()) {
-// String value = valueEnumerator.current();
-//
-// boolean accept = builder.addValue(value);
-//
-// if (accept && samples.size() < nSamples && samples.contains(value) == false)
-// samples.add(value);
-// }
-// } catch (IOException e) {
-// logger.error("Error during adding dict value.", e);
-// builder.clear();
-// throw e;
-// }
-//
-// // build
-// Dictionary<String> dict = builder.build();
-// logger.debug("Dictionary cardinality: " + dict.getSize());
-// logger.debug("Dictionary builder class: " + builder.getClass().getName());
-// logger.debug("Dictionary class: " + dict.getClass().getName());
-// // log a few samples
-// StringBuilder buf = new StringBuilder();
-// for (String s : samples) {
-// if (buf.length() > 0) {
-// buf.append(", ");
-// }
-// buf.append(s.toString()).append("=>").append(dict.getIdFromValue(s));
-// }
-// logger.debug("Dictionary value samples: " + buf.toString());
-//
-// return dict;
-// }
-//
-// public static Dictionary mergeDictionaries(DataType dataType, List<DictionaryInfo> sourceDicts) throws IOException {
-// List<Dictionary<String>> dictList = Lists.transform(sourceDicts, new Function<DictionaryInfo, Dictionary<String>>() {
-// @Nullable
-// @Override
-// public Dictionary<String> apply(@Nullable DictionaryInfo input) {
-// return input.dictionaryObject;
-// }
-// });
-// return buildDictionary(dataType, new MultipleDictionaryValueEnumerator(dataType, dictList));
-// }
-//
-// private static class StringTrieDictBuilder implements IDictionaryBuilder {
-// int baseId;
-// TrieDictionaryBuilder builder;
-//
-// @Override
-// public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
-// this.baseId = baseId;
-// this.builder = new TrieDictionaryBuilder(new StringBytesConverter());
-// }
-//
-// @Override
-// public boolean addValue(String value) {
-// if (value == null)
-// return false;
-//
-// builder.addValue(value);
-// return true;
-// }
-//
-// @Override
-// public Dictionary<String> build() throws IOException {
-// return builder.build(baseId);
-// }
-//
-// @Override
-// public void clear() {
-//
-// }
-// }
-//
-// private static class StringTrieDictForestBuilder implements IDictionaryBuilder {
-// TrieDictionaryForestBuilder builder;
-//
-// @Override
-// public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
-// builder = new TrieDictionaryForestBuilder(new StringBytesConverter(), baseId);
-// }
-//
-// @Override
-// public boolean addValue(String value) {
-// if (value == null)
-// return false;
-//
-// builder.addValue(value);
-// return true;
-// }
-//
-// @Override
-// public Dictionary<String> build() throws IOException {
-// return builder.build();
-// }
-//
-// @Override
-// public void clear() {
-//
-// }
-// }
-//
-// @SuppressWarnings("deprecation")
-// private static class NumberTrieDictBuilder implements IDictionaryBuilder {
-// int baseId;
-// NumberDictionaryBuilder builder;
-//
-// @Override
-// public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
-// this.baseId = baseId;
-// this.builder = new NumberDictionaryBuilder();
-// }
-//
-// @Override
-// public boolean addValue(String value) {
-// if (StringUtils.isBlank(value)) // empty string is treated as null
-// return false;
-//
-// builder.addValue(value);
-// return true;
-// }
-//
-// @Override
-// public Dictionary<String> build() throws IOException {
-// return builder.build(baseId);
-// }
-//
-// @Override
-// public void clear() {
-//
-// }
-// }
-//
-// private static class NumberTrieDictForestBuilder implements IDictionaryBuilder {
-// NumberDictionaryForestBuilder builder;
-//
-// @Override
-// public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
-// builder = new NumberDictionaryForestBuilder(baseId);
-// }
-//
-// @Override
-// public boolean addValue(String value) {
-// if (StringUtils.isBlank(value)) // empty string is treated as null
-// return false;
-//
-// builder.addValue(value);
-// return true;
-// }
-//
-// @Override
-// public Dictionary<String> build() throws IOException {
-// return builder.build();
-// }
-//
-// @Override
-// public void clear() {
-//
-// }
-// }
+
+ private static final Logger logger = LoggerFactory.getLogger(DictionaryGenerator.class);
+
+ public static IDictionaryBuilder newDictionaryBuilder(DataType dataType) {
+ Preconditions.checkNotNull(dataType, "dataType cannot be null");
+
+ // build dict, case by data type
+ IDictionaryBuilder builder;
+ boolean useForest = KylinConfig.getInstanceFromEnv().isUseForestTrieDictionary();
+ if (dataType.isNumberFamily())
+ builder = useForest ? new NumberTrieDictForestBuilder() : new NumberTrieDictBuilder();
+ else
+ builder = useForest ? new StringTrieDictForestBuilder() : new StringTrieDictBuilder();
+ return builder;
+ }
+
+ public static Dictionary<String> buildDictionary(DataType dataType, IDictionaryValueEnumerator valueEnumerator)
+ throws IOException {
+ return buildDictionary(newDictionaryBuilder(dataType), null, valueEnumerator);
+ }
+
+ static Dictionary<String> buildDictionary(IDictionaryBuilder builder, DictionaryInfo dictInfo,
+ IDictionaryValueEnumerator valueEnumerator) throws IOException {
+ int baseId = 0; // always 0 for now
+ int nSamples = 5;
+ ArrayList<String> samples = new ArrayList<String>(nSamples);
+
+ // init the builder
+ builder.init(dictInfo, baseId, null);
+
+ // add values
+ try {
+ while (valueEnumerator.moveNext()) {
+ String value = valueEnumerator.current();
+
+ boolean accept = builder.addValue(value);
+
+ if (accept && samples.size() < nSamples && samples.contains(value) == false)
+ samples.add(value);
+ }
+ } catch (IOException e) {
+ logger.error("Error during adding dict value.", e);
+ builder.clear();
+ throw e;
+ }
+
+ // build
+ Dictionary<String> dict = builder.build();
+ logger.debug("Dictionary cardinality: " + dict.getSize());
+ logger.debug("Dictionary builder class: " + builder.getClass().getName());
+ logger.debug("Dictionary class: " + dict.getClass().getName());
+ // log a few samples
+ StringBuilder buf = new StringBuilder();
+ for (String s : samples) {
+ if (buf.length() > 0) {
+ buf.append(", ");
+ }
+ buf.append(s.toString()).append("=>").append(dict.getIdFromValue(s));
+ }
+ logger.debug("Dictionary value samples: " + buf.toString());
+
+ return dict;
+ }
+
+ public static Dictionary mergeDictionaries(DataType dataType, List<DictionaryInfo> sourceDicts) throws IOException {
+ List<Dictionary<String>> dictList = Lists.transform(sourceDicts, new Function<DictionaryInfo, Dictionary<String>>() {
+ @Nullable
+ @Override
+ public Dictionary<String> apply(@Nullable DictionaryInfo input) {
+ return input.dictionaryObject;
+ }
+ });
+ return buildDictionary(dataType, new MultipleDictionaryValueEnumerator(dataType, dictList));
+ }
+
+ private static class StringTrieDictBuilder implements IDictionaryBuilder {
+ int baseId;
+ TrieDictionaryBuilder builder;
+
+ @Override
+ public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
+ this.baseId = baseId;
+ this.builder = new TrieDictionaryBuilder(new StringBytesConverter());
+ }
+
+ @Override
+ public boolean addValue(String value) {
+ if (value == null)
+ return false;
+
+ builder.addValue(value);
+ return true;
+ }
+
+ @Override
+ public Dictionary<String> build() throws IOException {
+ return builder.build(baseId);
+ }
+
+ @Override
+ public void clear() {
+
+ }
+ }
+
+ private static class StringTrieDictForestBuilder implements IDictionaryBuilder {
+ TrieDictionaryForestBuilder builder;
+
+ @Override
+ public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
+ builder = new TrieDictionaryForestBuilder(new StringBytesConverter(), baseId);
+ }
+
+ @Override
+ public boolean addValue(String value) {
+ if (value == null)
+ return false;
+
+ builder.addValue(value);
+ return true;
+ }
+
+ @Override
+ public Dictionary<String> build() throws IOException {
+ return builder.build();
+ }
+
+ @Override
+ public void clear() {
+
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ private static class NumberTrieDictBuilder implements IDictionaryBuilder {
+ int baseId;
+ NumberDictionaryBuilder builder;
+
+ @Override
+ public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
+ this.baseId = baseId;
+ this.builder = new NumberDictionaryBuilder();
+ }
+
+ @Override
+ public boolean addValue(String value) {
+ if (StringUtils.isBlank(value)) // empty string is treated as null
+ return false;
+
+ builder.addValue(value);
+ return true;
+ }
+
+ @Override
+ public Dictionary<String> build() throws IOException {
+ return builder.build(baseId);
+ }
+
+ @Override
+ public void clear() {
+
+ }
+ }
+
+ private static class NumberTrieDictForestBuilder implements IDictionaryBuilder {
+ NumberDictionaryForestBuilder builder;
+
+ @Override
+ public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
+ builder = new NumberDictionaryForestBuilder(baseId);
+ }
+
+ @Override
+ public boolean addValue(String value) {
+ if (StringUtils.isBlank(value)) // empty string is treated as null
+ return false;
+
+ builder.addValue(value);
+ return true;
+ }
+
+ @Override
+ public Dictionary<String> build() throws IOException {
+ return builder.build();
+ }
+
+ @Override
+ public void clear() {
+
+ }
+ }
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/ILookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
similarity index 96%
rename from core-cube/src/main/java/org/apache/kylin/cube/ILookupTable.java
rename to core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
index 794b1e5..dccb7c4 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/ILookupTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.kylin.cube;
+package org.apache.kylin.dict.lookup;
import org.apache.kylin.common.util.Array;
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index c72681b..9d591b5 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -63,7 +63,7 @@ public class SnapshotManager {
SnapshotManager.logger.info("Snapshot with resource path {} is removed due to {}",
notification.getKey(), notification.getCause());
}
- }).maximumSize(100L)//
+ }).maximumSize(config.getCachedSnapshotMaxEntrySize())//
.expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, SnapshotTable>() {
@Override
public SnapshotTable load(String key) throws Exception {
@@ -173,11 +173,11 @@ public class SnapshotManager {
KylinConfig cubeConfig) throws IOException {
String dup = checkDupByInfo(snapshot);
-// if ((float) snapshot.getSignature().getSize() / 1024 / 1024 > cubeConfig.getTableSnapshotMaxMB()) {
-// throw new IllegalStateException(
-// "Table snapshot should be no greater than " + cubeConfig.getTableSnapshotMaxMB() //
-// + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize());
-// }
+ if ((float) snapshot.getSignature().getSize() / 1024 / 1024 > cubeConfig.getTableSnapshotMaxMB()) {
+ throw new IllegalStateException(
+ "Table snapshot should be no greater than " + cubeConfig.getTableSnapshotMaxMB() //
+ + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize());
+ }
if (dup != null) {
logger.info("Identical input {}, reuse existing snapshot at {}", table.getSignature(), dup);
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 31cc081..2c4bc6a 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -18,6 +18,7 @@
package org.apache.kylin.job;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@@ -31,6 +32,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.RowKeyColDesc;
+import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.JoinDesc;
@@ -111,16 +113,16 @@ public class JoinedFlatTable {
kylinConfig = (flatDesc.getSegment()).getConfig();
}
-// if (kylinConfig.isAdvancedFlatTableUsed()) {
-// try {
-// Class advancedFlatTable = Class.forName(kylinConfig.getAdvancedFlatTableClass());
-// Method method = advancedFlatTable.getMethod("generateInsertDataStatement", IJoinedFlatTableDesc.class,
-// JobEngineConfig.class);
-// return (String) method.invoke(null, flatDesc);
-// } catch (Exception e) {
-// throw new RuntimeException(e);
-// }
-// }
+ if (kylinConfig.isAdvancedFlatTableUsed()) {
+ try {
+ Class advancedFlatTable = Class.forName(kylinConfig.getAdvancedFlatTableClass());
+ Method method = advancedFlatTable.getMethod("generateInsertDataStatement", IJoinedFlatTableDesc.class,
+ JobEngineConfig.class);
+ return (String) method.invoke(null, flatDesc);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
return "INSERT OVERWRITE TABLE " + quoteIdentifier(flatDesc.getTableName(), null) + " " + generateSelectDataStatement(flatDesc)
+ ";\n";
diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java b/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
index 937c1ad..422a802 100644
--- a/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
@@ -32,7 +32,6 @@ import org.apache.kylin.shaded.com.google.common.base.Predicate;
import org.apache.kylin.shaded.com.google.common.collect.Iterables;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
-@Deprecated
public abstract class DimensionEncodingFactory {
private static final Logger logger = LoggerFactory.getLogger(DimensionEncodingFactory.class);
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
index 55bdb4b..acdc2e9 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.threadlocal.InternalThreadLocal;
+import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.ImplementationSwitch;
import org.apache.kylin.metadata.model.IStorageAware;
import org.apache.kylin.metadata.realization.IRealization;
@@ -31,7 +32,19 @@ public class StorageFactory {
// Use thread-local because KylinConfig can be thread-local and implementation might be different among multiple threads.
private static InternalThreadLocal<ImplementationSwitch<IStorage>> storages = new InternalThreadLocal<>();
+ private static IStorage configuredUseLocalStorage;
+
+ static {
+ String localStorageImpl = KylinConfig.getInstanceFromEnv().getLocalStorageImpl();
+ if (localStorageImpl != null){
+ configuredUseLocalStorage = (IStorage) ClassUtil.newInstance(localStorageImpl);
+ }
+ }
+
public static IStorage storage(IStorageAware aware) {
+ if (configuredUseLocalStorage != null) {
+ return configuredUseLocalStorage;
+ }
ImplementationSwitch<IStorage> current = storages.get();
if (storages.get() == null) {
current = new ImplementationSwitch<>(KylinConfig.getInstanceFromEnv().getStorageEngines(), IStorage.class);
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 3fa104d..e6e0737 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -28,6 +28,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Set;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
@@ -79,9 +80,9 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
TupleFilter havingFilter, StorageContext context) {
this.context = context;
-// this.maxScanRanges = cubeSegment.getConfig().getQueryStorageVisitScanRangeMax();
-// this.maxFuzzyKeysPerSplit = cubeSegment.getConfig().getQueryScanFuzzyKeyMax();
-// this.maxFuzzyKeys = maxFuzzyKeysPerSplit * cubeSegment.getConfig().getQueryScanFuzzyKeySplitMax();
+ this.maxScanRanges = cubeSegment.getConfig().getQueryStorageVisitScanRangeMax();
+ this.maxFuzzyKeysPerSplit = cubeSegment.getConfig().getQueryScanFuzzyKeyMax();
+ this.maxFuzzyKeys = maxFuzzyKeysPerSplit * cubeSegment.getConfig().getQueryScanFuzzyKeySplitMax();
this.cubeSegment = cubeSegment;
this.cubeDesc = cubeSegment.getCubeDesc();
@@ -147,9 +148,9 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
*/
public CubeScanRangePlanner(GTInfo info, TblColRef gtPartitionCol, TupleFilter gtFilter) {
-// this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
-// this.maxFuzzyKeysPerSplit = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
-// this.maxFuzzyKeys = maxFuzzyKeysPerSplit * KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeySplitMax();
+ this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
+ this.maxFuzzyKeysPerSplit = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
+ this.maxFuzzyKeys = maxFuzzyKeysPerSplit * KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeySplitMax();
this.gtInfo = info;
@@ -174,7 +175,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
.setRtAggrMetrics(gtRtAggrMetrics).setDynamicColumns(gtDynColumns)
.setExprsPushDown(tupleExpressionMap)//
.setAllowStorageAggregation(context.isNeedStorageAggregation())
- .setAggCacheMemThreshold(0)//
+ .setAggCacheMemThreshold(cubeSegment.getConfig().getQueryCoprocessorMemGB())//
.setStoragePushDownLimit(context.getFinalPushDownLimit())
.setStorageLimitLevel(context.getStorageLimitLevel()).setHavingFilterPushDown(havingFilter)
.createGTScanRequest();
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index b76fe47..3adbb8e 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -26,10 +26,12 @@ import java.util.Set;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.dict.BuiltInFunctionTransformer;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.metadata.expression.TupleExpression;
+import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
import org.apache.kylin.metadata.filter.StringCodeSystem;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.filter.TupleFilterSerializer;
@@ -66,6 +68,10 @@ public class CubeSegmentScanner implements Iterable<GTRecord> {
byte[] serialize = TupleFilterSerializer.serialize(originalfilter, StringCodeSystem.INSTANCE);
TupleFilter filter = TupleFilterSerializer.deserialize(serialize, StringCodeSystem.INSTANCE);
+ // translate FunctionTupleFilter to IN clause
+ ITupleFilterTransformer translator = new BuiltInFunctionTransformer(cubeSeg.getDimensionEncodingMap());
+ filter = translator.transform(filter);
+
CubeScanRangePlanner scanRangePlanner;
try {
scanRangePlanner = new CubeScanRangePlanner(cubeSeg, cuboid, filter, dimensions, groups, dynGroups,
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index db08e62..4fb71bb 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -36,6 +36,7 @@ import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.cube.model.RowKeyColDesc;
import org.apache.kylin.cube.model.RowKeyDesc;
+import org.apache.kylin.dict.lookup.ILookupTable;
import org.apache.kylin.dimension.TimeDerivedColumnType;
import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
@@ -44,7 +45,6 @@ import org.apache.kylin.metadata.model.JoinDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.Tuple;
import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.cube.ILookupTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,7 +97,7 @@ public class CubeTupleConverter implements ITupleConverter {
advMeasureFillers = Lists.newArrayListWithCapacity(1);
advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
usedLookupTables = Lists.newArrayList();
-// eventTimezone = cubeSeg.getConfig().getStreamingDerivedTimeTimezone();
+ eventTimezone = cubeSeg.getConfig().getStreamingDerivedTimeTimezone();
autoJustByTimezone = eventTimezone.length() > 0
&& cubeSeg.getCubeDesc().getModel().getRootFactTable().getTableDesc().isStreamingTable();
if (autoJustByTimezone) {
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 63ef36d..804ce3f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -33,7 +33,6 @@ import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.ILookupTable;
import org.apache.kylin.cube.RawQueryLastHacker;
import org.apache.kylin.cube.common.SegmentPruner;
import org.apache.kylin.cube.cuboid.Cuboid;
@@ -42,6 +41,7 @@ import org.apache.kylin.cube.gridtable.CuboidToGridTableMappingExt;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.cube.model.RowKeyColDesc;
+import org.apache.kylin.dict.lookup.ILookupTable;
import org.apache.kylin.gridtable.StorageLimitLevel;
import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.measure.bitmap.BitmapMeasureType;
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
index fea4550..9bfdd76 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
@@ -25,9 +25,9 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Array;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.ILookupTable;
import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
import org.apache.kylin.cube.model.CubeDesc.DeriveType;
+import org.apache.kylin.dict.lookup.ILookupTable;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.datatype.DataTypeOrder;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
index 47a3ba7..574bb9f 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
@@ -18,673 +18,721 @@
package org.apache.kylin.storage.gtrecord;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesSerializer;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.gridtable.CubeCodeSystem;
+import org.apache.kylin.dict.NumberDictionaryForestBuilder;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.apache.kylin.dict.TrieDictionaryBuilder;
+import org.apache.kylin.dimension.DictionaryDimEnc;
+import org.apache.kylin.dimension.DimensionEncoding;
+import org.apache.kylin.gridtable.GTBuilder;
+import org.apache.kylin.gridtable.GTFilterScanner.FilterResultCache;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTInfo.Builder;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRange;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
+import org.apache.kylin.gridtable.GTUtil;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.ExtractTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.TblColRef.InnerDataTypeEnum;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
public class DictGridTableTest extends LocalFileMetadataTestCase {
-//
-// private GridTable table;
-// private GTInfo info;
-// private CompareTupleFilter timeComp0;
-// private CompareTupleFilter timeComp1;
-// private CompareTupleFilter timeComp2;
-// private CompareTupleFilter timeComp3;
-// private CompareTupleFilter timeComp4;
-// private CompareTupleFilter timeComp5;
-// private CompareTupleFilter timeComp6;
-// private CompareTupleFilter timeComp7;
-// private CompareTupleFilter ageComp1;
-// private CompareTupleFilter ageComp2;
-// private CompareTupleFilter ageComp3;
-// private CompareTupleFilter ageComp4;
-//
-// @After
-// public void after() throws Exception {
-//
-// this.cleanupTestMetadata();
-// }
-//
-// @Before
-// public void setup() throws IOException {
-//
-// this.createTestMetadata();
-//
-// table = newTestTable();
-// info = table.getInfo();
-//
-// timeComp0 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-14"));
-// timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
-// timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
-// timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15"));
-// timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15"));
-// timeComp5 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-15"));
-// timeComp6 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-14"));
-// timeComp7 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "1970-01-01"));
-// ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10"));
-// ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20"));
-// ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30"));
-// ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30"));
-//
-// }
-//
-// @Test
-// public void verifySegmentSkipping() {
-//
-// ByteArray segmentStart = enc(info, 0, "2015-01-14");
-// ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free
-// assertEquals(segmentStart, segmentStartX);
-//
-// {
-// LogicalTupleFilter filter = and(timeComp0, ageComp1);
-// CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-// List<GTScanRange> r = planner.planScanRanges();
-// assertEquals(1, r.size());//scan range are [close,close]
-// assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
-// assertEquals(1, r.get(0).fuzzyKeys.size());
-// assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
-// }
-// {
-// LogicalTupleFilter filter = and(timeComp2, ageComp1);
-// CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-// List<GTScanRange> r = planner.planScanRanges();
-// assertEquals(1, r.size());
-// }
-// {
-// LogicalTupleFilter filter = and(timeComp4, ageComp1);
-// CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-// List<GTScanRange> r = planner.planScanRanges();
-// assertEquals(1, r.size());
-// }
-// {
-// LogicalTupleFilter filter = and(timeComp5, ageComp1);
-// CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-// List<GTScanRange> r = planner.planScanRanges();
-// assertEquals(1, r.size());
-// }
-// {
-// LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1),
-// and(timeComp6, ageComp1));
-// CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-// List<GTScanRange> r = planner.planScanRanges();
-// assertEquals(2, r.size());
-// assertEquals("[1421193600000, 10]-[null, 10]", r.get(1).toString());
-// assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]",
-// r.get(1).fuzzyKeys.toString());
-// }
-// {
-// LogicalTupleFilter filter = or(and(timeComp3, ageComp3), and(timeComp7, ageComp1));
-// CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-// List<GTScanRange> r = planner.planScanRanges();
-// assertEquals("[[0, 10]-[1421280000000, 30]]", r.toString());
-// }
-// {
-// LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
-// CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-// List<GTScanRange> r = planner.planScanRanges();
-// assertEquals(2, r.size());
-// assertEquals("[1421193600000, null]-[null, null]", r.get(1).toString());
-// assertEquals(0, r.get(1).fuzzyKeys.size());
-// }
-// {
-// //skip FALSE filter
-// LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE);
-// CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-// List<GTScanRange> r = planner.planScanRanges();
-// assertEquals(0, r.size());
-// }
-// {
-// //TRUE or FALSE filter
-// LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE);
-// CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-// List<GTScanRange> r = planner.planScanRanges();
-// assertEquals(1, r.size());
-// assertEquals("[null, null]-[null, null]", r.get(0).toString());
-// }
-// {
-// //TRUE or other filter
-// LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE);
-// CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-// List<GTScanRange> r = planner.planScanRanges();
-// assertEquals(1, r.size());
-// assertEquals("[null, null]-[null, null]", r.get(0).toString());
-// }
-// }
-//
-// @Test
-// public void verifySegmentSkipping2() {
-// {
-// LogicalTupleFilter filter = and(timeComp0, ageComp1);
-// CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-// List<GTScanRange> r = planner.planScanRanges();
-// assertEquals(1, r.size());//scan range are [close,close]
-// assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
-// assertEquals(1, r.get(0).fuzzyKeys.size());
-// assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
-// }
-//
-// {
-// LogicalTupleFilter filter = and(timeComp5, ageComp1);
-// CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-// List<GTScanRange> r = planner.planScanRanges();
-// assertEquals(1, r.size());//scan range are [close,close]
-// }
-// }
-//
-// @Test
-// public void verifyScanRangePlanner() {
-//
-// // flatten or-and & hbase fuzzy value
-// {
-// LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
-// CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-// List<GTScanRange> r = planner.planScanRanges();
-// assertEquals(1, r.size());
-// assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
-// assertEquals("[[null, 10, null, null, null], [null, 20, null, null, null]]", r.get(0).fuzzyKeys.toString());
-// }
-//
-// // pre-evaluate ever false
-// {
-// LogicalTupleFilter filter = and(timeComp1, timeComp2);
-// CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-// List<GTScanRange> r = planner.planScanRanges();
-// assertEquals(0, r.size());
-// }
-//
-// // pre-evaluate ever true
-// {
-// LogicalTupleFilter filter = or(timeComp1, ageComp4);
-// CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-// List<GTScanRange> r = planner.planScanRanges();
-// assertEquals("[[null, null]-[null, null]]", r.toString());
-// }
-//
-// // merge overlap range
-// {
-// LogicalTupleFilter filter = or(timeComp1, timeComp3);
-// CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-// List<GTScanRange> r = planner.planScanRanges();
-// assertEquals("[[null, null]-[null, null]]", r.toString());
-// }
-//
-// // merge too many ranges
-// {
-// LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2),
-// and(timeComp4, ageComp3));
-// CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-// List<GTScanRange> r = planner.planScanRanges();
-// assertEquals(3, r.size());
-// assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
-// assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString());
-// assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString());
-// planner.setMaxScanRanges(2);
-// List<GTScanRange> r2 = planner.planScanRanges();
-// assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString());
-// }
-// }
-//
-// @Test
-// public void verifyFirstRow() throws IOException {
-// doScanAndVerify(table,
-// new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null)
-// .setFilterPushDown(null).createGTScanRequest(),
-// "[1421193600000, 30, Yang, 10, 10.5]", //
-// "[1421193600000, 30, Luke, 10, 10.5]", //
-// "[1421280000000, 20, Dong, 10, 10.5]", //
-// "[1421280000000, 20, Jason, 10, 10.5]", //
-// "[1421280000000, 30, Xu, 10, 10.5]", //
-// "[1421366400000, 20, Mahone, 10, 10.5]", //
-// "[1421366400000, 20, Qianhao, 10, 10.5]", //
-// "[1421366400000, 30, George, 10, 10.5]", //
-// "[1421366400000, 30, Shaofeng, 10, 10.5]", //
-// "[1421452800000, 10, Kejia, 10, 10.5]");
-// }
-//
-// //for testing GTScanRequest serialization and deserialization
-// public static GTScanRequest useDeserializedGTScanRequest(GTScanRequest origin) {
-// ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
-// GTScanRequest.serializer.serialize(origin, buffer);
-// buffer.flip();
-// GTScanRequest sGTScanRequest = GTScanRequest.serializer.deserialize(buffer);
-//
-// Assert.assertArrayEquals(origin.getAggrMetricsFuncs(), sGTScanRequest.getAggrMetricsFuncs());
-// Assert.assertEquals(origin.getAggCacheMemThreshold(), sGTScanRequest.getAggCacheMemThreshold(), 0.01);
-// return sGTScanRequest;
-// }
-//
-// @Test
-// public void verifyScanWithUnevaluatableFilter() throws IOException {
-// GTInfo info = table.getInfo();
-//
-// CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
-// ExtractTupleFilter fUnevaluatable = unevaluatable(info.colRef(1));
-// LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
-// LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
-//
-// GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
-// .setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" })
-// .setFilterPushDown(filter).createGTScanRequest();
-//
-// // note the unEvaluatable column 1 in filter is added to group by
-// assertEquals(
-// "GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [], []], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]",
-// req.toString());
-//
-// doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 20, null]",
-// "[1421280000000, 30, null, 10, null]", "[1421366400000, 20, null, 20, null]",
-// "[1421366400000, 30, null, 20, null]", "[1421452800000, 10, null, 10, null]");
-// }
-//
-// @Test
-// public void verifyScanWithEvaluatableFilter() throws IOException {
-// GTInfo info = table.getInfo();
-//
-// CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
-// CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
-// LogicalTupleFilter filter = and(fComp1, fComp2);
-//
-// GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
-// .setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" })
-// .setFilterPushDown(filter).createGTScanRequest();
-// // note the evaluatable column 1 in filter is added to returned columns but not in group by
-// assertEquals(
-// "GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]",
-// req.toString());
-//
-// doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 30, null]",
-// "[1421366400000, 20, null, 40, null]");
-// }
-//
-// @Test
-// public void verifyAggregateAndHavingFilter() throws IOException {
-// GTInfo info = table.getInfo();
-//
-// TblColRef havingCol = TblColRef.newInnerColumn("SUM_OF_BIGDECIMAL", InnerDataTypeEnum.LITERAL);
-// havingCol.getColumnDesc().setId("1"); // point to the first aggregated measure
-// CompareTupleFilter havingFilter = compare(havingCol, FilterOperatorEnum.GT, "20");
-//
-// GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
-// .setAggrGroupBy(setOf(1)).setAggrMetrics(setOf(4)).setAggrMetricsFuncs(new String[] { "sum" })
-// .setHavingFilterPushDown(havingFilter).createGTScanRequest();
-//
-// doScanAndVerify(table, useDeserializedGTScanRequest(req), "[null, 20, null, null, 42.0]",
-// "[null, 30, null, null, 52.5]");
-// }
-//
-// @SuppressWarnings("unused")
-// private void testFilterScannerPerfInner(GridTable table, GTInfo info, LogicalTupleFilter filter)
-// throws IOException {
-// long start = System.currentTimeMillis();
-// GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
-// .setFilterPushDown(filter).createGTScanRequest();
-// int i = 0;
-// try (IGTScanner scanner = table.scan(req)) {
-// for (GTRecord r : scanner) {
-// i++;
-// }
-// }
-// long end = System.currentTimeMillis();
-// System.out.println(
-// (end - start) + "ms with filter cache enabled=" + FilterResultCache.DEFAULT_OPTION + ", " + i + " rows");
-// }
-//
-// @Test
-// public void verifyConvertFilterConstants1() {
-// GTInfo info = table.getInfo();
-//
-// TableDesc extTable = TableDesc.mockup("ext");
-// TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
-// TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
-//
-// CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
-// CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.EQ, "10");
-// LogicalTupleFilter filter = and(fComp1, fComp2);
-//
-// List<TblColRef> colMapping = Lists.newArrayList();
-// colMapping.add(extColA);
-// colMapping.add(extColB);
-//
-// TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-// assertEquals(
-// "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]",
-// newFilter.toString());
-// }
-//
-// @Test
-// public void verifyConvertFilterConstants2() {
-// GTInfo info = table.getInfo();
-//
-// TableDesc extTable = TableDesc.mockup("ext");
-// TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
-// TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
-//
-// List<TblColRef> colMapping = Lists.newArrayList();
-// colMapping.add(extColA);
-// colMapping.add(extColB);
-//
-// CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
-//
-// // $1<"9" round down to FALSE
-// {
-// LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "9"));
-// TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-// assertEquals(
-// "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 ISNOTNULL []]",
-// newFilter.toString());
-// }
-//
-// // $1<"10" needs no rounding
-// {
-// LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "10"));
-// TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-// assertEquals(
-// "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]",
-// newFilter.toString());
-// }
-//
-// // $1<"11" round down to <="10"
-// {
-// LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "11"));
-// TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-// assertEquals(
-// "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
-// newFilter.toString());
-// }
-//
-// // $1<="9" round down to FALSE
-// {
-// LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "9"));
-// TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-// assertEquals(ConstantTupleFilter.FALSE, newFilter);
-// }
-//
-// // $1<="10" needs no rounding
-// {
-// LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "10"));
-// TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-// assertEquals(
-// "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
-// newFilter.toString());
-// }
-//
-// // $1<="11" round down to <="10"
-// {
-// LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "11"));
-// TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-// assertEquals(
-// "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
-// newFilter.toString());
-// }
-// }
-//
-// @Test
-// public void verifyConvertFilterConstants3() {
-// GTInfo info = table.getInfo();
-//
-// TableDesc extTable = TableDesc.mockup("ext");
-// TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
-// TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
-//
-// List<TblColRef> colMapping = Lists.newArrayList();
-// colMapping.add(extColA);
-// colMapping.add(extColB);
-//
-// CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
-//
-// // $1>"101" round up to FALSE
-// {
-// LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "101"));
-// TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-// assertEquals("AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 ISNOTNULL []]",
-// newFilter.toString());
-// }
-//
-// // $1>"100" needs no rounding
-// {
-// LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "100"));
-// TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-// assertEquals(
-// "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GT [\\x09]]",
-// newFilter.toString());
-// }
-//
-// // $1>"99" round up to >="100"
-// {
-// LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "99"));
-// TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-// assertEquals(
-// "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
-// newFilter.toString());
-// }
-//
-// // $1>="101" round up to FALSE
-// {
-// LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "101"));
-// TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-// assertEquals(ConstantTupleFilter.FALSE, newFilter);
-// }
-//
-// // $1>="100" needs no rounding
-// {
-// LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "100"));
-// TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-// assertEquals(
-// "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
-// newFilter.toString());
-// }
-//
-// // $1>="99" round up to >="100"
-// {
-// LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "99"));
-// TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-// assertEquals(
-// "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
-// newFilter.toString());
-// }
-// }
-//
-// @Test
-// public void verifyConvertFilterConstants4() {
-// GTInfo info = table.getInfo();
-//
-// TableDesc extTable = TableDesc.mockup("ext");
-// TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
-// TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
-//
-// CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
-// CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.IN, "9", "10", "15");
-// LogicalTupleFilter filter = and(fComp1, fComp2);
-//
-// List<TblColRef> colMapping = Lists.newArrayList();
-// colMapping.add(extColA);
-// colMapping.add(extColB);
-//
-// // $1 in ("9", "10", "15") has only "10" left
-// TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-// assertEquals(
-// "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]",
-// newFilter.toString());
-// }
-//
-// private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException {
-// System.out.println(req);
-// try (IGTScanner scanner = table.scan(req)) {
-// int i = 0;
-// for (GTRecord r : scanner) {
-// System.out.println(r);
-// if (verifyRows == null || i >= verifyRows.length) {
-// Assert.fail();
-// }
-// assertEquals(verifyRows[i], r.toString());
-// i++;
-// }
-// }
-// }
-//
-// public static ByteArray enc(GTInfo info, int col, String value) {
-// ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
-// info.getCodeSystem().encodeColumnValue(col, value, buf);
-// return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position());
-// }
-//
-// public static ExtractTupleFilter unevaluatable(TblColRef col) {
-// ExtractTupleFilter r = new ExtractTupleFilter(FilterOperatorEnum.EXTRACT);
-// r.addChild(new ColumnTupleFilter(col));
-// return r;
-// }
-//
-// public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object... value) {
-// CompareTupleFilter result = new CompareTupleFilter(op);
-// result.addChild(new ColumnTupleFilter(col));
-// result.addChild(new ConstantTupleFilter(Arrays.asList(value)));
-// return result;
-// }
-//
-// public static LogicalTupleFilter and(TupleFilter... children) {
-// return logic(FilterOperatorEnum.AND, children);
-// }
-//
-// public static LogicalTupleFilter or(TupleFilter... children) {
-// return logic(FilterOperatorEnum.OR, children);
-// }
-//
-// public static LogicalTupleFilter not(TupleFilter child) {
-// return logic(FilterOperatorEnum.NOT, child);
-// }
-//
-// public static LogicalTupleFilter logic(FilterOperatorEnum op, TupleFilter... children) {
-// LogicalTupleFilter result = new LogicalTupleFilter(op);
-// for (TupleFilter c : children) {
-// result.addChild(c);
-// }
-// return result;
-// }
-//
-// public static GridTable newTestTable() throws IOException {
-// GTInfo info = newInfo();
-// GTSimpleMemStore store = new GTSimpleMemStore(info);
-// GridTable table = new GridTable(info, store);
-//
-// GTRecord r = new GTRecord(table.getInfo());
-// GTBuilder builder = table.rebuild();
-//
-// builder.write(r.setValues("2015-01-14", "30", "Yang", new Long(10), new BigDecimal("10.5")));
-// builder.write(r.setValues("2015-01-14", "30", "Luke", new Long(10), new BigDecimal("10.5")));
-// builder.write(r.setValues("2015-01-15", "20", "Dong", new Long(10), new BigDecimal("10.5")));
-// builder.write(r.setValues("2015-01-15", "20", "Jason", new Long(10), new BigDecimal("10.5")));
-// builder.write(r.setValues("2015-01-15", "30", "Xu", new Long(10), new BigDecimal("10.5")));
-// builder.write(r.setValues("2015-01-16", "20", "Mahone", new Long(10), new BigDecimal("10.5")));
-// builder.write(r.setValues("2015-01-16", "20", "Qianhao", new Long(10), new BigDecimal("10.5")));
-// builder.write(r.setValues("2015-01-16", "30", "George", new Long(10), new BigDecimal("10.5")));
-// builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new Long(10), new BigDecimal("10.5")));
-// builder.write(r.setValues("2015-01-17", "10", "Kejia", new Long(10), new BigDecimal("10.5")));
-// builder.close();
-//
-// return table;
-// }
-//
-// static GridTable newTestPerfTable() throws IOException {
-// GTInfo info = newInfo();
-// GTSimpleMemStore store = new GTSimpleMemStore(info);
-// GridTable table = new GridTable(info, store);
-//
-// GTRecord r = new GTRecord(table.getInfo());
-// GTBuilder builder = table.rebuild();
-//
-// for (int i = 0; i < 100000; i++) {
-// for (int j = 0; j < 10; j++)
-// builder.write(r.setValues("2015-01-14", "30", "Yang", new Long(10), new BigDecimal("10.5")));
-//
-// for (int j = 0; j < 10; j++)
-// builder.write(r.setValues("2015-01-14", "30", "Luke", new Long(10), new BigDecimal("10.5")));
-//
-// for (int j = 0; j < 10; j++)
-// builder.write(r.setValues("2015-01-15", "20", "Dong", new Long(10), new BigDecimal("10.5")));
-//
-// for (int j = 0; j < 10; j++)
-// builder.write(r.setValues("2015-01-15", "20", "Jason", new Long(10), new BigDecimal("10.5")));
-//
-// for (int j = 0; j < 10; j++)
-// builder.write(r.setValues("2015-01-15", "30", "Xu", new Long(10), new BigDecimal("10.5")));
-//
-// for (int j = 0; j < 10; j++)
-// builder.write(r.setValues("2015-01-16", "20", "Mahone", new Long(10), new BigDecimal("10.5")));
-//
-// for (int j = 0; j < 10; j++)
-// builder.write(r.setValues("2015-01-16", "20", "Qianhao", new Long(10), new BigDecimal("10.5")));
-//
-// for (int j = 0; j < 10; j++)
-// builder.write(r.setValues("2015-01-16", "30", "George", new Long(10), new BigDecimal("10.5")));
-//
-// for (int j = 0; j < 10; j++)
-// builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new Long(10), new BigDecimal("10.5")));
-//
-// for (int j = 0; j < 10; j++)
-// builder.write(r.setValues("2015-01-17", "10", "Kejia", new Long(10), new BigDecimal("10.5")));
-// }
-// builder.close();
-//
-// return table;
-// }
-//
-// static GTInfo newInfo() {
-// Builder builder = GTInfo.builder();
-// builder.setCodeSystem(newDictCodeSystem());
-// builder.setColumns(//
-// DataType.getType("timestamp"), //
-// DataType.getType("integer"), //
-// DataType.getType("varchar(10)"), //
-// DataType.getType("bigint"), //
-// DataType.getType("decimal") //
-// );
-// builder.setPrimaryKey(setOf(0, 1));
-// builder.setColumnPreferIndex(setOf(0));
-// builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0, 1), setOf(2), setOf(3, 4) });
-// builder.enableRowBlock(4);
-// GTInfo info = builder.build();
-// return info;
-// }
-//
-// private static CubeCodeSystem newDictCodeSystem() {
-// DimensionEncoding[] dimEncs = new DimensionEncoding[3];
-// dimEncs[1] = new DictionaryDimEnc(newDictionaryOfInteger());
-// dimEncs[2] = new DictionaryDimEnc(newDictionaryOfString());
-// return new CubeCodeSystem(dimEncs);
-// }
-//
-// private static Dictionary newDictionaryOfString() {
-// TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<>(new StringBytesConverter());
-// builder.addValue("Dong");
-// builder.addValue("George");
-// builder.addValue("Jason");
-// builder.addValue("Kejia");
-// builder.addValue("Luke");
-// builder.addValue("Mahone");
-// builder.addValue("Qianhao");
-// builder.addValue("Shaofeng");
-// builder.addValue("Xu");
-// builder.addValue("Yang");
-// return builder.build(0);
-// }
-//
-// private static Dictionary newDictionaryOfInteger() {
-// NumberDictionaryForestBuilder builder = new NumberDictionaryForestBuilder();
-// builder.addValue("10");
-// builder.addValue("20");
-// builder.addValue("30");
-// builder.addValue("40");
-// builder.addValue("50");
-// builder.addValue("60");
-// builder.addValue("70");
-// builder.addValue("80");
-// builder.addValue("90");
-// builder.addValue("100");
-// return builder.build();
-// }
-//
-// public static ImmutableBitSet setOf(int... values) {
-// BitSet set = new BitSet();
-// for (int i : values)
-// set.set(i);
-// return new ImmutableBitSet(set);
-// }
+
+ private GridTable table;
+ private GTInfo info;
+ private CompareTupleFilter timeComp0;
+ private CompareTupleFilter timeComp1;
+ private CompareTupleFilter timeComp2;
+ private CompareTupleFilter timeComp3;
+ private CompareTupleFilter timeComp4;
+ private CompareTupleFilter timeComp5;
+ private CompareTupleFilter timeComp6;
+ private CompareTupleFilter timeComp7;
+ private CompareTupleFilter ageComp1;
+ private CompareTupleFilter ageComp2;
+ private CompareTupleFilter ageComp3;
+ private CompareTupleFilter ageComp4;
+
+ @After
+ public void after() throws Exception {
+
+ this.cleanupTestMetadata();
+ }
+
+ @Before
+ public void setup() throws IOException {
+
+ this.createTestMetadata();
+
+ table = newTestTable();
+ info = table.getInfo();
+
+ timeComp0 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-14"));
+ timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+ timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
+ timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15"));
+ timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15"));
+ timeComp5 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-15"));
+ timeComp6 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-14"));
+ timeComp7 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "1970-01-01"));
+ ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10"));
+ ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20"));
+ ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30"));
+ ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30"));
+
+ }
+
+ @Test
+ public void verifySegmentSkipping() {
+
+ ByteArray segmentStart = enc(info, 0, "2015-01-14");
+ ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free
+ assertEquals(segmentStart, segmentStartX);
+
+ {
+ LogicalTupleFilter filter = and(timeComp0, ageComp1);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+ List<GTScanRange> r = planner.planScanRanges();
+ assertEquals(1, r.size());//scan range are [close,close]
+ assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
+ assertEquals(1, r.get(0).fuzzyKeys.size());
+ assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
+ }
+ {
+ LogicalTupleFilter filter = and(timeComp2, ageComp1);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+ List<GTScanRange> r = planner.planScanRanges();
+ assertEquals(1, r.size());
+ }
+ {
+ LogicalTupleFilter filter = and(timeComp4, ageComp1);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+ List<GTScanRange> r = planner.planScanRanges();
+ assertEquals(1, r.size());
+ }
+ {
+ LogicalTupleFilter filter = and(timeComp5, ageComp1);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+ List<GTScanRange> r = planner.planScanRanges();
+ assertEquals(1, r.size());
+ }
+ {
+ LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1),
+ and(timeComp6, ageComp1));
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+ List<GTScanRange> r = planner.planScanRanges();
+ assertEquals(2, r.size());
+ assertEquals("[1421193600000, 10]-[null, 10]", r.get(1).toString());
+ assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]",
+ r.get(1).fuzzyKeys.toString());
+ }
+ {
+ LogicalTupleFilter filter = or(and(timeComp3, ageComp3), and(timeComp7, ageComp1));
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+ List<GTScanRange> r = planner.planScanRanges();
+ assertEquals("[[0, 10]-[1421280000000, 30]]", r.toString());
+ }
+ {
+ LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+ List<GTScanRange> r = planner.planScanRanges();
+ assertEquals(2, r.size());
+ assertEquals("[1421193600000, null]-[null, null]", r.get(1).toString());
+ assertEquals(0, r.get(1).fuzzyKeys.size());
+ }
+ {
+ //skip FALSE filter
+ LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+ List<GTScanRange> r = planner.planScanRanges();
+ assertEquals(0, r.size());
+ }
+ {
+ //TRUE or FALSE filter
+ LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+ List<GTScanRange> r = planner.planScanRanges();
+ assertEquals(1, r.size());
+ assertEquals("[null, null]-[null, null]", r.get(0).toString());
+ }
+ {
+ //TRUE or other filter
+ LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+ List<GTScanRange> r = planner.planScanRanges();
+ assertEquals(1, r.size());
+ assertEquals("[null, null]-[null, null]", r.get(0).toString());
+ }
+ }
+
+ @Test
+ public void verifySegmentSkipping2() {
+ {
+ LogicalTupleFilter filter = and(timeComp0, ageComp1);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+ List<GTScanRange> r = planner.planScanRanges();
+ assertEquals(1, r.size());//scan range are [close,close]
+ assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
+ assertEquals(1, r.get(0).fuzzyKeys.size());
+ assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
+ }
+
+ {
+ LogicalTupleFilter filter = and(timeComp5, ageComp1);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+ List<GTScanRange> r = planner.planScanRanges();
+ assertEquals(1, r.size());//scan range are [close,close]
+ }
+ }
+
+ @Test
+ public void verifyScanRangePlanner() {
+
+ // flatten or-and & hbase fuzzy value
+ {
+ LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+ List<GTScanRange> r = planner.planScanRanges();
+ assertEquals(1, r.size());
+ assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
+ assertEquals("[[null, 10, null, null, null], [null, 20, null, null, null]]", r.get(0).fuzzyKeys.toString());
+ }
+
+ // pre-evaluate ever false
+ {
+ LogicalTupleFilter filter = and(timeComp1, timeComp2);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+ List<GTScanRange> r = planner.planScanRanges();
+ assertEquals(0, r.size());
+ }
+
+ // pre-evaluate ever true
+ {
+ LogicalTupleFilter filter = or(timeComp1, ageComp4);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+ List<GTScanRange> r = planner.planScanRanges();
+ assertEquals("[[null, null]-[null, null]]", r.toString());
+ }
+
+ // merge overlap range
+ {
+ LogicalTupleFilter filter = or(timeComp1, timeComp3);
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+ List<GTScanRange> r = planner.planScanRanges();
+ assertEquals("[[null, null]-[null, null]]", r.toString());
+ }
+
+ // merge too many ranges
+ {
+ LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2),
+ and(timeComp4, ageComp3));
+ CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+ List<GTScanRange> r = planner.planScanRanges();
+ assertEquals(3, r.size());
+ assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
+ assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString());
+ assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString());
+ planner.setMaxScanRanges(2);
+ List<GTScanRange> r2 = planner.planScanRanges();
+ assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString());
+ }
+ }
+
+ @Test
+ public void verifyFirstRow() throws IOException {
+ doScanAndVerify(table,
+ new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null)
+ .setFilterPushDown(null).createGTScanRequest(),
+ "[1421193600000, 30, Yang, 10, 10.5]", //
+ "[1421193600000, 30, Luke, 10, 10.5]", //
+ "[1421280000000, 20, Dong, 10, 10.5]", //
+ "[1421280000000, 20, Jason, 10, 10.5]", //
+ "[1421280000000, 30, Xu, 10, 10.5]", //
+ "[1421366400000, 20, Mahone, 10, 10.5]", //
+ "[1421366400000, 20, Qianhao, 10, 10.5]", //
+ "[1421366400000, 30, George, 10, 10.5]", //
+ "[1421366400000, 30, Shaofeng, 10, 10.5]", //
+ "[1421452800000, 10, Kejia, 10, 10.5]");
+ }
+
+ //for testing GTScanRequest serialization and deserialization
+ public static GTScanRequest useDeserializedGTScanRequest(GTScanRequest origin) {
+ ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
+ GTScanRequest.serializer.serialize(origin, buffer);
+ buffer.flip();
+ GTScanRequest sGTScanRequest = GTScanRequest.serializer.deserialize(buffer);
+
+ Assert.assertArrayEquals(origin.getAggrMetricsFuncs(), sGTScanRequest.getAggrMetricsFuncs());
+ Assert.assertEquals(origin.getAggCacheMemThreshold(), sGTScanRequest.getAggCacheMemThreshold(), 0.01);
+ return sGTScanRequest;
+ }
+
+ @Test
+ public void verifyScanWithUnevaluatableFilter() throws IOException {
+ GTInfo info = table.getInfo();
+
+ CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+ ExtractTupleFilter fUnevaluatable = unevaluatable(info.colRef(1));
+ LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
+ LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
+
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+ .setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" })
+ .setFilterPushDown(filter).createGTScanRequest();
+
+ // note the unEvaluatable column 1 in filter is added to group by
+ assertEquals(
+ "GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [], []], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]",
+ req.toString());
+
+ doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 20, null]",
+ "[1421280000000, 30, null, 10, null]", "[1421366400000, 20, null, 20, null]",
+ "[1421366400000, 30, null, 20, null]", "[1421452800000, 10, null, 10, null]");
+ }
+
+ @Test
+ public void verifyScanWithEvaluatableFilter() throws IOException {
+ GTInfo info = table.getInfo();
+
+ CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+ CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
+ LogicalTupleFilter filter = and(fComp1, fComp2);
+
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+ .setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" })
+ .setFilterPushDown(filter).createGTScanRequest();
+ // note the evaluatable column 1 in filter is added to returned columns but not in group by
+ assertEquals(
+ "GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]",
+ req.toString());
+
+ doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 30, null]",
+ "[1421366400000, 20, null, 40, null]");
+ }
+
+ @Test
+ public void verifyAggregateAndHavingFilter() throws IOException {
+ GTInfo info = table.getInfo();
+
+ TblColRef havingCol = TblColRef.newInnerColumn("SUM_OF_BIGDECIMAL", InnerDataTypeEnum.LITERAL);
+ havingCol.getColumnDesc().setId("1"); // point to the first aggregated measure
+ CompareTupleFilter havingFilter = compare(havingCol, FilterOperatorEnum.GT, "20");
+
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+ .setAggrGroupBy(setOf(1)).setAggrMetrics(setOf(4)).setAggrMetricsFuncs(new String[] { "sum" })
+ .setHavingFilterPushDown(havingFilter).createGTScanRequest();
+
+ doScanAndVerify(table, useDeserializedGTScanRequest(req), "[null, 20, null, null, 42.0]",
+ "[null, 30, null, null, 52.5]");
+ }
+
+ @SuppressWarnings("unused")
+ private void testFilterScannerPerfInner(GridTable table, GTInfo info, LogicalTupleFilter filter)
+ throws IOException {
+ long start = System.currentTimeMillis();
+ GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+ .setFilterPushDown(filter).createGTScanRequest();
+ int i = 0;
+ try (IGTScanner scanner = table.scan(req)) {
+ for (GTRecord r : scanner) {
+ i++;
+ }
+ }
+ long end = System.currentTimeMillis();
+ System.out.println(
+ (end - start) + "ms with filter cache enabled=" + FilterResultCache.DEFAULT_OPTION + ", " + i + " rows");
+ }
+
+ @Test
+ public void verifyConvertFilterConstants1() {
+ GTInfo info = table.getInfo();
+
+ TableDesc extTable = TableDesc.mockup("ext");
+ TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
+ TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
+
+ CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+ CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.EQ, "10");
+ LogicalTupleFilter filter = and(fComp1, fComp2);
+
+ List<TblColRef> colMapping = Lists.newArrayList();
+ colMapping.add(extColA);
+ colMapping.add(extColB);
+
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals(
+ "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]",
+ newFilter.toString());
+ }
+
+ @Test
+ public void verifyConvertFilterConstants2() {
+ GTInfo info = table.getInfo();
+
+ TableDesc extTable = TableDesc.mockup("ext");
+ TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
+ TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
+
+ List<TblColRef> colMapping = Lists.newArrayList();
+ colMapping.add(extColA);
+ colMapping.add(extColB);
+
+ CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+
+ // $1<"9" round down to FALSE
+ {
+ LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "9"));
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals(
+ "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 ISNOTNULL []]",
+ newFilter.toString());
+ }
+
+ // $1<"10" needs no rounding
+ {
+ LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "10"));
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals(
+ "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]",
+ newFilter.toString());
+ }
+
+ // $1<"11" round down to <="10"
+ {
+ LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "11"));
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals(
+ "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
+ newFilter.toString());
+ }
+
+ // $1<="9" round down to FALSE
+ {
+ LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "9"));
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals(ConstantTupleFilter.FALSE, newFilter);
+ }
+
+ // $1<="10" needs no rounding
+ {
+ LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "10"));
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals(
+ "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
+ newFilter.toString());
+ }
+
+ // $1<="11" round down to <="10"
+ {
+ LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "11"));
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals(
+ "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
+ newFilter.toString());
+ }
+ }
+
+ @Test
+ public void verifyConvertFilterConstants3() {
+ GTInfo info = table.getInfo();
+
+ TableDesc extTable = TableDesc.mockup("ext");
+ TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
+ TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
+
+ List<TblColRef> colMapping = Lists.newArrayList();
+ colMapping.add(extColA);
+ colMapping.add(extColB);
+
+ CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+
+ // $1>"101" round up to FALSE
+ {
+ LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "101"));
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals("AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 ISNOTNULL []]",
+ newFilter.toString());
+ }
+
+ // $1>"100" needs no rounding
+ {
+ LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "100"));
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals(
+ "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GT [\\x09]]",
+ newFilter.toString());
+ }
+
+ // $1>"99" round up to >="100"
+ {
+ LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "99"));
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals(
+ "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
+ newFilter.toString());
+ }
+
+ // $1>="101" round up to FALSE
+ {
+ LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "101"));
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals(ConstantTupleFilter.FALSE, newFilter);
+ }
+
+ // $1>="100" needs no rounding
+ {
+ LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "100"));
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals(
+ "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
+ newFilter.toString());
+ }
+
+ // $1>="99" round up to >="100"
+ {
+ LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "99"));
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals(
+ "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
+ newFilter.toString());
+ }
+ }
+
+ @Test
+ public void verifyConvertFilterConstants4() {
+ GTInfo info = table.getInfo();
+
+ TableDesc extTable = TableDesc.mockup("ext");
+ TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
+ TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
+
+ CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+ CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.IN, "9", "10", "15");
+ LogicalTupleFilter filter = and(fComp1, fComp2);
+
+ List<TblColRef> colMapping = Lists.newArrayList();
+ colMapping.add(extColA);
+ colMapping.add(extColB);
+
+ // $1 in ("9", "10", "15") has only "10" left
+ TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+ assertEquals(
+ "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]",
+ newFilter.toString());
+ }
+
+ private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException {
+ System.out.println(req);
+ try (IGTScanner scanner = table.scan(req)) {
+ int i = 0;
+ for (GTRecord r : scanner) {
+ System.out.println(r);
+ if (verifyRows == null || i >= verifyRows.length) {
+ Assert.fail();
+ }
+ assertEquals(verifyRows[i], r.toString());
+ i++;
+ }
+ }
+ }
+
+ public static ByteArray enc(GTInfo info, int col, String value) {
+ ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
+ info.getCodeSystem().encodeColumnValue(col, value, buf);
+ return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position());
+ }
+
+ public static ExtractTupleFilter unevaluatable(TblColRef col) {
+ ExtractTupleFilter r = new ExtractTupleFilter(FilterOperatorEnum.EXTRACT);
+ r.addChild(new ColumnTupleFilter(col));
+ return r;
+ }
+
+ public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object... value) {
+ CompareTupleFilter result = new CompareTupleFilter(op);
+ result.addChild(new ColumnTupleFilter(col));
+ result.addChild(new ConstantTupleFilter(Arrays.asList(value)));
+ return result;
+ }
+
+ public static LogicalTupleFilter and(TupleFilter... children) {
+ return logic(FilterOperatorEnum.AND, children);
+ }
+
+ public static LogicalTupleFilter or(TupleFilter... children) {
+ return logic(FilterOperatorEnum.OR, children);
+ }
+
+ public static LogicalTupleFilter not(TupleFilter child) {
+ return logic(FilterOperatorEnum.NOT, child);
+ }
+
+ public static LogicalTupleFilter logic(FilterOperatorEnum op, TupleFilter... children) {
+ LogicalTupleFilter result = new LogicalTupleFilter(op);
+ for (TupleFilter c : children) {
+ result.addChild(c);
+ }
+ return result;
+ }
+
+ public static GridTable newTestTable() throws IOException {
+ GTInfo info = newInfo();
+ GTSimpleMemStore store = new GTSimpleMemStore(info);
+ GridTable table = new GridTable(info, store);
+
+ GTRecord r = new GTRecord(table.getInfo());
+ GTBuilder builder = table.rebuild();
+
+ builder.write(r.setValues("2015-01-14", "30", "Yang", new Long(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-14", "30", "Luke", new Long(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-15", "20", "Dong", new Long(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-15", "20", "Jason", new Long(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-15", "30", "Xu", new Long(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "20", "Mahone", new Long(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "20", "Qianhao", new Long(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "30", "George", new Long(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new Long(10), new BigDecimal("10.5")));
+ builder.write(r.setValues("2015-01-17", "10", "Kejia", new Long(10), new BigDecimal("10.5")));
+ builder.close();
+
+ return table;
+ }
+
+ static GridTable newTestPerfTable() throws IOException {
+ GTInfo info = newInfo();
+ GTSimpleMemStore store = new GTSimpleMemStore(info);
+ GridTable table = new GridTable(info, store);
+
+ GTRecord r = new GTRecord(table.getInfo());
+ GTBuilder builder = table.rebuild();
+
+ for (int i = 0; i < 100000; i++) {
+ for (int j = 0; j < 10; j++)
+ builder.write(r.setValues("2015-01-14", "30", "Yang", new Long(10), new BigDecimal("10.5")));
+
+ for (int j = 0; j < 10; j++)
+ builder.write(r.setValues("2015-01-14", "30", "Luke", new Long(10), new BigDecimal("10.5")));
+
+ for (int j = 0; j < 10; j++)
+ builder.write(r.setValues("2015-01-15", "20", "Dong", new Long(10), new BigDecimal("10.5")));
+
+ for (int j = 0; j < 10; j++)
+ builder.write(r.setValues("2015-01-15", "20", "Jason", new Long(10), new BigDecimal("10.5")));
+
+ for (int j = 0; j < 10; j++)
+ builder.write(r.setValues("2015-01-15", "30", "Xu", new Long(10), new BigDecimal("10.5")));
+
+ for (int j = 0; j < 10; j++)
+ builder.write(r.setValues("2015-01-16", "20", "Mahone", new Long(10), new BigDecimal("10.5")));
+
+ for (int j = 0; j < 10; j++)
+ builder.write(r.setValues("2015-01-16", "20", "Qianhao", new Long(10), new BigDecimal("10.5")));
+
+ for (int j = 0; j < 10; j++)
+ builder.write(r.setValues("2015-01-16", "30", "George", new Long(10), new BigDecimal("10.5")));
+
+ for (int j = 0; j < 10; j++)
+ builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new Long(10), new BigDecimal("10.5")));
+
+ for (int j = 0; j < 10; j++)
+ builder.write(r.setValues("2015-01-17", "10", "Kejia", new Long(10), new BigDecimal("10.5")));
+ }
+ builder.close();
+
+ return table;
+ }
+
+ static GTInfo newInfo() {
+ Builder builder = GTInfo.builder();
+ builder.setCodeSystem(newDictCodeSystem());
+ builder.setColumns(//
+ DataType.getType("timestamp"), //
+ DataType.getType("integer"), //
+ DataType.getType("varchar(10)"), //
+ DataType.getType("bigint"), //
+ DataType.getType("decimal") //
+ );
+ builder.setPrimaryKey(setOf(0, 1));
+ builder.setColumnPreferIndex(setOf(0));
+ builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0, 1), setOf(2), setOf(3, 4) });
+ builder.enableRowBlock(4);
+ GTInfo info = builder.build();
+ return info;
+ }
+
+ private static CubeCodeSystem newDictCodeSystem() {
+ DimensionEncoding[] dimEncs = new DimensionEncoding[3];
+ dimEncs[1] = new DictionaryDimEnc(newDictionaryOfInteger());
+ dimEncs[2] = new DictionaryDimEnc(newDictionaryOfString());
+ return new CubeCodeSystem(dimEncs);
+ }
+
+ private static Dictionary newDictionaryOfString() {
+ TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<>(new StringBytesConverter());
+ builder.addValue("Dong");
+ builder.addValue("George");
+ builder.addValue("Jason");
+ builder.addValue("Kejia");
+ builder.addValue("Luke");
+ builder.addValue("Mahone");
+ builder.addValue("Qianhao");
+ builder.addValue("Shaofeng");
+ builder.addValue("Xu");
+ builder.addValue("Yang");
+ return builder.build(0);
+ }
+
+ private static Dictionary newDictionaryOfInteger() {
+ NumberDictionaryForestBuilder builder = new NumberDictionaryForestBuilder();
+ builder.addValue("10");
+ builder.addValue("20");
+ builder.addValue("30");
+ builder.addValue("40");
+ builder.addValue("50");
+ builder.addValue("60");
+ builder.addValue("70");
+ builder.addValue("80");
+ builder.addValue("90");
+ builder.addValue("100");
+ return builder.build();
+ }
+
+ public static ImmutableBitSet setOf(int... values) {
+ BitSet set = new BitSet();
+ for (int i : values)
+ set.set(i);
+ return new ImmutableBitSet(set);
+ }
}
diff --git a/pom.xml b/pom.xml
index 3fe15f2..75d33a9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -284,11 +284,11 @@
<artifactId>kylin-core-metadata</artifactId>
<version>${project.version}</version>
</dependency>
-<!-- <dependency>-->
-<!-- <groupId>org.apache.kylin</groupId>-->
-<!-- <artifactId>kylin-core-dictionary</artifactId>-->
-<!-- <version>${project.version}</version>-->
-<!-- </dependency>-->
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-dictionary</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-core-cube</artifactId>
@@ -1530,7 +1530,7 @@
<module>external</module>
<module>core-common</module>
<module>core-metadata</module>
-<!-- <module>core-dictionary</module>-->
+ <module>core-dictionary</module>
<module>core-cube</module>
<module>core-job</module>
<module>core-storage</module>
@@ -1602,7 +1602,7 @@
<profiles>
<profile>
- <id>spark2</id>
+ <id>sandbox</id>
<activation>
<activeByDefault>true</activeByDefault>
<property>
@@ -1745,6 +1745,114 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>cdh5.7</id>
+ <properties>
+ <hadoop2.version>2.6.0-cdh5.7.0</hadoop2.version>
+ <yarn.version>2.6.0-cdh5.7.0</yarn.version>
+ <hive.version>1.1.0-cdh5.7.0</hive.version>
+ <hive-hcatalog.version>1.1.0-cdh5.7.0</hive-hcatalog.version>
+ <hbase-hadoop2.version>1.2.0-cdh5.7.0</hbase-hadoop2.version>
+ <zookeeper.version>3.4.5-cdh5.7.0</zookeeper.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <fork>true</fork>
+ <meminitial>1024m</meminitial>
+ <maxmem>2048m</maxmem>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-jamm</id>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <phase>generate-test-resources</phase>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>com.github.jbellis</groupId>
+ <artifactId>jamm</artifactId>
+ <outputDirectory>${project.build.testOutputDirectory}
+ </outputDirectory>
+ <destFileName>jamm.jar</destFileName>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ <configuration>
+ <append>true</append>
+ <destFile>
+ ${sonar.jacoco.reportPaths}
+ </destFile>
+ </configuration>
+ <executions>
+ <execution>
+ <id>pre-test</id>
+ <goals>
+ <goal>prepare-agent</goal>
+ </goals>
+ <configuration>
+ <propertyName>surefireArgLine</propertyName>
+ </configuration>
+ </execution>
+ <execution>
+ <id>post-test</id>
+ <phase>test</phase>
+ <goals>
+ <goal>report</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.21.0</version>
+ <configuration>
+ <reportsDirectory>${project.basedir}/../target/surefire-reports
+ </reportsDirectory>
+ <excludes>
+ <exclude>**/IT*.java</exclude>
+ <exclude>org.apache.kylin.engine.spark2.NManualBuildAndQueryCuboidTest</exclude>
+ <exclude>org.apache.kylin.engine.spark2.NBuildAndQueryTest</exclude>
+ <exclude>org.apache.kylin.engine.spark2.NBadQueryAndPushDownTest</exclude>
+ </excludes>
+ <systemProperties>
+ <property>
+ <name>buildCubeUsingProvidedData</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>log4j.configuration</name>
+ <value>
+ file:${project.basedir}/../build/conf/kylin-tools-log4j.properties
+ </value>
+ </property>
+ </systemProperties>
+ <argLine>-javaagent:${project.build.testOutputDirectory}/jamm.jar
+ ${argLine} ${surefireArgLine}
+ </argLine>
+ </configuration>
+ </plugin>
+ </plugins>
+</build>
+</profile>
<profile>
<!-- This profile adds/overrides few features of the 'apache-release'
profile in the parent pom. -->
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
new file mode 100644
index 0000000..ad2e20c
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.enumerator;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.kylin.common.debug.BackdoorToggles;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.RealizationEntry;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.tuple.Tuple;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.query.schema.OLAPTable;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class LookupTableEnumerator implements Enumerator<Object[]> {
+ private final static Logger logger = LoggerFactory.getLogger(LookupTableEnumerator.class);
+
+ private final ILookupTable lookupTable;
+ private final List<ColumnDesc> colDescs;
+ private final Object[] current;
+ private Iterator<String[]> iterator;
+
+ public LookupTableEnumerator(OLAPContext olapContext) {
+
+ //TODO: assuming LookupTableEnumerator is handled by a cube
+ CubeInstance cube = null;
+
+ if (olapContext.realization instanceof CubeInstance) {
+ cube = (CubeInstance) olapContext.realization;
+ ProjectInstance project = cube.getProjectInstance();
+ List<RealizationEntry> realizationEntries = project.getRealizationEntries();
+ String lookupTableName = olapContext.firstTableScan.getTableName();
+ CubeManager cubeMgr = CubeManager.getInstance(cube.getConfig());
+
+ // Make force hit cube in lookup table
+ String forceHitCubeName = BackdoorToggles.getForceHitCube();
+ if (!StringUtil.isEmpty(forceHitCubeName)) {
+ String forceHitCubeNameLower = forceHitCubeName.toLowerCase(Locale.ROOT);
+ String[] forceHitCubeNames = forceHitCubeNameLower.split(",");
+ final Set<String> forceHitCubeNameSet = new HashSet<String>(Arrays.asList(forceHitCubeNames));
+ cube = cubeMgr.findLatestSnapshot(
+ (List<RealizationEntry>) realizationEntries.stream()
+ .filter(x -> forceHitCubeNameSet.contains(x.getRealization().toLowerCase(Locale.ROOT))),
+ lookupTableName, cube);
+ olapContext.realization = cube;
+ } else {
+ cube = cubeMgr.findLatestSnapshot(realizationEntries, lookupTableName, cube);
+ olapContext.realization = cube;
+ }
+ } else if (olapContext.realization instanceof HybridInstance) {
+ final HybridInstance hybridInstance = (HybridInstance) olapContext.realization;
+ final IRealization latestRealization = hybridInstance.getLatestRealization();
+ if (latestRealization instanceof CubeInstance) {
+ cube = (CubeInstance) latestRealization;
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+
+ String lookupTableName = olapContext.firstTableScan.getTableName();
+ DimensionDesc dim = cube.getDescriptor().findDimensionByTable(lookupTableName);
+ if (dim == null)
+ throw new IllegalStateException("No dimension with derived columns found for lookup table " + lookupTableName + ", cube desc " + cube.getDescriptor());
+
+ CubeManager cubeMgr = CubeManager.getInstance(cube.getConfig());
+ this.lookupTable = cubeMgr.getLookupTable(cube.getLatestReadySegment(), dim.getJoin());
+
+ OLAPTable olapTable = (OLAPTable) olapContext.firstTableScan.getOlapTable();
+ this.colDescs = olapTable.getSourceColumns();
+ this.current = new Object[colDescs.size()];
+
+ reset();
+ }
+
+ @Override
+ public boolean moveNext() {
+ boolean hasNext = iterator.hasNext();
+ if (hasNext) {
+ String[] row = iterator.next();
+ for (int i = 0, n = colDescs.size(); i < n; i++) {
+ ColumnDesc colDesc = colDescs.get(i);
+ int colIdx = colDesc.getZeroBasedIndex();
+ if (colIdx >= 0) {
+ current[i] = Tuple.convertOptiqCellValue(row[colIdx], colDesc.getUpgradedType().getName());
+ } else {
+ current[i] = null; // fake column
+ }
+ }
+ }
+ return hasNext;
+ }
+
+ @Override
+ public Object[] current() {
+ // NOTE if without the copy, sql_lookup/query03.sql will yields messy result. Very weird coz other lookup queries are all good.
+ return Arrays.copyOf(current, current.length);
+ }
+
+ @Override
+ public void reset() {
+ this.iterator = lookupTable.iterator();
+ }
+
+ @Override
+ public void close() {
+ try {
+ lookupTable.close();
+ } catch (IOException e) {
+ logger.error("error when close lookup table", e);
+ }
+ }
+
+}
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
index 6c0b5b1..c094ff5 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
@@ -64,6 +64,10 @@ public class OLAPQuery extends AbstractEnumerable<Object[]> implements Enumerabl
case OLAP:
return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator()
: new OLAPEnumerator(olapContext, optiqContext);
+ case LOOKUP_TABLE:
+ return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator() : new LookupTableEnumerator(olapContext);
+ case COL_DICT:
+ return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator() : new DictionaryEnumerator(olapContext);
case HIVE:
return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator() : new HiveEnumerator(olapContext);
default:
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
index 6ab86a8..bfd6c4d 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
@@ -38,6 +38,7 @@ import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.metadata.filter.FilterOptimizeTransformer;
import org.apache.kylin.metadata.filter.LogicalTupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter;
@@ -54,7 +55,7 @@ public class OLAPFilterRel extends Filter implements OLAPRel {
ColumnRowType columnRowType;
OLAPContext context;
- boolean autoJustTimezone = false;
+ boolean autoJustTimezone = KylinConfig.getInstanceFromEnv().getStreamingDerivedTimeTimezone().length() > 0;
public OLAPFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
super(cluster, traits, child, condition);
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
index dd79a4f..ffff10b 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
@@ -37,6 +37,7 @@ import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.util.NlsString;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.metadata.filter.CaseTupleFilter;
@@ -59,6 +60,7 @@ import java.util.GregorianCalendar;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TimeZone;
public class TupleFilterVisitor extends RexVisitorImpl<TupleFilter> {
@@ -68,7 +70,8 @@ public class TupleFilterVisitor extends RexVisitorImpl<TupleFilter> {
// is the fact table is a streamingv2 table
private boolean autoJustByTimezone = false;
- private static final long TIME_ZONE_OFFSET = 0;
+ private static final long TIME_ZONE_OFFSET = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getStreamingDerivedTimeTimezone())
+ .getRawOffset();
public TupleFilterVisitor(ColumnRowType inputRowType) {
super(true);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index a8285ae..00370e3 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -36,7 +36,7 @@ import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.cube.CubeInstance;
-//import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.cuboid.TreeCuboidScheduler;
@@ -308,20 +308,20 @@ public class CubeController extends BasicController {
*
* @throws IOException
*/
-// @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", method = {
-// RequestMethod.PUT }, produces = { "application/json" })
-// @ResponseBody
-// public CubeInstance rebuildLookupSnapshot(@PathVariable String cubeName, @PathVariable String segmentName,
-// @RequestParam(value = "lookupTable") String lookupTable) {
-// try {
-// final CubeManager cubeMgr = cubeService.getCubeManager();
-// final CubeInstance cube = cubeMgr.getCube(cubeName);
-// return cubeService.rebuildLookupSnapshot(cube, segmentName, lookupTable);
-// } catch (IOException e) {
-// logger.error(e.getLocalizedMessage(), e);
-// throw new InternalErrorException(e.getLocalizedMessage(), e);
-// }
-// }
+ @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", method = {
+ RequestMethod.PUT }, produces = { "application/json" })
+ @ResponseBody
+ public CubeInstance rebuildLookupSnapshot(@PathVariable String cubeName, @PathVariable String segmentName,
+ @RequestParam(value = "lookupTable") String lookupTable) {
+ try {
+ final CubeManager cubeMgr = cubeService.getCubeManager();
+ final CubeInstance cube = cubeMgr.getCube(cubeName);
+ return cubeService.rebuildLookupSnapshot(cube, segmentName, lookupTable);
+ } catch (IOException e) {
+ logger.error(e.getLocalizedMessage(), e);
+ throw new InternalErrorException(e.getLocalizedMessage(), e);
+ }
+ }
/**
* Delete a cube segment
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index dd45599..bd30109 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -64,7 +64,7 @@ import org.apache.kylin.metadata.model.IStorageAware;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
-//import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.project.RealizationEntry;
@@ -592,20 +592,20 @@ public class CubeService extends BasicService implements InitializingBean {
return getCubeManager().updateCube(update);
}
-// public CubeInstance rebuildLookupSnapshot(CubeInstance cube, String segmentName, String lookupTable)
-// throws IOException {
-// aclEvaluate.checkProjectOperationPermission(cube);
-// Message msg = MsgPicker.getMsg();
-// TableDesc tableDesc = getTableManager().getTableDesc(lookupTable, cube.getProject());
-// if (tableDesc.isView()) {
-// throw new BadRequestException(
-// String.format(Locale.ROOT, msg.getREBUILD_SNAPSHOT_OF_VIEW(), tableDesc.getName()));
-// }
-// CubeSegment seg = cube.getSegment(segmentName, SegmentStatusEnum.READY);
-// getCubeManager().buildSnapshotTable(seg, lookupTable, null);
-//
-// return cube;
-// }
+ public CubeInstance rebuildLookupSnapshot(CubeInstance cube, String segmentName, String lookupTable)
+ throws IOException {
+ aclEvaluate.checkProjectOperationPermission(cube);
+ Message msg = MsgPicker.getMsg();
+ TableDesc tableDesc = getTableManager().getTableDesc(lookupTable, cube.getProject());
+ if (tableDesc.isView()) {
+ throw new BadRequestException(
+ String.format(Locale.ROOT, msg.getREBUILD_SNAPSHOT_OF_VIEW(), tableDesc.getName()));
+ }
+ CubeSegment seg = cube.getSegment(segmentName, SegmentStatusEnum.READY);
+ getCubeManager().buildSnapshotTable(seg, lookupTable, null);
+
+ return cube;
+ }
public CubeInstance deleteSegmentById(CubeInstance cube, String uuid) throws IOException {
aclEvaluate.checkProjectWritePermission(cube);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index 966fd1f..6f282ff 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -47,6 +47,9 @@ import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.engine.spark.source.CsvSource;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.CsvColumnDesc;
@@ -60,6 +63,8 @@ import org.apache.kylin.rest.msg.MsgPicker;
import org.apache.kylin.rest.response.TableDescResponse;
import org.apache.kylin.rest.response.TableSnapshotResponse;
import org.apache.kylin.rest.util.AclEvaluate;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.IReadableTable.TableSignature;
import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ISourceMetadataExplorer;
import org.apache.kylin.source.SourceManager;
@@ -357,32 +362,38 @@ public class TableService extends BasicService {
// }
public List<TableSnapshotResponse> getLookupTableSnapshots(String project, String tableName) throws IOException {
- return Lists.newArrayList();
+ TableDesc tableDesc = getTableManager().getTableDesc(tableName, project);
+ if (SourceManager.getSource(tableDesc).getClass() == CsvSource.class) {
+ return new ArrayList<>();
+ }
+ IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, null);
+ TableSignature signature = hiveTable.getSignature();
+ return internalGetLookupTableSnapshots(tableName, signature);
+ }
+
+ List<TableSnapshotResponse> internalGetLookupTableSnapshots(String tableName, TableSignature signature)
+ throws IOException {
+ SnapshotManager snapshotManager = SnapshotManager.getInstance(getConfig());
+ List<SnapshotTable> metaStoreTableSnapshots = snapshotManager.getSnapshots(tableName, signature);
+
+ Map<String, List<String>> snapshotUsageMap = getSnapshotUsages();
+
+ List<TableSnapshotResponse> result = Lists.newArrayList();
+
+ for (SnapshotTable metaStoreTableSnapshot : metaStoreTableSnapshots) {
+ TableSnapshotResponse response = new TableSnapshotResponse();
+ response.setSnapshotID(metaStoreTableSnapshot.getId());
+ response.setSnapshotType(TableSnapshotResponse.TYPE_INNER);
+ response.setLastBuildTime(metaStoreTableSnapshot.getLastBuildTime());
+ response.setStorageType(SnapshotTable.STORAGE_TYPE_METASTORE);
+ response.setSourceTableSize(metaStoreTableSnapshot.getSignature().getSize());
+ response.setSourceTableLastModifyTime(metaStoreTableSnapshot.getSignature().getLastModifiedTime());
+ response.setCubesAndSegmentsUsage(snapshotUsageMap.get(metaStoreTableSnapshot.getResourcePath()));
+ result.add(response);
+ }
+
+ return result;
}
-//
-// List<TableSnapshotResponse> internalGetLookupTableSnapshots(String tableName, TableSignature signature)
-// throws IOException {
-// SnapshotManager snapshotManager = SnapshotManager.getInstance(getConfig());
-// List<SnapshotTable> metaStoreTableSnapshots = snapshotManager.getSnapshots(tableName, signature);
-//
-// Map<String, List<String>> snapshotUsageMap = getSnapshotUsages();
-//
-// List<TableSnapshotResponse> result = Lists.newArrayList();
-//
-// for (SnapshotTable metaStoreTableSnapshot : metaStoreTableSnapshots) {
-// TableSnapshotResponse response = new TableSnapshotResponse();
-// response.setSnapshotID(metaStoreTableSnapshot.getId());
-// response.setSnapshotType(TableSnapshotResponse.TYPE_INNER);
-// response.setLastBuildTime(metaStoreTableSnapshot.getLastBuildTime());
-// response.setStorageType(SnapshotTable.STORAGE_TYPE_METASTORE);
-// response.setSourceTableSize(metaStoreTableSnapshot.getSignature().getSize());
-// response.setSourceTableLastModifyTime(metaStoreTableSnapshot.getSignature().getLastModifiedTime());
-// response.setCubesAndSegmentsUsage(snapshotUsageMap.get(metaStoreTableSnapshot.getResourcePath()));
-// result.add(response);
-// }
-//
-// return result;
-// }
/**
* @return Map of SnapshotID, CubeName or SegmentName list
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 6236ef9..8ff5719 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Base64;
+import java.util.Objects;
import java.util.Set;
import java.util.Locale;
import java.util.Collections;
@@ -92,15 +93,15 @@ public class HiveInputBase {
// create global dict
KylinConfig dictConfig = (flatDesc.getSegment()).getConfig();
-// String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns();
-// if (mrHiveDictColumns.length > 0) {
-// String globalDictDatabase = dictConfig.getMrHiveDictDB();
-// if (null == globalDictDatabase) {
-// throw new IllegalArgumentException("Mr-Hive Global dict database is null.");
-// }
-// String globalDictTable = cubeName + dictConfig.getMrHiveDictTableSuffix();
-// addStepPhase1_DoCreateMrHiveGlobalDict(jobFlow, mrHiveDictColumns, globalDictDatabase, globalDictTable);
-// }
+ String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns();
+ if (mrHiveDictColumns.length > 0) {
+ String globalDictDatabase = dictConfig.getMrHiveDictDB();
+ if (null == globalDictDatabase) {
+ throw new IllegalArgumentException("Mr-Hive Global dict database is null.");
+ }
+ String globalDictTable = cubeName + dictConfig.getMrHiveDictTableSuffix();
+ addStepPhase1_DoCreateMrHiveGlobalDict(jobFlow, mrHiveDictColumns, globalDictDatabase, globalDictTable);
+ }
// then count and redistribute
if (cubeConfig.isHiveRedistributeEnabled()) {
@@ -288,13 +289,13 @@ public class HiveInputBase {
deleteTables.add(getIntermediateTableIdentity());
// mr-hive dict and inner table do not need delete hdfs
-// String[] mrHiveDicts = flatDesc.getSegment().getConfig().getMrHiveDictColumns();
-// if (Objects.nonNull(mrHiveDicts) && mrHiveDicts.length > 0) {
-// String dictDb = flatDesc.getSegment().getConfig().getMrHiveDictDB();
-// String tableName = dictDb + "." + flatDesc.getTableName() + "_"
-// + MRHiveDictUtil.DictHiveType.GroupBy.getName();
-// deleteTables.add(tableName);
-// }
+ String[] mrHiveDicts = flatDesc.getSegment().getConfig().getMrHiveDictColumns();
+ if (Objects.nonNull(mrHiveDicts) && mrHiveDicts.length > 0) {
+ String dictDb = flatDesc.getSegment().getConfig().getMrHiveDictDB();
+ String tableName = dictDb + "." + flatDesc.getTableName() + "_"
+ + MRHiveDictUtil.DictHiveType.GroupBy.getName();
+ deleteTables.add(tableName);
+ }
step.setIntermediateTables(deleteTables);
step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));