You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by davies <gi...@git.apache.org> on 2016/08/04 22:27:00 UTC

[GitHub] spark pull request #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

GitHub user davies opened a pull request:

    https://github.com/apache/spark/pull/14500

    [SPARK-] SQL DDL: MSCK REPAIR TABLE

    ## What changes were proposed in this pull request?
    
    MSCK REPAIR TABLE could be used to recover the partitions in external catalog based on partitions in file system.
    
    Another syntax is: ALTER TABLE table RECOVER PARTITIONS
    
    The implementation in this PR will only list partitions (not the files with a partition) in driver (in parallel if needed).
    
    ## How was this patch tested?
    
    Added unit tests for it and Hive compatibility test suite.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davies/spark repair_table

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/14500.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #14500
    
----
commit c5edbdfc3a9b11fce8b427488f106158133e40a0
Author: Davies Liu <da...@databricks.com>
Date:   2016-08-04T22:12:37Z

    support ddl: MSCK REPAIR TABLE

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73780281
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala ---
    @@ -864,6 +864,55 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         testAddPartitions(isDatasourceTable = true)
       }
     
    +  test("alter table: recover partitions (sequential)") {
    +    withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
    +      testRecoverPartitions()
    +    }
    +  }
    +
    +  test("after table: recover partition (parallel)") {
    --- End diff --
    
    after -> alter


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/63242/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/63246/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r74099592
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: $tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    +    Seq.empty[Row]
    +  }
    +
    +  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
    +
    +  private def scanPartitions(
    +      spark: SparkSession,
    +      fs: FileSystem,
    +      filter: PathFilter,
    +      path: Path,
    +      spec: TablePartitionSpec,
    +      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
    +    if (partitionNames.length == 0) {
    +      return Seq(spec -> path)
    +    }
    +
    +    val statuses = fs.listStatus(path)
    +    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
    +    val statusPar: GenSeq[FileStatus] =
    +      if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
    +        val parArray = statuses.par
    +        parArray.tasksupport = evalTaskSupport
    +        parArray
    +      } else {
    +        statuses
    +      }
    +    statusPar.flatMap { st =>
    +      val name = st.getPath.getName
    +      if (st.isDirectory && name.contains("=")) {
    +        val ps = name.split("=", 2)
    +        val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
    +        // TODO: Validate the value
    +        val value = PartitioningUtils.unescapePathName(ps(1))
    --- End diff --
    
    If the partitions are generated by Spark, they could be unescape back correctly.  For others, they could be compatibility issues. For example, Spark does not escape ` ` in Linux, the unescaping for `%20` could be wrong (we could show an warning?). I think these are not in the scope of this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r74100170
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: $tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    +    Seq.empty[Row]
    +  }
    +
    +  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
    +
    +  private def scanPartitions(
    +      spark: SparkSession,
    +      fs: FileSystem,
    +      filter: PathFilter,
    +      path: Path,
    +      spec: TablePartitionSpec,
    +      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
    +    if (partitionNames.length == 0) {
    +      return Seq(spec -> path)
    +    }
    +
    +    val statuses = fs.listStatus(path)
    +    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
    +    val statusPar: GenSeq[FileStatus] =
    +      if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
    +        val parArray = statuses.par
    --- End diff --
    
    I did not figure out how it work, at least `statuses.par(evalTaskSupport)` does not work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    Build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/63244/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r74132414
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: $tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    --- End diff --
    
    no, this is true for Hive <=0.12, for Hive 0.13+, they are sent in single RPC. so we should verify that what's limit for a single RPC


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    **[Test build #63244 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/63244/consoleFull)** for PR 14500 at commit [`89d22f4`](https://github.com/apache/spark/commit/89d22f4d2640a3f8bcd0f25a9eb84aaa9df73e48).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by sameeragarwal <gi...@git.apache.org>.
Github user sameeragarwal commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/63283/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    **[Test build #63283 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/63283/consoleFull)** for PR 14500 at commit [`e478c3a`](https://github.com/apache/spark/commit/e478c3a50e0d1564fc78939c5c1fb47798c47a3e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `public class ShuffleIndexInformation `
      * `public class ShuffleIndexRecord `
      * `case class Least(children: Seq[Expression]) extends Expression `
      * `case class Greatest(children: Seq[Expression]) extends Expression `
      * `case class CreateTable(tableDesc: CatalogTable, mode: SaveMode, query: Option[LogicalPlan])`
      * `case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73729927
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala ---
    @@ -827,6 +827,45 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         testAddPartitions(isDatasourceTable = true)
       }
     
    +  test("alter table: recover partitions (sequential)") {
    +    withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
    +      testRecoverPartitions()
    +    }
    +  }
    +
    +  test("after table: recover partition (parallel)") {
    +    withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
    +      testRecoverPartitions()
    +    }
    +  }
    +
    +  private def testRecoverPartitions() {
    +    val catalog = spark.sessionState.catalog
    +    // table to alter does not exist
    +    intercept[AnalysisException] {
    +      sql("ALTER TABLE does_not_exist RECOVER PARTITIONS")
    +    }
    +
    +    val tableIdent = TableIdentifier("tab1")
    +    createTable(catalog, tableIdent)
    +    val part1 = Map("a" -> "1", "b" -> "5")
    +    createTablePartition(catalog, part1, tableIdent)
    +    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
    +
    +    val part2 = Map("a" -> "2", "b" -> "6")
    +    val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
    +    fs.mkdirs(new Path(new Path(root, "a=2"), "b=6"))
    +    try {
    +      sql("ALTER TABLE tab1 RECOVER PARTITIONS")
    +      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
    +        Set(part1, part2))
    +    } finally {
    +      fs.delete(root, true)
    +    }
    +  }
    --- End diff --
    
    Let's add tests to exercise the command more. Here are three examples.
    1. There is an partition dir has a bad name (not in the format of key=value).
    2. Say that we have two partition columns. We have some files under the first layer (e.g. _SUCCESS, parquet's metadata files, and/or regular data files). 
    3. Some dirs do not have the expected number of partition columns. For example, the schema specifies 3 partition columns. But, a path only has two partition columns. 
    4. The partition column columns encoded in the path does not match the name specified in the schema. For example, when we create the table, we specify `c1` as the first partition column. However, the dir in fs has `c2` as the first partition column.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    **[Test build #63375 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/63375/consoleFull)** for PR 14500 at commit [`e5906cf`](https://github.com/apache/spark/commit/e5906cf408f225b5023ed6d250f850fcd447168a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    **[Test build #63283 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/63283/consoleFull)** for PR 14500 at commit [`e478c3a`](https://github.com/apache/spark/commit/e478c3a50e0d1564fc78939c5c1fb47798c47a3e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r74094235
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: $tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    +    Seq.empty[Row]
    +  }
    +
    +  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
    +
    +  private def scanPartitions(
    +      spark: SparkSession,
    +      fs: FileSystem,
    +      filter: PathFilter,
    +      path: Path,
    +      spec: TablePartitionSpec,
    +      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
    +    if (partitionNames.length == 0) {
    +      return Seq(spec -> path)
    +    }
    +
    +    val statuses = fs.listStatus(path)
    +    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
    +    val statusPar: GenSeq[FileStatus] =
    +      if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
    +        val parArray = statuses.par
    +        parArray.tasksupport = evalTaskSupport
    +        parArray
    +      } else {
    +        statuses
    +      }
    +    statusPar.flatMap { st =>
    +      val name = st.getPath.getName
    +      if (st.isDirectory && name.contains("=")) {
    +        val ps = name.split("=", 2)
    +        val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
    +        // TODO: Validate the value
    +        val value = PartitioningUtils.unescapePathName(ps(1))
    +        // comparing with case-insensitive, but preserve the case
    +        if (columnName == partitionNames(0)) {
    +          scanPartitions(
    +            spark, fs, filter, st.getPath, spec ++ Map(columnName -> value), partitionNames.drop(1))
    +        } else {
    +          logWarning(s"expect partition column ${partitionNames(0)}, but got ${ps(0)}, ignore it")
    +          Seq()
    --- End diff --
    
    The Hive only throws exception when there are not allowed character in the value, not other cases. I'd like to avoid any configs if no serious problem here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    **[Test build #63246 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/63246/consoleFull)** for PR 14500 at commit [`f338516`](https://github.com/apache/spark/commit/f3385166ff57143ef8b3cfac82aba7a8e958d6eb).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73962887
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: $tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    +    Seq.empty[Row]
    +  }
    +
    +  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
    +
    +  private def scanPartitions(
    +      spark: SparkSession,
    +      fs: FileSystem,
    +      filter: PathFilter,
    +      path: Path,
    +      spec: TablePartitionSpec,
    +      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
    +    if (partitionNames.length == 0) {
    +      return Seq(spec -> path)
    +    }
    +
    +    val statuses = fs.listStatus(path)
    +    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
    +    val statusPar: GenSeq[FileStatus] =
    +      if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
    +        val parArray = statuses.par
    --- End diff --
    
    cool. can we make it explicit, e.g. `statuses.par(evalTaskSupport)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73619320
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---
    @@ -389,6 +389,50 @@ case class TruncateTableCommand(
     }
     
     /**
    + * A command to repair a table by discovery all the partitions in the directory.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   MSCK REPAIR TABLE table_name;
    + * }}}
    + *
    + * This command is the same as AlterTableRecoverPartitions
    + */
    +case class RepairTableCommand(tableName: TableIdentifier) extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    val table = catalog.getTableMetadata(tableName)
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in MSCK REPAIR TABLE does not exist.")
    +    }
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: MSCK REPAIR TABLE on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: MSCK REPAIR TABLE on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: MSCK REPAIR TABLE only works on external tables: $tableName")
    +    }
    +    if (DDLUtils.isTablePartitioned(table)) {
    --- End diff --
    
    This checking conflicts with the error message reported below. Now, if the table is partitioned, we will report the message: `MSCK REPAIR TABLE only works on partitioned tables`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73800495
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: $tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    +    Seq.empty[Row]
    +  }
    +
    +  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
    +
    +  private def scanPartitions(
    +      spark: SparkSession,
    +      fs: FileSystem,
    +      filter: PathFilter,
    +      path: Path,
    +      spec: TablePartitionSpec,
    +      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
    +    if (partitionNames.length == 0) {
    +      return Seq(spec -> path)
    +    }
    +
    +    val statuses = fs.listStatus(path)
    +    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
    +    val statusPar: GenSeq[FileStatus] =
    +      if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
    +        val parArray = statuses.par
    --- End diff --
    
    A new one is created here: https://github.com/apache/spark/pull/14500/files#diff-54979ed5797b4a6193cf663dc23baca5R490


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/63279/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73654508
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +431,96 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Discover Partitions in ALTER TABLE: discover all the partition in the directory of a table and
    --- End diff --
    
    Discover or recover? Lets update the doc accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    **[Test build #63244 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/63244/consoleFull)** for PR 14500 at commit [`89d22f4`](https://github.com/apache/spark/commit/89d22f4d2640a3f8bcd0f25a9eb84aaa9df73e48).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    **[Test build #63243 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/63243/consoleFull)** for PR 14500 at commit [`c5edbdf`](https://github.com/apache/spark/commit/c5edbdfc3a9b11fce8b427488f106158133e40a0).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73780357
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: $tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    +    Seq.empty[Row]
    +  }
    +
    +  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
    +
    +  private def scanPartitions(
    +      spark: SparkSession,
    +      fs: FileSystem,
    +      filter: PathFilter,
    +      path: Path,
    +      spec: TablePartitionSpec,
    +      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
    +    if (partitionNames.length == 0) {
    +      return Seq(spec -> path)
    +    }
    +
    +    val statuses = fs.listStatus(path)
    +    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
    +    val statusPar: GenSeq[FileStatus] =
    +      if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
    +        val parArray = statuses.par
    +        parArray.tasksupport = evalTaskSupport
    +        parArray
    +      } else {
    +        statuses
    +      }
    +    statusPar.flatMap { st =>
    +      val name = st.getPath.getName
    +      if (st.isDirectory && name.contains("=")) {
    +        val ps = name.split("=", 2)
    +        val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
    +        val value = PartitioningUtils.unescapePathName(ps(1))
    --- End diff --
    
    Do we need to check if the value is valid. E.g., for a partition column "a" of IntegerType, "a=abc" is invalid.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    @yhuai Just checked the repair.q, it's kind of useless, already covered by out unit test, we could just ignore it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r74481008
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: $tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    --- End diff --
    
    It seems that the Hive Metastore can't handle a RPC with millions of partitions, I will send a patch to do it in batch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73753834
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala ---
    @@ -827,6 +827,45 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         testAddPartitions(isDatasourceTable = true)
       }
     
    +  test("alter table: recover partitions (sequential)") {
    +    withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
    +      testRecoverPartitions()
    +    }
    +  }
    +
    +  test("after table: recover partition (parallel)") {
    +    withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
    +      testRecoverPartitions()
    +    }
    +  }
    +
    +  private def testRecoverPartitions() {
    +    val catalog = spark.sessionState.catalog
    +    // table to alter does not exist
    +    intercept[AnalysisException] {
    +      sql("ALTER TABLE does_not_exist RECOVER PARTITIONS")
    +    }
    +
    +    val tableIdent = TableIdentifier("tab1")
    +    createTable(catalog, tableIdent)
    +    val part1 = Map("a" -> "1", "b" -> "5")
    +    createTablePartition(catalog, part1, tableIdent)
    +    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
    +
    +    val part2 = Map("a" -> "2", "b" -> "6")
    +    val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
    +    fs.mkdirs(new Path(new Path(root, "a=2"), "b=6"))
    +    try {
    +      sql("ALTER TABLE tab1 RECOVER PARTITIONS")
    +      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
    +        Set(part1, part2))
    +    } finally {
    +      fs.delete(root, true)
    +    }
    +  }
    --- End diff --
    
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    **[Test build #63243 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/63243/consoleFull)** for PR 14500 at commit [`c5edbdf`](https://github.com/apache/spark/commit/c5edbdfc3a9b11fce8b427488f106158133e40a0).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class AlterTableRecoverPartitionsCommand(`
      * `case class RepairTableCommand(tableName: TableIdentifier) extends RunnableCommand `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/63375/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73945656
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: $tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    +    Seq.empty[Row]
    +  }
    +
    +  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
    +
    +  private def scanPartitions(
    +      spark: SparkSession,
    +      fs: FileSystem,
    +      filter: PathFilter,
    +      path: Path,
    +      spec: TablePartitionSpec,
    +      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
    +    if (partitionNames.length == 0) {
    +      return Seq(spec -> path)
    +    }
    +
    +    val statuses = fs.listStatus(path)
    +    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
    +    val statusPar: GenSeq[FileStatus] =
    +      if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
    +        val parArray = statuses.par
    +        parArray.tasksupport = evalTaskSupport
    +        parArray
    +      } else {
    +        statuses
    +      }
    +    statusPar.flatMap { st =>
    +      val name = st.getPath.getName
    +      if (st.isDirectory && name.contains("=")) {
    +        val ps = name.split("=", 2)
    +        val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
    +        val value = PartitioningUtils.unescapePathName(ps(1))
    +        // comparing with case-insensitive, but preserve the case
    +        if (columnName == partitionNames(0)) {
    --- End diff --
    
    I think it's valid.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73619025
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---
    @@ -389,6 +389,50 @@ case class TruncateTableCommand(
     }
     
     /**
    + * A command to repair a table by discovery all the partitions in the directory.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   MSCK REPAIR TABLE table_name;
    + * }}}
    + *
    + * This command is the same as AlterTableRecoverPartitions
    + */
    +case class RepairTableCommand(tableName: TableIdentifier) extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    val table = catalog.getTableMetadata(tableName)
    +    if (!catalog.tableExists(tableName)) {
    --- End diff --
    
    This is a dead code. The previous line already checks whether the table exists or not.
    ```
    val table = catalog.getTableMetadata(tableName)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r74132132
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: $tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    --- End diff --
    
    Good question, see the implementation in HiveShim:
    ```
      // Follows exactly the same logic of DDLTask.createPartitions in Hive 0.12
      override def createPartitions(
          hive: Hive,
          database: String,
          tableName: String,
          parts: Seq[CatalogTablePartition],
          ignoreIfExists: Boolean): Unit = {
        val table = hive.getTable(database, tableName)
        parts.foreach { s =>
          val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull
          val spec = s.spec.asJava
          if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
            // Ignore this partition since it already exists and ignoreIfExists == true
          } else {
            if (location == null && table.isView()) {
              throw new HiveException("LOCATION clause illegal for view partition");
            }
    
            createPartitionMethod.invoke(
              hive,
              table,
              spec,
              location,
              null, // partParams
              null, // inputFormat
              null, // outputFormat
              -1: JInteger, // numBuckets
              null, // cols
              null, // serializationLib
              null, // serdeParams
              null, // bucketCols
              null) // sortCols
          }
        }
      }
    ```
    All these partitions will be insert into Hive in sequential way, so group them as batches will not help here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    **[Test build #63246 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/63246/consoleFull)** for PR 14500 at commit [`f338516`](https://github.com/apache/spark/commit/f3385166ff57143ef8b3cfac82aba7a8e958d6eb).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/63243/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    **[Test build #63375 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/63375/consoleFull)** for PR 14500 at commit [`e5906cf`](https://github.com/apache/spark/commit/e5906cf408f225b5023ed6d250f850fcd447168a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73730807
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +431,96 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Discover Partitions in ALTER TABLE: discover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table DISCOVER PARTITIONS;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier) extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(
    +        s"Table $tableName in ALTER TABLE RECOVER PARTITIONS does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on external " +
    +          s"tables: $tableName")
    +    }
    +    if (DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on partitioned " +
    +          s"tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on tables with " +
    +          s"location provided: $tableName")
    +    }
    +
    +    recoverPartitions(spark, table)
    +    Seq.empty[Row]
    +  }
    +
    +  def recoverPartitions(spark: SparkSession, table: CatalogTable): Unit = {
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    val partitionSpecsAndLocs = scanPartitions(spark, fs, root, Map(), table.partitionSchema.size)
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    +  }
    +
    +  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
    +
    +  private def scanPartitions(
    +      spark: SparkSession,
    +      fs: FileSystem,
    +      path: Path,
    +      spec: TablePartitionSpec,
    +      numPartitionsLeft: Int): GenSeq[(TablePartitionSpec, Path)] = {
    --- End diff --
    
    Let's see if we can reuse code in `PartitionUtils`. Also, path name can be escaped. We need to handle this kind of cases (we have `unescapePathName` in `PartitionUtils`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73729864
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
    @@ -409,6 +409,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       }
     
       /**
    +   * Create a [[RepairTableCommand]] command.
    +   *
    +   * For example:
    +   * {{{
    +   *   MSCK REPAIR TABLE tablename
    +   * }}}
    +   */
    +  override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
    +    RepairTableCommand(visitTableIdentifier(ctx.tableIdentifier))
    --- End diff --
    
    Yes, Hive support MSCK REPAIR TABLE, EMR support ALTER TABLE RECOVER PARTITIONS


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    Merging into master, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73619427
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---
    @@ -389,6 +389,50 @@ case class TruncateTableCommand(
     }
     
     /**
    + * A command to repair a table by discovery all the partitions in the directory.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   MSCK REPAIR TABLE table_name;
    + * }}}
    + *
    + * This command is the same as AlterTableRecoverPartitions
    + */
    +case class RepairTableCommand(tableName: TableIdentifier) extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    val table = catalog.getTableMetadata(tableName)
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in MSCK REPAIR TABLE does not exist.")
    +    }
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: MSCK REPAIR TABLE on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: MSCK REPAIR TABLE on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: MSCK REPAIR TABLE only works on external tables: $tableName")
    +    }
    +    if (DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: MSCK REPAIR TABLE only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: MSCK REPAIR TABLE only works on tables with location provided: " +
    --- End diff --
    
    Nit: we can remove `s`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by sameeragarwal <gi...@git.apache.org>.
Github user sameeragarwal commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73971757
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: $tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    +    Seq.empty[Row]
    +  }
    +
    +  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
    +
    +  private def scanPartitions(
    +      spark: SparkSession,
    +      fs: FileSystem,
    +      filter: PathFilter,
    +      path: Path,
    +      spec: TablePartitionSpec,
    +      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
    +    if (partitionNames.length == 0) {
    +      return Seq(spec -> path)
    +    }
    +
    +    val statuses = fs.listStatus(path)
    +    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
    +    val statusPar: GenSeq[FileStatus] =
    +      if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
    +        val parArray = statuses.par
    +        parArray.tasksupport = evalTaskSupport
    +        parArray
    +      } else {
    +        statuses
    +      }
    +    statusPar.flatMap { st =>
    +      val name = st.getPath.getName
    +      if (st.isDirectory && name.contains("=")) {
    +        val ps = name.split("=", 2)
    +        val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
    +        // TODO: Validate the value
    +        val value = PartitioningUtils.unescapePathName(ps(1))
    --- End diff --
    
    Can this escaping cause problems in (say) S3 for objects of the form "foo%20bar"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73654367
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
    @@ -779,6 +791,23 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       }
     
       /**
    +   * Create an [[AlterTableDiscoverPartitionsCommand]] command
    +   *
    +   * For example:
    +   * {{{
    +   *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
    --- End diff --
    
    Nit: Update the syntax and the comments here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73775993
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: $tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    +    Seq.empty[Row]
    +  }
    +
    +  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
    +
    +  private def scanPartitions(
    +      spark: SparkSession,
    +      fs: FileSystem,
    +      filter: PathFilter,
    +      path: Path,
    +      spec: TablePartitionSpec,
    +      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
    +    if (partitionNames.length == 0) {
    +      return Seq(spec -> path)
    +    }
    +
    +    val statuses = fs.listStatus(path)
    +    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
    +    val statusPar: GenSeq[FileStatus] =
    +      if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
    +        val parArray = statuses.par
    --- End diff --
    
    i didn't look carefully - but if you are using the default exec context, please create a new one. otherwise it'd block.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73655191
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---
    @@ -389,6 +389,50 @@ case class TruncateTableCommand(
     }
     
     /**
    + * A command to repair a table by discovery all the partitions in the directory.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   MSCK REPAIR TABLE table_name;
    + * }}}
    + *
    + * This command is the same as AlterTableRecoverPartitions
    + */
    +case class RepairTableCommand(tableName: TableIdentifier) extends RunnableCommand {
    --- End diff --
    
    Why do we have the command? It is exactly the same as the `AlterTableRecoverPartitionsCommand`, except for the wording of the errors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    @yhuai Could you help to generate the golden result for this suite?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    @liancheng Can you do a post-hoc review?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73945744
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: $tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    +    Seq.empty[Row]
    +  }
    +
    +  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
    +
    +  private def scanPartitions(
    +      spark: SparkSession,
    +      fs: FileSystem,
    +      filter: PathFilter,
    +      path: Path,
    +      spec: TablePartitionSpec,
    +      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
    +    if (partitionNames.length == 0) {
    +      return Seq(spec -> path)
    +    }
    +
    +    val statuses = fs.listStatus(path)
    +    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
    +    val statusPar: GenSeq[FileStatus] =
    +      if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
    +        val parArray = statuses.par
    +        parArray.tasksupport = evalTaskSupport
    +        parArray
    +      } else {
    +        statuses
    +      }
    +    statusPar.flatMap { st =>
    +      val name = st.getPath.getName
    +      if (st.isDirectory && name.contains("=")) {
    +        val ps = name.split("=", 2)
    +        val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
    +        val value = PartitioningUtils.unescapePathName(ps(1))
    --- End diff --
    
    We could have a TODO here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by sameeragarwal <gi...@git.apache.org>.
Github user sameeragarwal commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r74110917
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: $tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    +    Seq.empty[Row]
    +  }
    +
    +  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
    +
    +  private def scanPartitions(
    +      spark: SparkSession,
    +      fs: FileSystem,
    +      filter: PathFilter,
    +      path: Path,
    +      spec: TablePartitionSpec,
    +      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
    +    if (partitionNames.length == 0) {
    +      return Seq(spec -> path)
    +    }
    +
    +    val statuses = fs.listStatus(path)
    +    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
    +    val statusPar: GenSeq[FileStatus] =
    +      if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
    +        val parArray = statuses.par
    +        parArray.tasksupport = evalTaskSupport
    +        parArray
    +      } else {
    +        statuses
    +      }
    +    statusPar.flatMap { st =>
    +      val name = st.getPath.getName
    +      if (st.isDirectory && name.contains("=")) {
    +        val ps = name.split("=", 2)
    +        val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
    +        // TODO: Validate the value
    +        val value = PartitioningUtils.unescapePathName(ps(1))
    --- End diff --
    
    yes, that makes sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    We do not generate golden files anymore. Let's port those tests. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    **[Test build #63242 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/63242/consoleFull)** for PR 14500 at commit [`c5edbdf`](https://github.com/apache/spark/commit/c5edbdfc3a9b11fce8b427488f106158133e40a0).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    **[Test build #63279 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/63279/consoleFull)** for PR 14500 at commit [`7f4f38d`](https://github.com/apache/spark/commit/7f4f38d0051c0ad25bcd8406a18f6ef0b759b0ef).
     * This patch passes all tests.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73619605
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +431,96 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Discover Partitions in ALTER TABLE: discover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table DISCOVER PARTITIONS;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier) extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(
    +        s"Table $tableName in ALTER TABLE RECOVER PARTITIONS does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on external " +
    +          s"tables: $tableName")
    +    }
    +    if (DDLUtils.isTablePartitioned(table)) {
    --- End diff --
    
    Here, the same issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    **[Test build #63279 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/63279/consoleFull)** for PR 14500 at commit [`7f4f38d`](https://github.com/apache/spark/commit/7f4f38d0051c0ad25bcd8406a18f6ef0b759b0ef).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73730136
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
    @@ -409,6 +409,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       }
     
       /**
    +   * Create a [[RepairTableCommand]] command.
    +   *
    +   * For example:
    +   * {{{
    +   *   MSCK REPAIR TABLE tablename
    +   * }}}
    +   */
    +  override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
    +    RepairTableCommand(visitTableIdentifier(ctx.tableIdentifier))
    --- End diff --
    
    I see. How about we use a single command internally?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r74094542
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: $tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    +    Seq.empty[Row]
    +  }
    +
    +  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
    +
    +  private def scanPartitions(
    +      spark: SparkSession,
    +      fs: FileSystem,
    +      filter: PathFilter,
    +      path: Path,
    +      spec: TablePartitionSpec,
    +      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
    +    if (partitionNames.length == 0) {
    +      return Seq(spec -> path)
    +    }
    +
    +    val statuses = fs.listStatus(path)
    +    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
    +    val statusPar: GenSeq[FileStatus] =
    +      if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
    +        val parArray = statuses.par
    --- End diff --
    
    This is copied from UnionRDD.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by sameeragarwal <gi...@git.apache.org>.
Github user sameeragarwal commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73975249
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: $tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    +    Seq.empty[Row]
    +  }
    +
    +  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
    +
    +  private def scanPartitions(
    +      spark: SparkSession,
    +      fs: FileSystem,
    +      filter: PathFilter,
    +      path: Path,
    +      spec: TablePartitionSpec,
    +      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
    +    if (partitionNames.length == 0) {
    +      return Seq(spec -> path)
    +    }
    +
    +    val statuses = fs.listStatus(path)
    +    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
    +    val statusPar: GenSeq[FileStatus] =
    +      if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
    +        val parArray = statuses.par
    +        parArray.tasksupport = evalTaskSupport
    +        parArray
    +      } else {
    +        statuses
    +      }
    +    statusPar.flatMap { st =>
    +      val name = st.getPath.getName
    +      if (st.isDirectory && name.contains("=")) {
    +        val ps = name.split("=", 2)
    +        val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
    +        // TODO: Validate the value
    +        val value = PartitioningUtils.unescapePathName(ps(1))
    +        // comparing with case-insensitive, but preserve the case
    +        if (columnName == partitionNames(0)) {
    +          scanPartitions(
    +            spark, fs, filter, st.getPath, spec ++ Map(columnName -> value), partitionNames.drop(1))
    +        } else {
    +          logWarning(s"expect partition column ${partitionNames(0)}, but got ${ps(0)}, ignore it")
    +          Seq()
    --- End diff --
    
    Like hive, we may consider throwing an exception here (that could be turned off via a config).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73670923
  
    --- Diff: sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ---
    @@ -121,6 +122,7 @@ statement
         | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
             tableIdentifier partitionSpec?                                 #loadData
         | TRUNCATE TABLE tableIdentifier partitionSpec?                    #truncateTable
    +    | MSCK REPAIR TABLE tableIdentifier                                     #repairTable
    --- End diff --
    
    Nit: outline


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73621345
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +431,96 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Discover Partitions in ALTER TABLE: discover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table DISCOVER PARTITIONS;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier) extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(
    +        s"Table $tableName in ALTER TABLE RECOVER PARTITIONS does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on external " +
    +          s"tables: $tableName")
    +    }
    +    if (DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on partitioned " +
    +          s"tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on tables with " +
    +          s"location provided: $tableName")
    +    }
    +
    +    recoverPartitions(spark, table)
    +    Seq.empty[Row]
    +  }
    +
    +  def recoverPartitions(spark: SparkSession, table: CatalogTable): Unit = {
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    val partitionSpecsAndLocs = scanPartitions(spark, fs, root, Map(), table.partitionSchema.size)
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    +  }
    +
    +  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
    +
    +  private def scanPartitions(
    +      spark: SparkSession,
    +      fs: FileSystem,
    +      path: Path,
    +      spec: TablePartitionSpec,
    +      numPartitionsLeft: Int): GenSeq[(TablePartitionSpec, Path)] = {
    +    if (numPartitionsLeft == 0) {
    +      return Seq(spec -> path)
    +    }
    +
    +    val statuses = fs.listStatus(path)
    +    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
    +    val statusPar: GenSeq[FileStatus] =
    +      if (numPartitionsLeft > 1 && statuses.length > threshold || numPartitionsLeft > 2) {
    --- End diff --
    
    This condition looks confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73728488
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
    @@ -409,6 +409,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       }
     
       /**
    +   * Create a [[RepairTableCommand]] command.
    +   *
    +   * For example:
    +   * {{{
    +   *   MSCK REPAIR TABLE tablename
    +   * }}}
    +   */
    +  override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
    +    RepairTableCommand(visitTableIdentifier(ctx.tableIdentifier))
    --- End diff --
    
    Are AlterTableRecoverPartitionsCommand and RepairTableCommand the same?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/14500


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r74123952
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: $tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    --- End diff --
    
    What will happen if we get thousands of new partitions of tens thousands of new partitions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #14500: [SPARK-16905] SQL DDL: MSCK REPAIR TABLE

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14500#discussion_r73780390
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -425,6 +430,110 @@ case class AlterTableDropPartitionCommand(
     
     }
     
    +/**
    + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
    + * update the catalog.
    + *
    + * The syntax of this command is:
    + * {{{
    + *   ALTER TABLE table RECOVER PARTITIONS;
    + *   MSCK REPAIR TABLE table;
    + * }}}
    + */
    +case class AlterTableRecoverPartitionsCommand(
    +    tableName: TableIdentifier,
    +    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
    +  override def run(spark: SparkSession): Seq[Row] = {
    +    val catalog = spark.sessionState.catalog
    +    if (!catalog.tableExists(tableName)) {
    +      throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
    +    }
    +    val table = catalog.getTableMetadata(tableName)
    +    if (catalog.isTemporaryTable(tableName)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on temporary tables: $tableName")
    +    }
    +    if (DDLUtils.isDatasourceTable(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd on datasource tables: $tableName")
    +    }
    +    if (table.tableType != CatalogTableType.EXTERNAL) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on external tables: $tableName")
    +    }
    +    if (!DDLUtils.isTablePartitioned(table)) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
    +    }
    +    if (table.storage.locationUri.isEmpty) {
    +      throw new AnalysisException(
    +        s"Operation not allowed: $cmd only works on table with location provided: $tableName")
    +    }
    +
    +    val root = new Path(table.storage.locationUri.get)
    +    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
    +    // Dummy jobconf to get to the pathFilter defined in configuration
    +    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
    +    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
    +    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
    +    val partitionSpecsAndLocs = scanPartitions(
    +      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
    +    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
    +      // inherit table storage format (possibly except for location)
    +      CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
    +    }
    +    spark.sessionState.catalog.createPartitions(tableName,
    +      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
    +    Seq.empty[Row]
    +  }
    +
    +  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
    +
    +  private def scanPartitions(
    +      spark: SparkSession,
    +      fs: FileSystem,
    +      filter: PathFilter,
    +      path: Path,
    +      spec: TablePartitionSpec,
    +      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
    +    if (partitionNames.length == 0) {
    +      return Seq(spec -> path)
    +    }
    +
    +    val statuses = fs.listStatus(path)
    +    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
    +    val statusPar: GenSeq[FileStatus] =
    +      if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
    +        val parArray = statuses.par
    +        parArray.tasksupport = evalTaskSupport
    +        parArray
    +      } else {
    +        statuses
    +      }
    +    statusPar.flatMap { st =>
    +      val name = st.getPath.getName
    +      if (st.isDirectory && name.contains("=")) {
    +        val ps = name.split("=", 2)
    +        val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
    +        val value = PartitioningUtils.unescapePathName(ps(1))
    +        // comparing with case-insensitive, but preserve the case
    +        if (columnName == partitionNames(0)) {
    --- End diff --
    
    A directory name like "a=" will pass this condition and get empty partition value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #14500: [SPARK-] SQL DDL: MSCK REPAIR TABLE

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/14500
  
    **[Test build #63242 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/63242/consoleFull)** for PR 14500 at commit [`c5edbdf`](https://github.com/apache/spark/commit/c5edbdfc3a9b11fce8b427488f106158133e40a0).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class AlterTableRecoverPartitionsCommand(`
      * `case class RepairTableCommand(tableName: TableIdentifier) extends RunnableCommand `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org