You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/03/24 08:54:06 UTC

[kylin] 01/02: KYLIN-4927 Forbid to use AE when building Global Dict

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 04e216994b9c03e2f1eee44eab3427ab6b46d619
Author: Zhichao Zhang <zh...@apache.org>
AuthorDate: Thu Mar 11 15:00:26 2021 +0800

    KYLIN-4927 Forbid to use AE when building Global Dict
---
 .../features/step_impl/auto_config/auto_config.py  | 17 ++++++++
 .../features/step_impl/happy_path/happy_path.py    | 17 ++++++++
 .../features/step_impl/project_model/model.py      | 17 ++++++++
 .../features/step_impl/project_model/project.py    | 17 ++++++++
 .../apache/spark/sql/common/LocalMetadata.scala    | 10 +----
 .../spark/builder/CubeDictionaryBuilder.scala      | 18 +++++++-
 .../engine/spark/builder/TestGlobalDictBuild.scala | 50 ++++++++++++----------
 7 files changed, 113 insertions(+), 33 deletions(-)

diff --git a/build/CI/kylin-system-testing/features/step_impl/auto_config/auto_config.py b/build/CI/kylin-system-testing/features/step_impl/auto_config/auto_config.py
index 30ad76f..3cab963 100644
--- a/build/CI/kylin-system-testing/features/step_impl/auto_config/auto_config.py
+++ b/build/CI/kylin-system-testing/features/step_impl/auto_config/auto_config.py
@@ -1,3 +1,20 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 from getgauge.python import step
 import os
 import json
diff --git a/build/CI/kylin-system-testing/features/step_impl/happy_path/happy_path.py b/build/CI/kylin-system-testing/features/step_impl/happy_path/happy_path.py
index 1d59e05..57e64d1 100644
--- a/build/CI/kylin-system-testing/features/step_impl/happy_path/happy_path.py
+++ b/build/CI/kylin-system-testing/features/step_impl/happy_path/happy_path.py
@@ -1,3 +1,20 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 from getgauge.python import step
 import os
 import json
diff --git a/build/CI/kylin-system-testing/features/step_impl/project_model/model.py b/build/CI/kylin-system-testing/features/step_impl/project_model/model.py
index f159d93..2bc842c 100644
--- a/build/CI/kylin-system-testing/features/step_impl/project_model/model.py
+++ b/build/CI/kylin-system-testing/features/step_impl/project_model/model.py
@@ -1,3 +1,20 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 from getgauge.python import step
 import os
 import json
diff --git a/build/CI/kylin-system-testing/features/step_impl/project_model/project.py b/build/CI/kylin-system-testing/features/step_impl/project_model/project.py
index 61a088f..9d46725 100644
--- a/build/CI/kylin-system-testing/features/step_impl/project_model/project.py
+++ b/build/CI/kylin-system-testing/features/step_impl/project_model/project.py
@@ -1,3 +1,20 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 from getgauge.python import step
 import os
 import json
diff --git a/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala b/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala
index df2cc68..ab6b182 100644
--- a/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala
+++ b/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala
@@ -49,14 +49,6 @@ trait LocalMetadata extends BeforeAndAfterAll with BeforeAndAfterEach {
   }
 
   def cleanAfterClass(): Unit = {
-    val directory = new File(LocalFileMetadataTestCase.LOCALMETA_TEMP_DATA)
-    try
-      FileUtils.deleteDirectory(directory)
-    catch {
-      case e: IOException =>
-        if (directory.exists && directory.list.length > 0) throw new IllegalStateException("Can't delete directory " + directory, e)
-    }
-    System.clearProperty(KylinConfig.KYLIN_CONF)
-    KylinConfig.destroyInstance()
+    LocalFileMetadataTestCase.cleanAfterClass();
   }
 }
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
index 0b8ca3d..5c15f7d 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
@@ -44,9 +44,18 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row],
 
   @throws[IOException]
   def buildDictSet(): Unit = {
+    // Set 'spark.sql.adaptive.enabled' to false if the value of it is true.
+    // Because when 'spark.sql.adaptive.enabled' is true, it will change the partition number
+    // dynamically and lead to wrong Global Dictionary results.
+    val aeOriginalValue = ss.conf.get("spark.sql.adaptive.enabled", "false").toBoolean
+    if (aeOriginalValue) {
+      ss.conf.set("spark.sql.adaptive.enabled", false);
+    }
     logInfo(s"Start building global dictionaries V2 for seg $seg")
     val m = s"Build global dictionaries V2 for seg $seg succeeded"
     time(m, colRefSet.asScala.foreach(col => safeBuild(col)))
+    // set the original value to 'spark.sql.adaptive.enabled'
+    ss.conf.set("spark.sql.adaptive.enabled", aeOriginalValue);
   }
 
   @throws[IOException]
