You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/12/17 09:25:40 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4818 Check lookup
table duplicate key when building job
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new fb94ec4 KYLIN-4818 Check lookup table duplicate key when building job
fb94ec4 is described below
commit fb94ec44cee704f79e6a1796a44b4d5c04f9699d
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Wed Dec 16 18:59:28 2020 +0800
KYLIN-4818 Check lookup table duplicate key when building job
---
.../engine/spark/builder/CubeSnapshotBuilder.scala | 19 ++++++++++++++++++-
.../kylin/engine/spark/job/ParentSourceChooser.scala | 1 +
2 files changed, 19 insertions(+), 1 deletion(-)
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeSnapshotBuilder.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeSnapshotBuilder.scala
index b2f0620..d1b62f0 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeSnapshotBuilder.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeSnapshotBuilder.scala
@@ -31,10 +31,11 @@ import org.apache.kylin.common.util.HadoopUtil
import org.apache.kylin.engine.spark.metadata.{SegmentInfo, TableDesc}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.hive.utils.ResourceDetectUtils
-import org.apache.spark.sql.{SparkSession}
+import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.utils.ProxyThreadUtils
import org.apache.kylin.engine.spark.utils.SparkDataSource._
import org.apache.kylin.engine.spark.utils.FileNames
+import org.apache.spark.sql.functions.{count, countDistinct}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
@@ -183,6 +184,22 @@ class CubeSnapshotBuilder extends Logging {
}
import org.apache.kylin.engine.spark.utils.SparkDataSource._
+ def checkDupKey() = {
+ val joinDescs = seg.joindescs
+ joinDescs.foreach {
+ joinDesc =>
+ val tableInfo = joinDesc.lookupTable
+ val lookupTableName = tableInfo.tableName
+ val df = ss.table(tableInfo)
+ val countColumn = df.count()
+ val lookupTablePKS = joinDesc.PKS.map(lookupTablePK => lookupTablePK.columnName)
+ val countDistinctColumn = df.agg(countDistinct(lookupTablePKS.head, lookupTablePKS.tail: _*)).collect().map(_.getLong(0)).head
+ if (countColumn != countDistinctColumn) {
+ throw new IllegalStateException(s"Failed to build lookup table ${lookupTableName} snapshot for Dup key found, key= ${lookupTablePKS}")
+ }
+ }
+ }
+
def buildSnapshotWithoutMd5(tableInfo: TableDesc, baseDir: String): (String, String) = {
val sourceData = ss.table(tableInfo)
val tablePath = FileNames.snapshotFile(tableInfo, seg.project)
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
index 7d719de..7697fd6 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
@@ -69,6 +69,7 @@ class ParentSourceChooser(
// eg: resource detect
// Move this to a more suitable place
val builder = new CubeSnapshotBuilder(seg, ss)
+ builder.checkDupKey()
seg = builder.buildSnapshot
}
flatTableSource = getFlatTable()