You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by esoroush <gi...@git.apache.org> on 2014/04/19 23:48:43 UTC

[GitHub] spark pull request: Coding Task

GitHub user esoroush opened a pull request:

    https://github.com/apache/spark/pull/456

    Coding Task

    The goal is to improve the performance of the HiveTableScan Operator:
    
    As a quick benchmark run the following code in the scala interpreter:
    
    scala> :paste
    
    hql("CREATE TABLE IF NOT EXISTS sample (key1 INT, key2 INT,value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
    hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/sample2.txt' INTO TABLE sample")
    println("Result of SELECT * FROM sample:")
    val start = System.nanoTime
    val recs = hql("FROM sample SELECT key1,key2,value").collect()
    val micros = (System.nanoTime - start) / 1000
    println("%d microsecondss".format(micros))
    
    scala> CTRL-D
    
    you can download the test file from here: 
    http://homes.cs.washington.edu/~soroush/sample2.txt
    
    "sample2.txt contains about 3.6 million rows. The improved code scans the entire table in about 9 seconds while the original code scans the entire table in about 22 seconds.
    
    
    Regarding the last item in the task: 
    "Avoid Reading Unneeded Data - Some Hive Serializer/Deserializer (SerDe) interfaces support reading only the required columns from the underlying HDFS files.  We should use ColumnProjectionUtils to configure these correctly." 
    The way to do it, should be similar to the following code: 
    
    https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/TableScanOperator.scala
    
    I tried to take a similar approach, but I am not sure columnar reading is working at hiveOperators.scala right now. Anyway, it requires more time for me to make sure that last feature is working. Please notice that it was the first time that I wrote code in scala and it took me some time to get comfortable with the language.  
     
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/esoroush/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/456.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #456
    
----
commit bd6894171c537627c09d04451ca1725dfc8228ae
Author: Emad Soroush <so...@emads-macbook-pro.local>
Date:   2014-04-19T21:11:08Z

    test code commit

commit ddc1c2398deb56fdc08a36cc032c329ccdedc73b
Author: esoroush <so...@cs.washington.edu>
Date:   2014-04-19T21:30:52Z

    code task 2

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by esoroush <gi...@git.apache.org>.
Github user esoroush commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40962375
  
    sure, I am looking at right now. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Coding Task

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40882024
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-41439136
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Coding Task

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40881907
  
    Jenkins, this is ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40972499
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14300/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Coding Task

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40881706
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40984045
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14304/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40968991
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40972305
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-41439290
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40970823
  
    BTW, you can check style locally `sbt scalastyle`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by esoroush <gi...@git.apache.org>.
Github user esoroush commented on a diff in the pull request:

    https://github.com/apache/spark/pull/456#discussion_r11863253
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala ---
    @@ -123,40 +159,88 @@ case class HiveTableScan(
        * @return Partitions that are involved in the query plan.
        */
       private[hive] def prunePartitions(partitions: Seq[HivePartition]) = {
    +    /** mutable row implementation to avoid creating row instance at
    +     *  each iteration inside the while loop.
    +     */
    +    val row = new GenericMutableRow(attributes.length)
         boundPruningPred match {
           case None => partitions
           case Some(shouldKeep) => partitions.filter { part =>
    -        val dataTypes = relation.partitionKeys.map(_.dataType)
    -        val castedValues = for ((value, dataType) <- part.getValues.zip(dataTypes)) yield {
    -          castFromString(value, dataType)
    +        val castedValues = mutable.ArrayBuffer[Any]()
    +        var i = 0
    +        var len = relation.partitionKeys.length
    +        val iter: Iterator[String] = part.getValues.iterator
    +        while (i < len) {
    +          castedValues += castFromString(iter.next,relation.partitionKeys(i).dataType)
    +          i += 1
             }
    -
             // Only partitioned values are needed here, since the predicate has already been bound to
             // partition key attribute references.
    -        val row = new GenericRow(castedValues.toArray)
    +        i = 0
    +        len = castedValues.length
    +        // castedValues represents columns in the row.
    +        while (i < len) {
    +          castedValues(i) match {
    +            case n: String if n.toLowerCase == "null" => row.setNullAt(i)
    +            case n: Boolean => row.setBoolean(i,n)
    +            case n: Byte => row.setByte(i,n)
    +            case n: Double => row.setDouble(i,n)
    +            case n: Float => row.setFloat(i,n)
    +            case n: Int => row.setInt(i,n)
    +            case n: Long => row.setLong(i,n)
    +            case n: String  => row.setString(i,n)
    +            case n: Short  => row.setShort(i,n)
    +            case other => row.update(i,other)
    +          }
    +          i += 1
    +        }
             shouldKeep.eval(row).asInstanceOf[Boolean]
           }
         }
       }
     
       def execute() = {
    -    inputRdd.map { row =>
    -      val values = row match {
    -        case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) =>
    -          attributeFunctions.map(_(deserializedRow, partitionKeys))
    -        case deserializedRow: AnyRef =>
    -          attributeFunctions.map(_(deserializedRow, Array.empty))
    +    /**
    +     *  mutableRow is GenericMutableRow type and only created once.
    +     *  mutableRow is upadted at each iteration inside the while loop. 
    +     */ 
    +    val mutableRow = new GenericMutableRow(attributes.length) 
    +    var i = 0
    +    
    +    var res = inputRdd.context.runJob(inputRdd,(iter: Iterator[_]) => {
    +      /** rddBuffer keeps track of all the transformed rows.
    +       *  needed later to create finalRdd 
    +       */ 
    +      val rddBuffer = mutable.ArrayBuffer[Row]()
    +      while (iter.hasNext) {
    +        val row = iter.next()
    +        val values = row match {
    +          case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) =>
    +            attributeFunctions.map(_(deserializedRow, partitionKeys))
    +          case deserializedRow: AnyRef =>
    +            attributeFunctions.map(_(deserializedRow, Array.empty))
    +        }
    +        i = 0
    +        val len = values.length
    +        while ( i < len ) {
    +          values(i) match {
    +            case n: String if n.toLowerCase == "null" => mutableRow.setNullAt(i)
    +            case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar => 
    +              mutableRow.update(i,varchar.getValue)
    +            case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal =>
    +              mutableRow.update(i,BigDecimal(decimal.bigDecimalValue))
    +            case other => mutableRow.update(i,other)
    +          }
    +          i += 1
    +        }
    +        rddBuffer +=  mutableRow
    --- End diff --
    
    Thanks Matei for the comments. I will address it today whenever I find a time. 
     My major problem is the following. I appreciate the suggestions. 
    
    def execute() = {
        /**
         *  mutableRow is GenericMutableRow type and only created once.
         *  mutableRow is upadted at each iteration inside the while loop. 
         */
        val mutableRow = new GenericMutableRow(attributes.length)
    
        val res = inputRdd.context.runJob(inputRdd,(iter: Iterator[_]) => {
          /** rddBuffer keeps track of all the transformed rows.
           *  needed later to create finalRdd 
           */
          val rddBuffer = mutable.ArrayBuffer[Row]()
          while (iter.hasNext) {
            val row = iter.next()
            val values = row match {
              case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) =>
                attributeFunctions.map(_(deserializedRow, partitionKeys))
              case deserializedRow: AnyRef =>
                attributeFunctions.map(_(deserializedRow, Array.empty))
            }
            var i = 0
            val len = values.length
            while ( i < len ) {
              values(i) match {
                case n: String if n.toLowerCase == "null" => mutableRow.setNullAt(i)
                case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar =>
                  mutableRow.update(i,varchar.getValue)
                case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal =>
                  mutableRow.update(i,BigDecimal(decimal.bigDecimalValue))
                case other => mutableRow.update(i,other)
              }
              i += 1
            }
            rddBuffer += new GenericRow(mutableRow.toArray)
          }
          rddBuffer
        })
        /** finalRdd ... equivalent to Rdd generated from inputRdd.map(...) */
        val concatArray = Array.concat[mutable.ArrayBuffer[Row]](res)
        inputRdd.context.makeRDD(concatArray)
        
      }
    
    In the execute() method of hiveOperators.scala, I take an iterator over all the partitions in the inputRDD and pass it to the runJob() method as argument. The return from runJob() method is an Array[ArrayBuffer[Row]]. I need to generate the output rdd from that and the only way I can think of is concatenating all the ArrayBuffer[Row]s and generating one Array[Row] and then generating a new finalRDD from that. I don't think this is the right way to do that ( by doing concatenation, we are kind of localizing the already partitioned data and then again partition it by creating a new RDD ) ... Does anyone have a suggestion? 
    
    This is the main reason that I cannot pass the regression tests ... 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40972316
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by esoroush <gi...@git.apache.org>.
Github user esoroush closed the pull request at:

    https://github.com/apache/spark/pull/456


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by esoroush <gi...@git.apache.org>.
Github user esoroush commented on a diff in the pull request:

    https://github.com/apache/spark/pull/456#discussion_r11866215
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala ---
    @@ -123,40 +159,88 @@ case class HiveTableScan(
        * @return Partitions that are involved in the query plan.
        */
       private[hive] def prunePartitions(partitions: Seq[HivePartition]) = {
    +    /** mutable row implementation to avoid creating row instance at
    +     *  each iteration inside the while loop.
    +     */
    +    val row = new GenericMutableRow(attributes.length)
         boundPruningPred match {
           case None => partitions
           case Some(shouldKeep) => partitions.filter { part =>
    -        val dataTypes = relation.partitionKeys.map(_.dataType)
    -        val castedValues = for ((value, dataType) <- part.getValues.zip(dataTypes)) yield {
    -          castFromString(value, dataType)
    +        val castedValues = mutable.ArrayBuffer[Any]()
    +        var i = 0
    +        var len = relation.partitionKeys.length
    +        val iter: Iterator[String] = part.getValues.iterator
    +        while (i < len) {
    +          castedValues += castFromString(iter.next,relation.partitionKeys(i).dataType)
    +          i += 1
             }
    -
             // Only partitioned values are needed here, since the predicate has already been bound to
             // partition key attribute references.
    -        val row = new GenericRow(castedValues.toArray)
    +        i = 0
    +        len = castedValues.length
    +        // castedValues represents columns in the row.
    +        while (i < len) {
    +          castedValues(i) match {
    +            case n: String if n.toLowerCase == "null" => row.setNullAt(i)
    +            case n: Boolean => row.setBoolean(i,n)
    +            case n: Byte => row.setByte(i,n)
    +            case n: Double => row.setDouble(i,n)
    +            case n: Float => row.setFloat(i,n)
    +            case n: Int => row.setInt(i,n)
    +            case n: Long => row.setLong(i,n)
    +            case n: String  => row.setString(i,n)
    +            case n: Short  => row.setShort(i,n)
    +            case other => row.update(i,other)
    +          }
    +          i += 1
    +        }
             shouldKeep.eval(row).asInstanceOf[Boolean]
           }
         }
       }
     
       def execute() = {
    -    inputRdd.map { row =>
    -      val values = row match {
    -        case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) =>
    -          attributeFunctions.map(_(deserializedRow, partitionKeys))
    -        case deserializedRow: AnyRef =>
    -          attributeFunctions.map(_(deserializedRow, Array.empty))
    +    /**
    +     *  mutableRow is GenericMutableRow type and only created once.
    +     *  mutableRow is upadted at each iteration inside the while loop. 
    +     */ 
    +    val mutableRow = new GenericMutableRow(attributes.length) 
    +    var i = 0
    +    
    +    var res = inputRdd.context.runJob(inputRdd,(iter: Iterator[_]) => {
    +      /** rddBuffer keeps track of all the transformed rows.
    +       *  needed later to create finalRdd 
    +       */ 
    +      val rddBuffer = mutable.ArrayBuffer[Row]()
    +      while (iter.hasNext) {
    +        val row = iter.next()
    +        val values = row match {
    +          case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) =>
    +            attributeFunctions.map(_(deserializedRow, partitionKeys))
    +          case deserializedRow: AnyRef =>
    +            attributeFunctions.map(_(deserializedRow, Array.empty))
    +        }
    +        i = 0
    +        val len = values.length
    +        while ( i < len ) {
    +          values(i) match {
    +            case n: String if n.toLowerCase == "null" => mutableRow.setNullAt(i)
    +            case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar => 
    +              mutableRow.update(i,varchar.getValue)
    +            case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal =>
    +              mutableRow.update(i,BigDecimal(decimal.bigDecimalValue))
    +            case other => mutableRow.update(i,other)
    +          }
    +          i += 1
    +        }
    +        rddBuffer +=  mutableRow
    --- End diff --
    
    Thanks Reynold, 
    it worked ... I am running unit tests right now ... 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40984044
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/456#discussion_r11841588
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala ---
    @@ -123,40 +159,88 @@ case class HiveTableScan(
        * @return Partitions that are involved in the query plan.
        */
       private[hive] def prunePartitions(partitions: Seq[HivePartition]) = {
    +    /** mutable row implementation to avoid creating row instance at
    +     *  each iteration inside the while loop.
    +     */
    +    val row = new GenericMutableRow(attributes.length)
         boundPruningPred match {
           case None => partitions
           case Some(shouldKeep) => partitions.filter { part =>
    -        val dataTypes = relation.partitionKeys.map(_.dataType)
    -        val castedValues = for ((value, dataType) <- part.getValues.zip(dataTypes)) yield {
    -          castFromString(value, dataType)
    +        val castedValues = mutable.ArrayBuffer[Any]()
    +        var i = 0
    +        var len = relation.partitionKeys.length
    +        val iter: Iterator[String] = part.getValues.iterator
    +        while (i < len) {
    +          castedValues += castFromString(iter.next,relation.partitionKeys(i).dataType)
    +          i += 1
             }
    -
             // Only partitioned values are needed here, since the predicate has already been bound to
             // partition key attribute references.
    -        val row = new GenericRow(castedValues.toArray)
    +        i = 0
    +        len = castedValues.length
    +        // castedValues represents columns in the row.
    +        while (i < len) {
    +          castedValues(i) match {
    +            case n: String if n.toLowerCase == "null" => row.setNullAt(i)
    +            case n: Boolean => row.setBoolean(i,n)
    +            case n: Byte => row.setByte(i,n)
    +            case n: Double => row.setDouble(i,n)
    +            case n: Float => row.setFloat(i,n)
    +            case n: Int => row.setInt(i,n)
    +            case n: Long => row.setLong(i,n)
    +            case n: String  => row.setString(i,n)
    +            case n: Short  => row.setShort(i,n)
    +            case other => row.update(i,other)
    +          }
    +          i += 1
    +        }
             shouldKeep.eval(row).asInstanceOf[Boolean]
           }
         }
       }
     
       def execute() = {
    -    inputRdd.map { row =>
    -      val values = row match {
    -        case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) =>
    -          attributeFunctions.map(_(deserializedRow, partitionKeys))
    -        case deserializedRow: AnyRef =>
    -          attributeFunctions.map(_(deserializedRow, Array.empty))
    +    /**
    +     *  mutableRow is GenericMutableRow type and only created once.
    +     *  mutableRow is upadted at each iteration inside the while loop. 
    +     */ 
    +    val mutableRow = new GenericMutableRow(attributes.length) 
    +    var i = 0
    +    
    +    var res = inputRdd.context.runJob(inputRdd,(iter: Iterator[_]) => {
    +      /** rddBuffer keeps track of all the transformed rows.
    +       *  needed later to create finalRdd 
    +       */ 
    +      val rddBuffer = mutable.ArrayBuffer[Row]()
    +      while (iter.hasNext) {
    +        val row = iter.next()
    +        val values = row match {
    +          case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) =>
    +            attributeFunctions.map(_(deserializedRow, partitionKeys))
    +          case deserializedRow: AnyRef =>
    +            attributeFunctions.map(_(deserializedRow, Array.empty))
    +        }
    +        i = 0
    +        val len = values.length
    +        while ( i < len ) {
    +          values(i) match {
    +            case n: String if n.toLowerCase == "null" => mutableRow.setNullAt(i)
    +            case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar => 
    +              mutableRow.update(i,varchar.getValue)
    +            case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal =>
    +              mutableRow.update(i,BigDecimal(decimal.bigDecimalValue))
    +            case other => mutableRow.update(i,other)
    +          }
    +          i += 1
    +        }
    +        rddBuffer +=  mutableRow
    --- End diff --
    
    Please make sure to follow the [code style](https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide) throughout, e.g. there are some problems here:
    - Comments written in a non-Javadoc like syntax (with text on the `/**` line and weird indent)
    - Extra spaces inside parens and after +=
    - No space after comma


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by esoroush <gi...@git.apache.org>.
Github user esoroush commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40972943
  
    The problem was I used vim for coding and it screwed up the tabbing for some reason. 
    
    I did the sbt scalastyle and it succeed now locally 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40972498
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40969118
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Coding Task

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40882025
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14261/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40969119
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14295/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-41446866
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14497/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40968978
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-41440433
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-41439291
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14496/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-41446864
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-41440444
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40979712
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/456#discussion_r11864083
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala ---
    @@ -123,40 +159,88 @@ case class HiveTableScan(
        * @return Partitions that are involved in the query plan.
        */
       private[hive] def prunePartitions(partitions: Seq[HivePartition]) = {
    +    /** mutable row implementation to avoid creating row instance at
    +     *  each iteration inside the while loop.
    +     */
    +    val row = new GenericMutableRow(attributes.length)
         boundPruningPred match {
           case None => partitions
           case Some(shouldKeep) => partitions.filter { part =>
    -        val dataTypes = relation.partitionKeys.map(_.dataType)
    -        val castedValues = for ((value, dataType) <- part.getValues.zip(dataTypes)) yield {
    -          castFromString(value, dataType)
    +        val castedValues = mutable.ArrayBuffer[Any]()
    +        var i = 0
    +        var len = relation.partitionKeys.length
    +        val iter: Iterator[String] = part.getValues.iterator
    +        while (i < len) {
    +          castedValues += castFromString(iter.next,relation.partitionKeys(i).dataType)
    +          i += 1
             }
    -
             // Only partitioned values are needed here, since the predicate has already been bound to
             // partition key attribute references.
    -        val row = new GenericRow(castedValues.toArray)
    +        i = 0
    +        len = castedValues.length
    +        // castedValues represents columns in the row.
    +        while (i < len) {
    +          castedValues(i) match {
    +            case n: String if n.toLowerCase == "null" => row.setNullAt(i)
    +            case n: Boolean => row.setBoolean(i,n)
    +            case n: Byte => row.setByte(i,n)
    +            case n: Double => row.setDouble(i,n)
    +            case n: Float => row.setFloat(i,n)
    +            case n: Int => row.setInt(i,n)
    +            case n: Long => row.setLong(i,n)
    +            case n: String  => row.setString(i,n)
    +            case n: Short  => row.setShort(i,n)
    +            case other => row.update(i,other)
    +          }
    +          i += 1
    +        }
             shouldKeep.eval(row).asInstanceOf[Boolean]
           }
         }
       }
     
       def execute() = {
    -    inputRdd.map { row =>
    -      val values = row match {
    -        case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) =>
    -          attributeFunctions.map(_(deserializedRow, partitionKeys))
    -        case deserializedRow: AnyRef =>
    -          attributeFunctions.map(_(deserializedRow, Array.empty))
    +    /**
    +     *  mutableRow is GenericMutableRow type and only created once.
    +     *  mutableRow is upadted at each iteration inside the while loop. 
    +     */ 
    +    val mutableRow = new GenericMutableRow(attributes.length) 
    +    var i = 0
    +    
    +    var res = inputRdd.context.runJob(inputRdd,(iter: Iterator[_]) => {
    +      /** rddBuffer keeps track of all the transformed rows.
    +       *  needed later to create finalRdd 
    +       */ 
    +      val rddBuffer = mutable.ArrayBuffer[Row]()
    +      while (iter.hasNext) {
    +        val row = iter.next()
    +        val values = row match {
    +          case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) =>
    +            attributeFunctions.map(_(deserializedRow, partitionKeys))
    +          case deserializedRow: AnyRef =>
    +            attributeFunctions.map(_(deserializedRow, Array.empty))
    +        }
    +        i = 0
    +        val len = values.length
    +        while ( i < len ) {
    +          values(i) match {
    +            case n: String if n.toLowerCase == "null" => mutableRow.setNullAt(i)
    +            case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar => 
    +              mutableRow.update(i,varchar.getValue)
    +            case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal =>
    +              mutableRow.update(i,BigDecimal(decimal.bigDecimalValue))
    +            case other => mutableRow.update(i,other)
    +          }
    +          i += 1
    +        }
    +        rddBuffer +=  mutableRow
    --- End diff --
    
    Why not just use a mapPartitions and work on one partition at a time? You don't want to collect all the data back to the driver.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Coding Task

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40881999
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-41439121
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Coding Task

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40881994
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by esoroush <gi...@git.apache.org>.
Github user esoroush commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40974192
  
    I am a little bit confused why it does not complain in my machine locally but it produces errors here ... 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40952235
  
    Mind fixing the tab characters so we can test this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/456#issuecomment-40979726
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-SQL] HiveTableScan operator Performance...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/456#discussion_r11841553
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala ---
    @@ -123,40 +159,88 @@ case class HiveTableScan(
        * @return Partitions that are involved in the query plan.
        */
       private[hive] def prunePartitions(partitions: Seq[HivePartition]) = {
    +    /** mutable row implementation to avoid creating row instance at
    +     *  each iteration inside the while loop.
    +     */
    +    val row = new GenericMutableRow(attributes.length)
         boundPruningPred match {
           case None => partitions
           case Some(shouldKeep) => partitions.filter { part =>
    -        val dataTypes = relation.partitionKeys.map(_.dataType)
    -        val castedValues = for ((value, dataType) <- part.getValues.zip(dataTypes)) yield {
    -          castFromString(value, dataType)
    +        val castedValues = mutable.ArrayBuffer[Any]()
    +        var i = 0
    +        var len = relation.partitionKeys.length
    +        val iter: Iterator[String] = part.getValues.iterator
    +        while (i < len) {
    +          castedValues += castFromString(iter.next,relation.partitionKeys(i).dataType)
    +          i += 1
             }
    -
             // Only partitioned values are needed here, since the predicate has already been bound to
             // partition key attribute references.
    -        val row = new GenericRow(castedValues.toArray)
    +        i = 0
    +        len = castedValues.length
    +        // castedValues represents columns in the row.
    +        while (i < len) {
    +          castedValues(i) match {
    +            case n: String if n.toLowerCase == "null" => row.setNullAt(i)
    +            case n: Boolean => row.setBoolean(i,n)
    +            case n: Byte => row.setByte(i,n)
    +            case n: Double => row.setDouble(i,n)
    +            case n: Float => row.setFloat(i,n)
    +            case n: Int => row.setInt(i,n)
    +            case n: Long => row.setLong(i,n)
    +            case n: String  => row.setString(i,n)
    +            case n: Short  => row.setShort(i,n)
    +            case other => row.update(i,other)
    +          }
    +          i += 1
    +        }
             shouldKeep.eval(row).asInstanceOf[Boolean]
           }
         }
       }
     
       def execute() = {
    -    inputRdd.map { row =>
    -      val values = row match {
    -        case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) =>
    -          attributeFunctions.map(_(deserializedRow, partitionKeys))
    -        case deserializedRow: AnyRef =>
    -          attributeFunctions.map(_(deserializedRow, Array.empty))
    +    /**
    +     *  mutableRow is GenericMutableRow type and only created once.
    +     *  mutableRow is upadted at each iteration inside the while loop. 
    +     */ 
    +    val mutableRow = new GenericMutableRow(attributes.length) 
    +    var i = 0
    +    
    +    var res = inputRdd.context.runJob(inputRdd,(iter: Iterator[_]) => {
    +      /** rddBuffer keeps track of all the transformed rows.
    +       *  needed later to create finalRdd 
    +       */ 
    +      val rddBuffer = mutable.ArrayBuffer[Row]()
    +      while (iter.hasNext) {
    +        val row = iter.next()
    +        val values = row match {
    +          case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) =>
    +            attributeFunctions.map(_(deserializedRow, partitionKeys))
    +          case deserializedRow: AnyRef =>
    +            attributeFunctions.map(_(deserializedRow, Array.empty))
    +        }
    +        i = 0
    +        val len = values.length
    +        while ( i < len ) {
    +          values(i) match {
    +            case n: String if n.toLowerCase == "null" => mutableRow.setNullAt(i)
    +            case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar => 
    +              mutableRow.update(i,varchar.getValue)
    +            case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal =>
    +              mutableRow.update(i,BigDecimal(decimal.bigDecimalValue))
    +            case other => mutableRow.update(i,other)
    +          }
    +          i += 1
    +        }
    +        rddBuffer +=  mutableRow
           }
    -      buildRow(values.map {
    -        case n: String if n.toLowerCase == "null" => null
    -        case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar => varchar.getValue
    -        case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal =>
    -          BigDecimal(decimal.bigDecimalValue)
    -        case other => other
    -      })
    -    }
    +      rddBuffer
    +    })
    +    /** finalRdd ... equivelant to Rdd generated from inputRdd.map(...) */
    --- End diff --
    
    Typo in here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---