@@ -55,7 +64,7 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row],
     lock.lock(getLockPath(sourceColumn), Long.MaxValue)
     try
       if (lock.lock(getLockPath(sourceColumn))) {
-        val dictColDistinct = dataset.select(wrapCol(ref)).distinct
+        val dictColDistinct = dataset.select(CubeDictionaryBuilder.wrapCol(ref)).distinct
         ss.sparkContext.setJobDescription("Calculate bucket size " + ref.identity)
         val bucketPartitionSize = DictionaryBuilderHelper.calculateBucketSize(seg, ref, dictColDistinct)
         val m = s"Build global dictionaries V2 for column $sourceColumn succeeded"
@@ -66,6 +75,8 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row],
 
   @throws[IOException]
   private[builder] def build(ref: ColumnDesc, bucketPartitionSize: Int, afterDistinct: Dataset[Row]): Unit = {
+    assert(!ss.conf.get("spark.sql.adaptive.enabled", "false").toBoolean,
+      "Parameter 'spark.sql.adaptive.enabled' must be false when building global dictionary.")
     val columnName = ref.identity
     logInfo(s"Start building global dictionaries V2 for column $columnName.")
 
@@ -88,9 +99,12 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row],
 
   private def getLockPath(pathName: String) = s"/${seg.project}${HadoopUtil.GLOBAL_DICT_STORAGE_ROOT}/$pathName/lock"
 
+}
+
+object CubeDictionaryBuilder {
+
   def wrapCol(ref: ColumnDesc): Column = {
     val colName = NSparkCubingUtil.convertFromDot(ref.identity)
     expr(colName).cast(StringType)
   }
-
 }
diff --git a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala
index a0b33dc..0ed26cf 100644
--- a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala
@@ -26,6 +26,7 @@ import org.apache.commons.lang3.RandomStringUtils
 import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.common.util.DateFormat
 import org.apache.kylin.cube.{CubeInstance, CubeManager, CubeSegment}
+import org.apache.kylin.engine.spark.job.NSparkCubingUtil
 import org.apache.kylin.engine.spark.metadata.{ColumnDesc, MetadataConverter, SegmentInfo}
 import org.apache.kylin.job.engine.JobEngineConfig
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler
@@ -36,8 +37,8 @@ import org.apache.spark.dict.{NGlobalDictMetaInfo, NGlobalDictionary}
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, SparderBaseFunSuite}
 import org.apache.spark.sql.functions.col
-import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.types.StructType
 import org.junit.Assert
 
 import scala.collection.JavaConverters.setAsJavaSetConverter
@@ -69,63 +70,67 @@ class TestGlobalDictBuild extends SparderBaseFunSuite with SharedSparkSession wi
 
     // When to resize the dictionary, please refer to the description of DictionaryBuilderHelper.calculateBucketSize
 
+    val dictCol = dictColSet.iterator().next()
     // First build dictionary, no dictionary file exists
-    var randomDataSet = generateOriginData(1000, 21)
+    var randomDataSet = generateOriginData(dictCol, 1000, 21)
     val meta1 = buildDict(segInfo, seg, randomDataSet, dictColSet)
     Assert.assertEquals(20, meta1.getBucketSize)
     Assert.assertEquals(1000, meta1.getDictCount)
 
     // apply rule #1
-    randomDataSet = generateOriginData(3000, 22)
+    randomDataSet = generateOriginData(dictCol, 3000, 22)
     val meta2 = buildDict(segInfo, seg, randomDataSet, dictColSet)
     Assert.assertEquals(60, meta2.getBucketSize)
     Assert.assertEquals(4000, meta2.getDictCount)
 
-    randomDataSet = generateOriginData(3000, 23)
+    randomDataSet = generateOriginData(dictCol, 3000, 23)
     val meta3 = buildDict(segInfo, seg, randomDataSet, dictColSet)
     Assert.assertEquals(60, meta3.getBucketSize)
     Assert.assertEquals(7000, meta3.getDictCount)
 
     // apply rule #2
