You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by baishuo <gi...@git.apache.org> on 2014/08/13 11:15:02 UTC

[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

GitHub user baishuo opened a pull request:

    https://github.com/apache/spark/pull/1919

    [SPARK-3007][SQL]Add "Dynamic Partition" support to Spark Sql hive

    the detail please refer the comment of https://issues.apache.org/jira/browse/SPARK-3007

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/baishuo/spark patch-1

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1919.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1919
    
----
commit d3e206e1a2fadc271e365462bd93730e31a094eb
Author: baishuo(白硕) <vc...@hotmail.com>
Date:   2014-08-12T17:27:54Z

    Update HiveQl.scala

commit b22857a365925a428c41dd3e93d0da3613053071
Author: baishuo(白硕) <vc...@hotmail.com>
Date:   2014-08-12T17:29:36Z

    Update SparkHadoopWriter.scala

commit bade51d4726b8c55de83fef5c3e42c48f5af8f59
Author: baishuo(白硕) <vc...@hotmail.com>
Date:   2014-08-12T17:31:01Z

    Update InsertIntoHiveTable.scala

commit d211d330550260d93752349682e7c8447691a9e5
Author: baishuo(白硕) <vc...@hotmail.com>
Date:   2014-08-12T17:53:04Z

    Update InsertIntoHiveTable.scala

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16343341
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -201,17 +270,79 @@ case class InsertIntoHiveTable(
           }
         }
     
    -    // ORC stores compression information in table properties. While, there are other formats
    -    // (e.g. RCFile) that rely on hadoop configurations to store compression information.
    -    val jobConf = new JobConf(sc.hiveconf)
    -    saveAsHiveFile(
    -      rdd,
    -      outputClass,
    -      fileSinkConf,
    -      jobConf,
    -      sc.hiveconf.getBoolean("hive.exec.compress.output", false))
    -
    -    // TODO: Handle dynamic partitioning.
    +    if (dynamicPartNum > 0) {
    +      if (outputClass == null) {
    +        throw new SparkException("Output value class not set")
    +      }
    +      jobConfSer.value.setOutputValueClass(outputClass)
    +      if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
    +        throw new SparkException("Output format class not set")
    +      }
    +      // Doesn't work in Scala 2.9 due to what may be a generics bug
    +      // TODO: Should we uncomment this for Scala 2.10?
    +      // conf.setOutputFormat(outputFormatClass)
    +      jobConfSer.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
    +      if (sc.hiveconf.getBoolean("hive.exec.compress.output", false)) {
    +        // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
    +        // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
    +        // to store compression information.
    +        jobConfSer.value.set("mapred.output.compress", "true")
    +        fileSinkConf.setCompressed(true)
    +        fileSinkConf.setCompressCodec(jobConfSer.value.get("mapred.output.compression.codec"))
    +        fileSinkConf.setCompressType(jobConfSer.value.get("mapred.output.compression.type"))
    +      }
    +      jobConfSer.value.setOutputCommitter(classOf[FileOutputCommitter])
    +
    +      FileOutputFormat.setOutputPath(
    +        jobConfSer.value,
    +        SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, jobConfSer.value))
    +
    +      var writerMap =  new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter]
    +      def writeToFile2(context: TaskContext, iter: Iterator[Writable]) {
    +        // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
    +        // around by taking a mod. We expect that no task will be attempted 2 billion times.
    +        val attemptNumber = (context.attemptId % Int.MaxValue).toInt
    +        val serializer = newSerializer(fileSinkConf.getTableInfo)
    +        var count = 0
    +        var writer2:SparkHiveHadoopWriter = null
    +        while(iter.hasNext) {
    +          val record = iter.next();
    +          val location = fileSinkConf.getDirName
    +          val partLocation = location + dynamicPartPath
    +          writer2=writerMap.get(dynamicPartPath) match {
    --- End diff --
    
    Spaces around `=`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16346630
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -191,7 +257,10 @@ case class InsertIntoHiveTable(
           val outputData = new Array[Any](fieldOIs.length)
           iter.map { row =>
             var i = 0
    -        while (i < row.length) {
    +        while (i < fieldOIs.length) {
    +          if (fieldOIs.length < row.length && row.length - fieldOIs.length == dynamicPartNum) {
    +              dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum, jobConfSer.value)
    --- End diff --
    
    OK, I finally understand the trick here... Although `dynamicPartPath` is defined out of the closure,  the `dynamicPartPath` in this line and the the one used in `writeToFile2` are the same instance for a single row as the two iterators are pipelined.
    
    I admit I had never thought that we can use Spark in this way :) But this is too hacky to follow. I'd suggest to define `dynamicPartPath` within this closure and pass it as part of the output this RDD. Namely, change [this line](https://github.com/apache/spark/pull/1919/files#diff-d579db9a8f27e0bbef37720ab14ec3f6L200) to:
    
    ```scala
    serializer.serialize(outputData, standardOI) -> dynamicPartPath
    ```
    
    Then make `writeToFile2` receive an `Iterator[(Writable, String)]` instead of an `Iterator[Writable]`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16340297
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -159,6 +162,36 @@ case class InsertIntoHiveTable(
         writer.commitJob()
       }
     
    +  def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int, jobConf: JobConf) :String = {
    --- End diff --
    
    Should be `private`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-53942610
  
    ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-52026260
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16339176
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -204,12 +235,81 @@ case class InsertIntoHiveTable(
         // ORC stores compression information in table properties. While, there are other formats
         // (e.g. RCFile) that rely on hadoop configurations to store compression information.
         val jobConf = new JobConf(sc.hiveconf)
    -    saveAsHiveFile(
    -      rdd,
    -      outputClass,
    -      fileSinkConf,
    -      jobConf,
    -      sc.hiveconf.getBoolean("hive.exec.compress.output", false))
    +    val jobConfSer = new SerializableWritable(jobConf)
    +    if (dynamicPartNum>0) {
    +      if (outputClass == null) {
    +        throw new SparkException("Output value class not set")
    +      }
    +      jobConfSer.value.setOutputValueClass(outputClass)
    +      if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
    +        throw new SparkException("Output format class not set")
    +      }
    +      // Doesn't work in Scala 2.9 due to what may be a generics bug
    +      // TODO: Should we uncomment this for Scala 2.10?
    +      // conf.setOutputFormat(outputFormatClass)
    +      jobConfSer.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
    +      if (sc.hiveconf.getBoolean("hive.exec.compress.output", false)) {
    +        // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
    +        // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
    +        // to store compression information.
    +        jobConfSer.value.set("mapred.output.compress", "true")
    +        fileSinkConf.setCompressed(true)
    +        fileSinkConf.setCompressCodec(jobConfSer.value.get("mapred.output.compression.codec"))
    +        fileSinkConf.setCompressType(jobConfSer.value.get("mapred.output.compression.type"))
    +      }
    +      jobConfSer.value.setOutputCommitter(classOf[FileOutputCommitter])
    +
    +      FileOutputFormat.setOutputPath(
    +        jobConfSer.value,
    +        SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, jobConfSer.value))
    +
    +      var writerMap =  new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter]
    +      def writeToFile2(context: TaskContext, iter: Iterator[Writable]) {
    +        // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
    +        // around by taking a mod. We expect that no task will be attempted 2 billion times.
    +        val attemptNumber = (context.attemptId % Int.MaxValue).toInt
    +        val serializer = newSerializer(fileSinkConf.getTableInfo)
    +        var count = 0
    +        var writer2:SparkHiveHadoopWriter = null
    +        while(iter.hasNext) {
    +          val record = iter.next();
    +          val location = fileSinkConf.getDirName
    +          val partLocation = location + dynamicPartPath
    +          writer2=writerMap.get(dynamicPartPath) match {
    +            case Some(writer)=> writer
    +            case None => {
    +              val tempWriter = new SparkHiveHadoopWriter(jobConfSer.value, new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false))
    +              tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
    +              tempWriter.open(dynamicPartPath);
    +              writerMap += (dynamicPartPath -> tempWriter)
    +              tempWriter
    +            }
    +          }
    +          count += 1
    +          writer2.write(record)
    +        }
    +        for((k,v) <- writerMap) {
    +          v.close()
    +          v.commit()
    +        }
    +      }
    +
    +      sc.sparkContext.runJob(rdd, writeToFile2 _)
    +
    +      for((k,v) <- writerMap) {
    +        v.commitJob()
    +      }
    +      writerMap.clear()
    +      //writer.commitJob()
    +
    --- End diff --
    
    Remove this newline.
    
    It seems that we duplicated `saveAsHiveFile` here and added a new version of `writeToFile` together with some other minor modifications to enable dynamic partitioning. I think we can move `writeToFile2` (also, please rename it to something like `wirteWithDynamicPartitions`) into `saveAsHiveFile` without hurting performance as long as we keep the critical path of writing without dynamic partitioning clean.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by baishuo <gi...@git.apache.org>.
Github user baishuo commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-52583642
  
    thanks a lot @yhuai and @liancheng    :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16343705
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -201,17 +270,79 @@ case class InsertIntoHiveTable(
           }
         }
     
    -    // ORC stores compression information in table properties. While, there are other formats
    -    // (e.g. RCFile) that rely on hadoop configurations to store compression information.
    -    val jobConf = new JobConf(sc.hiveconf)
    -    saveAsHiveFile(
    -      rdd,
    -      outputClass,
    -      fileSinkConf,
    -      jobConf,
    -      sc.hiveconf.getBoolean("hive.exec.compress.output", false))
    -
    -    // TODO: Handle dynamic partitioning.
    +    if (dynamicPartNum > 0) {
    +      if (outputClass == null) {
    +        throw new SparkException("Output value class not set")
    +      }
    +      jobConfSer.value.setOutputValueClass(outputClass)
    +      if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
    +        throw new SparkException("Output format class not set")
    +      }
    +      // Doesn't work in Scala 2.9 due to what may be a generics bug
    +      // TODO: Should we uncomment this for Scala 2.10?
    +      // conf.setOutputFormat(outputFormatClass)
    +      jobConfSer.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
    +      if (sc.hiveconf.getBoolean("hive.exec.compress.output", false)) {
    +        // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
    +        // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
    +        // to store compression information.
    +        jobConfSer.value.set("mapred.output.compress", "true")
    +        fileSinkConf.setCompressed(true)
    +        fileSinkConf.setCompressCodec(jobConfSer.value.get("mapred.output.compression.codec"))
    +        fileSinkConf.setCompressType(jobConfSer.value.get("mapred.output.compression.type"))
    +      }
    +      jobConfSer.value.setOutputCommitter(classOf[FileOutputCommitter])
    +
    +      FileOutputFormat.setOutputPath(
    +        jobConfSer.value,
    +        SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, jobConfSer.value))
    +
    +      var writerMap =  new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter]
    +      def writeToFile2(context: TaskContext, iter: Iterator[Writable]) {
    +        // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
    +        // around by taking a mod. We expect that no task will be attempted 2 billion times.
    +        val attemptNumber = (context.attemptId % Int.MaxValue).toInt
    +        val serializer = newSerializer(fileSinkConf.getTableInfo)
    +        var count = 0
    +        var writer2:SparkHiveHadoopWriter = null
    +        while(iter.hasNext) {
    +          val record = iter.next();
    +          val location = fileSinkConf.getDirName
    +          val partLocation = location + dynamicPartPath
    +          writer2=writerMap.get(dynamicPartPath) match {
    +            case Some(writer)=> writer
    +            case None => {
    --- End diff --
    
    We can replace `writerMap.get(...) match { case None ... }` structure here with `writerMap.getOrElse(..., { ... })`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16346660
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -191,7 +257,10 @@ case class InsertIntoHiveTable(
           val outputData = new Array[Any](fieldOIs.length)
           iter.map { row =>
             var i = 0
    -        while (i < row.length) {
    +        while (i < fieldOIs.length) {
    +          if (fieldOIs.length < row.length && row.length - fieldOIs.length == dynamicPartNum) {
    --- End diff --
    
    Another thing is that, we only need to calculate `dynamicPartPath` once for a single row. Just move this block before the `while`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16343370
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -201,17 +270,79 @@ case class InsertIntoHiveTable(
           }
         }
     
    -    // ORC stores compression information in table properties. While, there are other formats
    -    // (e.g. RCFile) that rely on hadoop configurations to store compression information.
    -    val jobConf = new JobConf(sc.hiveconf)
    -    saveAsHiveFile(
    -      rdd,
    -      outputClass,
    -      fileSinkConf,
    -      jobConf,
    -      sc.hiveconf.getBoolean("hive.exec.compress.output", false))
    -
    -    // TODO: Handle dynamic partitioning.
    +    if (dynamicPartNum > 0) {
    +      if (outputClass == null) {
    +        throw new SparkException("Output value class not set")
    +      }
    +      jobConfSer.value.setOutputValueClass(outputClass)
    +      if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
    +        throw new SparkException("Output format class not set")
    +      }
    +      // Doesn't work in Scala 2.9 due to what may be a generics bug
    +      // TODO: Should we uncomment this for Scala 2.10?
    +      // conf.setOutputFormat(outputFormatClass)
    +      jobConfSer.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
    +      if (sc.hiveconf.getBoolean("hive.exec.compress.output", false)) {
    +        // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
    +        // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
    +        // to store compression information.
    +        jobConfSer.value.set("mapred.output.compress", "true")
    +        fileSinkConf.setCompressed(true)
    +        fileSinkConf.setCompressCodec(jobConfSer.value.get("mapred.output.compression.codec"))
    +        fileSinkConf.setCompressType(jobConfSer.value.get("mapred.output.compression.type"))
    +      }
    +      jobConfSer.value.setOutputCommitter(classOf[FileOutputCommitter])
    +
    +      FileOutputFormat.setOutputPath(
    +        jobConfSer.value,
    +        SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, jobConfSer.value))
    +
    +      var writerMap =  new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter]
    +      def writeToFile2(context: TaskContext, iter: Iterator[Writable]) {
    +        // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
    +        // around by taking a mod. We expect that no task will be attempted 2 billion times.
    +        val attemptNumber = (context.attemptId % Int.MaxValue).toInt
    +        val serializer = newSerializer(fileSinkConf.getTableInfo)
    +        var count = 0
    +        var writer2:SparkHiveHadoopWriter = null
    +        while(iter.hasNext) {
    --- End diff --
    
    Space after `(`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16343293
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -191,7 +257,10 @@ case class InsertIntoHiveTable(
           val outputData = new Array[Any](fieldOIs.length)
           iter.map { row =>
             var i = 0
    -        while (i < row.length) {
    +        while (i < fieldOIs.length) {
    +          if (fieldOIs.length < row.length && row.length - fieldOIs.length == dynamicPartNum) {
    --- End diff --
    
    The first comparison is redundant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16345513
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -191,7 +257,10 @@ case class InsertIntoHiveTable(
           val outputData = new Array[Any](fieldOIs.length)
           iter.map { row =>
             var i = 0
    -        while (i < row.length) {
    +        while (i < fieldOIs.length) {
    +          if (fieldOIs.length < row.length && row.length - fieldOIs.length == dynamicPartNum) {
    +              dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum, jobConfSer.value)
    --- End diff --
    
    Wrong indentation (should be 2 spaces).
    
    And more important, `dynamicPartPath` is defined out of the closure, but the assignment here is executed on the executor side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16339230
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -204,12 +235,81 @@ case class InsertIntoHiveTable(
         // ORC stores compression information in table properties. While, there are other formats
         // (e.g. RCFile) that rely on hadoop configurations to store compression information.
         val jobConf = new JobConf(sc.hiveconf)
    -    saveAsHiveFile(
    -      rdd,
    -      outputClass,
    -      fileSinkConf,
    -      jobConf,
    -      sc.hiveconf.getBoolean("hive.exec.compress.output", false))
    +    val jobConfSer = new SerializableWritable(jobConf)
    +    if (dynamicPartNum>0) {
    +      if (outputClass == null) {
    +        throw new SparkException("Output value class not set")
    +      }
    +      jobConfSer.value.setOutputValueClass(outputClass)
    +      if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
    +        throw new SparkException("Output format class not set")
    +      }
    +      // Doesn't work in Scala 2.9 due to what may be a generics bug
    +      // TODO: Should we uncomment this for Scala 2.10?
    +      // conf.setOutputFormat(outputFormatClass)
    +      jobConfSer.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
    +      if (sc.hiveconf.getBoolean("hive.exec.compress.output", false)) {
    +        // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
    +        // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
    +        // to store compression information.
    +        jobConfSer.value.set("mapred.output.compress", "true")
    +        fileSinkConf.setCompressed(true)
    +        fileSinkConf.setCompressCodec(jobConfSer.value.get("mapred.output.compression.codec"))
    +        fileSinkConf.setCompressType(jobConfSer.value.get("mapred.output.compression.type"))
    +      }
    +      jobConfSer.value.setOutputCommitter(classOf[FileOutputCommitter])
    +
    +      FileOutputFormat.setOutputPath(
    +        jobConfSer.value,
    +        SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, jobConfSer.value))
    +
    +      var writerMap =  new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter]
    +      def writeToFile2(context: TaskContext, iter: Iterator[Writable]) {
    +        // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
    +        // around by taking a mod. We expect that no task will be attempted 2 billion times.
    +        val attemptNumber = (context.attemptId % Int.MaxValue).toInt
    +        val serializer = newSerializer(fileSinkConf.getTableInfo)
    +        var count = 0
    +        var writer2:SparkHiveHadoopWriter = null
    +        while(iter.hasNext) {
    +          val record = iter.next();
    +          val location = fileSinkConf.getDirName
    +          val partLocation = location + dynamicPartPath
    +          writer2=writerMap.get(dynamicPartPath) match {
    +            case Some(writer)=> writer
    +            case None => {
    +              val tempWriter = new SparkHiveHadoopWriter(jobConfSer.value, new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false))
    +              tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
    +              tempWriter.open(dynamicPartPath);
    +              writerMap += (dynamicPartPath -> tempWriter)
    +              tempWriter
    +            }
    +          }
    +          count += 1
    +          writer2.write(record)
    +        }
    +        for((k,v) <- writerMap) {
    +          v.close()
    +          v.commit()
    +        }
    +      }
    +
    +      sc.sparkContext.runJob(rdd, writeToFile2 _)
    +
    +      for((k,v) <- writerMap) {
    +        v.commitJob()
    +      }
    +      writerMap.clear()
    +      //writer.commitJob()
    +
    +    } else {
    +      saveAsHiveFile(
    +        rdd,
    +        outputClass,
    +        fileSinkConf,
    +        jobConf,
    +        sc.hiveconf.getBoolean("hive.exec.compress.output", false))
    +    }
     
         // TODO: Handle dynamic partitioning.
    --- End diff --
    
    Remove this comment since we are adding support for dynamic partitioning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16338950
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -159,6 +159,28 @@ case class InsertIntoHiveTable(
         writer.commitJob()
       }
     
    +  def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int) :String = {
    +    dynamicPartNum2 match {
    +      case 0 =>""
    --- End diff --
    
    Please add a space before the quotes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16340359
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -201,15 +270,78 @@ case class InsertIntoHiveTable(
           }
         }
     
    -    // ORC stores compression information in table properties. While, there are other formats
    -    // (e.g. RCFile) that rely on hadoop configurations to store compression information.
    -    val jobConf = new JobConf(sc.hiveconf)
    -    saveAsHiveFile(
    -      rdd,
    -      outputClass,
    -      fileSinkConf,
    -      jobConf,
    -      sc.hiveconf.getBoolean("hive.exec.compress.output", false))
    +    if (dynamicPartNum>0) {
    --- End diff --
    
    Spaces around `>`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-54236783
  
    Would you mind to close this PR since #2226 was opened as a replacement?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16338992
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -178,6 +200,12 @@ case class InsertIntoHiveTable(
         val tableLocation = table.hiveQlTable.getDataLocation
         val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
         val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
    +    var dynamicPartNum = 0
    +    var dynamicPartPath = "";
    +    val partitionSpec = partition.map {
    +      case (key, Some(value)) => key -> value
    +      case (key, None) => { dynamicPartNum += 1; key -> ""  }// Should not reach here right now.
    --- End diff --
    
    Remove the comment and put `dynamicPartNum += 1` and `key -> ""` in two separate lines (without braces).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16353163
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -24,7 +24,9 @@ import java.util.{HashMap => JHashMap}
     import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
     import org.apache.hadoop.hive.metastore.MetaStoreUtils
     import org.apache.hadoop.hive.ql.Context
    +import org.apache.hadoop.hive.ql.ErrorMsg
     import org.apache.hadoop.hive.ql.metadata.Hive
    +import org.apache.hadoop.hive.ql.parse.SemanticException
    --- End diff --
    
    I feel we will introduce confusion to users if we throw hive's `SemanticException`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-54207500
  
    @baishuo Scala style check failed. See [here](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19507/consoleFull) for details.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by baishuo <gi...@git.apache.org>.
Github user baishuo commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-54032088
  
    Hi @


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16340258
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
    @@ -93,6 +93,33 @@ private[hive] class SparkHiveHadoopWriter(
           null)
       }
     
    +  def open(dynamicPartPath: String) {
    +    val numfmt = NumberFormat.getInstance()
    --- End diff --
    
    Just realized this function is a variant of the original `open()` method within the same file. This should be a bug in the master branch.
    
    Another issue is that, `SparkHadoopWriter` resides in project `core`, which is an indirect dependency of `sql/hive`. Thus logically, it's not proper to put `open(dynamicPartPath: String)` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-52460973
  
    Please don't forget to add golden answer files for those test cases newly added to whitelist in `HiveCompatibilitySuite`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16339566
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -231,14 +327,26 @@ case class InsertIntoHiveTable(
           val inheritTableSpecs = true
           // TODO: Correctly set isSkewedStoreAsSubdir.
           val isSkewedStoreAsSubdir = false
    -      db.loadPartition(
    -        outputPath,
    -        qualifiedTableName,
    -        partitionSpec,
    -        overwrite,
    -        holdDDLTime,
    -        inheritTableSpecs,
    -        isSkewedStoreAsSubdir)
    +      if (dynamicPartNum>0) {
    --- End diff --
    
    Please add spaces around `>`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16345316
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -159,6 +162,36 @@ case class InsertIntoHiveTable(
         writer.commitJob()
       }
     
    +  def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int, jobConf: JobConf) :String = {
    +    dynamicPartNum2 match {
    --- End diff --
    
    An `if` should be more appropriate than a `match` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16338944
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
    @@ -93,6 +93,33 @@ private[hive] class SparkHiveHadoopWriter(
           null)
       }
     
    +  def open(dynamicPartPath: String) {
    +    val numfmt = NumberFormat.getInstance()
    --- End diff --
    
    `NumberFormat.getInstance()` is not thread-safe. We can use a thread-local variable to hold this object, similar to [`Cast.threadLocalDateFormat`](https://github.com/apache/spark/blob/eef779b8d631de971d440051cae21040f4de558f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L269-L273)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16347194
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -191,7 +257,10 @@ case class InsertIntoHiveTable(
           val outputData = new Array[Any](fieldOIs.length)
           iter.map { row =>
             var i = 0
    -        while (i < row.length) {
    +        while (i < fieldOIs.length) {
    +          if (fieldOIs.length < row.length && row.length - fieldOIs.length == dynamicPartNum) {
    --- End diff --
    
    Sorry, too many comments for a single line :)
    
    Just realized this line is on the critical path, please forget about the assertion I mentioned above. To avoid unnecessary computations and branches, I'd suggest something like:
    
    ```scala
    if (dynamicPartNum > 0) {
      iter.map {
        // With DP
      }
    } else {
      iter.map {
        // Without DP
      }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16340321
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -159,6 +162,36 @@ case class InsertIntoHiveTable(
         writer.commitJob()
       }
     
    +  def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int, jobConf: JobConf) :String = {
    --- End diff --
    
    `) :String` => `): String`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16344256
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -191,7 +257,10 @@ case class InsertIntoHiveTable(
           val outputData = new Array[Any](fieldOIs.length)
           iter.map { row =>
             var i = 0
    -        while (i < row.length) {
    +        while (i < fieldOIs.length) {
    +          if (fieldOIs.length < row.length && row.length - fieldOIs.length == dynamicPartNum) {
    --- End diff --
    
    I think instead of an `if`, we should add an assertion here to make sure `row.length - fieldIOs.length == dynamicPartNum`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16343814
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -201,17 +270,79 @@ case class InsertIntoHiveTable(
           }
         }
     
    -    // ORC stores compression information in table properties. While, there are other formats
    -    // (e.g. RCFile) that rely on hadoop configurations to store compression information.
    -    val jobConf = new JobConf(sc.hiveconf)
    -    saveAsHiveFile(
    -      rdd,
    -      outputClass,
    -      fileSinkConf,
    -      jobConf,
    -      sc.hiveconf.getBoolean("hive.exec.compress.output", false))
    -
    -    // TODO: Handle dynamic partitioning.
    +    if (dynamicPartNum > 0) {
    +      if (outputClass == null) {
    +        throw new SparkException("Output value class not set")
    +      }
    +      jobConfSer.value.setOutputValueClass(outputClass)
    +      if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
    +        throw new SparkException("Output format class not set")
    +      }
    +      // Doesn't work in Scala 2.9 due to what may be a generics bug
    +      // TODO: Should we uncomment this for Scala 2.10?
    +      // conf.setOutputFormat(outputFormatClass)
    +      jobConfSer.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
    +      if (sc.hiveconf.getBoolean("hive.exec.compress.output", false)) {
    +        // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
    +        // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
    +        // to store compression information.
    +        jobConfSer.value.set("mapred.output.compress", "true")
    +        fileSinkConf.setCompressed(true)
    +        fileSinkConf.setCompressCodec(jobConfSer.value.get("mapred.output.compression.codec"))
    +        fileSinkConf.setCompressType(jobConfSer.value.get("mapred.output.compression.type"))
    +      }
    +      jobConfSer.value.setOutputCommitter(classOf[FileOutputCommitter])
    +
    +      FileOutputFormat.setOutputPath(
    +        jobConfSer.value,
    +        SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, jobConfSer.value))
    +
    +      var writerMap =  new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter]
    +      def writeToFile2(context: TaskContext, iter: Iterator[Writable]) {
    +        // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
    +        // around by taking a mod. We expect that no task will be attempted 2 billion times.
    +        val attemptNumber = (context.attemptId % Int.MaxValue).toInt
    +        val serializer = newSerializer(fileSinkConf.getTableInfo)
    +        var count = 0
    +        var writer2:SparkHiveHadoopWriter = null
    +        while(iter.hasNext) {
    +          val record = iter.next();
    --- End diff --
    
    Remove the trailing `;`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16343397
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -201,17 +270,79 @@ case class InsertIntoHiveTable(
           }
         }
     
    -    // ORC stores compression information in table properties. While, there are other formats
    -    // (e.g. RCFile) that rely on hadoop configurations to store compression information.
    -    val jobConf = new JobConf(sc.hiveconf)
    -    saveAsHiveFile(
    -      rdd,
    -      outputClass,
    -      fileSinkConf,
    -      jobConf,
    -      sc.hiveconf.getBoolean("hive.exec.compress.output", false))
    -
    -    // TODO: Handle dynamic partitioning.
    +    if (dynamicPartNum > 0) {
    +      if (outputClass == null) {
    +        throw new SparkException("Output value class not set")
    +      }
    +      jobConfSer.value.setOutputValueClass(outputClass)
    +      if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
    +        throw new SparkException("Output format class not set")
    +      }
    +      // Doesn't work in Scala 2.9 due to what may be a generics bug
    +      // TODO: Should we uncomment this for Scala 2.10?
    +      // conf.setOutputFormat(outputFormatClass)
    +      jobConfSer.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
    +      if (sc.hiveconf.getBoolean("hive.exec.compress.output", false)) {
    +        // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
    +        // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
    +        // to store compression information.
    +        jobConfSer.value.set("mapred.output.compress", "true")
    +        fileSinkConf.setCompressed(true)
    +        fileSinkConf.setCompressCodec(jobConfSer.value.get("mapred.output.compression.codec"))
    +        fileSinkConf.setCompressType(jobConfSer.value.get("mapred.output.compression.type"))
    +      }
    +      jobConfSer.value.setOutputCommitter(classOf[FileOutputCommitter])
    +
    +      FileOutputFormat.setOutputPath(
    +        jobConfSer.value,
    +        SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, jobConfSer.value))
    +
    +      var writerMap =  new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter]
    +      def writeToFile2(context: TaskContext, iter: Iterator[Writable]) {
    +        // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
    +        // around by taking a mod. We expect that no task will be attempted 2 billion times.
    +        val attemptNumber = (context.attemptId % Int.MaxValue).toInt
    +        val serializer = newSerializer(fileSinkConf.getTableInfo)
    +        var count = 0
    +        var writer2:SparkHiveHadoopWriter = null
    +        while(iter.hasNext) {
    +          val record = iter.next();
    +          val location = fileSinkConf.getDirName
    +          val partLocation = location + dynamicPartPath
    +          writer2=writerMap.get(dynamicPartPath) match {
    +            case Some(writer)=> writer
    +            case None => {
    +              val tempWriter = new SparkHiveHadoopWriter(jobConfSer.value, new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false))
    +              tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
    +              tempWriter.open(dynamicPartPath);
    +              writerMap += (dynamicPartPath -> tempWriter)
    +              tempWriter
    +            }
    +          }
    +          count += 1
    +          writer2.write(record)
    +        }
    +        for((k,v) <- writerMap) {
    --- End diff --
    
    Space before `(`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by baishuo <gi...@git.apache.org>.
Github user baishuo commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-53390280
  
    Hi @marmbrus i had update the file relating with test. all test passed on my machine. Would you please help to verify this patch when you have time:)  I had write out the thinking of the code. thank you. 
    @rxin @liancheng 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-52509183
  
    @baishuo Thank you for working on it.
    
    I have three general comments.
    
    1. Hive has a lots of confs that are used to influence how semantic analyzer works. `HiveConf.ConfVars.DYNAMICPARTITIONING (hive.exec.dynamic.partition)` and `HiveConf.ConfVars.DYNAMICPARTITIONINGMODE (hive.exec.dynamic.partition.mode)` are two examples. As long as we generate the correct results and we can make sure the execution is robust, I think it is not necessarily to follow those confs.
    
    2. For `hive.exec.dynamic.partition.mode`, I think the purpose of it is to avoid having too many concurrent file writers in a task. Actually, even if `hive.exec.dynamic.partition.mode=strict`, we can still have many distinct values on those dynamic partitioning columns and thus, have too many file writers in a task. For those columnar file formats, like RCFile, ORC, and Parquet, every file writer internally maintain a memory buffer. Many file writers can significantly increase the memory footprint of a task and can introduce OOMs. Instead of relying on Hive's confs, it is better to provide a way to group data based on those dynamic partitioning columns. So, we will not have many concurrent file writers. Just two primitive ideas. We can shuffle the data before inserting. Or, we can do local grouping and write data in a group-by-group fashion. Anyway, I feel we may need to introduce changes to the planner. 
    
    3. The last comment is not quite related to this PR. I think it is better to have a general design on how table is partitioned and (hopefully,) Hive's directory layout in HDFS will be just a special case. I am not sure that creating a single file for every combination of values of partitioning columns is a good way. It introduces potential stability issues to the insert operation (too many file writers), and performance issues to both insert and table scan operations. With this approach, we can easily create a lots of small files in HDFS, which introduces memory pressure to the HDFS namenode.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16340105
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
    @@ -93,6 +93,33 @@ private[hive] class SparkHiveHadoopWriter(
           null)
       }
     
    +  def open(dynamicPartPath: String) {
    +    val numfmt = NumberFormat.getInstance()
    +    numfmt.setMinimumIntegerDigits(5)
    +    numfmt.setGroupingUsed(false)
    +
    +    val extension = Utilities.getFileExtension(
    +      conf.value,
    +      fileSinkConf.getCompressed,
    +      getOutputFormat())
    +
    +    val outputName = "part-"  + numfmt.format(splitID) + extension
    +    val outputPath: Path = FileOutputFormat.getOutputPath(conf.value)
    +    if (outputPath == null) {
    +      throw new IOException("Undefined job output-path")
    +    }
    +    val workPath = new Path(outputPath, dynamicPartPath.substring(1))//remove "/"
    +    val path = new Path(workPath, outputName)
    +    getOutputCommitter().setupTask(getTaskContext())
    +    writer = HiveFileFormatUtils.getHiveRecordWriter(
    +      conf.value,
    +      fileSinkConf.getTableInfo,
    +      conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
    +      fileSinkConf,
    +      path,
    +      null)
    --- End diff --
    
    Maybe `Reporter.NULL` instead of `null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-52123315
  
    There are a couple of ways we can add tests, ideally we would do a little of both:
     - Find [existing hive tests](https://github.com/apache/spark/tree/master/sql/hive/src/test/resources/ql/src/test/queries/clientpositive) that test dynamic partitioning and add them to [our whitelist](https://github.com/apache/spark/blob/master/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala#L209).  The test harness will automatically invoke hive to calculate the correct answers.  You need to make sure you have Hadoop and Hive compiled and the environment variables set correctly as described in other [dependencies for developers](https://github.com/apache/spark/tree/master/sql).
     - Add tests to [HiveQuerySuite](https://github.com/apache/spark/blob/master/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by baishuo <gi...@git.apache.org>.
Github user baishuo commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-52758525
  
    Here I try to explain my design idea(the code is mostly in InsertIntoHiveTable.scala) :
    lets assume there is a table called table1,which has 2 columns:col1,col2, and two partitions: part1, part2.
    
    ONE:
    In case of just insert data to a static partition,I find when "saveAsHiveFile" finished, the data was wroten to a temporary location, then directory like: /tmp/hive-root/hive_****/-ext-10000,lets call it TMPLOCATION. And under TMPLOCATION, there is sub directory /part1=.../part2=... , all data was store under TMPLOCATION/part1=.../part2=... , then spark will call hive api "loadPartition" to move the files to {hivewarehouse}/{tablename}/part1=.../part2=... and update the metadata. then the whole progress is OK.
    
    If we what to implement the "dynamic partiton function", we need to use hive api "loadDynamicPartitions" to move data and update metadata. But the requirement of directory formate for "loadDynamicPartitions" is a little difference to "loadPartition":
    
    1: In case of one static partition and one dynamic partition (HQL like "
    insert overwrite table table1 partition(part1=val1,part2) select a,b,c from ..."), loadDynamicPartitions need the tmp data located at TMPLOCATION/part2=c1, TMPLOCATION/part2=c2 ......., And loadDynamicPartitions will move them to {hivewarehouse}/{tablename}/part1=val1/part2=c1, {hivewarehouse}/{tablename}/part1=val1/part2=c2 ...., and update the metadata. Note that in this case loadDynamicPartitions do note need the subdir like part1=val1 under TMPLOCATION
    
    2: In case of zero static partition and 2 dynamic partition (HQL like "
    insert overwrite table table1 partition(part1,part2) select a,b,x,c from ..."), loadDynamicPartitions need the tmp data located at TMPLOCATION/part1=../part2=c1, TMPLOCATION/part1=../part2=c2 ......., And loadDynamicPartitions will move them to {hivewarehouse}/{tablename}/part1=../part2=...,
    
    So if there have static partition in HQL determine how we create subdir under TMPLOCATION. That why the function "getDynamicPartDir" exist.
    
    TWO:
    where shall we call the "getDynamicPartDir"? must a location that we can get the values for dynamic partiton. so we call this function at "iter.map { row =>..." in the closure of "val rdd = childRdd.mapPartitions". when we get the row, we can get the values for dynamic partiton. after we get the dynamicPartPath by function getDynamicPartDir, we can pass it to next RDD by the output this RDD: serializer.serialize(outputData, standardOI) -> dynamicPartPath. (for the static partiton,dynamicPartPath is null)
    
    when the next rdd (closure in writeToFile) get the data and dynamicPartPath, we can check if the dynamicPartPath equals null. if not null. we check if there is already a corresponding writer exist in writerMap which store all writer for each partition. if there is. we use this writer to write the record. that ensure the data belongs to same partition will be wroten to the same directory.
    
    loadDynamicPartitions require there is no other files under TMPLOCATION except the subdir for dynamic partition. that why there are several "if (dynamicPartNum == 0)" in writeToFile


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16358186
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -201,17 +270,79 @@ case class InsertIntoHiveTable(
           }
         }
     
    -    // ORC stores compression information in table properties. While, there are other formats
    -    // (e.g. RCFile) that rely on hadoop configurations to store compression information.
    -    val jobConf = new JobConf(sc.hiveconf)
    -    saveAsHiveFile(
    -      rdd,
    -      outputClass,
    -      fileSinkConf,
    -      jobConf,
    -      sc.hiveconf.getBoolean("hive.exec.compress.output", false))
    -
    -    // TODO: Handle dynamic partitioning.
    +    if (dynamicPartNum > 0) {
    +      if (outputClass == null) {
    --- End diff --
    
    Can we generalize `saveAsHiveFile` instead of duplicating its code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16338954
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -159,6 +159,28 @@ case class InsertIntoHiveTable(
         writer.commitJob()
       }
     
    +  def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int) :String = {
    +    dynamicPartNum2 match {
    +      case 0 =>""
    +      case i => {
    +        val colsNum = tableInfo.getProperties.getProperty("columns").split("\\,").length
    +        val partColStr = tableInfo.getProperties.getProperty("partition_columns")
    +        val partCols = partColStr.split("/")
    +        var buf = new StringBuffer()
    --- End diff --
    
    I think we can simplify line 169 to line 179 to:
    
    ```scala
    partCols
      .takeRight(dynamicPartNum2)
      .zip(row.takeRight(dynamicPartNum2))
      .map { case (c, v) => s"/$c=$v" }
      .mkString
    ```
    
    Also, I'm not very sure whether `toString` is adequate enough to be part of the partition directory name...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by baishuo <gi...@git.apache.org>.
Github user baishuo commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-52026271
  
    I didnt have  add the related test since I dont know how to write it.  but I had test the function by SparkSQLCLIDriver


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-53942815
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19507/consoleFull) for   PR 1919 at commit [`0c324be`](https://github.com/apache/spark/commit/0c324beaa38abfd089257466a0a0ddd6e57c5fad).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class Sqrt(child: Expression) extends UnaryExpression `
      * `  class TreeNodeRef(val obj: TreeNode[_]) `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by baishuo <gi...@git.apache.org>.
Github user baishuo commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-54244065
  
    no problem, close this PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-53942774
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19507/consoleFull) for   PR 1919 at commit [`0c324be`](https://github.com/apache/spark/commit/0c324beaa38abfd089257466a0a0ddd6e57c5fad).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16339580
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -231,14 +327,26 @@ case class InsertIntoHiveTable(
           val inheritTableSpecs = true
           // TODO: Correctly set isSkewedStoreAsSubdir.
           val isSkewedStoreAsSubdir = false
    -      db.loadPartition(
    -        outputPath,
    -        qualifiedTableName,
    -        partitionSpec,
    -        overwrite,
    -        holdDDLTime,
    -        inheritTableSpecs,
    -        isSkewedStoreAsSubdir)
    +      if (dynamicPartNum>0) {
    +        db.loadDynamicPartitions(
    +          outputPath,
    +          qualifiedTableName,
    +          partitionSpec,
    +          overwrite,
    +          dynamicPartNum/*dpCtx.getNumDPCols()*/,
    --- End diff --
    
    Remove the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16339200
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -204,12 +235,81 @@ case class InsertIntoHiveTable(
         // ORC stores compression information in table properties. While, there are other formats
         // (e.g. RCFile) that rely on hadoop configurations to store compression information.
         val jobConf = new JobConf(sc.hiveconf)
    -    saveAsHiveFile(
    -      rdd,
    -      outputClass,
    -      fileSinkConf,
    -      jobConf,
    -      sc.hiveconf.getBoolean("hive.exec.compress.output", false))
    +    val jobConfSer = new SerializableWritable(jobConf)
    +    if (dynamicPartNum>0) {
    +      if (outputClass == null) {
    +        throw new SparkException("Output value class not set")
    +      }
    +      jobConfSer.value.setOutputValueClass(outputClass)
    +      if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
    +        throw new SparkException("Output format class not set")
    +      }
    +      // Doesn't work in Scala 2.9 due to what may be a generics bug
    +      // TODO: Should we uncomment this for Scala 2.10?
    +      // conf.setOutputFormat(outputFormatClass)
    +      jobConfSer.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
    +      if (sc.hiveconf.getBoolean("hive.exec.compress.output", false)) {
    +        // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
    +        // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
    +        // to store compression information.
    +        jobConfSer.value.set("mapred.output.compress", "true")
    +        fileSinkConf.setCompressed(true)
    +        fileSinkConf.setCompressCodec(jobConfSer.value.get("mapred.output.compression.codec"))
    +        fileSinkConf.setCompressType(jobConfSer.value.get("mapred.output.compression.type"))
    +      }
    +      jobConfSer.value.setOutputCommitter(classOf[FileOutputCommitter])
    +
    +      FileOutputFormat.setOutputPath(
    +        jobConfSer.value,
    +        SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, jobConfSer.value))
    +
    +      var writerMap =  new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter]
    +      def writeToFile2(context: TaskContext, iter: Iterator[Writable]) {
    +        // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
    +        // around by taking a mod. We expect that no task will be attempted 2 billion times.
    +        val attemptNumber = (context.attemptId % Int.MaxValue).toInt
    +        val serializer = newSerializer(fileSinkConf.getTableInfo)
    +        var count = 0
    +        var writer2:SparkHiveHadoopWriter = null
    +        while(iter.hasNext) {
    +          val record = iter.next();
    +          val location = fileSinkConf.getDirName
    +          val partLocation = location + dynamicPartPath
    +          writer2=writerMap.get(dynamicPartPath) match {
    +            case Some(writer)=> writer
    +            case None => {
    +              val tempWriter = new SparkHiveHadoopWriter(jobConfSer.value, new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false))
    +              tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
    +              tempWriter.open(dynamicPartPath);
    +              writerMap += (dynamicPartPath -> tempWriter)
    +              tempWriter
    +            }
    +          }
    +          count += 1
    +          writer2.write(record)
    +        }
    +        for((k,v) <- writerMap) {
    +          v.close()
    +          v.commit()
    +        }
    +      }
    +
    +      sc.sparkContext.runJob(rdd, writeToFile2 _)
    +
    +      for((k,v) <- writerMap) {
    +        v.commitJob()
    +      }
    +      writerMap.clear()
    +      //writer.commitJob()
    --- End diff --
    
    Remove this comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16339943
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala ---
    @@ -271,4 +272,9 @@ object Cast {
           new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
         }
       }
    +  private[sql] val threadLocalNumberFormat = new ThreadLocal[NumberFormat] {
    --- End diff --
    
    Ah, sorry, I didn't make myself clear enough. I mean you can refer to `Cast.threadLocalDateFormat`, not add the thread-local version of `NumberFormat` here, since it's not related to `Cast`. A better place to hold this could be `object SparkHadoopWriter`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by baishuo <gi...@git.apache.org>.
Github user baishuo commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-52670830
  
    Hi @marmbrus and @liancheng  I had made some modification and do the test with "sbt/sbt catalyst/test sql/test hive/test"  .  Please help me to check if it is proper when you have time . Thank you :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16338953
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -159,6 +159,28 @@ case class InsertIntoHiveTable(
         writer.commitJob()
       }
     
    +  def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int) :String = {
    +    dynamicPartNum2 match {
    +      case 0 =>""
    +      case i => {
    --- End diff --
    
    This pair of braces are redundant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16340415
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -159,6 +162,36 @@ case class InsertIntoHiveTable(
         writer.commitJob()
       }
     
    +  def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int, jobConf: JobConf) :String = {
    --- End diff --
    
    nit: why the `2` in `dynamicPartNum2`?...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-52732204
  
    Hmm, I see 17 newly whitelisted test cases, but only golden answers for the `dynamic_partition` case were submitted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16343411
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -201,17 +270,79 @@ case class InsertIntoHiveTable(
           }
         }
     
    -    // ORC stores compression information in table properties. While, there are other formats
    -    // (e.g. RCFile) that rely on hadoop configurations to store compression information.
    -    val jobConf = new JobConf(sc.hiveconf)
    -    saveAsHiveFile(
    -      rdd,
    -      outputClass,
    -      fileSinkConf,
    -      jobConf,
    -      sc.hiveconf.getBoolean("hive.exec.compress.output", false))
    -
    -    // TODO: Handle dynamic partitioning.
    +    if (dynamicPartNum > 0) {
    +      if (outputClass == null) {
    +        throw new SparkException("Output value class not set")
    +      }
    +      jobConfSer.value.setOutputValueClass(outputClass)
    +      if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
    +        throw new SparkException("Output format class not set")
    +      }
    +      // Doesn't work in Scala 2.9 due to what may be a generics bug
    +      // TODO: Should we uncomment this for Scala 2.10?
    +      // conf.setOutputFormat(outputFormatClass)
    +      jobConfSer.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
    +      if (sc.hiveconf.getBoolean("hive.exec.compress.output", false)) {
    +        // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
    +        // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
    +        // to store compression information.
    +        jobConfSer.value.set("mapred.output.compress", "true")
    +        fileSinkConf.setCompressed(true)
    +        fileSinkConf.setCompressCodec(jobConfSer.value.get("mapred.output.compression.codec"))
    +        fileSinkConf.setCompressType(jobConfSer.value.get("mapred.output.compression.type"))
    +      }
    +      jobConfSer.value.setOutputCommitter(classOf[FileOutputCommitter])
    +
    +      FileOutputFormat.setOutputPath(
    +        jobConfSer.value,
    +        SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, jobConfSer.value))
    +
    +      var writerMap =  new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter]
    +      def writeToFile2(context: TaskContext, iter: Iterator[Writable]) {
    +        // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
    +        // around by taking a mod. We expect that no task will be attempted 2 billion times.
    +        val attemptNumber = (context.attemptId % Int.MaxValue).toInt
    +        val serializer = newSerializer(fileSinkConf.getTableInfo)
    +        var count = 0
    +        var writer2:SparkHiveHadoopWriter = null
    +        while(iter.hasNext) {
    +          val record = iter.next();
    +          val location = fileSinkConf.getDirName
    +          val partLocation = location + dynamicPartPath
    +          writer2=writerMap.get(dynamicPartPath) match {
    +            case Some(writer)=> writer
    +            case None => {
    +              val tempWriter = new SparkHiveHadoopWriter(jobConfSer.value, new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false))
    +              tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
    +              tempWriter.open(dynamicPartPath);
    +              writerMap += (dynamicPartPath -> tempWriter)
    +              tempWriter
    +            }
    +          }
    +          count += 1
    +          writer2.write(record)
    +        }
    +        for((k,v) <- writerMap) {
    +          v.close()
    +          v.commit()
    +        }
    +      }
    +
    +      sc.sparkContext.runJob(rdd, writeToFile2 _)
    +
    +      for((k,v) <- writerMap) {
    --- End diff --
    
    Space before `(`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16339036
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -204,12 +235,81 @@ case class InsertIntoHiveTable(
         // ORC stores compression information in table properties. While, there are other formats
         // (e.g. RCFile) that rely on hadoop configurations to store compression information.
         val jobConf = new JobConf(sc.hiveconf)
    -    saveAsHiveFile(
    -      rdd,
    -      outputClass,
    -      fileSinkConf,
    -      jobConf,
    -      sc.hiveconf.getBoolean("hive.exec.compress.output", false))
    +    val jobConfSer = new SerializableWritable(jobConf)
    +    if (dynamicPartNum>0) {
    --- End diff --
    
    Please add spaces around `>`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16340505
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -159,6 +162,36 @@ case class InsertIntoHiveTable(
         writer.commitJob()
       }
     
    +  def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int, jobConf: JobConf) :String = {
    +    dynamicPartNum2 match {
    +      case 0 =>""
    +      case i => {
    +        val colsNum = tableInfo.getProperties.getProperty("columns").split("\\,").length
    +        val partColStr = tableInfo.getProperties.getProperty("partition_columns")
    +        val partCols = partColStr.split("/")
    +        var buf = new StringBuffer()
    +        if (partCols.length == dynamicPartNum2) {
    +          for (j <- 0 until partCols.length) {
    +            buf.append("/").append(partCols(j)).append("=").append(handleNull(row(colsNum + j ), jobConf))
    +          }
    +        } else {
    +          for (j <- 0 until dynamicPartNum2) {
    +            buf.append("/").append(partCols(j + partCols.length - dynamicPartNum2)).append("=").append(handleNull(row(colsNum + j), jobConf))
    +          }
    +        }
    +        buf.toString
    +      }
    +    }
    +  }
    +
    +  def handleNull(obj :Any, jobConf: JobConf) :String = {
    --- End diff --
    
    Should be `private`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16343382
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -201,17 +270,79 @@ case class InsertIntoHiveTable(
           }
         }
     
    -    // ORC stores compression information in table properties. While, there are other formats
    -    // (e.g. RCFile) that rely on hadoop configurations to store compression information.
    -    val jobConf = new JobConf(sc.hiveconf)
    -    saveAsHiveFile(
    -      rdd,
    -      outputClass,
    -      fileSinkConf,
    -      jobConf,
    -      sc.hiveconf.getBoolean("hive.exec.compress.output", false))
    -
    -    // TODO: Handle dynamic partitioning.
    +    if (dynamicPartNum > 0) {
    +      if (outputClass == null) {
    +        throw new SparkException("Output value class not set")
    +      }
    +      jobConfSer.value.setOutputValueClass(outputClass)
    +      if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
    +        throw new SparkException("Output format class not set")
    +      }
    +      // Doesn't work in Scala 2.9 due to what may be a generics bug
    +      // TODO: Should we uncomment this for Scala 2.10?
    +      // conf.setOutputFormat(outputFormatClass)
    +      jobConfSer.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
    +      if (sc.hiveconf.getBoolean("hive.exec.compress.output", false)) {
    +        // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
    +        // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
    +        // to store compression information.
    +        jobConfSer.value.set("mapred.output.compress", "true")
    +        fileSinkConf.setCompressed(true)
    +        fileSinkConf.setCompressCodec(jobConfSer.value.get("mapred.output.compression.codec"))
    +        fileSinkConf.setCompressType(jobConfSer.value.get("mapred.output.compression.type"))
    +      }
    +      jobConfSer.value.setOutputCommitter(classOf[FileOutputCommitter])
    +
    +      FileOutputFormat.setOutputPath(
    +        jobConfSer.value,
    +        SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, jobConfSer.value))
    +
    +      var writerMap =  new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter]
    +      def writeToFile2(context: TaskContext, iter: Iterator[Writable]) {
    +        // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
    +        // around by taking a mod. We expect that no task will be attempted 2 billion times.
    +        val attemptNumber = (context.attemptId % Int.MaxValue).toInt
    +        val serializer = newSerializer(fileSinkConf.getTableInfo)
    +        var count = 0
    +        var writer2:SparkHiveHadoopWriter = null
    +        while(iter.hasNext) {
    +          val record = iter.next();
    +          val location = fileSinkConf.getDirName
    +          val partLocation = location + dynamicPartPath
    +          writer2=writerMap.get(dynamicPartPath) match {
    +            case Some(writer)=> writer
    --- End diff --
    
    Space after `)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by baishuo <gi...@git.apache.org>.
Github user baishuo commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-52728496
  
    here I try to express my design idea clearly: 
    lets assume there is a table called table1,which has 2 columns:col1,col2, and two partitions: part1,  part2.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by baishuo <gi...@git.apache.org>.
Github user baishuo closed the pull request at:

    https://github.com/apache/spark/pull/1919


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-52459171
  
    @yhuai It would be nice if you can have a look at this PR as you're the expert here :)
    
    @baishuo You can refer to `sql/README.md` for details about setting up testing environment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16340005
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
    @@ -93,6 +93,33 @@ private[hive] class SparkHiveHadoopWriter(
           null)
       }
     
    +  def open(dynamicPartPath: String) {
    +    val numfmt = NumberFormat.getInstance()
    +    numfmt.setMinimumIntegerDigits(5)
    +    numfmt.setGroupingUsed(false)
    +
    +    val extension = Utilities.getFileExtension(
    +      conf.value,
    +      fileSinkConf.getCompressed,
    +      getOutputFormat())
    +
    +    val outputName = "part-"  + numfmt.format(splitID) + extension
    +    val outputPath: Path = FileOutputFormat.getOutputPath(conf.value)
    +    if (outputPath == null) {
    +      throw new IOException("Undefined job output-path")
    +    }
    +    val workPath = new Path(outputPath, dynamicPartPath.substring(1))//remove "/"
    --- End diff --
    
    Add spaces around `//`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16339033
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -204,12 +235,81 @@ case class InsertIntoHiveTable(
         // ORC stores compression information in table properties. While, there are other formats
         // (e.g. RCFile) that rely on hadoop configurations to store compression information.
    --- End diff --
    
    Move the above comments before line 306.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-53471102
  
    Thanks for working on this!  We will have more time to review it after the Spark 1.1 release.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16351653
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
    @@ -93,6 +93,33 @@ private[hive] class SparkHiveHadoopWriter(
           null)
       }
     
    +  def open(dynamicPartPath: String) {
    +    val numfmt = NumberFormat.getInstance()
    --- End diff --
    
    Oh, it is actually `SparkHiveHadoopWriter` in `sql/hive`. Seems we need to rename this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1919#discussion_r16343351
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -201,17 +270,79 @@ case class InsertIntoHiveTable(
           }
         }
     
    -    // ORC stores compression information in table properties. While, there are other formats
    -    // (e.g. RCFile) that rely on hadoop configurations to store compression information.
    -    val jobConf = new JobConf(sc.hiveconf)
    -    saveAsHiveFile(
    -      rdd,
    -      outputClass,
    -      fileSinkConf,
    -      jobConf,
    -      sc.hiveconf.getBoolean("hive.exec.compress.output", false))
    -
    -    // TODO: Handle dynamic partitioning.
    +    if (dynamicPartNum > 0) {
    +      if (outputClass == null) {
    +        throw new SparkException("Output value class not set")
    +      }
    +      jobConfSer.value.setOutputValueClass(outputClass)
    +      if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
    +        throw new SparkException("Output format class not set")
    +      }
    +      // Doesn't work in Scala 2.9 due to what may be a generics bug
    +      // TODO: Should we uncomment this for Scala 2.10?
    +      // conf.setOutputFormat(outputFormatClass)
    +      jobConfSer.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
    +      if (sc.hiveconf.getBoolean("hive.exec.compress.output", false)) {
    +        // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
    +        // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
    +        // to store compression information.
    +        jobConfSer.value.set("mapred.output.compress", "true")
    +        fileSinkConf.setCompressed(true)
    +        fileSinkConf.setCompressCodec(jobConfSer.value.get("mapred.output.compression.codec"))
    +        fileSinkConf.setCompressType(jobConfSer.value.get("mapred.output.compression.type"))
    +      }
    +      jobConfSer.value.setOutputCommitter(classOf[FileOutputCommitter])
    +
    +      FileOutputFormat.setOutputPath(
    +        jobConfSer.value,
    +        SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, jobConfSer.value))
    +
    +      var writerMap =  new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter]
    +      def writeToFile2(context: TaskContext, iter: Iterator[Writable]) {
    +        // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
    +        // around by taking a mod. We expect that no task will be attempted 2 billion times.
    +        val attemptNumber = (context.attemptId % Int.MaxValue).toInt
    +        val serializer = newSerializer(fileSinkConf.getTableInfo)
    +        var count = 0
    +        var writer2:SparkHiveHadoopWriter = null
    --- End diff --
    
    Space after `:`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by baishuo <gi...@git.apache.org>.
Github user baishuo commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-52141005
  
    hi @marmbrus , when I study the HiveQuerySuite.scala, I found there is a important table : src,  but I didnt find where and how the table created, would please give more instruction?  thank you :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3007][SQL]Add "Dynamic Partition" suppo...

Posted by baishuo <gi...@git.apache.org>.
Github user baishuo commented on the pull request:

    https://github.com/apache/spark/pull/1919#issuecomment-52734543
  
    I also curious about that.
    I down the master branch,and check the folder sql/hive/src/test/resources/golden
    I find that files begin with  dynamic_partition_skip_default*  or load_dyn_part* already exist. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org