You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/02/21 03:21:30 UTC
[kylin] 01/06: KYLIN-4336 Global domain dict for MR build engine
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 4a484f861550f07417e5890bc8478f182f922052
Author: wangxiaojing <wa...@didichuxing.com>
AuthorDate: Mon Jan 13 19:39:31 2020 +0800
KYLIN-4336 Global domain dict for MR build engine
---
.../java/org/apache/kylin/cube/CubeManager.java | 15 +-
.../java/org/apache/kylin/cube/model/CubeDesc.java | 26 ++++
.../CubeDescTiretreeGlobalDomainDictUtil.java | 164 +++++++++++++++++++++
.../apache/kylin/cube/model/DictionaryDesc.java | 42 ++++++
.../cube/model/validation/rule/DictionaryRule.java | 12 +-
.../kylin/measure/bitmap/BitmapMeasureType.java | 8 +-
.../apache/kylin/metadata/model/DataModelDesc.java | 20 ++-
.../kylin/engine/mr/common/AbstractHadoopJob.java | 4 +
8 files changed, 278 insertions(+), 13 deletions(-)
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index d057982..7a44f60 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -45,6 +46,7 @@ import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeDescTiretreeGlobalDomainDictUtil;
import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.cube.model.SnapshotTableDesc;
import org.apache.kylin.dict.DictionaryInfo;
@@ -1185,9 +1187,20 @@ public class CubeManager implements IRealizationProvider {
@SuppressWarnings("unchecked")
public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
DictionaryInfo info = null;
+ String dictResPath = null;
try {
DictionaryManager dictMgr = getDictionaryManager();
- String dictResPath = cubeSeg.getDictResPath(col);
+
+ //tiretree global domain dic
+ List<CubeDescTiretreeGlobalDomainDictUtil.GlobalDict> globalDicts = cubeSeg.getCubeDesc().listDomainDict();
+ if (!globalDicts.isEmpty()) {
+ dictResPath = CubeDescTiretreeGlobalDomainDictUtil.globalReuseDictPath(cubeSeg.getConfig(), col, cubeSeg.getCubeDesc());
+ }
+
+ if (Objects.isNull(dictResPath)){
+ dictResPath = cubeSeg.getDictResPath(col);
+ }
+
if (dictResPath == null)
return null;
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index f5bb427..7c16d33 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -40,6 +40,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
@@ -1375,6 +1376,13 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
result.remove(dictDesc.getColumnRef());
result.add(dictDesc.getResuseColumnRef());
}
+
+ //tiretree global domain dic
+ if (Objects.isNull(dictDesc.getResuseColumnRef()) && Objects.nonNull(dictDesc.getReuseColumn())) {
+ logger.info("tiretree global domain dic : column {} use tiretree global domain dic, reuse column {} ", dictDesc.getColumnRef(), dictDesc.getReuseColumn());
+ result.remove(dictDesc.getColumnRef());
+ }
+
}
}
@@ -1382,6 +1390,24 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
}
/**
+ * get tiretree global domain dic
+ *
+ * @return
+ */
+ public List<CubeDescTiretreeGlobalDomainDictUtil.GlobalDict> listDomainDict() {
+ List<CubeDescTiretreeGlobalDomainDictUtil.GlobalDict> dicts = new ArrayList<>();
+ if(dictionaries!=null && dictionaries.size()>0) {
+ for (DictionaryDesc dictionaryDesc : dictionaries) {
+ if (dictionaryDesc.isDomain()) {
+ dicts.add(new CubeDescTiretreeGlobalDomainDictUtil.GlobalDict(dictionaryDesc.getColumnRef(), dictionaryDesc.getReuseColumn(), dictionaryDesc.getCube(), dictionaryDesc.getModel()));
+ }
+ }
+ }
+ return dicts;
+ }
+
+
+ /**
* A column may reuse dictionary of another column, find the dict column, return same col if there's no reuse column
*/
public TblColRef getDictionaryReuseColumn(TblColRef col) {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDescTiretreeGlobalDomainDictUtil.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDescTiretreeGlobalDomainDictUtil.java
new file mode 100644
index 0000000..999fcae
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDescTiretreeGlobalDomainDictUtil.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.model;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.DataModelManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.SourceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+public class CubeDescTiretreeGlobalDomainDictUtil {
+ private static final Logger logger = LoggerFactory.getLogger(CubeDescTiretreeGlobalDomainDictUtil.class);
+
+ /**
+ * get reuse global tiretree global dic path
+ * @param tblColRef
+ * @param cubeDesc
+ * @return
+ */
+ public static String globalReuseDictPath(KylinConfig config, TblColRef tblColRef, CubeDesc cubeDesc) {
+ String globalResumeDictPath = null;
+ List<GlobalDict> globalDicts = cubeDesc.listDomainDict();
+ DataModelManager metadataManager = DataModelManager.getInstance(config);
+ CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ for (GlobalDict dict : globalDicts) {
+ if (dict.getSrc().getIdentity().equalsIgnoreCase(tblColRef.getIdentity())) {
+ String model = dict.getModel();
+ String cube = dict.getCube();
+ logger.info("cube:{} column:{} tiretree global domain dic reuse model:{} cube{} column:{} ", cubeDesc.getName() , tblColRef.getName(), model, cube, dict.getDesc());
+
+ DataModelDesc dataModel = metadataManager.getDataModelDesc(model);
+ if (Objects.isNull(dataModel)) {
+ logger.error("get cube:{} column:{} tiretree global domain dic reuse DataModelDesc error", cubeDesc.getName(), tblColRef.getName());
+ return null;
+ }
+
+ CubeInstance cubeInstance = cubeManager.getCube(cube);
+ CubeSegment cubeSegment = cubeInstance.getLatestReadySegment();
+
+ TblColRef colRef = dataModel.findColumn(dict.getDesc());
+ if(Objects.isNull(colRef)){
+ logger.error("get cube:{} column:{} tiretree global domain dic TblColRef error");
+ return null;
+ }
+
+ globalResumeDictPath = cubeSegment.getDictResPath(colRef);
+
+ if (StringUtils.isBlank(globalResumeDictPath)) {
+ logger.error("get cube:{} column:{} tiretree global domain dic resume dict path error");
+ }
+ logger.error("get cube:{} column:{} tiretree global domain dic resume dict path is {}", globalResumeDictPath);
+ break;
+ }
+ }
+ return globalResumeDictPath;
+ }
+
+
+
+
+ /**
+ * add resuce global tiretree global dic for baseid job
+ * @param cubeDesc
+ * @param dumpList
+ */
+ public static void cuboidJob(CubeDesc cubeDesc, Set<String> dumpList) {
+ logger.info("cube {} start to add global domain dic", cubeDesc.getName());
+ CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ DataModelManager metadataManager =DataModelManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+ cubeManager.getCube(cubeDesc.getName());
+ List<GlobalDict> globalDicts = cubeDesc.listDomainDict();
+
+ for (GlobalDict dict : globalDicts) {
+ String cube = dict.getCube();
+ String model = dict.getModel();
+ logger.debug("cube {} column {} start to add global domain dic ,reuse {}.{}.{}", cubeDesc.getName(), dict.getSrc(), model, cube, dict.getDesc());
+ CubeInstance instance = cubeManager.getCube(cube);
+ logger.debug("cube {} column {} start to add global domain dic ,reuse cube{} dict", cubeDesc.getName(), dict.getSrc(), instance.getName());
+
+ // cube, model_desc, cube_desc, table
+ dumpList.add(instance.getResourcePath());
+ dumpList.add(instance.getDescriptor().getModel().getResourcePath());
+ dumpList.add(instance.getDescriptor().getResourcePath());
+ dumpList.add(instance.getProjectInstance().getResourcePath());
+
+ for (TableRef tableRef : instance.getDescriptor().getModel().getAllTables()) {
+ TableDesc table = tableRef.getTableDesc();
+ dumpList.add(table.getResourcePath());
+ dumpList.addAll(SourceManager.getMRDependentResources(table));
+ }
+
+ DataModelDesc dataModelDesc = metadataManager.getDataModelDesc(model);
+ logger.debug("cube {} column {} start to add global domain dic ,reuse model{} dict", cubeDesc.getName(), dict.getSrc(), dataModelDesc.getName());
+ TblColRef tblColRef = dataModelDesc.findColumn(dict.getDesc());
+ CubeSegment segment = instance.getLatestReadySegment();
+ logger.debug("cube {} column {} start to add global domain dic ,reuse mode:{} cube:{} segment:{} dict,tblColRef:{}", cubeDesc.getName(), dict.getSrc(), dataModelDesc.getName(), cube, segment.getName(), tblColRef.getIdentity());
+ if(segment.getDictResPath(tblColRef)!=null) {
+ dumpList.addAll(ImmutableList.of(segment.getDictResPath(tblColRef)));
+ }
+ }
+ }
+
+
+ public static class GlobalDict implements Serializable {
+ private TblColRef src;
+ private String desc;
+ private String cube;
+ private String model;
+
+ public GlobalDict(TblColRef src, String desc, String cube, String model) {
+ this.src = src;
+ this.desc = desc;
+ this.cube = cube;
+ this.model = model;
+ }
+
+ public TblColRef getSrc() {
+ return src;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+
+ public String getCube() {
+ return cube;
+ }
+
+ public String getModel() {
+ return model;
+ }
+ }
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java
index a700e10..2d1ba99 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java
@@ -19,6 +19,7 @@
package org.apache.kylin.cube.model;
import java.util.Locale;
+import java.util.Objects;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -40,6 +41,15 @@ public class DictionaryDesc implements java.io.Serializable {
@JsonInclude(JsonInclude.Include.NON_NULL)
private String builderClass;
+ //for tiretree global domain dic
+ @JsonProperty("cube")
+ private String cube;
+
+ //for tiretree global domain dic
+ @JsonProperty("model")
+ private String model;
+
+
// computed content
private TblColRef colRef;
private TblColRef reuseColRef;
@@ -68,6 +78,38 @@ public class DictionaryDesc implements java.io.Serializable {
return builderClass;
}
+ public String getModel() {
+ return model;
+ }
+
+ public void setModel(String model) {
+ this.model = model;
+ }
+
+ public String getCube() {
+ return cube;
+ }
+
+ public void setCube(String cube) {
+ this.cube = cube;
+ }
+
+ public String getReuseColumn() {
+ return reuseColumn;
+ }
+
+ /**
+ * check if the col is tiretree global domain dic
+ * @return
+ */
+ public boolean isDomain() {
+ if (Objects.isNull(reuseColRef) && Objects.nonNull(reuseColumn)) {
+ return true;
+ }
+ return false;
+ }
+
+
// for test
public static DictionaryDesc create(String column, String reuseColumn, String builderClass) {
DictionaryDesc desc = new DictionaryDesc();
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
index df1316d..9023f28 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
@@ -32,6 +32,8 @@ import org.apache.kylin.cube.model.validation.ResultLevel;
import org.apache.kylin.cube.model.validation.ValidateContext;
import org.apache.kylin.dict.GlobalDictionaryBuilder;
import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Validate Dictionary Settings:
@@ -43,6 +45,8 @@ import org.apache.kylin.metadata.model.TblColRef;
* </ul>
*/
public class DictionaryRule implements IValidatorRule<CubeDesc> {
+ private static final Logger logger = LoggerFactory.getLogger(DictionaryRule.class);
+
static final String ERROR_DUPLICATE_DICTIONARY_COLUMN = "Duplicated dictionary specification for column: ";
static final String ERROR_REUSE_BUILDER_BOTH_SET = "REUSE and BUILDER both set on dictionary for column: ";
static final String ERROR_REUSE_BUILDER_BOTH_EMPTY = "REUSE and BUILDER both empty on dictionary for column: ";
@@ -80,8 +84,12 @@ public class DictionaryRule implements IValidatorRule<CubeDesc> {
}
if (reuseCol == null && StringUtils.isEmpty(builderClass)) {
- context.addResult(ResultLevel.ERROR, ERROR_REUSE_BUILDER_BOTH_EMPTY + dictCol);
- return;
+ if(dictDesc.isDomain()) {
+ logger.info("() is tiretree global domain dic", dictCol);
+ }else{
+ context.addResult(ResultLevel.ERROR, ERROR_REUSE_BUILDER_BOTH_EMPTY + dictCol);
+ return;
+ }
}
if (StringUtils.isNotEmpty(builderClass) && builderClass.equalsIgnoreCase(GlobalDictionaryBuilder.class.getName()) && dimensionColumns.contains(dictCol) && rowKeyDesc.isUseDictionary(dictCol)) {
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
index 7134c6b..450a62a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
@@ -110,10 +110,10 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
}
int id;
- if (needDictionaryColumn(measureDesc.getFunction())) {
- TblColRef literalCol = measureDesc.getFunction().getParameter().getColRefs().get(0);
- Dictionary<String> dictionary = dictionaryMap.get(literalCol);
- id = dictionary.getIdFromValue(values[0]);
+ TblColRef literalCol = measureDesc.getFunction().getParameter().getColRefs().get(0);
+ if (needDictionaryColumn(measureDesc.getFunction()) && dictionaryMap.containsKey(literalCol)) {
+ Dictionary<String> dictionary = dictionaryMap.get(literalCol);
+ id = dictionary.getIdFromValue(values[0]);
} else {
id = Integer.parseInt(values[0]);
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
index 818afdf..12dd63f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.Queue;
import java.util.Set;
@@ -285,10 +286,16 @@ public class DataModelDesc extends RootPersistentEntity {
}
public TblColRef findColumn(String table, String column) throws IllegalArgumentException {
+ TblColRef result = null;
TableRef tableRef = findTable(table);
- TblColRef result = tableRef.getColumn(column.toUpperCase(Locale.ROOT));
- if (result == null)
- throw new IllegalArgumentException("Column not found by " + table + "." + column);
+ if (Objects.nonNull(tableRef)) {
+ result = tableRef.getColumn(column.toUpperCase(Locale.ROOT));;
+ }
+
+ if (result == null) {//tiretree global domain dic
+ logger.warn("table {} column {} not found in its's model {} , maybe it's a tiretree global domain dict. ", table, column, getName() );
+ }
+
return result;
}
@@ -310,8 +317,9 @@ public class DataModelDesc extends RootPersistentEntity {
}
}
- if (result == null)
- throw new IllegalArgumentException("Column not found by " + input);
+ if (result == null) {
+ logger.warn("Column {} not found in its's model {} , maybe it's a tiretree global domain dict. ", column, getName() );
+ }
return result;
}
@@ -320,7 +328,7 @@ public class DataModelDesc extends RootPersistentEntity {
public TableRef findTable(String table) throws IllegalArgumentException {
TableRef result = tableNameMap.get(table.toUpperCase(Locale.ROOT));
if (result == null) {
- throw new IllegalArgumentException("Table not found by " + table);
+ logger.warn("table {} not found in its's model {} , maybe it's a tiretree global domain dict. ", table, getName() );
}
return result;
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 5e49c76..fd4d413 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -69,6 +69,7 @@ import org.apache.kylin.common.util.StringSplitter;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDescTiretreeGlobalDomainDictUtil;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.metadata.model.TableDesc;
@@ -586,6 +587,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
if (ifStatsIncluded) {
dumpList.add(segment.getStatisticsResourcePath());
}
+ //tiretree global domain dic
+ CubeDescTiretreeGlobalDomainDictUtil.cuboidJob(segment.getCubeDesc(), dumpList);
+
dumpKylinPropsAndMetadata(segment.getProject(), dumpList, segment.getConfig(), conf);
}