You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/03/24 08:54:06 UTC
[kylin] 01/02: KYLIN-4927 Forbid to use AE when building Global Dict
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 04e216994b9c03e2f1eee44eab3427ab6b46d619
Author: Zhichao Zhang <zh...@apache.org>
AuthorDate: Thu Mar 11 15:00:26 2021 +0800
KYLIN-4927 Forbid to use AE when building Global Dict
---
.../features/step_impl/auto_config/auto_config.py | 17 ++++++++
.../features/step_impl/happy_path/happy_path.py | 17 ++++++++
.../features/step_impl/project_model/model.py | 17 ++++++++
.../features/step_impl/project_model/project.py | 17 ++++++++
.../apache/spark/sql/common/LocalMetadata.scala | 10 +----
.../spark/builder/CubeDictionaryBuilder.scala | 18 +++++++-
.../engine/spark/builder/TestGlobalDictBuild.scala | 50 ++++++++++++----------
7 files changed, 113 insertions(+), 33 deletions(-)
diff --git a/build/CI/kylin-system-testing/features/step_impl/auto_config/auto_config.py b/build/CI/kylin-system-testing/features/step_impl/auto_config/auto_config.py
index 30ad76f..3cab963 100644
--- a/build/CI/kylin-system-testing/features/step_impl/auto_config/auto_config.py
+++ b/build/CI/kylin-system-testing/features/step_impl/auto_config/auto_config.py
@@ -1,3 +1,20 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
from getgauge.python import step
import os
import json
diff --git a/build/CI/kylin-system-testing/features/step_impl/happy_path/happy_path.py b/build/CI/kylin-system-testing/features/step_impl/happy_path/happy_path.py
index 1d59e05..57e64d1 100644
--- a/build/CI/kylin-system-testing/features/step_impl/happy_path/happy_path.py
+++ b/build/CI/kylin-system-testing/features/step_impl/happy_path/happy_path.py
@@ -1,3 +1,20 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
from getgauge.python import step
import os
import json
diff --git a/build/CI/kylin-system-testing/features/step_impl/project_model/model.py b/build/CI/kylin-system-testing/features/step_impl/project_model/model.py
index f159d93..2bc842c 100644
--- a/build/CI/kylin-system-testing/features/step_impl/project_model/model.py
+++ b/build/CI/kylin-system-testing/features/step_impl/project_model/model.py
@@ -1,3 +1,20 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
from getgauge.python import step
import os
import json
diff --git a/build/CI/kylin-system-testing/features/step_impl/project_model/project.py b/build/CI/kylin-system-testing/features/step_impl/project_model/project.py
index 61a088f..9d46725 100644
--- a/build/CI/kylin-system-testing/features/step_impl/project_model/project.py
+++ b/build/CI/kylin-system-testing/features/step_impl/project_model/project.py
@@ -1,3 +1,20 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
from getgauge.python import step
import os
import json
diff --git a/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala b/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala
index df2cc68..ab6b182 100644
--- a/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala
+++ b/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala
@@ -49,14 +49,6 @@ trait LocalMetadata extends BeforeAndAfterAll with BeforeAndAfterEach {
}
def cleanAfterClass(): Unit = {
- val directory = new File(LocalFileMetadataTestCase.LOCALMETA_TEMP_DATA)
- try
- FileUtils.deleteDirectory(directory)
- catch {
- case e: IOException =>
- if (directory.exists && directory.list.length > 0) throw new IllegalStateException("Can't delete directory " + directory, e)
- }
- System.clearProperty(KylinConfig.KYLIN_CONF)
- KylinConfig.destroyInstance()
+ LocalFileMetadataTestCase.cleanAfterClass();
}
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
index 0b8ca3d..5c15f7d 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
@@ -44,9 +44,18 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row],
@throws[IOException]
def buildDictSet(): Unit = {
+ // Set 'spark.sql.adaptive.enabled' to false if the value of it is true.
+ // Because when 'spark.sql.adaptive.enabled' is true, it will change the partition number
+ // dynamically and lead to wrong Global Dictionary results.
+ val aeOriginalValue = ss.conf.get("spark.sql.adaptive.enabled", "false").toBoolean
+ if (aeOriginalValue) {
+ ss.conf.set("spark.sql.adaptive.enabled", false);
+ }
logInfo(s"Start building global dictionaries V2 for seg $seg")
val m = s"Build global dictionaries V2 for seg $seg succeeded"
time(m, colRefSet.asScala.foreach(col => safeBuild(col)))
+ // set the original value to 'spark.sql.adaptive.enabled'
+ ss.conf.set("spark.sql.adaptive.enabled", aeOriginalValue);
}
@throws[IOException]
@@ -55,7 +64,7 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row],
lock.lock(getLockPath(sourceColumn), Long.MaxValue)
try
if (lock.lock(getLockPath(sourceColumn))) {
- val dictColDistinct = dataset.select(wrapCol(ref)).distinct
+ val dictColDistinct = dataset.select(CubeDictionaryBuilder.wrapCol(ref)).distinct
ss.sparkContext.setJobDescription("Calculate bucket size " + ref.identity)
val bucketPartitionSize = DictionaryBuilderHelper.calculateBucketSize(seg, ref, dictColDistinct)
val m = s"Build global dictionaries V2 for column $sourceColumn succeeded"
@@ -66,6 +75,8 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row],
@throws[IOException]
private[builder] def build(ref: ColumnDesc, bucketPartitionSize: Int, afterDistinct: Dataset[Row]): Unit = {
+ assert(!ss.conf.get("spark.sql.adaptive.enabled", "false").toBoolean,
+ "Parameter 'spark.sql.adaptive.enabled' must be false when building global dictionary.")
val columnName = ref.identity
logInfo(s"Start building global dictionaries V2 for column $columnName.")
@@ -88,9 +99,12 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row],
private def getLockPath(pathName: String) = s"/${seg.project}${HadoopUtil.GLOBAL_DICT_STORAGE_ROOT}/$pathName/lock"
+}
+
+object CubeDictionaryBuilder {
+
def wrapCol(ref: ColumnDesc): Column = {
val colName = NSparkCubingUtil.convertFromDot(ref.identity)
expr(colName).cast(StringType)
}
-
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala
index a0b33dc..0ed26cf 100644
--- a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala
@@ -26,6 +26,7 @@ import org.apache.commons.lang3.RandomStringUtils
import org.apache.kylin.common.KylinConfig
import org.apache.kylin.common.util.DateFormat
import org.apache.kylin.cube.{CubeInstance, CubeManager, CubeSegment}
+import org.apache.kylin.engine.spark.job.NSparkCubingUtil
import org.apache.kylin.engine.spark.metadata.{ColumnDesc, MetadataConverter, SegmentInfo}
import org.apache.kylin.job.engine.JobEngineConfig
import org.apache.kylin.job.impl.threadpool.DefaultScheduler
@@ -36,8 +37,8 @@ import org.apache.spark.dict.{NGlobalDictMetaInfo, NGlobalDictionary}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, SparderBaseFunSuite}
import org.apache.spark.sql.functions.col
-import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.types.StructType
import org.junit.Assert
import scala.collection.JavaConverters.setAsJavaSetConverter
@@ -69,63 +70,67 @@ class TestGlobalDictBuild extends SparderBaseFunSuite with SharedSparkSession wi
// When to resize the dictionary, please refer to the description of DictionaryBuilderHelper.calculateBucketSize
+ val dictCol = dictColSet.iterator().next()
// First build dictionary, no dictionary file exists
- var randomDataSet = generateOriginData(1000, 21)
+ var randomDataSet = generateOriginData(dictCol, 1000, 21)
val meta1 = buildDict(segInfo, seg, randomDataSet, dictColSet)
Assert.assertEquals(20, meta1.getBucketSize)
Assert.assertEquals(1000, meta1.getDictCount)
// apply rule #1
- randomDataSet = generateOriginData(3000, 22)
+ randomDataSet = generateOriginData(dictCol, 3000, 22)
val meta2 = buildDict(segInfo, seg, randomDataSet, dictColSet)
Assert.assertEquals(60, meta2.getBucketSize)
Assert.assertEquals(4000, meta2.getDictCount)
- randomDataSet = generateOriginData(3000, 23)
+ randomDataSet = generateOriginData(dictCol, 3000, 23)
val meta3 = buildDict(segInfo, seg, randomDataSet, dictColSet)
Assert.assertEquals(60, meta3.getBucketSize)
Assert.assertEquals(7000, meta3.getDictCount)
// apply rule #2
- randomDataSet = generateOriginData(200, 24)
+ randomDataSet = generateOriginData(dictCol, 200, 24)
val meta4 = buildDict(segInfo, seg, randomDataSet, dictColSet)
Assert.assertEquals(140, meta4.getBucketSize)
Assert.assertEquals(7200, meta4.getDictCount)
// apply rule #3
- randomDataSet = generateHotOriginData(200, 140)
+ randomDataSet = generateHotOriginData(dictCol, 200, 140)
val meta5 = buildDict(segInfo, seg, randomDataSet, dictColSet)
Assert.assertEquals(140, meta5.getBucketSize)
Assert.assertEquals(7400, meta5.getDictCount)
// apply rule #3
- randomDataSet = generateOriginData(200, 25)
+ spark.conf.set("spark.sql.adaptive.enabled", false)
+ randomDataSet = generateOriginData(dictCol, 200, 25)
val meta6 = buildDict(segInfo, seg, randomDataSet, dictColSet)
Assert.assertEquals(280, meta6.getBucketSize)
Assert.assertEquals(7600, meta6.getDictCount)
+ Assert.assertFalse(spark.conf.get("spark.sql.adaptive.enabled").toBoolean)
- randomDataSet = generateOriginData(2000, 26)
+ spark.conf.set("spark.sql.adaptive.enabled", true)
+ randomDataSet = generateOriginData(dictCol, 2000, 26)
val meta7 = buildDict(segInfo, seg, randomDataSet, dictColSet)
Assert.assertEquals(280, meta7.getBucketSize)
Assert.assertEquals(9600, meta7.getDictCount)
+ Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").toBoolean)
DefaultScheduler.destroyInstance()
}
- def buildDict(segInfo: SegmentInfo, seg: CubeSegment, randomDataSet: Dataset[Row], dictColSet: Set[ColumnDesc]): NGlobalDictMetaInfo = {
+ def buildDict(segInfo: SegmentInfo, seg: CubeSegment, randomDataSet: Dataset[Row],
+ dictColSet: Set[ColumnDesc]): NGlobalDictMetaInfo = {
val dictionaryBuilder = new CubeDictionaryBuilder(randomDataSet, segInfo, randomDataSet.sparkSession, dictColSet)
- val col = dictColSet.iterator().next()
- val ds = randomDataSet.select("26").distinct()
- val bucketPartitionSize = DictionaryBuilderHelper.calculateBucketSize(segInfo, col, ds)
- dictionaryBuilder.build(col, bucketPartitionSize, ds)
- val dict = new NGlobalDictionary(seg.getProject, col.tableName, col.columnName,
- seg.getConfig.getHdfsWorkingDirectory)
+ dictionaryBuilder.buildDictSet()
+ val columnDesc = dictColSet.iterator().next()
+ val dict = new NGlobalDictionary(seg.getProject, columnDesc.tableName,
+ columnDesc.columnName, seg.getConfig.getHdfsWorkingDirectory)
dict.getMetaInfo
}
- def generateOriginData(count: Int, length: Int): Dataset[Row] = {
+ def generateOriginData(colDesc: ColumnDesc, count: Int, length: Int): Dataset[Row] = {
var schema = new StructType
-
- schema = schema.add("26", StringType)
+ val colName = NSparkCubingUtil.convertFromDot(colDesc.identity)
+ schema = schema.add(colName, colDesc.dataType)
var set = new mutable.LinkedHashSet[Row]
while (set.size != count) {
val objects = new Array[String](1)
@@ -136,11 +141,12 @@ class TestGlobalDictBuild extends SparderBaseFunSuite with SharedSparkSession wi
spark.createDataFrame(spark.sparkContext.parallelize(set.toSeq), schema)
}
- def generateHotOriginData(threshold: Int, bucketSize: Int): Dataset[Row] = {
+ def generateHotOriginData(colDesc: ColumnDesc, threshold: Int, bucketSize: Int): Dataset[Row] = {
var schema = new StructType
- schema = schema.add("26", StringType)
- var ds = generateOriginData(threshold * bucketSize * 2, 30)
- ds = ds.repartition(bucketSize, col("26"))
+ val colName = NSparkCubingUtil.convertFromDot(colDesc.identity)
+ schema = schema.add(colName, colDesc.dataType)
+ var ds = generateOriginData(colDesc, threshold * bucketSize * 2, 30)
+ ds = ds.repartition(bucketSize, col(colName))
.mapPartitions {
iter =>
val partitionID = TaskContext.get().partitionId()