You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yuming Wang (Jira)" <ji...@apache.org> on 2022/06/11 13:41:00 UTC
[jira] [Updated] (SPARK-39448) Add ReplaceCTERefWithRepartition into nonExcludableRules list
[ https://issues.apache.org/jira/browse/SPARK-39448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yuming Wang updated SPARK-39448:
--------------------------------
Description:
A unit test to test excluded rules:
{code:scala}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.sql.catalyst.util.resourceToString
import org.apache.spark.sql.internal.SQLConf
// scalastyle:off println
object ExcludedRulesTestSuite extends TPCDSBase {
override val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local[40]")
.config(SparkLauncher.DRIVER_MEMORY, "4g")
.config(SQLConf.SHUFFLE_PARTITIONS.key, 2)
.getOrCreate()
override def injectStats: Boolean = true
createTables()
private lazy val excludableRules = spark.sessionState.optimizer.batches
.flatMap(_.rules.map(_.ruleName))
.distinct
.filterNot(spark.sessionState.optimizer.nonExcludableRules.contains)
def main(args: Array[String]): Unit = {
System.setProperty(IS_TESTING.key, "true")
Seq("q83").foreach { name =>
val queryString = resourceToString(s"tpcds/$name.sql",
classLoader = Thread.currentThread().getContextClassLoader)
excludableRules.foreach { rule =>
println(name + ": " + rule)
try {
withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> rule) {
sql(queryString).collect()
}
} catch {
case e: Exception =>
println("Exception: " + e.getMessage)
}
}
}
tpcdsQueriesV2_7_0.foreach { name =>
val queryString = resourceToString(s"tpcds-v2.7.0/$name.sql",
classLoader = Thread.currentThread().getContextClassLoader)
excludableRules.foreach { rule =>
println("tpcds-v2.7.0 " + name + ": " + rule)
try {
withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> rule) {
sql(queryString).collect()
}
} catch {
case e: Exception =>
println("Exception: " + e.getMessage)
}
}
}
modifiedTPCDSQueries.foreach { name =>
val queryString = resourceToString(s"tpcds-modifiedQueries/$name.sql",
classLoader = Thread.currentThread().getContextClassLoader)
excludableRules.foreach { rule =>
println("tpcds-modifiedQueries " + name + ": " + rule)
try {
withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> rule) {
sql(queryString).collect()
}
} catch {
case e: Exception =>
println("Exception: " + e.getMessage)
}
}
}
// scalastyle:on println
}
}
{code}
> Add ReplaceCTERefWithRepartition into nonExcludableRules list
> -------------------------------------------------------------
>
> Key: SPARK-39448
> URL: https://issues.apache.org/jira/browse/SPARK-39448
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.4.0
> Reporter: Yuming Wang
> Priority: Major
>
> A unit test to test excluded rules:
> {code:scala}
> /*
> * Licensed to the Apache Software Foundation (ASF) under one or more
> * contributor license agreements. See the NOTICE file distributed with
> * this work for additional information regarding copyright ownership.
> * The ASF licenses this file to You under the Apache License, Version 2.0
> * (the "License"); you may not use this file except in compliance with
> * the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> package org.apache.spark.sql
> import org.apache.spark.internal.config.Tests.IS_TESTING
> import org.apache.spark.launcher.SparkLauncher
> import org.apache.spark.sql.catalyst.util.resourceToString
> import org.apache.spark.sql.internal.SQLConf
> // scalastyle:off println
> object ExcludedRulesTestSuite extends TPCDSBase {
> override val spark = SparkSession.builder()
> .appName(this.getClass.getSimpleName)
> .master("local[40]")
> .config(SparkLauncher.DRIVER_MEMORY, "4g")
> .config(SQLConf.SHUFFLE_PARTITIONS.key, 2)
> .getOrCreate()
> override def injectStats: Boolean = true
> createTables()
> private lazy val excludableRules = spark.sessionState.optimizer.batches
> .flatMap(_.rules.map(_.ruleName))
> .distinct
> .filterNot(spark.sessionState.optimizer.nonExcludableRules.contains)
> def main(args: Array[String]): Unit = {
> System.setProperty(IS_TESTING.key, "true")
> Seq("q83").foreach { name =>
> val queryString = resourceToString(s"tpcds/$name.sql",
> classLoader = Thread.currentThread().getContextClassLoader)
> excludableRules.foreach { rule =>
> println(name + ": " + rule)
> try {
> withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> rule) {
> sql(queryString).collect()
> }
> } catch {
> case e: Exception =>
> println("Exception: " + e.getMessage)
> }
> }
> }
> tpcdsQueriesV2_7_0.foreach { name =>
> val queryString = resourceToString(s"tpcds-v2.7.0/$name.sql",
> classLoader = Thread.currentThread().getContextClassLoader)
> excludableRules.foreach { rule =>
> println("tpcds-v2.7.0 " + name + ": " + rule)
> try {
> withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> rule) {
> sql(queryString).collect()
> }
> } catch {
> case e: Exception =>
> println("Exception: " + e.getMessage)
> }
> }
> }
> modifiedTPCDSQueries.foreach { name =>
> val queryString = resourceToString(s"tpcds-modifiedQueries/$name.sql",
> classLoader = Thread.currentThread().getContextClassLoader)
> excludableRules.foreach { rule =>
> println("tpcds-modifiedQueries " + name + ": " + rule)
> try {
> withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> rule) {
> sql(queryString).collect()
> }
> } catch {
> case e: Exception =>
> println("Exception: " + e.getMessage)
> }
> }
> }
> // scalastyle:on println
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org