You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/03/24 08:54:07 UTC
[kylin] 02/02: KYLIN-4937 Verify the uniqueness of the global
dictionary after building global dictionary
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 1d5f176fc3c940309dd9fd91f68b2bcd14f37975
Author: Zhichao Zhang <zh...@apache.org>
AuthorDate: Sat Mar 20 15:53:02 2021 +0800
KYLIN-4937 Verify the uniqueness of the global dictionary after building global dictionary
---
.../org/apache/kylin/common/KylinConfigBase.java | 4 +++
.../spark/dict/NGlobalDictBuilderAssist.scala | 30 ++++++++++++++++++++++
.../spark/builder/CubeDictionaryBuilder.scala | 12 ++++++---
.../engine/spark/builder/TestCreateFlatTable.scala | 2 +-
4 files changed, 43 insertions(+), 5 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 2ed1cb5..f255c1c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -699,6 +699,10 @@ public abstract class KylinConfigBase implements Serializable {
return Long.parseLong(getOptional("kylin.dictionary.globalV2-version-ttl", "259200000"));
}
+ public boolean isCheckGlobalDictV2() {
+ return Boolean.parseBoolean(getOptional("kylin.dictionary.globalV2-check", "true"));
+ }
+
// ============================================================================
// CUBE
// ============================================================================
diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
index a16a9da..dbac8b8 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
+++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
@@ -71,4 +71,34 @@ object NGlobalDictBuilderAssist extends Logging {
desc.kylinconf.getGlobalDictV2MaxVersions, desc.kylinconf.getGlobalDictV2VersionTTL)
}
+ /**
+ * check the global dict
+ */
+ @throws[IOException]
+ def checkGlobalDict(ref: ColumnDesc, desc: SegmentInfo, bucketPartitionSize: Int,
+ ss: SparkSession): Unit = {
+ if (desc.kylinconf.isCheckGlobalDictV2) {
+ val globalDict = new NGlobalDictionary(desc.project, ref.tableAliasName, ref.columnName, desc.kylinconf.getHdfsWorkingDirectory)
+ val broadcastDict = ss.sparkContext.broadcast(globalDict)
+ import ss.implicits._
+ val existsDictDs = ss.createDataset(0 to bucketPartitionSize)
+ .flatMap {
+ bucketId =>
+ val gDict: NGlobalDictionary = broadcastDict.value
+ val bucketDict: NBucketDictionary = gDict.loadBucketDictionary(bucketId)
+ val tupleList = new util.ArrayList[(String, Long)](bucketDict.getAbsoluteDictMap.size)
+ bucketDict.getAbsoluteDictMap.object2LongEntrySet.asScala
+ .foreach(dictTuple => tupleList.add((dictTuple.getKey, dictTuple.getLongValue)))
+ tupleList.asScala.iterator
+ }
+ val valueCount = existsDictDs.dropDuplicates("_1").count()
+ val keyCount = existsDictDs.dropDuplicates("_2").count()
+ if (valueCount != keyCount) {
+ logError(s"Global dict build error on column ${ref.columnName}, " +
+ s"key distinct count is ${keyCount}, and value distinct count is ${valueCount}.")
+ throw new RuntimeException(s"Global dict build error on column ${ref.columnName}, " +
+ s"key distinct count is ${keyCount}, and value distinct count is ${valueCount}.")
+ }
+ }
+ }
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
index 5c15f7d..b39486b 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
@@ -19,14 +19,13 @@ package org.apache.kylin.engine.spark.builder
import java.io.IOException
import java.util
-
import org.apache.kylin.common.KylinConfig
import org.apache.kylin.common.lock.DistributedLock
import org.apache.kylin.common.util.HadoopUtil
import org.apache.kylin.engine.spark.builder.CubeBuilderHelper._
import org.apache.kylin.engine.spark.job.NSparkCubingUtil
import org.apache.kylin.engine.spark.metadata.{ColumnDesc, SegmentInfo}
-import org.apache.spark.dict.NGlobalDictionary
+import org.apache.spark.dict.{NGlobalDictBuilderAssist, NGlobalDictionary}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.functions.{col, expr}
import org.apache.spark.sql.types.StringType
@@ -54,8 +53,10 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row],
logInfo(s"Start building global dictionaries V2 for seg $seg")
val m = s"Build global dictionaries V2 for seg $seg succeeded"
time(m, colRefSet.asScala.foreach(col => safeBuild(col)))
- // set the original value to 'spark.sql.adaptive.enabled'
- ss.conf.set("spark.sql.adaptive.enabled", aeOriginalValue);
+ if (aeOriginalValue) {
+ // set the original value to 'spark.sql.adaptive.enabled'
+ ss.conf.set("spark.sql.adaptive.enabled", aeOriginalValue);
+ }
}
@throws[IOException]
@@ -95,6 +96,9 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row],
}
globalDict.writeMetaDict(bucketPartitionSize, seg.kylinconf.getGlobalDictV2MaxVersions, seg.kylinconf.getGlobalDictV2VersionTTL)
+
+ // after writing global dict, check the uniqueness for global dict
+ NGlobalDictBuilderAssist.checkGlobalDict(ref, seg, bucketPartitionSize, ss)
}
private def getLockPath(pathName: String) = s"/${seg.project}${HadoopUtil.GLOBAL_DICT_STORAGE_ROOT}/$pathName/lock"
diff --git a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
index fe84f82..3ecdfb5 100644
--- a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
@@ -97,7 +97,7 @@ class TestCreateFlatTable extends SparderBaseFunSuite with SharedSparkSession wi
afterJoin1.collect()
val jobs = helper.getJobsByGroupId(groupId)
- Assert.assertEquals(jobs.length, 13)
+ Assert.assertEquals(jobs.length, 15)
DefaultScheduler.destroyInstance()
}