You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yuming Wang (Jira)" <ji...@apache.org> on 2022/06/11 13:41:00 UTC

[jira] [Updated] (SPARK-39448) Add ReplaceCTERefWithRepartition into nonExcludableRules list

     [ https://issues.apache.org/jira/browse/SPARK-39448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yuming Wang updated SPARK-39448:
--------------------------------
    Description: 
A unit test to test excluded rules:
{code:scala}
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.spark.sql

import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.sql.catalyst.util.resourceToString
import org.apache.spark.sql.internal.SQLConf

// scalastyle:off println
object ExcludedRulesTestSuite extends TPCDSBase {
  override val spark = SparkSession.builder()
    .appName(this.getClass.getSimpleName)
    .master("local[40]")
    .config(SparkLauncher.DRIVER_MEMORY, "4g")
    .config(SQLConf.SHUFFLE_PARTITIONS.key, 2)
    .getOrCreate()

  override def injectStats: Boolean = true

  createTables()

  private lazy val excludableRules = spark.sessionState.optimizer.batches
    .flatMap(_.rules.map(_.ruleName))
    .distinct
    .filterNot(spark.sessionState.optimizer.nonExcludableRules.contains)

  def main(args: Array[String]): Unit = {
    System.setProperty(IS_TESTING.key, "true")

    Seq("q83").foreach { name =>
      val queryString = resourceToString(s"tpcds/$name.sql",
        classLoader = Thread.currentThread().getContextClassLoader)
      excludableRules.foreach { rule =>

        println(name + ": " + rule)
        try {
          withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> rule) {
            sql(queryString).collect()
          }
        } catch {
          case e: Exception =>
            println("Exception: " + e.getMessage)
        }
      }
    }

    tpcdsQueriesV2_7_0.foreach { name =>
      val queryString = resourceToString(s"tpcds-v2.7.0/$name.sql",
        classLoader = Thread.currentThread().getContextClassLoader)
      excludableRules.foreach { rule =>

        println("tpcds-v2.7.0 " + name + ": " + rule)
        try {
          withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> rule) {
            sql(queryString).collect()
          }
        } catch {
          case e: Exception =>
            println("Exception: " + e.getMessage)
        }
      }
    }

    modifiedTPCDSQueries.foreach { name =>
      val queryString = resourceToString(s"tpcds-modifiedQueries/$name.sql",
        classLoader = Thread.currentThread().getContextClassLoader)
      excludableRules.foreach { rule =>

        println("tpcds-modifiedQueries " + name + ": " + rule)
        try {
          withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> rule) {
            sql(queryString).collect()
          }
        } catch {
          case e: Exception =>
            println("Exception: " + e.getMessage)
        }
      }
    }

    // scalastyle:on println
  }
}

{code}


> Add ReplaceCTERefWithRepartition into nonExcludableRules list
> -------------------------------------------------------------
>
>                 Key: SPARK-39448
>                 URL: https://issues.apache.org/jira/browse/SPARK-39448
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.0
>            Reporter: Yuming Wang
>            Priority: Major
>
> A unit test to test excluded rules:
> {code:scala}
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *    http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.apache.spark.sql
> import org.apache.spark.internal.config.Tests.IS_TESTING
> import org.apache.spark.launcher.SparkLauncher
> import org.apache.spark.sql.catalyst.util.resourceToString
> import org.apache.spark.sql.internal.SQLConf
> // scalastyle:off println
> object ExcludedRulesTestSuite extends TPCDSBase {
>   override val spark = SparkSession.builder()
>     .appName(this.getClass.getSimpleName)
>     .master("local[40]")
>     .config(SparkLauncher.DRIVER_MEMORY, "4g")
>     .config(SQLConf.SHUFFLE_PARTITIONS.key, 2)
>     .getOrCreate()
>   override def injectStats: Boolean = true
>   createTables()
>   private lazy val excludableRules = spark.sessionState.optimizer.batches
>     .flatMap(_.rules.map(_.ruleName))
>     .distinct
>     .filterNot(spark.sessionState.optimizer.nonExcludableRules.contains)
>   def main(args: Array[String]): Unit = {
>     System.setProperty(IS_TESTING.key, "true")
>     Seq("q83").foreach { name =>
>       val queryString = resourceToString(s"tpcds/$name.sql",
>         classLoader = Thread.currentThread().getContextClassLoader)
>       excludableRules.foreach { rule =>
>         println(name + ": " + rule)
>         try {
>           withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> rule) {
>             sql(queryString).collect()
>           }
>         } catch {
>           case e: Exception =>
>             println("Exception: " + e.getMessage)
>         }
>       }
>     }
>     tpcdsQueriesV2_7_0.foreach { name =>
>       val queryString = resourceToString(s"tpcds-v2.7.0/$name.sql",
>         classLoader = Thread.currentThread().getContextClassLoader)
>       excludableRules.foreach { rule =>
>         println("tpcds-v2.7.0 " + name + ": " + rule)
>         try {
>           withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> rule) {
>             sql(queryString).collect()
>           }
>         } catch {
>           case e: Exception =>
>             println("Exception: " + e.getMessage)
>         }
>       }
>     }
>     modifiedTPCDSQueries.foreach { name =>
>       val queryString = resourceToString(s"tpcds-modifiedQueries/$name.sql",
>         classLoader = Thread.currentThread().getContextClassLoader)
>       excludableRules.foreach { rule =>
>         println("tpcds-modifiedQueries " + name + ": " + rule)
>         try {
>           withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> rule) {
>             sql(queryString).collect()
>           }
>         } catch {
>           case e: Exception =>
>             println("Exception: " + e.getMessage)
>         }
>       }
>     }
>     // scalastyle:on println
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org