-    randomDataSet = generateOriginData(200, 24)
+    randomDataSet = generateOriginData(dictCol, 200, 24)
     val meta4 = buildDict(segInfo, seg, randomDataSet, dictColSet)
     Assert.assertEquals(140, meta4.getBucketSize)
     Assert.assertEquals(7200, meta4.getDictCount)
 
     // apply rule #3
-    randomDataSet = generateHotOriginData(200, 140)
+    randomDataSet = generateHotOriginData(dictCol, 200, 140)
     val meta5 = buildDict(segInfo, seg, randomDataSet, dictColSet)
     Assert.assertEquals(140, meta5.getBucketSize)
     Assert.assertEquals(7400, meta5.getDictCount)
 
     // apply rule #3
-    randomDataSet = generateOriginData(200, 25)
+    spark.conf.set("spark.sql.adaptive.enabled", false)
+    randomDataSet = generateOriginData(dictCol, 200, 25)
     val meta6 = buildDict(segInfo, seg, randomDataSet, dictColSet)
     Assert.assertEquals(280, meta6.getBucketSize)
     Assert.assertEquals(7600, meta6.getDictCount)
+    Assert.assertFalse(spark.conf.get("spark.sql.adaptive.enabled").toBoolean)
 
-    randomDataSet = generateOriginData(2000, 26)
+    spark.conf.set("spark.sql.adaptive.enabled", true)
+    randomDataSet = generateOriginData(dictCol, 2000, 26)
     val meta7 = buildDict(segInfo, seg, randomDataSet, dictColSet)
     Assert.assertEquals(280, meta7.getBucketSize)
     Assert.assertEquals(9600, meta7.getDictCount)
+    Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").toBoolean)
     DefaultScheduler.destroyInstance()
   }
 
-  def buildDict(segInfo: SegmentInfo, seg: CubeSegment, randomDataSet: Dataset[Row], dictColSet: Set[ColumnDesc]): NGlobalDictMetaInfo = {
+  def buildDict(segInfo: SegmentInfo, seg: CubeSegment, randomDataSet: Dataset[Row],
+                dictColSet: Set[ColumnDesc]): NGlobalDictMetaInfo = {
     val dictionaryBuilder = new CubeDictionaryBuilder(randomDataSet, segInfo, randomDataSet.sparkSession, dictColSet)
-    val col = dictColSet.iterator().next()
-    val ds = randomDataSet.select("26").distinct()
-    val bucketPartitionSize = DictionaryBuilderHelper.calculateBucketSize(segInfo, col, ds)
-    dictionaryBuilder.build(col, bucketPartitionSize, ds)
-    val dict = new NGlobalDictionary(seg.getProject, col.tableName, col.columnName,
-      seg.getConfig.getHdfsWorkingDirectory)
+    dictionaryBuilder.buildDictSet()
+    val columnDesc = dictColSet.iterator().next()
+    val dict = new NGlobalDictionary(seg.getProject, columnDesc.tableName,
+      columnDesc.columnName, seg.getConfig.getHdfsWorkingDirectory)
     dict.getMetaInfo
   }
 
-  def generateOriginData(count: Int, length: Int): Dataset[Row] = {
+  def generateOriginData(colDesc: ColumnDesc, count: Int, length: Int): Dataset[Row] = {
     var schema = new StructType
-
-    schema = schema.add("26", StringType)
+    val colName = NSparkCubingUtil.convertFromDot(colDesc.identity)
+    schema = schema.add(colName, colDesc.dataType)
     var set = new mutable.LinkedHashSet[Row]
     while (set.size != count) {
       val objects = new Array[String](1)
@@ -136,11 +141,12 @@ class TestGlobalDictBuild extends SparderBaseFunSuite with SharedSparkSession wi
     spark.createDataFrame(spark.sparkContext.parallelize(set.toSeq), schema)
   }
 
-  def generateHotOriginData(threshold: Int, bucketSize: Int): Dataset[Row] = {
+  def generateHotOriginData(colDesc: ColumnDesc, threshold: Int, bucketSize: Int): Dataset[Row] = {
     var schema = new StructType
-    schema = schema.add("26", StringType)
-    var ds = generateOriginData(threshold * bucketSize * 2, 30)
-    ds = ds.repartition(bucketSize, col("26"))
+    val colName = NSparkCubingUtil.convertFromDot(colDesc.identity)
+    schema = schema.add(colName, colDesc.dataType)
+    var ds = generateOriginData(colDesc, threshold * bucketSize * 2, 30)
+    ds = ds.repartition(bucketSize, col(colName))
       .mapPartitions {
         iter =>
           val partitionID = TaskContext.get().partitionId()