You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/03/24 08:54:07 UTC

[kylin] 02/02: KYLIN-4937 Verify the uniqueness of the global dictionary after building global dictionary

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 1d5f176fc3c940309dd9fd91f68b2bcd14f37975
Author: Zhichao Zhang <zh...@apache.org>
AuthorDate: Sat Mar 20 15:53:02 2021 +0800

    KYLIN-4937 Verify the uniqueness of the global dictionary after building global dictionary
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 +++
 .../spark/dict/NGlobalDictBuilderAssist.scala      | 30 ++++++++++++++++++++++
 .../spark/builder/CubeDictionaryBuilder.scala      | 12 ++++++---
 .../engine/spark/builder/TestCreateFlatTable.scala |  2 +-
 4 files changed, 43 insertions(+), 5 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 2ed1cb5..f255c1c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -699,6 +699,10 @@ public abstract class KylinConfigBase implements Serializable {
         return Long.parseLong(getOptional("kylin.dictionary.globalV2-version-ttl", "259200000"));
     }
 
+    public boolean isCheckGlobalDictV2() {
+        return Boolean.parseBoolean(getOptional("kylin.dictionary.globalV2-check", "true"));
+    }
+
     // ============================================================================
     // CUBE
     // ============================================================================
diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
index a16a9da..dbac8b8 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
+++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
@@ -71,4 +71,34 @@ object NGlobalDictBuilderAssist extends Logging {
       desc.kylinconf.getGlobalDictV2MaxVersions, desc.kylinconf.getGlobalDictV2VersionTTL)
   }
 
+  /**
+   * check the global dict
+   */
+  @throws[IOException]
+  def checkGlobalDict(ref: ColumnDesc, desc: SegmentInfo, bucketPartitionSize: Int,
+                      ss: SparkSession): Unit = {
+    if (desc.kylinconf.isCheckGlobalDictV2) {
+      val globalDict = new NGlobalDictionary(desc.project, ref.tableAliasName, ref.columnName, desc.kylinconf.getHdfsWorkingDirectory)
+      val broadcastDict = ss.sparkContext.broadcast(globalDict)
+      import ss.implicits._
+      val existsDictDs = ss.createDataset(0 to bucketPartitionSize)
+        .flatMap {
+          bucketId =>
+            val gDict: NGlobalDictionary = broadcastDict.value
+            val bucketDict: NBucketDictionary = gDict.loadBucketDictionary(bucketId)
+            val tupleList = new util.ArrayList[(String, Long)](bucketDict.getAbsoluteDictMap.size)
+            bucketDict.getAbsoluteDictMap.object2LongEntrySet.asScala
+              .foreach(dictTuple => tupleList.add((dictTuple.getKey, dictTuple.getLongValue)))
+            tupleList.asScala.iterator
+        }
+      val valueCount = existsDictDs.dropDuplicates("_1").count()
+      val keyCount = existsDictDs.dropDuplicates("_2").count()
+      if (valueCount != keyCount) {
+        logError(s"Global dict build error on column ${ref.columnName}, " +
+          s"key distinct count is ${keyCount}, and value distinct count is ${valueCount}.")
+        throw new RuntimeException(s"Global dict build error on column ${ref.columnName}, " +
+          s"key distinct count is ${keyCount}, and value distinct count is ${valueCount}.")
+      }
+    }
+  }
 }
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
index 5c15f7d..b39486b 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
@@ -19,14 +19,13 @@ package org.apache.kylin.engine.spark.builder
 
 import java.io.IOException
 import java.util
-
 import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.common.lock.DistributedLock
 import org.apache.kylin.common.util.HadoopUtil
 import org.apache.kylin.engine.spark.builder.CubeBuilderHelper._
 import org.apache.kylin.engine.spark.job.NSparkCubingUtil
 import org.apache.kylin.engine.spark.metadata.{ColumnDesc, SegmentInfo}
-import org.apache.spark.dict.NGlobalDictionary
+import org.apache.spark.dict.{NGlobalDictBuilderAssist, NGlobalDictionary}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.functions.{col, expr}
 import org.apache.spark.sql.types.StringType
@@ -54,8 +53,10 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row],
     logInfo(s"Start building global dictionaries V2 for seg $seg")
     val m = s"Build global dictionaries V2 for seg $seg succeeded"
     time(m, colRefSet.asScala.foreach(col => safeBuild(col)))
-    // set the original value to 'spark.sql.adaptive.enabled'
-    ss.conf.set("spark.sql.adaptive.enabled", aeOriginalValue);
+    if (aeOriginalValue) {
+      // set the original value to 'spark.sql.adaptive.enabled'
+      ss.conf.set("spark.sql.adaptive.enabled", aeOriginalValue);
+    }
   }
 
   @throws[IOException]
@@ -95,6 +96,9 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row],
       }
 
     globalDict.writeMetaDict(bucketPartitionSize, seg.kylinconf.getGlobalDictV2MaxVersions, seg.kylinconf.getGlobalDictV2VersionTTL)
+
+    // after writing global dict, check the uniqueness for global dict
+    NGlobalDictBuilderAssist.checkGlobalDict(ref, seg, bucketPartitionSize, ss)
   }
 
   private def getLockPath(pathName: String) = s"/${seg.project}${HadoopUtil.GLOBAL_DICT_STORAGE_ROOT}/$pathName/lock"
diff --git a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
index fe84f82..3ecdfb5 100644
--- a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
@@ -97,7 +97,7 @@ class TestCreateFlatTable extends SparderBaseFunSuite with SharedSparkSession wi
     afterJoin1.collect()
 
     val jobs = helper.getJobsByGroupId(groupId)
-    Assert.assertEquals(jobs.length, 13)
+    Assert.assertEquals(jobs.length, 15)
     DefaultScheduler.destroyInstance()
   }