You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by cloud-fan <gi...@git.apache.org> on 2015/12/28 15:28:59 UTC

[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

GitHub user cloud-fan opened a pull request:

    https://github.com/apache/spark/pull/10498

    [SPARK-12539][SQL][WIP] support writing bucketed table

    Done:
    
    * add bucket info in write path
    * support writing bucketed `HadoopFsRelation`
    
    TODO:
    
    * figure out a way to store metadata for `HadoopFsRelation` without hive meta store.
    * figure out how to insert bucketed data into empty hive table. (currently we just use hash map to [keep all writers](https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala#L249), which maybe expensive when we have bucketing)
    * when we append bucketed data to existing `HadoopFsRelation`, we may have multi-files for one bucket, is that safe?
    * hive support having bucketing without partitioning, should we support it?
    * more safe check(fail fast if the given bucket info is wrong)
    * code refine(a better way to delivery partitioning and bucketing info though write path)
    * add test(how to test it before implement read path?)
    * add doc


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cloud-fan/spark bucket-write

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/10498.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #10498
    
----
commit 8cb24942890153bad70e003110a28d6111f0c407
Author: Wenchen Fan <we...@databricks.com>
Date:   2015-12-28T12:15:34Z

    write bucketed table

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r49002660
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -335,6 +339,117 @@ private[sql] class DynamicPartitionWriterContainer(
           val partitionName = Literal(c.name + "=") :: str :: Nil
           if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
         }
    +  }
    +
    +  private def getBucketIdFromKey(key: InternalRow): Option[Int] = {
    +    if (bucketSpec.isDefined) {
    +      Some(key.getInt(partitionColumns.length))
    +    } else {
    +      None
    +    }
    +  }
    +
    +  private def sameBucket(key1: UnsafeRow, key2: UnsafeRow): Boolean = {
    +    val bucketIdIndex = partitionColumns.length
    +    if (key1.getInt(bucketIdIndex) != key2.getInt(bucketIdIndex)) {
    +      false
    +    } else {
    +      var i = partitionColumns.length - 1
    +      while (i >= 0) {
    +        val dt = partitionColumns(i).dataType
    +        if (key1.get(i, dt) != key2.get(i, dt)) return false
    +        i -= 1
    +      }
    +      true
    +    }
    +  }
    +
    +  private def sortBasedWrite(
    +      sorter: UnsafeKVExternalSorter,
    +      iterator: Iterator[InternalRow],
    +      getSortingKey: UnsafeProjection,
    +      getOutputRow: UnsafeProjection,
    +      getPartitionString: UnsafeProjection,
    +      outputWriters: java.util.HashMap[InternalRow, OutputWriter]): Unit = {
    +    while (iterator.hasNext) {
    +      val currentRow = iterator.next()
    +      sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
    +    }
    +
    +    logInfo(s"Sorting complete. Writing out partition files one at a time.")
    +
    +    val needNewWriter: (UnsafeRow, UnsafeRow) => Boolean = if (sortColumns.isEmpty) {
    +      (key1, key2) => key1 != key2
    +    } else {
    +      (key1, key2) => key1 == null || !sameBucket(key1, key2)
    +    }
    +
    +    val sortedIterator = sorter.sortedIterator()
    +    var currentKey: UnsafeRow = null
    +    var currentWriter: OutputWriter = null
    +    try {
    +      while (sortedIterator.next()) {
    +        if (needNewWriter(currentKey, sortedIterator.getKey)) {
    --- End diff --
    
    I think this is fine. The rest of this path is much more expensive than this function call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48996768
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---
    @@ -161,6 +161,20 @@ trait HadoopFsRelationProvider {
           dataSchema: Option[StructType],
           partitionColumns: Option[StructType],
           parameters: Map[String, String]): HadoopFsRelation
    +
    +  // TODO: expose bucket API to users.
    --- End diff --
    
    I'd remove this TODO.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-169313893
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48854/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r49021259
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -342,22 +461,34 @@ private[sql] class DynamicPartitionWriterContainer(
     
         // If anything below fails, we should abort the task.
         try {
    -      // This will be filled in if we have to fall back on sorting.
    -      var sorter: UnsafeKVExternalSorter = null
    +      // If there is no sorting columns, we set sorter to null and try the hash-based writing first,
    +      // and fill the sorter if there are too many writers and we need to fall back on sorting.
    +      // If there are sorting columns, then we have to sort the data anyway, and no need to try the
    --- End diff --
    
    +1.
    I'd vote for always do the sorting. In my benchmark, even for 1 or 2 partitions, hash based is only 10% faster than sort based one, but will reduce the complicity a lot, especially when considering how to manage the memory for writers.  cc @marmbrus  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-167596712
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48609411
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---
    @@ -351,7 +352,11 @@ abstract class OutputWriterFactory extends Serializable {
        *
        * @since 1.4.0
        */
    -  def newInstance(path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter
    +  def newInstance(
    +      path: String,
    +      bucketId: Option[Int],
    +      dataSchema: StructType,
    +      context: TaskAttemptContext): OutputWriter
    --- End diff --
    
    We probably should overload `newInstance` here and fallback to the old one since `OutputWriterFactory.newInstance()` is part of the public data sources API (it's marked as experimental, though).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48819110
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala ---
    @@ -239,12 +240,21 @@ object ResolvedDataSource extends Logging {
             val equality = columnNameEquality(caseSensitive)
             val dataSchema = StructType(
               data.schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
    -        val r = dataSource.createRelation(
    -          sqlContext,
    -          Array(outputPath.toString),
    -          Some(dataSchema.asNullable),
    -          Some(partitionColumnsSchema(data.schema, partitionColumns, caseSensitive)),
    -          caseInsensitiveOptions)
    +        val r = dataSource match {
    +          case provider: BucketedHadoopFsRelationProvider => provider.createRelation(
    +            sqlContext,
    +            Array(outputPath.toString),
    +            Some(dataSchema.asNullable),
    --- End diff --
    
    replace Some with Option


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48588647
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -317,14 +318,39 @@ private[sql] class DynamicPartitionWriterContainer(
         isAppend: Boolean)
       extends BaseWriterContainer(relation, job, isAppend) {
     
    +  private def toAttribute(columnName: String) = inputSchema.find(_.name == columnName).get
    +
       def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
         val outputWriters = new java.util.HashMap[InternalRow, OutputWriter]
         executorSideSetup(taskContext)
     
         var outputWritersCleared = false
     
    -    // Returns the partition key given an input row
    -    val getPartitionKey = UnsafeProjection.create(partitionColumns, inputSchema)
    +    val getKey = if (bucketSpec.isEmpty) {
    --- End diff --
    
    This logic is a bit dense. Might be nice to add a quick comment with how you are setting these up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48820170
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -189,13 +220,43 @@ final class DataFrameWriter private[sql](df: DataFrame) {
             ifNotExists = false)).toRdd
       }
     
    -  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { parCols =>
    -    parCols.map { col =>
    -      df.logicalPlan.output
    -        .map(_.name)
    -        .find(df.sqlContext.analyzer.resolver(_, col))
    -        .getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
    -          s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
    +  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols =>
    +    cols.map(normalize(_, "Partition"))
    +  }
    +
    +  private def normalizedBucketCols: Option[Seq[String]] = bucketingColumns.map { cols =>
    +    cols.map(normalize(_, "Bucketing"))
    +  }
    +
    +  private def normalizedSortCols: Option[Seq[String]] = sortingColumns.map { cols =>
    +    cols.map(normalize(_, "Sorting"))
    +  }
    +
    +  private def getBucketSpec: Option[BucketSpec] = {
    +    if (sortingColumns.isDefined) {
    +      require(numBuckets.isDefined, "sortBy must be used together with bucketBy")
    +    }
    +
    +    for {
    +      n <- numBuckets
    +      cols <- normalizedBucketCols
    +    } yield {
    +      require(n > 0, "Bucket number must be greater than 0.")
    +      BucketSpec(n, cols, normalizedSortCols)
    +    }
    +  }
    +
    +  private def normalize(columnName: String, columnType: String): String = {
    +    val validColumnNames = df.logicalPlan.output.map(_.name)
    +    validColumnNames.find(df.sqlContext.analyzer.resolver(_, columnName))
    +      .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
    +        s"existing columns (${validColumnNames.mkString(", ")})"))
    +  }
    +
    +  private def assertNotBucketed(): Unit = {
    +    if (numBuckets.isDefined || sortingColumns.isDefined) {
    --- End diff --
    
    I think sorting columns make no sense without bucketing columns, cc @nongli @yhuai 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48818349
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -117,7 +117,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
     
       /**
        * Partitions the output by the given columns on the file system. If specified, the output is
    -   * laid out on the file system similar to Hive's partitioning scheme.
    +   * laid out on the file system similar to Hive's partitioning schema.
    --- End diff --
    
    I think scheme is actually the correct word here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48819162
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala ---
    @@ -0,0 +1,89 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources
    +
    +import org.apache.hadoop.mapreduce.TaskAttemptContext
    +import org.apache.spark.sql.SQLContext
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.sources.{OutputWriter, OutputWriterFactory, HadoopFsRelationProvider, HadoopFsRelation}
    +import org.apache.spark.sql.types.StructType
    +
    +private[sql] case class BucketSpec(
    --- End diff --
    
    you should have some internal javadoc that explains what bucket is, and what the 3 parameters mean.
    
    also it would be great to be consistent w.r.t. naming throughout the code base. Right now you are using bucketingColumns, bucketCols in different places.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168878213
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48819062
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala ---
    @@ -0,0 +1,89 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources
    +
    +import org.apache.hadoop.mapreduce.TaskAttemptContext
    +import org.apache.spark.sql.SQLContext
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.sources.{OutputWriter, OutputWriterFactory, HadoopFsRelationProvider, HadoopFsRelation}
    +import org.apache.spark.sql.types.StructType
    +
    +private[sql] case class BucketSpec(
    +    numBuckets: Int,
    +    bucketingColumns: Seq[String],
    +    sortingColumns: Option[Seq[String]]) {
    +
    +  def resolvedBucketingColumns(inputSchema: Seq[Attribute]): Seq[Attribute] = {
    +    bucketingColumns.map { col =>
    +      inputSchema.find(_.name == col).get
    +    }
    +  }
    +
    +  def resolvedSortingColumns(inputSchema: Seq[Attribute]): Seq[Attribute] = {
    +    if (sortingColumns.isDefined) {
    --- End diff --
    
    is this just sortingColumns.flatMap ... ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48924160
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources
    +
    +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
    +import org.apache.spark.sql.SQLContext
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.sources.{OutputWriter, OutputWriterFactory, HadoopFsRelationProvider, HadoopFsRelation}
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * A container for bucketing information.
    + * Bucketing is a technology for decomposing data sets into more manageable parts, and the number
    + * of buckets is fixed so it does not fluctuate with data.
    + *
    + * @param numBuckets number of buckets.
    + * @param bucketColumnNames the names of the columns that used to generate the bucket id.
    + * @param sortColumnNames the names of the columns that used to sort data in each bucket.
    + */
    +private[sql] case class BucketSpec(
    +    numBuckets: Int,
    +    bucketColumnNames: Seq[String],
    +    sortColumnNames: Seq[String])
    +
    +private[sql] trait BucketedHadoopFsRelationProvider extends HadoopFsRelationProvider {
    --- End diff --
    
    Not right now.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48821710
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
    @@ -240,6 +241,23 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
           }
         }
     
    +    if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
    +      val BucketSpec(numBuckets, bucketColumns, sortColumns) = bucketSpec.get
    +
    +      tableProperties.put("spark.sql.sources.schema.numBuckets", numBuckets.toString)
    +      tableProperties.put("spark.sql.sources.schema.numBucketCols", bucketColumns.length.toString)
    +      bucketColumns.zipWithIndex.foreach { case (bucketCol, index) =>
    +        tableProperties.put(s"spark.sql.sources.schema.bucketCol.$index", bucketCol)
    +      }
    +
    +      if (sortColumns.isDefined) {
    +        tableProperties.put("spark.sql.sources.schema.numSortCols", sortColumns.get.length.toString)
    --- End diff --
    
    are we worried about the 4k limit and as a result want to limit the size of each property?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10498: [SPARK-12539][SQL] support writing bucketed table

Posted by infinitymittal <gi...@git.apache.org>.
Github user infinitymittal commented on the issue:

    https://github.com/apache/spark/pull/10498
  
    @tejasapatil Thanks for the response. SPARK-17729 says "Spark still won't produce bucketed data as per Hive's bucketing guarantees". I want the data to be bucketed when written. Any further leads?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168685716
  
    **[Test build #48659 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48659/consoleFull)** for PR 10498 at commit [`21e0c48`](https://github.com/apache/spark/commit/21e0c48e83319f7319ba339deb2bffde0188583d).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/10498


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168717374
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48588277
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -129,6 +129,19 @@ final class DataFrameWriter private[sql](df: DataFrame) {
         this
       }
     
    +  @scala.annotation.varargs
    --- End diff --
    
    and the @since annotations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r49002565
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -335,6 +339,121 @@ private[sql] class DynamicPartitionWriterContainer(
           val partitionName = Literal(c.name + "=") :: str :: Nil
           if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
         }
    +  }
    +
    +  private def getBucketIdFromKey(key: InternalRow): Option[Int] = {
    +    if (bucketSpec.isDefined) {
    +      Some(key.getInt(partitionColumns.length))
    +    } else {
    +      None
    +    }
    +  }
    +
    +  private def sameBucket(key1: UnsafeRow, key2: UnsafeRow): Boolean = {
    +    val bucketIdIndex = partitionColumns.length
    +    if (key1.getInt(bucketIdIndex) != key2.getInt(bucketIdIndex)) {
    +      false
    +    } else {
    +      var i = partitionColumns.length - 1
    +      while (i >= 0) {
    +        val dt = partitionColumns(i).dataType
    +        if (key1.get(i, dt) != key2.get(i, dt)) return false
    +        i -= 1
    +      }
    +      true
    +    }
    +  }
    +
    +  private def sortBasedWrite(
    +      sorter: UnsafeKVExternalSorter,
    +      iterator: Iterator[InternalRow],
    +      getSortingKey: UnsafeProjection,
    +      getOutputRow: UnsafeProjection,
    +      getPartitionString: UnsafeProjection,
    +      outputWriters: java.util.HashMap[InternalRow, OutputWriter]): Unit = {
    --- End diff --
    
    unused?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-169309910
  
    **[Test build #48854 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48854/consoleFull)** for PR 10498 at commit [`d3200cf`](https://github.com/apache/spark/commit/d3200cf8bdffaf0025b25c30d7c3d9fef8a6f9a8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-167823882
  
    **[Test build #48415 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48415/consoleFull)** for PR 10498 at commit [`a9dc997`](https://github.com/apache/spark/commit/a9dc99722bfea886c6381abbd2e1e9366fcf9064).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r49026005
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -335,6 +339,121 @@ private[sql] class DynamicPartitionWriterContainer(
           val partitionName = Literal(c.name + "=") :: str :: Nil
           if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
         }
    +  }
    +
    +  private def getBucketIdFromKey(key: InternalRow): Option[Int] = {
    +    if (bucketSpec.isDefined) {
    +      Some(key.getInt(partitionColumns.length))
    +    } else {
    +      None
    +    }
    +  }
    +
    +  private def sameBucket(key1: UnsafeRow, key2: UnsafeRow): Boolean = {
    +    val bucketIdIndex = partitionColumns.length
    +    if (key1.getInt(bucketIdIndex) != key2.getInt(bucketIdIndex)) {
    +      false
    +    } else {
    +      var i = partitionColumns.length - 1
    +      while (i >= 0) {
    +        val dt = partitionColumns(i).dataType
    +        if (key1.get(i, dt) != key2.get(i, dt)) return false
    +        i -= 1
    +      }
    +      true
    +    }
    +  }
    +
    +  private def sortBasedWrite(
    +      sorter: UnsafeKVExternalSorter,
    +      iterator: Iterator[InternalRow],
    +      getSortingKey: UnsafeProjection,
    +      getOutputRow: UnsafeProjection,
    +      getPartitionString: UnsafeProjection,
    +      outputWriters: java.util.HashMap[InternalRow, OutputWriter]): Unit = {
    +    while (iterator.hasNext) {
    +      val currentRow = iterator.next()
    +      sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
    +    }
    +
    +    logInfo(s"Sorting complete. Writing out partition files one at a time.")
    +
    +    val needNewWriter: (UnsafeRow, UnsafeRow) => Boolean = if (sortColumns.isEmpty) {
    +      (key1, key2) => key1 != key2
    +    } else {
    +      (key1, key2) => key1 == null || !sameBucket(key1, key2)
    +    }
    +
    +    val sortedIterator = sorter.sortedIterator()
    +    var currentKey: UnsafeRow = null
    +    var currentWriter: OutputWriter = null
    +    try {
    +      while (sortedIterator.next()) {
    +        if (needNewWriter(currentKey, sortedIterator.getKey)) {
    +          if (currentWriter != null) {
    +            currentWriter.close()
    +          }
    +          currentKey = sortedIterator.getKey.copy()
    +          logDebug(s"Writing partition: $currentKey")
    +
    +          // Either use an existing file from before, or open a new one.
    +          currentWriter = outputWriters.remove(currentKey)
    +          if (currentWriter == null) {
    +            currentWriter = newOutputWriter(currentKey, getPartitionString)
    +          }
    +        }
    +
    +        currentWriter.writeInternal(sortedIterator.getValue)
    +      }
    +    } finally {
    +      if (currentWriter != null) { currentWriter.close() }
    +    }
    +  }
    +
    +  /**
    +   * Open and returns a new OutputWriter given a partition key and optional bucket id.
    +   * If bucket id is specified, we will append it to the end of the file name, but before the
    +   * file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet
    --- End diff --
    
    Having the bucket id at the very last could break some other applications, that rely on the file extension (to recognized the file format), so don't do that. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48591458
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -317,14 +318,39 @@ private[sql] class DynamicPartitionWriterContainer(
         isAppend: Boolean)
       extends BaseWriterContainer(relation, job, isAppend) {
     
    +  private def toAttribute(columnName: String) = inputSchema.find(_.name == columnName).get
    +
       def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
         val outputWriters = new java.util.HashMap[InternalRow, OutputWriter]
         executorSideSetup(taskContext)
     
         var outputWritersCleared = false
     
    -    // Returns the partition key given an input row
    -    val getPartitionKey = UnsafeProjection.create(partitionColumns, inputSchema)
    +    val getKey = if (bucketSpec.isEmpty) {
    +      UnsafeProjection.create(partitionColumns, inputSchema)
    +    } else {
    +      val BucketSpec(numBuckets, bucketColumns, sortColumns) = bucketSpec.get
    +      val bucketIdExpr = Pmod(Hash(bucketColumns.map(toAttribute)), Literal(numBuckets))
    +      val sortingAttrs = sortColumns.map(_.map(toAttribute)).getOrElse(Nil)
    +      UnsafeProjection.create(partitionColumns ++ (bucketIdExpr +: sortingAttrs), inputSchema)
    --- End diff --
    
    The logic is: if we have sorting columns, then we will immediately sort input rows, and won't use the hash map to keep all writers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168696115
  
    **[Test build #48663 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48663/consoleFull)** for PR 10498 at commit [`70ebd69`](https://github.com/apache/spark/commit/70ebd69190e1ebd27362e17240b20bf60b5fdf16).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168878214
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48701/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48823466
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
    @@ -240,6 +241,23 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
           }
         }
     
    +    if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
    +      val BucketSpec(numBuckets, bucketColumns, sortColumns) = bucketSpec.get
    +
    +      tableProperties.put("spark.sql.sources.schema.numBuckets", numBuckets.toString)
    +      tableProperties.put("spark.sql.sources.schema.numBucketCols", bucketColumns.length.toString)
    +      bucketColumns.zipWithIndex.foreach { case (bucketCol, index) =>
    +        tableProperties.put(s"spark.sql.sources.schema.bucketCol.$index", bucketCol)
    +      }
    +
    +      if (sortColumns.isDefined) {
    +        tableProperties.put("spark.sql.sources.schema.numSortCols", sortColumns.get.length.toString)
    --- End diff --
    
    I think so. Looks like it's not a problem for partition/bucket/sort columns, shall we just use a delimiter and store all column names in one line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48818408
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -189,13 +220,43 @@ final class DataFrameWriter private[sql](df: DataFrame) {
             ifNotExists = false)).toRdd
       }
     
    -  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { parCols =>
    -    parCols.map { col =>
    -      df.logicalPlan.output
    -        .map(_.name)
    -        .find(df.sqlContext.analyzer.resolver(_, col))
    -        .getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
    -          s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
    +  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols =>
    +    cols.map(normalize(_, "Partition"))
    +  }
    +
    +  private def normalizedBucketCols: Option[Seq[String]] = bucketingColumns.map { cols =>
    +    cols.map(normalize(_, "Bucketing"))
    +  }
    +
    +  private def normalizedSortCols: Option[Seq[String]] = sortingColumns.map { cols =>
    +    cols.map(normalize(_, "Sorting"))
    +  }
    +
    +  private def getBucketSpec: Option[BucketSpec] = {
    +    if (sortingColumns.isDefined) {
    +      require(numBuckets.isDefined, "sortBy must be used together with bucketBy")
    +    }
    +
    +    for {
    +      n <- numBuckets
    +      cols <- normalizedBucketCols
    +    } yield {
    +      require(n > 0, "Bucket number must be greater than 0.")
    +      BucketSpec(n, cols, normalizedSortCols)
    +    }
    +  }
    +
    +  private def normalize(columnName: String, columnType: String): String = {
    +    val validColumnNames = df.logicalPlan.output.map(_.name)
    +    validColumnNames.find(df.sqlContext.analyzer.resolver(_, columnName))
    +      .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
    +        s"existing columns (${validColumnNames.mkString(", ")})"))
    +  }
    +
    +  private def assertNotBucketed(): Unit = {
    +    if (numBuckets.isDefined || sortingColumns.isDefined) {
    --- End diff --
    
    if bucket == false and sort == true, we should just write the normal Spark (not Hive) partitions but sorted within each partition, no?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168675138
  
    **[Test build #48658 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48658/consoleFull)** for PR 10498 at commit [`ba23292`](https://github.com/apache/spark/commit/ba2329261740b08bd1a19dc8be0ef281281b84c9).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r49034956
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala ---
    @@ -125,7 +125,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
                  |Actual: ${partitionColumns.mkString(", ")}
               """.stripMargin)
     
    -        val writerContainer = if (partitionColumns.isEmpty) {
    +        val writerContainer = if (partitionColumns.isEmpty && relation.bucketSpec.isEmpty) {
    --- End diff --
    
    I see, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168017792
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48887994
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -130,6 +130,34 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       }
     
       /**
    +   * Buckets the output by the given columns on the file system. If specified, the output is
    --- End diff --
    
    Remove "on the file system" in the first sentence.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10498: [SPARK-12539][SQL] support writing bucketed table

Posted by infinitymittal <gi...@git.apache.org>.
Github user infinitymittal commented on the issue:

    https://github.com/apache/spark/pull/10498
  
    Hi,
    
    There is the limitation of "Can't insert bucketed data into existing hive tables.". Do we have any plans to relax the same? I want to insert data using a query into an already existing table.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-169317899
  
    **[Test build #48856 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48856/consoleFull)** for PR 10498 at commit [`1afd3ee`](https://github.com/apache/spark/commit/1afd3ee78484ce56dd04446bd43adab96a677411).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-169021962
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48770/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168921838
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48737/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48588632
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -317,14 +318,39 @@ private[sql] class DynamicPartitionWriterContainer(
         isAppend: Boolean)
       extends BaseWriterContainer(relation, job, isAppend) {
     
    +  private def toAttribute(columnName: String) = inputSchema.find(_.name == columnName).get
    +
       def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
         val outputWriters = new java.util.HashMap[InternalRow, OutputWriter]
         executorSideSetup(taskContext)
     
         var outputWritersCleared = false
     
    -    // Returns the partition key given an input row
    -    val getPartitionKey = UnsafeProjection.create(partitionColumns, inputSchema)
    +    val getKey = if (bucketSpec.isEmpty) {
    +      UnsafeProjection.create(partitionColumns, inputSchema)
    +    } else {
    +      val BucketSpec(numBuckets, bucketColumns, sortColumns) = bucketSpec.get
    +      val bucketIdExpr = Pmod(Hash(bucketColumns.map(toAttribute)), Literal(numBuckets))
    +      val sortingAttrs = sortColumns.map(_.map(toAttribute)).getOrElse(Nil)
    +      UnsafeProjection.create(partitionColumns ++ (bucketIdExpr +: sortingAttrs), inputSchema)
    --- End diff --
    
    The sorting columns shouldn't be part of the key I don't think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48610578
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -344,66 +371,129 @@ private[sql] class DynamicPartitionWriterContainer(
     
         // If anything below fails, we should abort the task.
         try {
    -      // This will be filled in if we have to fall back on sorting.
    -      var sorter: UnsafeKVExternalSorter = null
    -      while (iterator.hasNext && sorter == null) {
    -        val inputRow = iterator.next()
    -        val currentKey = getPartitionKey(inputRow)
    -        var currentWriter = outputWriters.get(currentKey)
    -
    -        if (currentWriter == null) {
    -          if (outputWriters.size < maxOpenFiles) {
    -            currentWriter = newOutputWriter(currentKey)
    -            outputWriters.put(currentKey.copy(), currentWriter)
    -            currentWriter.writeInternal(getOutputRow(inputRow))
    -          } else {
    -            logInfo(s"Maximum partitions reached, falling back on sorting.")
    -            sorter = new UnsafeKVExternalSorter(
    -              StructType.fromAttributes(partitionColumns),
    -              StructType.fromAttributes(dataColumns),
    -              SparkEnv.get.blockManager,
    -              TaskContext.get().taskMemoryManager().pageSizeBytes)
    -            sorter.insertKV(currentKey, getOutputRow(inputRow))
    +      val mustSort = bucketSpec.isDefined && bucketSpec.get.sortingColumns.isDefined
    +      // TODO: remove duplicated code.
    +      if (mustSort) {
    +        val bucketColumns = bucketSpec.get.resolvedBucketingColumns(inputSchema)
    +        val sortColumns = bucketSpec.get.resolvedSortingColumns(inputSchema)
    +
    +        val getSortingKey = {
    +          val getBucketKey = UnsafeProjection.create(bucketColumns, inputSchema)
    +          val getResultRow = UnsafeProjection.create(
    +            (partitionColumns :+ Literal(-1)) ++ sortColumns, inputSchema)
    +          (row: InternalRow) => {
    +            val bucketId = math.abs(getBucketKey(row).hashCode()) % bucketSpec.get.numBuckets
    +            val result = getResultRow(row)
    +            result.setInt(partitionColumns.length, bucketId)
    +            result
               }
    -        } else {
    -          currentWriter.writeInternal(getOutputRow(inputRow))
             }
    -      }
     
    -      // If the sorter is not null that means that we reached the maxFiles above and need to finish
    -      // using external sort.
    -      if (sorter != null) {
    +        val sortingKeySchema = {
    +          val fields = StructType.fromAttributes(partitionColumns)
    +            .add("bucketId", IntegerType, nullable = false) ++
    +            StructType.fromAttributes(sortColumns)
    +          StructType(fields)
    +        }
    +
    +        val sorter = new UnsafeKVExternalSorter(
    +          sortingKeySchema,
    +          StructType.fromAttributes(dataColumns),
    +          SparkEnv.get.blockManager,
    +          TaskContext.get().taskMemoryManager().pageSizeBytes)
    +
             while (iterator.hasNext) {
               val currentRow = iterator.next()
    -          sorter.insertKV(getPartitionKey(currentRow), getOutputRow(currentRow))
    +          sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
             }
     
             logInfo(s"Sorting complete. Writing out partition files one at a time.")
     
    +        def sameBucket(row1: InternalRow, row2: InternalRow): Boolean = {
    +          partitionColumns.map(_.dataType).zipWithIndex.forall { case (dt, index) =>
    +            row1.get(index, dt) == row2.get(index, dt)
    +          } && row1.getInt(partitionColumns.length) == row2.getInt(partitionColumns.length)
    +        }
    --- End diff --
    
    Please avoid using functional style on performance critical code path.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168907148
  
    **[Test build #48737 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48737/consoleFull)** for PR 10498 at commit [`3df61dc`](https://github.com/apache/spark/commit/3df61dcf76f7991a3fc47254a54e135ad2c044dd).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48819093
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -189,13 +220,43 @@ final class DataFrameWriter private[sql](df: DataFrame) {
             ifNotExists = false)).toRdd
       }
     
    -  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { parCols =>
    -    parCols.map { col =>
    -      df.logicalPlan.output
    -        .map(_.name)
    -        .find(df.sqlContext.analyzer.resolver(_, col))
    -        .getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
    -          s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
    +  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols =>
    +    cols.map(normalize(_, "Partition"))
    +  }
    +
    +  private def normalizedBucketCols: Option[Seq[String]] = bucketingColumns.map { cols =>
    +    cols.map(normalize(_, "Bucketing"))
    +  }
    +
    +  private def normalizedSortCols: Option[Seq[String]] = sortingColumns.map { cols =>
    +    cols.map(normalize(_, "Sorting"))
    +  }
    +
    +  private def getBucketSpec: Option[BucketSpec] = {
    +    if (sortingColumns.isDefined) {
    +      require(numBuckets.isDefined, "sortBy must be used together with bucketBy")
    +    }
    +
    +    for {
    +      n <- numBuckets
    +      cols <- normalizedBucketCols
    +    } yield {
    +      require(n > 0, "Bucket number must be greater than 0.")
    +      BucketSpec(n, cols, normalizedSortCols)
    +    }
    +  }
    +
    +  private def normalize(columnName: String, columnType: String): String = {
    --- End diff --
    
    can you add comment explaining what "normalize" does.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by l15k4 <gi...@git.apache.org>.
Github user l15k4 commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-220782023
  
    Guys do you have a rough guess about when bucketing is to be implemented for `org.apache.spark.sql.DataFrameWriter#save` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48747698
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala ---
    @@ -177,3 +179,36 @@ case class Crc32(child: Expression) extends UnaryExpression with ImplicitCastInp
         })
       }
     }
    +
    +case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression {
    +  def this(arguments: Seq[Expression]) = this(arguments, 42)
    +
    +  override def dataType: DataType = IntegerType
    +
    +  override def foldable: Boolean = children.forall(_.foldable)
    +
    +  override def nullable: Boolean = false
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    if (children.isEmpty) {
    +      TypeCheckResult.TypeCheckFailure("arguments of function hash cannot be empty")
    --- End diff --
    
    Nit: hash => Murmur3Hash


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168028613
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48489/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168703688
  
    The failed test cases can be passed locally, seems it's because `TestHiveContext.warehousePath` has some problem at jenkins machines. cc @JoshRosen @yhuai 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168685886
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168028513
  
    **[Test build #48489 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48489/consoleFull)** for PR 10498 at commit [`d2dc9b3`](https://github.com/apache/spark/commit/d2dc9b3ce51bd84ed5f59137d2eb76c8b6bd4f9c).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-169313888
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48588458
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -129,6 +129,19 @@ final class DataFrameWriter private[sql](df: DataFrame) {
         this
       }
     
    +  @scala.annotation.varargs
    +  def bucketBy(numBuckets: Int, colNames: String*): DataFrameWriter = {
    +    this.numBuckets = Some(numBuckets)
    +    this.bucketingColumns = Option(colNames)
    --- End diff --
    
    Might be worth checking that numBuckets is None (similar to sortBy)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48818904
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---
    @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
     import org.apache.spark.sql.catalyst.expressions._
     import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
     import org.apache.spark.sql.execution.{FileRelation, RDDConversions}
    -import org.apache.spark.sql.execution.datasources.{PartitioningUtils, PartitionSpec, Partition}
    +import org.apache.spark.sql.execution.datasources.{BucketSpec, PartitioningUtils, PartitionSpec, Partition}
    --- End diff --
    
    not really used here?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48819115
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -247,7 +252,7 @@ private[sql] class DefaultWriterContainer(
         executorSideSetup(taskContext)
         val configuration = taskAttemptContext.getConfiguration
         configuration.set("spark.sql.sources.output.path", outputPath)
    -    val writer = newOutputWriter(getWorkPath)
    +    val writer = newOutputWriter(getWorkPath, None)
    --- End diff --
    
    named arg


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168683054
  
    **[Test build #48662 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48662/consoleFull)** for PR 10498 at commit [`e3c3728`](https://github.com/apache/spark/commit/e3c3728fd67aea1849c8d4d1dab3658b1efb7417).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-169002388
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48951242
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---
    @@ -351,7 +365,18 @@ abstract class OutputWriterFactory extends Serializable {
        *
        * @since 1.4.0
        */
    -  def newInstance(path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter
    +  def newInstance(
    +      path: String,
    +      dataSchema: StructType,
    +      context: TaskAttemptContext): OutputWriter
    +
    +  // TODO: expose bucket API to users.
    +  private[sql] def newInstance(
    +      path: String,
    +      bucketId: Option[Int],
    --- End diff --
    
    use `Option[Int]` so that this method can support both bucketing and non-bucketing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-167843318
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48415/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-169313791
  
    **[Test build #48854 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48854/consoleFull)** for PR 10498 at commit [`d3200cf`](https://github.com/apache/spark/commit/d3200cf8bdffaf0025b25c30d7c3d9fef8a6f9a8).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168925986
  
    Would be great if @davies can take a look at this too.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-167614005
  
    This one also includes https://github.com/apache/spark/pull/10435, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-169512619
  
    I'm going to merge this. @cloud-fan can you create a follow-up pr to address some of the comments above? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r49023042
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -342,22 +461,34 @@ private[sql] class DynamicPartitionWriterContainer(
     
         // If anything below fails, we should abort the task.
         try {
    -      // This will be filled in if we have to fall back on sorting.
    -      var sorter: UnsafeKVExternalSorter = null
    +      // If there is no sorting columns, we set sorter to null and try the hash-based writing first,
    +      // and fill the sorter if there are too many writers and we need to fall back on sorting.
    +      // If there are sorting columns, then we have to sort the data anyway, and no need to try the
    --- End diff --
    
    In that case should we just inject a local sort operator in planning and not do the sorting in  the writer?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48889504
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -451,3 +457,147 @@ private[sql] class DynamicPartitionWriterContainer(
         }
       }
     }
    +
    +/**
    + * A writer that dynamically opens files based on the given partition columns.  Internally this is
    + * done by maintaining a HashMap of open files until `maxFiles` is reached.  If this occurs, the
    + * writer externally sorts the remaining rows and then writes out them out one file at a time.
    + */
    +private[sql] class BucketedPartitionWriterContainer(
    +    relation: BucketedHadoopFsRelation,
    +    job: Job,
    +    partitionColumns: Seq[Attribute],
    +    bucketSpec: BucketSpec,
    +    dataColumns: Seq[Attribute],
    +    inputSchema: Seq[Attribute],
    +    defaultPartitionName: String,
    +    isAppend: Boolean)
    +  extends BaseWriterContainer(relation, job, isAppend) {
    +
    +  def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
    +    executorSideSetup(taskContext)
    +
    +    val bucketColumns = bucketSpec.bucketColumnNames.map(c => inputSchema.find(_.name == c).get)
    +    val numBuckets = bucketSpec.numBuckets
    +    val sortColumns = bucketSpec.sortColumnNames.map(c => inputSchema.find(_.name == c).get)
    +    val bucketIdExpr = Pmod(new Murmur3Hash(bucketColumns), Literal(numBuckets))
    +
    +    val getSortingKey =
    +      UnsafeProjection.create((partitionColumns :+ bucketIdExpr) ++ sortColumns, inputSchema)
    +
    +    val sortingKeySchema = {
    +      val fields = StructType.fromAttributes(partitionColumns)
    +        .add("bucketId", IntegerType, nullable = false) ++
    +        StructType.fromAttributes(sortColumns)
    +      StructType(fields)
    +    }
    +
    +    // Returns the data columns to be written given an input row
    +    val getOutputRow = UnsafeProjection.create(dataColumns, inputSchema)
    +
    +    // Expressions that given a partition key build a string like: col1=val/col2=val/...
    +    val partitionStringExpression = partitionColumns.zipWithIndex.flatMap { case (c, i) =>
    +      val escaped =
    +        ScalaUDF(
    +          PartitioningUtils.escapePathName _,
    +          StringType,
    +          Seq(Cast(c, StringType)),
    +          Seq(StringType))
    +      val str = If(IsNull(c), Literal(defaultPartitionName), escaped)
    +      val partitionName = Literal(c.name + "=") :: str :: Nil
    +      if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
    +    }
    +
    +    // Returns the partition path given a partition key.
    +    val getPartitionString =
    +      UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionColumns)
    +
    +    // If anything below fails, we should abort the task.
    +    try {
    +      // TODO: remove duplicated code.
    +      // TODO: if sorting columns are empty, we can keep all writers in a hash map and avoid sorting
    +      // here.
    +      val sorter = new UnsafeKVExternalSorter(
    +        sortingKeySchema,
    +        StructType.fromAttributes(dataColumns),
    +        SparkEnv.get.blockManager,
    +        TaskContext.get().taskMemoryManager().pageSizeBytes)
    +
    +      while (iterator.hasNext) {
    +        val currentRow = iterator.next()
    +        sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
    +      }
    +
    +      logInfo(s"Sorting complete. Writing out partition files one at a time.")
    +
    +      def sameBucket(row1: InternalRow, row2: InternalRow): Boolean = {
    +        if (row1.getInt(partitionColumns.length) != row2.getInt(partitionColumns.length)) {
    +          false
    +        } else {
    +          var i = partitionColumns.length - 1
    +          while (i >= 0) {
    +            val dt = partitionColumns(i).dataType
    +            if (row1.get(i, dt) != row2.get(i, dt)) return false
    +            i -= 1
    +          }
    +          true
    +        }
    +      }
    +      val sortedIterator = sorter.sortedIterator()
    +      var currentKey: InternalRow = null
    +      var currentWriter: OutputWriter = null
    +      try {
    +        while (sortedIterator.next()) {
    +          if (currentKey == null || !sameBucket(currentKey, sortedIterator.getKey)) {
    +            if (currentWriter != null) {
    +              currentWriter.close()
    +            }
    +            currentKey = sortedIterator.getKey.copy()
    +            logDebug(s"Writing partition: $currentKey")
    +            currentWriter = newOutputWriter(currentKey)
    +          }
    +          currentWriter.writeInternal(sortedIterator.getValue)
    +        }
    +      } finally {
    +        if (currentWriter != null) { currentWriter.close() }
    +      }
    +
    +      commitTask()
    +    } catch {
    +      case cause: Throwable =>
    +        logError("Aborting task.", cause)
    +        abortTask()
    +        throw new SparkException("Task failed while writing rows.", cause)
    +    }
    +
    +    /** Open and returns a new OutputWriter given a partition key. */
    +    def newOutputWriter(key: InternalRow): OutputWriter = {
    +      val configuration = taskAttemptContext.getConfiguration
    +      val path = if (partitionColumns.nonEmpty) {
    +        val partitionPath = getPartitionString(key).getString(0)
    +        configuration.set(
    +          "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
    +        new Path(getWorkPath, partitionPath).toString
    +      } else {
    +        configuration.set("spark.sql.sources.output.path", outputPath)
    +        getWorkPath
    +      }
    +      val bucketId = key.getInt(partitionColumns.length)
    +      val newWriter = improveErrorMessage {
    +        outputWriterFactory.asInstanceOf[BucketedOutputWriterFactory].newInstance(
    +          path, Some(bucketId), dataSchema, taskAttemptContext)
    +      }
    +      newWriter.initConverter(dataSchema)
    +      newWriter
    +    }
    +
    +    def commitTask(): Unit = {
    --- End diff --
    
    Remove? Does this do anything?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48588812
  
    --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala ---
    @@ -0,0 +1,67 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources
    +
    +import org.apache.spark.sql.hive.test.TestHiveSingleton
    +import org.apache.spark.sql.test.SQLTestUtils
    +import org.apache.spark.sql.{AnalysisException, QueryTest}
    +
    +class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
    +  import testImplicits._
    +
    +  test("bucketed by non-existing column") {
    +    val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
    +    intercept[AnalysisException](df.write.bucketBy(2, "k").saveAsTable("tt"))
    +  }
    +
    +  test("numBuckets not greater than 0") {
    +    val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
    +    intercept[IllegalArgumentException](df.write.bucketBy(0, "i").saveAsTable("tt"))
    +  }
    +
    +  test("specify sorting columns without bucketing columns") {
    +    val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
    +    intercept[IllegalArgumentException](df.write.sortBy("j").saveAsTable("tt"))
    +  }
    +
    +  test("sorting by non-orderable column") {
    +    val df = Seq("a" -> Map(1 -> 1), "b" -> Map(2 -> 2)).toDF("i", "j")
    +    intercept[AnalysisException](df.write.bucketBy(2, "i").sortBy("j").saveAsTable("tt"))
    +  }
    +
    +  test("write bucketed data to non-hive-table or existing hive table") {
    +    val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
    +    intercept[IllegalArgumentException](df.write.bucketBy(2, "i").parquet("/tmp/path"))
    +    intercept[IllegalArgumentException](df.write.bucketBy(2, "i").json("/tmp/path"))
    +    intercept[IllegalArgumentException](df.write.bucketBy(2, "i").insertInto("tt"))
    +  }
    +
    +  test("write bucketed data") {
    +    val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
    +    withTable("bucketedTable") {
    --- End diff --
    
    withTempTable


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-169002057
  
    **[Test build #48765 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48765/consoleFull)** for PR 10498 at commit [`3ff968b`](https://github.com/apache/spark/commit/3ff968b29d3852c92952454254ae6e1f7ba6599d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168660437
  
    **[Test build #48658 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48658/consoleFull)** for PR 10498 at commit [`ba23292`](https://github.com/apache/spark/commit/ba2329261740b08bd1a19dc8be0ef281281b84c9).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r49022763
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -342,22 +461,34 @@ private[sql] class DynamicPartitionWriterContainer(
     
         // If anything below fails, we should abort the task.
         try {
    -      // This will be filled in if we have to fall back on sorting.
    -      var sorter: UnsafeKVExternalSorter = null
    +      // If there is no sorting columns, we set sorter to null and try the hash-based writing first,
    +      // and fill the sorter if there are too many writers and we need to fall back on sorting.
    +      // If there are sorting columns, then we have to sort the data anyway, and no need to try the
    --- End diff --
    
    Yeah, this is probably to complex for the benefit.  
    /cc @rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168717379
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48663/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168859783
  
    **[Test build #48701 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48701/consoleFull)** for PR 10498 at commit [`6e3c1c0`](https://github.com/apache/spark/commit/6e3c1c0370dec30002992a3a83b2066f4d5278df).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168924177
  
    If I'm looking at it correctly, writeRows function is almost 500 loc line. Can we break it into different logical parts and create functions for each of them?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-167843065
  
    **[Test build #48415 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48415/consoleFull)** for PR 10498 at commit [`a9dc997`](https://github.com/apache/spark/commit/a9dc99722bfea886c6381abbd2e1e9366fcf9064).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class BucketSpec(`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48921611
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources
    +
    +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
    +import org.apache.spark.sql.SQLContext
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.sources.{OutputWriter, OutputWriterFactory, HadoopFsRelationProvider, HadoopFsRelation}
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * A container for bucketing information.
    + * Bucketing is a technology for decomposing data sets into more manageable parts, and the number
    + * of buckets is fixed so it does not fluctuate with data.
    + *
    + * @param numBuckets number of buckets.
    + * @param bucketColumnNames the names of the columns that used to generate the bucket id.
    + * @param sortColumnNames the names of the columns that used to sort data in each bucket.
    + */
    +private[sql] case class BucketSpec(
    +    numBuckets: Int,
    +    bucketColumnNames: Seq[String],
    +    sortColumnNames: Seq[String])
    +
    +private[sql] trait BucketedHadoopFsRelationProvider extends HadoopFsRelationProvider {
    --- End diff --
    
    should we expose the bucket API to users so that they can implement data source supporting bucketing?
    
    cc @rxin @nongli 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48889876
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources
    +
    +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
    +import org.apache.spark.sql.SQLContext
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.sources.{OutputWriter, OutputWriterFactory, HadoopFsRelationProvider, HadoopFsRelation}
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * A container for bucketing information.
    + * Bucketing is a technology for decomposing data sets into more manageable parts, and the number
    + * of buckets is fixed so it does not fluctuate with data.
    + *
    + * @param numBuckets number of buckets.
    + * @param bucketColumnNames the names of the columns that used to generate the bucket id.
    + * @param sortColumnNames the names of the columns that used to sort data in each bucket.
    + */
    +private[sql] case class BucketSpec(
    +    numBuckets: Int,
    +    bucketColumnNames: Seq[String],
    +    sortColumnNames: Seq[String])
    +
    +private[sql] trait BucketedHadoopFsRelationProvider extends HadoopFsRelationProvider {
    +  final override def createRelation(
    +      sqlContext: SQLContext,
    +      paths: Array[String],
    +      dataSchema: Option[StructType],
    +      partitionColumns: Option[StructType],
    +      parameters: Map[String, String]): HadoopFsRelation =
    +    createRelation(sqlContext, paths, dataSchema, partitionColumns, None, parameters)
    +
    +  def createRelation(
    +      sqlContext: SQLContext,
    +      paths: Array[String],
    +      dataSchema: Option[StructType],
    +      partitionColumns: Option[StructType],
    +      bucketSpec: Option[BucketSpec],
    +      parameters: Map[String, String]): BucketedHadoopFsRelation
    +}
    +
    +private[sql] abstract class BucketedHadoopFsRelation(
    +    maybePartitionSpec: Option[PartitionSpec],
    +    parameters: Map[String, String])
    +  extends HadoopFsRelation(maybePartitionSpec, parameters) {
    +  def this() = this(None, Map.empty[String, String])
    +
    +  def this(parameters: Map[String, String]) = this(None, parameters)
    +
    +  def bucketSpec: Option[BucketSpec]
    +
    +  def prepareJobForWrite(job: Job): BucketedOutputWriterFactory
    +}
    +
    +private[sql] abstract class BucketedOutputWriterFactory extends OutputWriterFactory {
    +  final override def newInstance(
    +      path: String,
    +      dataSchema: StructType,
    +      context: TaskAttemptContext): OutputWriter =
    +    newInstance(path, None, dataSchema, context)
    +
    +  def newInstance(
    +      path: String,
    +      bucketId: Option[Int],
    --- End diff --
    
    Should this just be bucketId: Int? What does it mean to pass a none bucket?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168017794
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48488/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48895396
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala ---
    @@ -87,7 +91,8 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
               val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
               val taskAttemptId = context.getTaskAttemptID
               val split = taskAttemptId.getTaskID.getId
    -          new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
    +          val bucketString = bucketId.map(id => f"-$id%05d").getOrElse("")
    --- End diff --
    
    This means the max buckets is capped to 99999 yes? This is definitely big enough but can you add some validation in bucketBy?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168921832
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168877974
  
    **[Test build #48701 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48701/consoleFull)** for PR 10498 at commit [`6e3c1c0`](https://github.com/apache/spark/commit/6e3c1c0370dec30002992a3a83b2066f4d5278df).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-167783752
  
    This one also includes #10435, we can merge that first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48723143
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketSpec.scala ---
    @@ -0,0 +1,42 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources
    +
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +
    +private[sql] case class BucketSpec(
    +    numBuckets: Int,
    +    bucketingColumns: Seq[String],
    +    sortingColumns: Option[Seq[String]]) {
    +
    +  def resolvedBucketingColumns(inputSchema: Seq[Attribute]): Seq[Attribute] = {
    +    bucketingColumns.map { col =>
    +      inputSchema.find(_.name == col).get
    +    }
    +  }
    +
    +  def resolvedSortingColumns(inputSchema: Seq[Attribute]): Seq[Attribute] = {
    +    if (sortingColumns.isDefined) {
    +      sortingColumns.get.map { col =>
    +        inputSchema.find(_.name == col).get
    +      }
    +    } else {
    +      Nil
    +    }
    +  }
    --- End diff --
    
    The columns are already normalized in `DataFrameWriter`, see https://github.com/apache/spark/pull/10498/files#diff-94fbd986b04087223f53697d4b6cab24R221


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r49023295
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -342,22 +461,34 @@ private[sql] class DynamicPartitionWriterContainer(
     
         // If anything below fails, we should abort the task.
         try {
    -      // This will be filled in if we have to fall back on sorting.
    -      var sorter: UnsafeKVExternalSorter = null
    +      // If there is no sorting columns, we set sorter to null and try the hash-based writing first,
    +      // and fill the sorter if there are too many writers and we need to fall back on sorting.
    +      // If there are sorting columns, then we have to sort the data anyway, and no need to try the
    --- End diff --
    
    Yes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48822540
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -189,13 +220,43 @@ final class DataFrameWriter private[sql](df: DataFrame) {
             ifNotExists = false)).toRdd
       }
     
    -  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { parCols =>
    -    parCols.map { col =>
    -      df.logicalPlan.output
    -        .map(_.name)
    -        .find(df.sqlContext.analyzer.resolver(_, col))
    -        .getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
    -          s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
    +  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols =>
    +    cols.map(normalize(_, "Partition"))
    +  }
    +
    +  private def normalizedBucketCols: Option[Seq[String]] = bucketingColumns.map { cols =>
    +    cols.map(normalize(_, "Bucketing"))
    +  }
    +
    +  private def normalizedSortCols: Option[Seq[String]] = sortingColumns.map { cols =>
    +    cols.map(normalize(_, "Sorting"))
    +  }
    +
    +  private def getBucketSpec: Option[BucketSpec] = {
    +    if (sortingColumns.isDefined) {
    +      require(numBuckets.isDefined, "sortBy must be used together with bucketBy")
    +    }
    +
    +    for {
    +      n <- numBuckets
    +      cols <- normalizedBucketCols
    +    } yield {
    +      require(n > 0, "Bucket number must be greater than 0.")
    +      BucketSpec(n, cols, normalizedSortCols)
    +    }
    +  }
    +
    +  private def normalize(columnName: String, columnType: String): String = {
    +    val validColumnNames = df.logicalPlan.output.map(_.name)
    +    validColumnNames.find(df.sqlContext.analyzer.resolver(_, columnName))
    +      .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
    +        s"existing columns (${validColumnNames.mkString(", ")})"))
    +  }
    +
    +  private def assertNotBucketed(): Unit = {
    +    if (numBuckets.isDefined || sortingColumns.isDefined) {
    --- End diff --
    
    maybe we need a better name rather than `sortBy` to indicate that users need to give columns which will be used to sort the data in each bucket.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r49022978
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -342,22 +461,34 @@ private[sql] class DynamicPartitionWriterContainer(
     
         // If anything below fails, we should abort the task.
         try {
    -      // This will be filled in if we have to fall back on sorting.
    -      var sorter: UnsafeKVExternalSorter = null
    +      // If there is no sorting columns, we set sorter to null and try the hash-based writing first,
    +      // and fill the sorter if there are too many writers and we need to fall back on sorting.
    +      // If there are sorting columns, then we have to sort the data anyway, and no need to try the
    --- End diff --
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48818640
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -312,14 +318,35 @@ private[sql] class DynamicPartitionWriterContainer(
         isAppend: Boolean)
       extends BaseWriterContainer(relation, job, isAppend) {
     
    +  private def bucketColumns = bucketSpec.get.resolvedBucketingColumns(inputSchema)
    +  private def sortColumns = bucketSpec.get.resolvedSortingColumns(inputSchema)
    +  private def numBuckets = bucketSpec.get.numBuckets
    +  private def bucketIdExpr = Pmod(new Murmur3Hash(bucketColumns), Literal(numBuckets))
    +
       def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
         val outputWriters = new java.util.HashMap[InternalRow, OutputWriter]
         executorSideSetup(taskContext)
     
         var outputWritersCleared = false
     
    -    // Returns the partition key given an input row
    -    val getPartitionKey = UnsafeProjection.create(partitionColumns, inputSchema)
    +    val getKey = if (bucketSpec.isEmpty) {
    +      UnsafeProjection.create(partitionColumns, inputSchema)
    +    } else { // If it's bucketed, we should also consider bucket id as part of the key.
    +      UnsafeProjection.create(partitionColumns :+ bucketIdExpr, inputSchema)
    +    }
    +
    +    val keySchema = if (bucketSpec.isEmpty) {
    +      StructType.fromAttributes(partitionColumns)
    +    } else { // If it's bucketed, we should also consider bucket id as part of the key.
    +      StructType.fromAttributes(partitionColumns).add("bucketId", IntegerType, nullable = false)
    +    }
    +
    +    def getBucketId(key: InternalRow): Option[Int] = if (bucketSpec.isDefined) {
    --- End diff --
    
    put the entire statement in one line, eg
    ```
    def getBucketId(key: InternalRow): Option[Int] = 
      if (bucketSpec.isDefined) Some(key.getInt(partitionColumns.length)) else None
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168982645
  
    **[Test build #48765 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48765/consoleFull)** for PR 10498 at commit [`3ff968b`](https://github.com/apache/spark/commit/3ff968b29d3852c92952454254ae6e1f7ba6599d).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48818942
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
    @@ -240,6 +241,23 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
           }
         }
     
    +    if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
    +      val BucketSpec(numBuckets, bucketColumns, sortColumns) = bucketSpec.get
    +
    +      tableProperties.put("spark.sql.sources.schema.numBuckets", numBuckets.toString)
    +      tableProperties.put("spark.sql.sources.schema.numBucketCols", bucketColumns.length.toString)
    +      bucketColumns.zipWithIndex.foreach { case (bucketCol, index) =>
    +        tableProperties.put(s"spark.sql.sources.schema.bucketCol.$index", bucketCol)
    +      }
    +
    +      if (sortColumns.isDefined) {
    +        tableProperties.put("spark.sql.sources.schema.numSortCols", sortColumns.get.length.toString)
    --- End diff --
    
    why do we care about the number of sorted columns?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r49003254
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -342,22 +461,34 @@ private[sql] class DynamicPartitionWriterContainer(
     
         // If anything below fails, we should abort the task.
         try {
    -      // This will be filled in if we have to fall back on sorting.
    -      var sorter: UnsafeKVExternalSorter = null
    +      // If there is no sorting columns, we set sorter to null and try the hash-based writing first,
    +      // and fill the sorter if there are too many writers and we need to fall back on sorting.
    +      // If there are sorting columns, then we have to sort the data anyway, and no need to try the
    --- End diff --
    
    I don't think it's useful to try to use the hash if it is bucketed. It's very unlikely to bucket by < numFiles and since it's hashed, it won't be clumpy by chance like partitions.
    
    Feel free to address later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48588795
  
    --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala ---
    @@ -0,0 +1,67 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources
    +
    +import org.apache.spark.sql.hive.test.TestHiveSingleton
    +import org.apache.spark.sql.test.SQLTestUtils
    +import org.apache.spark.sql.{AnalysisException, QueryTest}
    +
    +class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
    +  import testImplicits._
    +
    +  test("bucketed by non-existing column") {
    +    val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
    +    intercept[AnalysisException](df.write.bucketBy(2, "k").saveAsTable("tt"))
    +  }
    +
    +  test("numBuckets not greater than 0") {
    +    val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
    +    intercept[IllegalArgumentException](df.write.bucketBy(0, "i").saveAsTable("tt"))
    +  }
    +
    +  test("specify sorting columns without bucketing columns") {
    +    val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
    +    intercept[IllegalArgumentException](df.write.sortBy("j").saveAsTable("tt"))
    +  }
    +
    +  test("sorting by non-orderable column") {
    +    val df = Seq("a" -> Map(1 -> 1), "b" -> Map(2 -> 2)).toDF("i", "j")
    +    intercept[AnalysisException](df.write.bucketBy(2, "i").sortBy("j").saveAsTable("tt"))
    +  }
    +
    +  test("write bucketed data to non-hive-table or existing hive table") {
    +    val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
    +    intercept[IllegalArgumentException](df.write.bucketBy(2, "i").parquet("/tmp/path"))
    +    intercept[IllegalArgumentException](df.write.bucketBy(2, "i").json("/tmp/path"))
    +    intercept[IllegalArgumentException](df.write.bucketBy(2, "i").insertInto("tt"))
    +  }
    +
    +  test("write bucketed data") {
    +    val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
    +    withTable("bucketedTable") {
    +      df.write.partitionBy("i").bucketBy(8, "j").saveAsTable("bucketedTable")
    +    }
    --- End diff --
    
    can you verify the right number of files exist


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48953481
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -335,6 +339,121 @@ private[sql] class DynamicPartitionWriterContainer(
           val partitionName = Literal(c.name + "=") :: str :: Nil
           if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
         }
    +  }
    +
    +  private def getBucketIdFromKey(key: InternalRow): Option[Int] = {
    +    if (bucketSpec.isDefined) {
    +      Some(key.getInt(partitionColumns.length))
    +    } else {
    +      None
    +    }
    +  }
    +
    +  private def sameBucket(key1: UnsafeRow, key2: UnsafeRow): Boolean = {
    +    val bucketIdIndex = partitionColumns.length
    +    if (key1.getInt(bucketIdIndex) != key2.getInt(bucketIdIndex)) {
    +      false
    +    } else {
    +      var i = partitionColumns.length - 1
    +      while (i >= 0) {
    +        val dt = partitionColumns(i).dataType
    +        if (key1.get(i, dt) != key2.get(i, dt)) return false
    +        i -= 1
    +      }
    +      true
    +    }
    +  }
    +
    +  private def sortBasedWrite(
    +      sorter: UnsafeKVExternalSorter,
    +      iterator: Iterator[InternalRow],
    +      getSortingKey: UnsafeProjection,
    +      getOutputRow: UnsafeProjection,
    +      getPartitionString: UnsafeProjection,
    +      outputWriters: java.util.HashMap[InternalRow, OutputWriter]): Unit = {
    +    while (iterator.hasNext) {
    +      val currentRow = iterator.next()
    +      sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
    +    }
    +
    +    logInfo(s"Sorting complete. Writing out partition files one at a time.")
    +
    +    val needNewWriter: (UnsafeRow, UnsafeRow) => Boolean = if (sortColumns.isEmpty) {
    +      (key1, key2) => key1 != key2
    +    } else {
    +      (key1, key2) => key1 == null || !sameBucket(key1, key2)
    +    }
    +
    +    val sortedIterator = sorter.sortedIterator()
    +    var currentKey: UnsafeRow = null
    +    var currentWriter: OutputWriter = null
    +    try {
    +      while (sortedIterator.next()) {
    +        if (needNewWriter(currentKey, sortedIterator.getKey)) {
    +          if (currentWriter != null) {
    +            currentWriter.close()
    +          }
    +          currentKey = sortedIterator.getKey.copy()
    +          logDebug(s"Writing partition: $currentKey")
    +
    +          // Either use an existing file from before, or open a new one.
    +          currentWriter = outputWriters.remove(currentKey)
    +          if (currentWriter == null) {
    +            currentWriter = newOutputWriter(currentKey, getPartitionString)
    +          }
    +        }
    +
    +        currentWriter.writeInternal(sortedIterator.getValue)
    +      }
    +    } finally {
    +      if (currentWriter != null) { currentWriter.close() }
    +    }
    +  }
    +
    +  /**
    +   * Open and returns a new OutputWriter given a partition key and optional bucket id.
    +   * If bucket id is specified, we will append it to the end of the file name, but before the
    +   * file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet
    --- End diff --
    
    Shall we put the bucket id at the very last? i.e. after the file extension, so that it's much easier to get the bucket id given a file name. e.g. `part-r-00009-ea518ad4-455a-4431-b471-d24e03814677.gz.parquet.00002`
    
    cc @nongli


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168702656
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48662/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168702652
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168675285
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48658/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168685889
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48659/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-169317848
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48855/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48820679
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -189,13 +220,43 @@ final class DataFrameWriter private[sql](df: DataFrame) {
             ifNotExists = false)).toRdd
       }
     
    -  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { parCols =>
    -    parCols.map { col =>
    -      df.logicalPlan.output
    -        .map(_.name)
    -        .find(df.sqlContext.analyzer.resolver(_, col))
    -        .getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
    -          s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
    +  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols =>
    +    cols.map(normalize(_, "Partition"))
    +  }
    +
    +  private def normalizedBucketCols: Option[Seq[String]] = bucketingColumns.map { cols =>
    +    cols.map(normalize(_, "Bucketing"))
    +  }
    +
    +  private def normalizedSortCols: Option[Seq[String]] = sortingColumns.map { cols =>
    +    cols.map(normalize(_, "Sorting"))
    +  }
    +
    +  private def getBucketSpec: Option[BucketSpec] = {
    +    if (sortingColumns.isDefined) {
    +      require(numBuckets.isDefined, "sortBy must be used together with bucketBy")
    +    }
    +
    +    for {
    +      n <- numBuckets
    +      cols <- normalizedBucketCols
    +    } yield {
    +      require(n > 0, "Bucket number must be greater than 0.")
    +      BucketSpec(n, cols, normalizedSortCols)
    +    }
    +  }
    +
    +  private def normalize(columnName: String, columnType: String): String = {
    +    val validColumnNames = df.logicalPlan.output.map(_.name)
    +    validColumnNames.find(df.sqlContext.analyzer.resolver(_, columnName))
    +      .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
    +        s"existing columns (${validColumnNames.mkString(", ")})"))
    +  }
    +
    +  private def assertNotBucketed(): Unit = {
    +    if (numBuckets.isDefined || sortingColumns.isDefined) {
    --- End diff --
    
    If users just wanna sort the data, they can call `DataFrame.sort` before write. In this context, the `sortingColumns` is part of the bucketing information and should be used together with `bucketingColumns`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r49030540
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala ---
    @@ -34,13 +34,13 @@ import org.apache.spark.rdd.RDD
     import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
     import org.apache.spark.sql.catalyst.InternalRow
     import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    -import org.apache.spark.sql.execution.datasources.PartitionSpec
    +import org.apache.spark.sql.execution.datasources._
     import org.apache.spark.sql.sources._
     import org.apache.spark.sql.types.StructType
     import org.apache.spark.util.SerializableConfiguration
     
     
    -class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
    +class DefaultSource extends BucketedHadoopFsRelationProvider with DataSourceRegister {
    --- End diff --
    
    just because we can... I felt it's cheap to add bucketing support for JSON so I went for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168675283
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48588590
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -129,6 +129,19 @@ final class DataFrameWriter private[sql](df: DataFrame) {
         this
       }
     
    +  @scala.annotation.varargs
    +  def bucketBy(numBuckets: Int, colNames: String*): DataFrameWriter = {
    +    this.numBuckets = Some(numBuckets)
    +    this.bucketingColumns = Option(colNames)
    +    this
    +  }
    +
    +  @scala.annotation.varargs
    --- End diff --
    
    is that needed?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48588450
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -189,13 +205,44 @@ final class DataFrameWriter private[sql](df: DataFrame) {
             ifNotExists = false)).toRdd
       }
     
    -  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { parCols =>
    -    parCols.map { col =>
    -      df.logicalPlan.output
    -        .map(_.name)
    -        .find(df.sqlContext.analyzer.resolver(_, col))
    -        .getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
    -          s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
    +  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols =>
    +    cols.map(normalize(_, "Partition"))
    +  }
    +
    +  private def normalizedBucketCols: Option[Seq[String]] = bucketingColumns.map { cols =>
    +    cols.map(normalize(_, "Bucketing"))
    +  }
    +
    +  private def normalizedSortCols: Option[Seq[String]] = sortingColumns.map { cols =>
    +    cols.map(normalize(_, "Sorting"))
    +  }
    +
    +  private def getBucketSpec: Option[BucketSpec] = {
    +    if (numBuckets.isEmpty && sortingColumns.isDefined) {
    +      throw new IllegalArgumentException("Specify numBuckets and bucketing columns first.")
    --- End diff --
    
    This error message is not quite right. The order of bucketBy and sortBy doesn't matter right?
    
    e.g.
    df.write.bucketBy().sortBy() and df.write.sortBy.bucketBy both work?
    
    Should be something like sortBy requires bucketBy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168716880
  
    **[Test build #48663 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48663/consoleFull)** for PR 10498 at commit [`70ebd69`](https://github.com/apache/spark/commit/70ebd69190e1ebd27362e17240b20bf60b5fdf16).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-167843317
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168921702
  
    **[Test build #48737 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48737/consoleFull)** for PR 10498 at commit [`3df61dc`](https://github.com/apache/spark/commit/3df61dcf76f7991a3fc47254a54e135ad2c044dd).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48588321
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -129,6 +129,19 @@ final class DataFrameWriter private[sql](df: DataFrame) {
         this
       }
     
    +  @scala.annotation.varargs
    +  def bucketBy(numBuckets: Int, colNames: String*): DataFrameWriter = {
    +    this.numBuckets = Some(numBuckets)
    +    this.bucketingColumns = Option(colNames)
    +    this
    +  }
    +
    +  @scala.annotation.varargs
    --- End diff --
    
    Did you intentionally not provide a way for sort order?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48588334
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -189,13 +205,44 @@ final class DataFrameWriter private[sql](df: DataFrame) {
             ifNotExists = false)).toRdd
       }
     
    -  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { parCols =>
    -    parCols.map { col =>
    -      df.logicalPlan.output
    -        .map(_.name)
    -        .find(df.sqlContext.analyzer.resolver(_, col))
    -        .getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
    -          s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
    +  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols =>
    +    cols.map(normalize(_, "Partition"))
    +  }
    +
    +  private def normalizedBucketCols: Option[Seq[String]] = bucketingColumns.map { cols =>
    +    cols.map(normalize(_, "Bucketing"))
    +  }
    +
    +  private def normalizedSortCols: Option[Seq[String]] = sortingColumns.map { cols =>
    +    cols.map(normalize(_, "Sorting"))
    +  }
    +
    +  private def getBucketSpec: Option[BucketSpec] = {
    +    if (numBuckets.isEmpty && sortingColumns.isDefined) {
    +      throw new IllegalArgumentException("Specify numBuckets and bucketing columns first.")
    +    }
    +    if (numBuckets.isDefined && numBuckets.get <= 0) {
    +      throw new IllegalArgumentException("numBuckets must be greater than 0.")
    +    }
    +
    +    if (numBuckets.isDefined) {
    +      Some(BucketSpec(numBuckets.get, normalizedBucketCols.get, normalizedSortCols))
    +    } else {
    +      None
    +    }
    +  }
    +
    +  private def normalize(columnName: String, columnType: String): String = {
    +    val validColumnNames = df.logicalPlan.output.map(_.name)
    +    validColumnNames.find(df.sqlContext.analyzer.resolver(_, columnName))
    +      .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
    +        s"existing columns (${validColumnNames.mkString(", ")})"))
    +  }
    +
    +  private def assertNoBucketing(): Unit = {
    --- End diff --
    
    assertNotBucketed()?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48818992
  
    --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala ---
    @@ -219,16 +231,22 @@ class CommitFailureTestRelation(
         override val paths: Array[String],
         maybeDataSchema: Option[StructType],
         override val userDefinedPartitionColumns: Option[StructType],
    +    bucketSpec: Option[BucketSpec],
         parameters: Map[String, String])(
         @transient sqlContext: SQLContext)
       extends SimpleTextRelation(
    -    paths, maybeDataSchema, userDefinedPartitionColumns, parameters)(sqlContext) {
    -  override def prepareJobForWrite(job: Job): OutputWriterFactory = new OutputWriterFactory {
    +    paths,
    +    maybeDataSchema,
    +    userDefinedPartitionColumns,
    +    bucketSpec,
    +    parameters)(sqlContext) {
    +  override def prepareJobForWrite(job: Job): OutputWriterFactory = new BucketedOutputWriterFactory {
    --- End diff --
    
    add a blank line here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-169021958
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10498: [SPARK-12539][SQL] support writing bucketed table

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on the issue:

    https://github.com/apache/spark/pull/10498
  
    @infinitymittal : See https://issues.apache.org/jira/browse/SPARK-17729


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-169002391
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48765/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-167581087
  
    cc @yhuai @nongli 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-167582335
  
    **[Test build #48367 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48367/consoleFull)** for PR 10498 at commit [`8cb2494`](https://github.com/apache/spark/commit/8cb24942890153bad70e003110a28d6111f0c407).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48888112
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -130,6 +130,34 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       }
     
       /**
    +   * Buckets the output by the given columns on the file system. If specified, the output is
    +   * laid out on the file system similar to Hive's bucketing scheme.
    +   *
    +   * This is applicable for Parquet, JSON and ORC.
    +   *
    +   * @since 2.0
    +   */
    +  @scala.annotation.varargs
    +  def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter = {
    +    this.numBuckets = Option(numBuckets)
    +    this.bucketColumnNames = Option(colName +: colNames)
    +    this
    +  }
    +
    +  /**
    +   * Sorts the bucketed output by the given columns.
    +   *
    +   * This is applicable for Parquet, JSON and ORC.
    --- End diff --
    
    Why the limitation? The comment makes this seem bucketBy must be called first. If that is true, make that more clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-167596714
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48367/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48951109
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -335,6 +339,117 @@ private[sql] class DynamicPartitionWriterContainer(
           val partitionName = Literal(c.name + "=") :: str :: Nil
           if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
         }
    +  }
    +
    +  private def getBucketIdFromKey(key: InternalRow): Option[Int] = {
    +    if (bucketSpec.isDefined) {
    +      Some(key.getInt(partitionColumns.length))
    +    } else {
    +      None
    +    }
    +  }
    +
    +  private def sameBucket(key1: UnsafeRow, key2: UnsafeRow): Boolean = {
    +    val bucketIdIndex = partitionColumns.length
    +    if (key1.getInt(bucketIdIndex) != key2.getInt(bucketIdIndex)) {
    +      false
    +    } else {
    +      var i = partitionColumns.length - 1
    +      while (i >= 0) {
    +        val dt = partitionColumns(i).dataType
    +        if (key1.get(i, dt) != key2.get(i, dt)) return false
    +        i -= 1
    +      }
    +      true
    +    }
    +  }
    +
    +  private def sortBasedWrite(
    +      sorter: UnsafeKVExternalSorter,
    +      iterator: Iterator[InternalRow],
    +      getSortingKey: UnsafeProjection,
    +      getOutputRow: UnsafeProjection,
    +      getPartitionString: UnsafeProjection,
    +      outputWriters: java.util.HashMap[InternalRow, OutputWriter]): Unit = {
    +    while (iterator.hasNext) {
    +      val currentRow = iterator.next()
    +      sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
    +    }
    +
    +    logInfo(s"Sorting complete. Writing out partition files one at a time.")
    +
    +    val needNewWriter: (UnsafeRow, UnsafeRow) => Boolean = if (sortColumns.isEmpty) {
    +      (key1, key2) => key1 != key2
    +    } else {
    +      (key1, key2) => key1 == null || !sameBucket(key1, key2)
    +    }
    +
    +    val sortedIterator = sorter.sortedIterator()
    +    var currentKey: UnsafeRow = null
    +    var currentWriter: OutputWriter = null
    +    try {
    +      while (sortedIterator.next()) {
    +        if (needNewWriter(currentKey, sortedIterator.getKey)) {
    --- End diff --
    
    I'm a little worried here. It was a simple `!=` operator for non-bucket path before, but now it's a function call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-169338192
  
    **[Test build #48856 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48856/consoleFull)** for PR 10498 at commit [`1afd3ee`](https://github.com/apache/spark/commit/1afd3ee78484ce56dd04446bd43adab96a677411).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48842483
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
    @@ -240,6 +241,25 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
           }
         }
     
    +    if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
    +      val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
    +
    +      tableProperties.put("spark.sql.sources.schema.numBuckets", numBuckets.toString)
    +      tableProperties.put("spark.sql.sources.schema.numBucketCols",
    +        bucketColumnNames.length.toString)
    +      bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
    +        tableProperties.put(s"spark.sql.sources.schema.bucketCol.$index", bucketCol)
    +      }
    +
    +      if (sortColumnNames.nonEmpty) {
    +        tableProperties.put("spark.sql.sources.schema.numSortCols",
    --- End diff --
    
    previous discussion: https://github.com/apache/spark/pull/10498#discussion_r48818942


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-167664157
  
    BTW in github you can use square brackets to create a checklist, e.g.
    
    ```
    - [] item a
    - [] item b
    ```
    
    becomes
    
    - [] item a
    - [] item b


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48842647
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -189,13 +220,47 @@ final class DataFrameWriter private[sql](df: DataFrame) {
             ifNotExists = false)).toRdd
       }
     
    -  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { parCols =>
    -    parCols.map { col =>
    -      df.logicalPlan.output
    -        .map(_.name)
    -        .find(df.sqlContext.analyzer.resolver(_, col))
    -        .getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
    -          s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
    +  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols =>
    +    cols.map(normalize(_, "Partition"))
    +  }
    +
    +  private def normalizedBucketColNames: Option[Seq[String]] = bucketColumnNames.map { cols =>
    +    cols.map(normalize(_, "Bucketing"))
    +  }
    +
    +  private def normalizedSortColNames: Option[Seq[String]] = sortColumnNames.map { cols =>
    +    cols.map(normalize(_, "Sorting"))
    +  }
    +
    +  private def getBucketSpec: Option[BucketSpec] = {
    +    if (sortColumnNames.isDefined) {
    +      require(numBuckets.isDefined, "sortBy must be used together with bucketBy")
    +    }
    +
    +    for {
    +      n <- numBuckets
    +    } yield {
    +      require(n > 0, "Bucket number must be greater than 0.")
    +      BucketSpec(n, normalizedBucketColNames.get, normalizedSortColNames.getOrElse(Nil))
    +    }
    +  }
    +
    +  /**
    +   * The given column name may not be equal to any of the existing column names if we were in
    +   * case-insensitive context.  Normalize the given column name to the real one so that we don't
    +   * need to care about case sensitivity afterwards.
    +   */
    +  private def normalize(columnName: String, columnType: String): String = {
    +    val validColumnNames = df.logicalPlan.output.map(_.name)
    +    validColumnNames.find(df.sqlContext.analyzer.resolver(_, columnName))
    +      .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
    +        s"existing columns (${validColumnNames.mkString(", ")})"))
    +  }
    +
    +  private def assertNotBucketed(): Unit = {
    +    if (numBuckets.isDefined || sortColumnNames.isDefined) {
    --- End diff --
    
    previous discussion: https://github.com/apache/spark/pull/10498#discussion_r48818408


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10498: [SPARK-12539][SQL] support writing bucketed table

Posted by tejasapatil <gi...@git.apache.org>.
Github user tejasapatil commented on the issue:

    https://github.com/apache/spark/pull/10498
  
    @infinitymittal : It will take time to have a fully functional support added. I had initiated a design proposal to get consensus on this could be done : https://issues.apache.org/jira/browse/SPARK-19256
    
    In Spark, "hive.enforce.bucketing" is not respected. #15300 won't guarantee that the data written adheres to Hive's bucketing spec so approach taken there is to fail in user sets configs to enforce bucketing. This will avoid wrong data being written when user is expecting correct outputs after setting "hive.enforce.bucketing" to true. The longer term plan is to get rid of these configs and always write properly bucketed data (hive 2.x follows this model).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48996772
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---
    @@ -351,7 +365,18 @@ abstract class OutputWriterFactory extends Serializable {
        *
        * @since 1.4.0
        */
    -  def newInstance(path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter
    +  def newInstance(
    +      path: String,
    +      dataSchema: StructType,
    +      context: TaskAttemptContext): OutputWriter
    +
    +  // TODO: expose bucket API to users.
    --- End diff --
    
    I'd remove this TODO.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48818983
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala ---
    @@ -151,6 +151,7 @@ case class CreateMetastoreDataSource(
           tableIdent,
           userSpecifiedSchema,
           Array.empty[String],
    +      None,
    --- End diff --
    
    named argument


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168702433
  
    **[Test build #48662 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48662/consoleFull)** for PR 10498 at commit [`e3c3728`](https://github.com/apache/spark/commit/e3c3728fd67aea1849c8d4d1dab3658b1efb7417).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-169021584
  
    **[Test build #48770 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48770/consoleFull)** for PR 10498 at commit [`74bd524`](https://github.com/apache/spark/commit/74bd52461f381f67da737b8c9db595b09c77ad8d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister `
      * `class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemptContext)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-169495337
  
    @cloud-fan Can we write bucketed table without partitions?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48818260
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala ---
    @@ -109,6 +109,7 @@ class DDLParser(parseQuery: String => LogicalPlan)
                 provider,
                 temp.isDefined,
                 Array.empty[String],
    +            None,
    --- End diff --
    
    i'd use named argument here so we know what's none


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r49025627
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala ---
    @@ -34,13 +34,13 @@ import org.apache.spark.rdd.RDD
     import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
     import org.apache.spark.sql.catalyst.InternalRow
     import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    -import org.apache.spark.sql.execution.datasources.PartitionSpec
    +import org.apache.spark.sql.execution.datasources._
     import org.apache.spark.sql.sources._
     import org.apache.spark.sql.types.StructType
     import org.apache.spark.util.SerializableConfiguration
     
     
    -class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
    +class DefaultSource extends BucketedHadoopFsRelationProvider with DataSourceRegister {
    --- End diff --
    
    minor question: Why do we support bucketing for json writer? The bucketing can only be recognized by Spark SQL, in this case, parquet is much more efficient.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r49002810
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -335,6 +339,121 @@ private[sql] class DynamicPartitionWriterContainer(
           val partitionName = Literal(c.name + "=") :: str :: Nil
           if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
         }
    +  }
    +
    +  private def getBucketIdFromKey(key: InternalRow): Option[Int] = {
    +    if (bucketSpec.isDefined) {
    +      Some(key.getInt(partitionColumns.length))
    +    } else {
    +      None
    +    }
    +  }
    +
    +  private def sameBucket(key1: UnsafeRow, key2: UnsafeRow): Boolean = {
    +    val bucketIdIndex = partitionColumns.length
    +    if (key1.getInt(bucketIdIndex) != key2.getInt(bucketIdIndex)) {
    +      false
    +    } else {
    +      var i = partitionColumns.length - 1
    +      while (i >= 0) {
    +        val dt = partitionColumns(i).dataType
    +        if (key1.get(i, dt) != key2.get(i, dt)) return false
    +        i -= 1
    +      }
    +      true
    +    }
    +  }
    +
    +  private def sortBasedWrite(
    +      sorter: UnsafeKVExternalSorter,
    +      iterator: Iterator[InternalRow],
    +      getSortingKey: UnsafeProjection,
    +      getOutputRow: UnsafeProjection,
    +      getPartitionString: UnsafeProjection,
    +      outputWriters: java.util.HashMap[InternalRow, OutputWriter]): Unit = {
    +    while (iterator.hasNext) {
    +      val currentRow = iterator.next()
    +      sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
    +    }
    +
    +    logInfo(s"Sorting complete. Writing out partition files one at a time.")
    +
    +    val needNewWriter: (UnsafeRow, UnsafeRow) => Boolean = if (sortColumns.isEmpty) {
    +      (key1, key2) => key1 != key2
    +    } else {
    +      (key1, key2) => key1 == null || !sameBucket(key1, key2)
    +    }
    +
    +    val sortedIterator = sorter.sortedIterator()
    +    var currentKey: UnsafeRow = null
    +    var currentWriter: OutputWriter = null
    +    try {
    +      while (sortedIterator.next()) {
    +        if (needNewWriter(currentKey, sortedIterator.getKey)) {
    +          if (currentWriter != null) {
    +            currentWriter.close()
    +          }
    +          currentKey = sortedIterator.getKey.copy()
    +          logDebug(s"Writing partition: $currentKey")
    +
    +          // Either use an existing file from before, or open a new one.
    +          currentWriter = outputWriters.remove(currentKey)
    +          if (currentWriter == null) {
    +            currentWriter = newOutputWriter(currentKey, getPartitionString)
    +          }
    +        }
    +
    +        currentWriter.writeInternal(sortedIterator.getValue)
    +      }
    +    } finally {
    +      if (currentWriter != null) { currentWriter.close() }
    +    }
    +  }
    +
    +  /**
    +   * Open and returns a new OutputWriter given a partition key and optional bucket id.
    +   * If bucket id is specified, we will append it to the end of the file name, but before the
    +   * file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet
    --- End diff --
    
    I don't have a strong opinion here. Let's go either way for now and talk another pass before shipping this. We should try this with hive as well just to get another data point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48820313
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -189,13 +220,43 @@ final class DataFrameWriter private[sql](df: DataFrame) {
             ifNotExists = false)).toRdd
       }
     
    -  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { parCols =>
    -    parCols.map { col =>
    -      df.logicalPlan.output
    -        .map(_.name)
    -        .find(df.sqlContext.analyzer.resolver(_, col))
    -        .getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
    -          s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
    +  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols =>
    +    cols.map(normalize(_, "Partition"))
    +  }
    +
    +  private def normalizedBucketCols: Option[Seq[String]] = bucketingColumns.map { cols =>
    +    cols.map(normalize(_, "Bucketing"))
    +  }
    +
    +  private def normalizedSortCols: Option[Seq[String]] = sortingColumns.map { cols =>
    +    cols.map(normalize(_, "Sorting"))
    +  }
    +
    +  private def getBucketSpec: Option[BucketSpec] = {
    +    if (sortingColumns.isDefined) {
    +      require(numBuckets.isDefined, "sortBy must be used together with bucketBy")
    +    }
    +
    +    for {
    +      n <- numBuckets
    +      cols <- normalizedBucketCols
    +    } yield {
    +      require(n > 0, "Bucket number must be greater than 0.")
    +      BucketSpec(n, cols, normalizedSortCols)
    +    }
    +  }
    +
    +  private def normalize(columnName: String, columnType: String): String = {
    +    val validColumnNames = df.logicalPlan.output.map(_.name)
    +    validColumnNames.find(df.sqlContext.analyzer.resolver(_, columnName))
    +      .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
    +        s"existing columns (${validColumnNames.mkString(", ")})"))
    +  }
    +
    +  private def assertNotBucketed(): Unit = {
    +    if (numBuckets.isDefined || sortingColumns.isDefined) {
    --- End diff --
    
    isn't it the same as our normal DataFrame.sort? It still increases compression ratio for Parquet.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48609296
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -189,13 +205,45 @@ final class DataFrameWriter private[sql](df: DataFrame) {
             ifNotExists = false)).toRdd
       }
     
    -  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { parCols =>
    -    parCols.map { col =>
    -      df.logicalPlan.output
    -        .map(_.name)
    -        .find(df.sqlContext.analyzer.resolver(_, col))
    -        .getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
    -          s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
    +  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols =>
    +    cols.map(normalize(_, "Partition"))
    +  }
    +
    +  private def normalizedBucketCols: Option[Seq[String]] = bucketingColumns.map { cols =>
    +    cols.map(normalize(_, "Bucketing"))
    +  }
    +
    +  private def normalizedSortCols: Option[Seq[String]] = sortingColumns.map { cols =>
    +    cols.map(normalize(_, "Sorting"))
    +  }
    +
    +  private def getBucketSpec: Option[BucketSpec] = {
    +    if (numBuckets.isEmpty && sortingColumns.isDefined) {
    +      throw new IllegalArgumentException(
    +        "Specify bucketing information by bucketBy when use sortBy.")
    +    }
    +    if (numBuckets.isDefined && numBuckets.get <= 0) {
    +      throw new IllegalArgumentException("numBuckets must be greater than 0.")
    +    }
    +
    +    if (numBuckets.isDefined) {
    +      Some(BucketSpec(numBuckets.get, normalizedBucketCols.get, normalizedSortCols))
    +    } else {
    +      None
    +    }
    --- End diff --
    
    This method can be simplified to:
    
    ```scala
    if (sortingColumns.isDefined) {
      require(numBuckets.isDefined, "sortBy must be used together with bucketBy")
    }
    
    for {
      n <- numBuckets
      cols <- normalizedBucketCols
    } yield {
      require(n > 0, "Bucket number must be greater than 0.")
      BucketSpec(n, cols, normalizedSortCols)
    }
    ```
    
    (`require` throws `IllegalArgumentException` when the condition is not met.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168028609
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-167649759
  
    @cloud-fan 
    > currently we don't shuffle before writing partitioned data, which means we will have same partition data in different RDD blocks, and that's why we have multi-files for one partition, and we will also have multi-files for one bucket, is that safe?
    
    This is safe but how can we get in this state from a single write. There must have been a partitionBy before right?
    
    > hive support having bucketing without partitioning, should we support it?
    
    Why not? If this hard to support?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r49002988
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -335,6 +339,121 @@ private[sql] class DynamicPartitionWriterContainer(
           val partitionName = Literal(c.name + "=") :: str :: Nil
           if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
         }
    +  }
    +
    +  private def getBucketIdFromKey(key: InternalRow): Option[Int] = {
    +    if (bucketSpec.isDefined) {
    +      Some(key.getInt(partitionColumns.length))
    +    } else {
    +      None
    +    }
    +  }
    +
    +  private def sameBucket(key1: UnsafeRow, key2: UnsafeRow): Boolean = {
    +    val bucketIdIndex = partitionColumns.length
    +    if (key1.getInt(bucketIdIndex) != key2.getInt(bucketIdIndex)) {
    +      false
    +    } else {
    +      var i = partitionColumns.length - 1
    +      while (i >= 0) {
    +        val dt = partitionColumns(i).dataType
    +        if (key1.get(i, dt) != key2.get(i, dt)) return false
    +        i -= 1
    +      }
    +      true
    +    }
    +  }
    +
    +  private def sortBasedWrite(
    +      sorter: UnsafeKVExternalSorter,
    +      iterator: Iterator[InternalRow],
    +      getSortingKey: UnsafeProjection,
    +      getOutputRow: UnsafeProjection,
    +      getPartitionString: UnsafeProjection,
    +      outputWriters: java.util.HashMap[InternalRow, OutputWriter]): Unit = {
    +    while (iterator.hasNext) {
    +      val currentRow = iterator.next()
    +      sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
    +    }
    +
    +    logInfo(s"Sorting complete. Writing out partition files one at a time.")
    +
    +    val needNewWriter: (UnsafeRow, UnsafeRow) => Boolean = if (sortColumns.isEmpty) {
    +      (key1, key2) => key1 != key2
    +    } else {
    +      (key1, key2) => key1 == null || !sameBucket(key1, key2)
    +    }
    +
    +    val sortedIterator = sorter.sortedIterator()
    +    var currentKey: UnsafeRow = null
    +    var currentWriter: OutputWriter = null
    +    try {
    +      while (sortedIterator.next()) {
    +        if (needNewWriter(currentKey, sortedIterator.getKey)) {
    +          if (currentWriter != null) {
    +            currentWriter.close()
    +          }
    +          currentKey = sortedIterator.getKey.copy()
    +          logDebug(s"Writing partition: $currentKey")
    +
    +          // Either use an existing file from before, or open a new one.
    +          currentWriter = outputWriters.remove(currentKey)
    +          if (currentWriter == null) {
    +            currentWriter = newOutputWriter(currentKey, getPartitionString)
    +          }
    +        }
    +
    +        currentWriter.writeInternal(sortedIterator.getValue)
    +      }
    +    } finally {
    +      if (currentWriter != null) { currentWriter.close() }
    +    }
    +  }
    +
    +  /**
    +   * Open and returns a new OutputWriter given a partition key and optional bucket id.
    +   * If bucket id is specified, we will append it to the end of the file name, but before the
    +   * file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet
    +   */
    +  private def newOutputWriter(
    +      key: InternalRow,
    +      getPartitionString: UnsafeProjection): OutputWriter = {
    +    val configuration = taskAttemptContext.getConfiguration
    +    val path = if (partitionColumns.nonEmpty) {
    +      val partitionPath = getPartitionString(key).getString(0)
    +      configuration.set(
    +        "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
    +      new Path(getWorkPath, partitionPath).toString
    +    } else {
    +      configuration.set("spark.sql.sources.output.path", outputPath)
    +      getWorkPath
    +    }
    +    val bucketId = getBucketIdFromKey(key)
    +    val newWriter = super.newOutputWriter(path, bucketId)
    +    newWriter.initConverter(dataSchema)
    +    newWriter
    +  }
    +
    +  def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
    +    val outputWriters = new java.util.HashMap[InternalRow, OutputWriter]
    +    executorSideSetup(taskContext)
    +
    +    var outputWritersCleared = false
    +
    +    // We should first sort by partition columns, then bucket id, and finally sorting columns.
    +    val getSortingKey =
    +      UnsafeProjection.create(partitionColumns ++ bucketIdExpression ++ sortColumns, inputSchema)
    +
    +    val sortingKeySchema = if (bucketSpec.isEmpty) {
    +      StructType.fromAttributes(partitionColumns)
    +    } else { // If it's bucketed, we should also consider bucket id as part of the key.
    +      val fields = StructType.fromAttributes(partitionColumns)
    +        .add("bucketId", IntegerType, nullable = false) ++ StructType.fromAttributes(sortColumns)
    +      StructType(fields)
    +    }
    --- End diff --
    
    can this be simplifed to
    
    sortingKeySchema = // what you have
    val getSortingKey = UnsafeProject.create(sortingKeySchema)
    ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48888699
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -451,3 +457,147 @@ private[sql] class DynamicPartitionWriterContainer(
         }
       }
     }
    +
    +/**
    + * A writer that dynamically opens files based on the given partition columns.  Internally this is
    --- End diff --
    
    Update comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48818977
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
    @@ -596,6 +614,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
                 conf.defaultDataSourceName,
                 temporary = false,
                 Array.empty[String],
    +            None,
    --- End diff --
    
    again use named argument here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48588391
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -129,6 +129,19 @@ final class DataFrameWriter private[sql](df: DataFrame) {
         this
       }
     
    +  @scala.annotation.varargs
    +  def bucketBy(numBuckets: Int, colNames: String*): DataFrameWriter = {
    --- End diff --
    
    I think this is better as bucketBy(numBuckets, col: String, cols: String*). Need to have at least one column.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48818865
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala ---
    @@ -192,7 +192,14 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
             }
     
             PartitioningUtils.validatePartitionColumnDataTypes(
    -          query.schema, partitionColumns, catalog.conf.caseSensitiveAnalysis)
    +          c.child.schema, c.partitionColumns, catalog.conf.caseSensitiveAnalysis)
    +
    +        c.bucketSpec.foreach(_.sortingColumns.foreach(_.foreach { sortCol =>
    --- End diff --
    
    these 3 foreaches nested in one line is too crazy. Let's simplify this (even if it means more loc)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168995524
  
    **[Test build #48770 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48770/consoleFull)** for PR 10498 at commit [`74bd524`](https://github.com/apache/spark/commit/74bd52461f381f67da737b8c9db595b09c77ad8d).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48889469
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---
    @@ -451,3 +457,147 @@ private[sql] class DynamicPartitionWriterContainer(
         }
       }
     }
    +
    +/**
    + * A writer that dynamically opens files based on the given partition columns.  Internally this is
    + * done by maintaining a HashMap of open files until `maxFiles` is reached.  If this occurs, the
    + * writer externally sorts the remaining rows and then writes out them out one file at a time.
    + */
    +private[sql] class BucketedPartitionWriterContainer(
    +    relation: BucketedHadoopFsRelation,
    +    job: Job,
    +    partitionColumns: Seq[Attribute],
    +    bucketSpec: BucketSpec,
    +    dataColumns: Seq[Attribute],
    +    inputSchema: Seq[Attribute],
    +    defaultPartitionName: String,
    +    isAppend: Boolean)
    +  extends BaseWriterContainer(relation, job, isAppend) {
    +
    +  def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
    +    executorSideSetup(taskContext)
    +
    +    val bucketColumns = bucketSpec.bucketColumnNames.map(c => inputSchema.find(_.name == c).get)
    +    val numBuckets = bucketSpec.numBuckets
    +    val sortColumns = bucketSpec.sortColumnNames.map(c => inputSchema.find(_.name == c).get)
    +    val bucketIdExpr = Pmod(new Murmur3Hash(bucketColumns), Literal(numBuckets))
    +
    +    val getSortingKey =
    +      UnsafeProjection.create((partitionColumns :+ bucketIdExpr) ++ sortColumns, inputSchema)
    +
    +    val sortingKeySchema = {
    +      val fields = StructType.fromAttributes(partitionColumns)
    +        .add("bucketId", IntegerType, nullable = false) ++
    +        StructType.fromAttributes(sortColumns)
    +      StructType(fields)
    +    }
    +
    +    // Returns the data columns to be written given an input row
    +    val getOutputRow = UnsafeProjection.create(dataColumns, inputSchema)
    +
    +    // Expressions that given a partition key build a string like: col1=val/col2=val/...
    +    val partitionStringExpression = partitionColumns.zipWithIndex.flatMap { case (c, i) =>
    +      val escaped =
    +        ScalaUDF(
    +          PartitioningUtils.escapePathName _,
    +          StringType,
    +          Seq(Cast(c, StringType)),
    +          Seq(StringType))
    +      val str = If(IsNull(c), Literal(defaultPartitionName), escaped)
    +      val partitionName = Literal(c.name + "=") :: str :: Nil
    +      if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
    +    }
    +
    +    // Returns the partition path given a partition key.
    +    val getPartitionString =
    +      UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionColumns)
    +
    +    // If anything below fails, we should abort the task.
    +    try {
    +      // TODO: remove duplicated code.
    +      // TODO: if sorting columns are empty, we can keep all writers in a hash map and avoid sorting
    +      // here.
    +      val sorter = new UnsafeKVExternalSorter(
    +        sortingKeySchema,
    +        StructType.fromAttributes(dataColumns),
    +        SparkEnv.get.blockManager,
    +        TaskContext.get().taskMemoryManager().pageSizeBytes)
    +
    +      while (iterator.hasNext) {
    +        val currentRow = iterator.next()
    +        sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
    +      }
    +
    +      logInfo(s"Sorting complete. Writing out partition files one at a time.")
    +
    +      def sameBucket(row1: InternalRow, row2: InternalRow): Boolean = {
    +        if (row1.getInt(partitionColumns.length) != row2.getInt(partitionColumns.length)) {
    +          false
    +        } else {
    +          var i = partitionColumns.length - 1
    +          while (i >= 0) {
    +            val dt = partitionColumns(i).dataType
    +            if (row1.get(i, dt) != row2.get(i, dt)) return false
    +            i -= 1
    +          }
    +          true
    +        }
    +      }
    +      val sortedIterator = sorter.sortedIterator()
    +      var currentKey: InternalRow = null
    +      var currentWriter: OutputWriter = null
    +      try {
    +        while (sortedIterator.next()) {
    +          if (currentKey == null || !sameBucket(currentKey, sortedIterator.getKey)) {
    +            if (currentWriter != null) {
    +              currentWriter.close()
    +            }
    +            currentKey = sortedIterator.getKey.copy()
    +            logDebug(s"Writing partition: $currentKey")
    +            currentWriter = newOutputWriter(currentKey)
    +          }
    +          currentWriter.writeInternal(sortedIterator.getValue)
    +        }
    +      } finally {
    +        if (currentWriter != null) { currentWriter.close() }
    +      }
    +
    +      commitTask()
    +    } catch {
    +      case cause: Throwable =>
    +        logError("Aborting task.", cause)
    +        abortTask()
    +        throw new SparkException("Task failed while writing rows.", cause)
    +    }
    +
    +    /** Open and returns a new OutputWriter given a partition key. */
    --- End diff --
    
    This comment is not accurate. Do you explain the on disk file naming anywhere? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168017643
  
    **[Test build #48489 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48489/consoleFull)** for PR 10498 at commit [`d2dc9b3`](https://github.com/apache/spark/commit/d2dc9b3ce51bd84ed5f59137d2eb76c8b6bd4f9c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48821401
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
    @@ -240,6 +241,23 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
           }
         }
     
    +    if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
    +      val BucketSpec(numBuckets, bucketColumns, sortColumns) = bucketSpec.get
    +
    +      tableProperties.put("spark.sql.sources.schema.numBuckets", numBuckets.toString)
    +      tableProperties.put("spark.sql.sources.schema.numBucketCols", bucketColumns.length.toString)
    +      bucketColumns.zipWithIndex.foreach { case (bucketCol, index) =>
    +        tableProperties.put(s"spark.sql.sources.schema.bucketCol.$index", bucketCol)
    +      }
    +
    +      if (sortColumns.isDefined) {
    +        tableProperties.put("spark.sql.sources.schema.numSortCols", sortColumns.get.length.toString)
    --- End diff --
    
    It's only used to read the sorting columns back, which is the same technology we used to store partitioned columns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-168669606
  
    **[Test build #48659 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48659/consoleFull)** for PR 10498 at commit [`21e0c48`](https://github.com/apache/spark/commit/21e0c48e83319f7319ba339deb2bffde0188583d).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48820931
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -189,13 +220,43 @@ final class DataFrameWriter private[sql](df: DataFrame) {
             ifNotExists = false)).toRdd
       }
     
    -  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { parCols =>
    -    parCols.map { col =>
    -      df.logicalPlan.output
    -        .map(_.name)
    -        .find(df.sqlContext.analyzer.resolver(_, col))
    -        .getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
    -          s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
    +  private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols =>
    +    cols.map(normalize(_, "Partition"))
    +  }
    +
    +  private def normalizedBucketCols: Option[Seq[String]] = bucketingColumns.map { cols =>
    +    cols.map(normalize(_, "Bucketing"))
    +  }
    +
    +  private def normalizedSortCols: Option[Seq[String]] = sortingColumns.map { cols =>
    +    cols.map(normalize(_, "Sorting"))
    +  }
    +
    +  private def getBucketSpec: Option[BucketSpec] = {
    +    if (sortingColumns.isDefined) {
    +      require(numBuckets.isDefined, "sortBy must be used together with bucketBy")
    +    }
    +
    +    for {
    +      n <- numBuckets
    +      cols <- normalizedBucketCols
    +    } yield {
    +      require(n > 0, "Bucket number must be greater than 0.")
    +      BucketSpec(n, cols, normalizedSortCols)
    +    }
    +  }
    +
    +  private def normalize(columnName: String, columnType: String): String = {
    +    val validColumnNames = df.logicalPlan.output.map(_.name)
    +    validColumnNames.find(df.sqlContext.analyzer.resolver(_, columnName))
    +      .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
    +        s"existing columns (${validColumnNames.mkString(", ")})"))
    +  }
    +
    +  private def assertNotBucketed(): Unit = {
    +    if (numBuckets.isDefined || sortingColumns.isDefined) {
    --- End diff --
    
    Your point makes sense if you look at it from the implementation's perspective, but if I'm an user, why do I have to call bucketBy in order to use sortBy?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL][WIP] support writing bucket...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-167596617
  
    **[Test build #48367 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48367/consoleFull)** for PR 10498 at commit [`8cb2494`](https://github.com/apache/spark/commit/8cb24942890153bad70e003110a28d6111f0c407).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `case class Hash(children: Seq[Expression]) extends Expression `\n  * `class TextOutputWriter(`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48609301
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketSpec.scala ---
    @@ -0,0 +1,42 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources
    +
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +
    +private[sql] case class BucketSpec(
    +    numBuckets: Int,
    +    bucketingColumns: Seq[String],
    +    sortingColumns: Option[Seq[String]]) {
    +
    +  def resolvedBucketingColumns(inputSchema: Seq[Attribute]): Seq[Attribute] = {
    +    bucketingColumns.map { col =>
    +      inputSchema.find(_.name == col).get
    +    }
    +  }
    +
    +  def resolvedSortingColumns(inputSchema: Seq[Attribute]): Seq[Attribute] = {
    +    if (sortingColumns.isDefined) {
    +      sortingColumns.get.map { col =>
    +        inputSchema.find(_.name == col).get
    +      }
    +    } else {
    +      Nil
    +    }
    +  }
    --- End diff --
    
    Do we need to consider case sensitivity in the above two `resolvedX` methods?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r49030037
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala ---
    @@ -125,7 +125,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
                  |Actual: ${partitionColumns.mkString(", ")}
               """.stripMargin)
     
    -        val writerContainer = if (partitionColumns.isEmpty) {
    +        val writerContainer = if (partitionColumns.isEmpty && relation.bucketSpec.isEmpty) {
    --- End diff --
    
    cc @davies, if we bucket without partition, we will go to the `DynamicPartitionWriterContainer` branch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-169317846
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12539][SQL] support writing bucketed ta...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on the pull request:

    https://github.com/apache/spark/pull/10498#issuecomment-169444645
  
    This looks good to me to merge. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org