You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/12/17 09:25:40 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4818 Check lookup table duplicate key when building job

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new fb94ec4  KYLIN-4818 Check lookup table duplicate key when building job
fb94ec4 is described below

commit fb94ec44cee704f79e6a1796a44b4d5c04f9699d
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Wed Dec 16 18:59:28 2020 +0800

    KYLIN-4818 Check lookup table duplicate key when building job
---
 .../engine/spark/builder/CubeSnapshotBuilder.scala    | 19 ++++++++++++++++++-
 .../kylin/engine/spark/job/ParentSourceChooser.scala  |  1 +
 2 files changed, 19 insertions(+), 1 deletion(-)

diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeSnapshotBuilder.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeSnapshotBuilder.scala
index b2f0620..d1b62f0 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeSnapshotBuilder.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeSnapshotBuilder.scala
@@ -31,10 +31,11 @@ import org.apache.kylin.common.util.HadoopUtil
 import org.apache.kylin.engine.spark.metadata.{SegmentInfo, TableDesc}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.hive.utils.ResourceDetectUtils
-import org.apache.spark.sql.{SparkSession}
+import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.utils.ProxyThreadUtils
 import org.apache.kylin.engine.spark.utils.SparkDataSource._
 import org.apache.kylin.engine.spark.utils.FileNames
+import org.apache.spark.sql.functions.{count, countDistinct}
 
 import scala.concurrent.duration._
 import scala.concurrent.{ExecutionContext, Future}
@@ -183,6 +184,22 @@ class CubeSnapshotBuilder extends Logging {
   }
   import org.apache.kylin.engine.spark.utils.SparkDataSource._
 
+  def checkDupKey() = {
+    val joinDescs = seg.joindescs
+    joinDescs.foreach {
+      joinDesc =>
+        val tableInfo = joinDesc.lookupTable
+        val lookupTableName = tableInfo.tableName
+        val df = ss.table(tableInfo)
+        val countColumn = df.count()
+        val lookupTablePKS = joinDesc.PKS.map(lookupTablePK => lookupTablePK.columnName)
+        val countDistinctColumn = df.agg(countDistinct(lookupTablePKS.head, lookupTablePKS.tail: _*)).collect().map(_.getLong(0)).head
+        if (countColumn != countDistinctColumn) {
+          throw new IllegalStateException(s"Failed to build lookup table ${lookupTableName} snapshot for Dup key found, key= ${lookupTablePKS}")
+        }
+    }
+  }
+
   def buildSnapshotWithoutMd5(tableInfo: TableDesc, baseDir: String): (String, String) = {
     val sourceData = ss.table(tableInfo)
     val tablePath = FileNames.snapshotFile(tableInfo, seg.project)
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
index 7d719de..7697fd6 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
@@ -69,6 +69,7 @@ class ParentSourceChooser(
         // eg: resource detect
         // Move this to a more suitable place
         val builder = new CubeSnapshotBuilder(seg, ss)
+        builder.checkDupKey()
         seg = builder.buildSnapshot
       }
       flatTableSource = getFlatTable()