You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by concretevitamin <gi...@git.apache.org> on 2014/07/13 01:52:59 UTC

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

GitHub user concretevitamin opened a pull request:

    https://github.com/apache/spark/pull/1390

    [SPARK-2443][SQL] Fix slow read from partitioned tables 

    This simply incorporates Shark's [#329](https://github.com/amplab/shark/pull/329) into Spark SQL. Implementation credit to @chiragaggarwal.
    
    @marmbrus @rxin @chenghao-intel
    
    ## Benchmarks
    Generated a local text file with 10M rows of simple key-value pairs. The data is loaded as a table through Hive. Results are obtained on my local machine using hive/console.
    
    Without the fix:
    
    Non-partitioned | Partitioned (1 part)
    ------------ | -------------
    First run: 9.52s end-to-end (1.64s Spark job) | First run: 36.6s (28.3s)
    Stablized runs: 1.21s (1.18s) | Stablized runs: 27.6s (27.5s)
    
    With this fix:
    
    Non-partitioned | Partitioned (1 part)
    ------------ | -------------
    First run: 9.57s (1.46s) | First run: 9.30s (1.45s)
    Stablized runs: 1.13s (1.10s) | Stablized runs: 1.18s (1.15s)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/concretevitamin/spark slow-read

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1390.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1390
    
----
commit 403f460c644308126b6f3ab5dda66fa6b1872ce9
Author: Zongheng Yang <zo...@gmail.com>
Date:   2014-07-12T22:52:47Z

    Incorporate shark/pull/329 into Spark SQL.
    
    Credit to @chiragaggarwal.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on the pull request:

    https://github.com/apache/spark/pull/1390#issuecomment-48936743
  
    New PR here: #1408 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1390#discussion_r14902570
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
    @@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
     
           // Create local references so that the outer object isn't serialized.
           val tableDesc = _tableDesc
    +      val tableSerDeClass = tableDesc.getDeserializerClass
    +
           val broadcastedHiveConf = _broadcastedHiveConf
           val localDeserializer = partDeserializer
     
           val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
    -      hivePartitionRDD.mapPartitions { iter =>
    +      hivePartitionRDD.mapPartitions { case iter =>
    --- End diff --
    
    https://gist.github.com/concretevitamin/272fe413dcc06b8cbe9c
    
    It seems the with-case version does have more instructions to do.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin closed the pull request at:

    https://github.com/apache/spark/pull/1390


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/1390#issuecomment-48860018
  
    @chenghao-intel I am not sure I understand your comment on column pruning. I think for a Hive table, we should use `ColumnProjectionUtils` to set needed columns. So, RCFile and ORC can just read needed columns from HDFS.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on the pull request:

    https://github.com/apache/spark/pull/1390#issuecomment-48859842
  
    And as the Hive SerDe actually provides the feature of `lazy` parsing, hence during the converting of `raw object` to `Row`, we need to support the column pruning
    
    Sorry, some high level comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1390#discussion_r14898638
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
    @@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
     
           // Create local references so that the outer object isn't serialized.
           val tableDesc = _tableDesc
    +      val tableSerDeClass = tableDesc.getDeserializerClass
    +
           val broadcastedHiveConf = _broadcastedHiveConf
           val localDeserializer = partDeserializer
     
           val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
    -      hivePartitionRDD.mapPartitions { iter =>
    +      hivePartitionRDD.mapPartitions { case iter =>
    --- End diff --
    
    oh really?  how does the generated bytecode differ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/1390#issuecomment-48860420
  
    In general, I suggest adding more comments to explain what we are doing at here because this part of code is pretty Hive-specific.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on the pull request:

    https://github.com/apache/spark/pull/1390#issuecomment-48935494
  
    @yhuai suggested a much simpler fix -- I benchmarked this and it gave the same performance boost. I am closing this and opening a new PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1390#discussion_r14862338
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
    @@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
     
           // Create local references so that the outer object isn't serialized.
           val tableDesc = _tableDesc
    +      val tableSerDeClass = tableDesc.getDeserializerClass
    +
           val broadcastedHiveConf = _broadcastedHiveConf
           val localDeserializer = partDeserializer
     
           val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
    -      hivePartitionRDD.mapPartitions { iter =>
    +      hivePartitionRDD.mapPartitions { case iter =>
             val hconf = broadcastedHiveConf.value.value
             val rowWithPartArr = new Array[Object](2)
    -        // Map each tuple to a row object
    -        iter.map { value =>
    -          val deserializer = localDeserializer.newInstance()
    -          deserializer.initialize(hconf, partProps)
    -          val deserializedRow = deserializer.deserialize(value)
    -          rowWithPartArr.update(0, deserializedRow)
    -          rowWithPartArr.update(1, partValues)
    -          rowWithPartArr.asInstanceOf[Object]
    +
    +        val partSerDe = localDeserializer.newInstance()
    +        val tableSerDe = tableSerDeClass.newInstance()
    +        partSerDe.initialize(hconf, partProps)
    +        tableSerDe.initialize(hconf,  tableDesc.getProperties)
    +
    +        val tblConvertedOI = ObjectInspectorConverters.getConvertedOI(
    +          partSerDe.getObjectInspector, tableSerDe.getObjectInspector, true)
    +          .asInstanceOf[StructObjectInspector]
    +        val partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
    +          partSerDe.getObjectInspector, tblConvertedOI)
    +
    +        // This is done per partition, and unnecessary to put it in the iterations (in iter.map).
    +        rowWithPartArr.update(1, partValues)
    +
    +        // Map each tuple to a row object.
    +        if (partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) {
    +          iter.map { case value =>
    +            rowWithPartArr.update(0, partSerDe.deserialize(value))
    +            rowWithPartArr.asInstanceOf[Object]
    +          }
    +        } else {
    +          iter.map { case value =>
    +            val deserializedRow = {
    +              // If partition schema does not match table schema, update the row to match.
    +              val convertedRow =
    +                partTblObjectInspectorConverter.convert(partSerDe.deserialize(value))
    +
    +              // If conversion was performed, convertedRow will be a standard Object, but if
    +              // conversion wasn't necessary, it will still be lazy. We can't have both across
    +              // partitions, so we serialize and deserialize again to make it lazy.
    +              if (tableSerDe.isInstanceOf[OrcSerde]) {
    +                convertedRow
    +              } else {
    +                convertedRow match {
    --- End diff --
    
    I think we need to comment why we need to do this pattern matching. Also, why do we handle `LazyStruct` and `ColumnarStruct` specially? There are similar classes, e.g. `LazyBinaryStruct` and `LazyBinaryColumnarStruct`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1390#discussion_r14902569
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
    @@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
     
           // Create local references so that the outer object isn't serialized.
           val tableDesc = _tableDesc
    +      val tableSerDeClass = tableDesc.getDeserializerClass
    +
           val broadcastedHiveConf = _broadcastedHiveConf
           val localDeserializer = partDeserializer
     
           val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
    -      hivePartitionRDD.mapPartitions { iter =>
    +      hivePartitionRDD.mapPartitions { case iter =>
    --- End diff --
    
    https://gist.github.com/concretevitamin/272fe413dcc06b8cbe9c
    
    It seems the with-case version does have more instructions to do.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1390#issuecomment-48831466
  
    QA results for PR 1390:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16599/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1390#discussion_r14862300
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
    @@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
     
           // Create local references so that the outer object isn't serialized.
           val tableDesc = _tableDesc
    +      val tableSerDeClass = tableDesc.getDeserializerClass
    +
           val broadcastedHiveConf = _broadcastedHiveConf
           val localDeserializer = partDeserializer
     
           val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
    -      hivePartitionRDD.mapPartitions { iter =>
    +      hivePartitionRDD.mapPartitions { case iter =>
             val hconf = broadcastedHiveConf.value.value
             val rowWithPartArr = new Array[Object](2)
    -        // Map each tuple to a row object
    -        iter.map { value =>
    -          val deserializer = localDeserializer.newInstance()
    -          deserializer.initialize(hconf, partProps)
    -          val deserializedRow = deserializer.deserialize(value)
    -          rowWithPartArr.update(0, deserializedRow)
    -          rowWithPartArr.update(1, partValues)
    -          rowWithPartArr.asInstanceOf[Object]
    +
    +        val partSerDe = localDeserializer.newInstance()
    +        val tableSerDe = tableSerDeClass.newInstance()
    +        partSerDe.initialize(hconf, partProps)
    +        tableSerDe.initialize(hconf,  tableDesc.getProperties)
    +
    +        val tblConvertedOI = ObjectInspectorConverters.getConvertedOI(
    +          partSerDe.getObjectInspector, tableSerDe.getObjectInspector, true)
    +          .asInstanceOf[StructObjectInspector]
    +        val partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
    +          partSerDe.getObjectInspector, tblConvertedOI)
    +
    +        // This is done per partition, and unnecessary to put it in the iterations (in iter.map).
    +        rowWithPartArr.update(1, partValues)
    +
    +        // Map each tuple to a row object.
    +        if (partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) {
    +          iter.map { case value =>
    +            rowWithPartArr.update(0, partSerDe.deserialize(value))
    +            rowWithPartArr.asInstanceOf[Object]
    +          }
    +        } else {
    +          iter.map { case value =>
    +            val deserializedRow = {
    +              // If partition schema does not match table schema, update the row to match.
    +              val convertedRow =
    +                partTblObjectInspectorConverter.convert(partSerDe.deserialize(value))
    +
    +              // If conversion was performed, convertedRow will be a standard Object, but if
    +              // conversion wasn't necessary, it will still be lazy. We can't have both across
    +              // partitions, so we serialize and deserialize again to make it lazy.
    +              if (tableSerDe.isInstanceOf[OrcSerde]) {
    +                convertedRow
    --- End diff --
    
    Why do we need to take care ORC separately?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1390#discussion_r14894946
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
    @@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
     
           // Create local references so that the outer object isn't serialized.
           val tableDesc = _tableDesc
    +      val tableSerDeClass = tableDesc.getDeserializerClass
    +
           val broadcastedHiveConf = _broadcastedHiveConf
           val localDeserializer = partDeserializer
     
           val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
    -      hivePartitionRDD.mapPartitions { iter =>
    +      hivePartitionRDD.mapPartitions { case iter =>
    --- End diff --
    
    I initially thought in a function context, `{ case x => ... }` will be optimized to `{ x => ... }`. I did a `scalac -print` on a simple program to confirm that this is not the case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1390#issuecomment-48828384
  
    QA results for PR 1390:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16598/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on the pull request:

    https://github.com/apache/spark/pull/1390#issuecomment-48828280
  
    Jenkins, test this please. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by concretevitamin <gi...@git.apache.org>.
Github user concretevitamin commented on the pull request:

    https://github.com/apache/spark/pull/1390#issuecomment-48830080
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1390#issuecomment-48828365
  
    QA tests have started for PR 1390. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16598/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1390#discussion_r14862289
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
    @@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
     
           // Create local references so that the outer object isn't serialized.
           val tableDesc = _tableDesc
    +      val tableSerDeClass = tableDesc.getDeserializerClass
    +
           val broadcastedHiveConf = _broadcastedHiveConf
           val localDeserializer = partDeserializer
     
           val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
    -      hivePartitionRDD.mapPartitions { iter =>
    +      hivePartitionRDD.mapPartitions { case iter =>
             val hconf = broadcastedHiveConf.value.value
             val rowWithPartArr = new Array[Object](2)
    -        // Map each tuple to a row object
    -        iter.map { value =>
    -          val deserializer = localDeserializer.newInstance()
    -          deserializer.initialize(hconf, partProps)
    -          val deserializedRow = deserializer.deserialize(value)
    -          rowWithPartArr.update(0, deserializedRow)
    -          rowWithPartArr.update(1, partValues)
    -          rowWithPartArr.asInstanceOf[Object]
    +
    +        val partSerDe = localDeserializer.newInstance()
    +        val tableSerDe = tableSerDeClass.newInstance()
    +        partSerDe.initialize(hconf, partProps)
    +        tableSerDe.initialize(hconf,  tableDesc.getProperties)
    +
    +        val tblConvertedOI = ObjectInspectorConverters.getConvertedOI(
    +          partSerDe.getObjectInspector, tableSerDe.getObjectInspector, true)
    +          .asInstanceOf[StructObjectInspector]
    +        val partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
    +          partSerDe.getObjectInspector, tblConvertedOI)
    +
    +        // This is done per partition, and unnecessary to put it in the iterations (in iter.map).
    +        rowWithPartArr.update(1, partValues)
    +
    +        // Map each tuple to a row object.
    +        if (partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) {
    +          iter.map { case value =>
    +            rowWithPartArr.update(0, partSerDe.deserialize(value))
    +            rowWithPartArr.asInstanceOf[Object]
    +          }
    +        } else {
    +          iter.map { case value =>
    +            val deserializedRow = {
    +              // If partition schema does not match table schema, update the row to match.
    +              val convertedRow =
    +                partTblObjectInspectorConverter.convert(partSerDe.deserialize(value))
    +
    +              // If conversion was performed, convertedRow will be a standard Object, but if
    +              // conversion wasn't necessary, it will still be lazy. We can't have both across
    +              // partitions, so we serialize and deserialize again to make it lazy.
    +              if (tableSerDe.isInstanceOf[OrcSerde]) {
    +                convertedRow
    +              } else {
    +                convertedRow match {
    +                  case _: LazyStruct => convertedRow
    +                  case _: HiveColumnarStruct => convertedRow
    +                  case _ => tableSerDe.deserialize(
    +                    tableSerDe.asInstanceOf[Serializer].serialize(convertedRow, tblConvertedOI))
    --- End diff --
    
    As mentioned by @chenghao-intel, can we avoid it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/1390#issuecomment-48845188
  
    I am reviewing it. Will comment it later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/1390#issuecomment-48930414
  
    Thanks for reviewing this everyone.  I'm all for commenting and cleaning things up here, but if possible I'd like to merge this in today.  There are a couple of people blocking on this as its a pretty severe performance bug.  How about we just add some TODOs that can be taken care of in a follow up PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1390#issuecomment-48830138
  
    QA tests have started for PR 1390. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16599/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/1390#issuecomment-48832990
  
    @yhuai can you take a look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1390#discussion_r14862941
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
    @@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
     
           // Create local references so that the outer object isn't serialized.
           val tableDesc = _tableDesc
    +      val tableSerDeClass = tableDesc.getDeserializerClass
    +
           val broadcastedHiveConf = _broadcastedHiveConf
           val localDeserializer = partDeserializer
     
           val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
    -      hivePartitionRDD.mapPartitions { iter =>
    +      hivePartitionRDD.mapPartitions { case iter =>
             val hconf = broadcastedHiveConf.value.value
             val rowWithPartArr = new Array[Object](2)
    -        // Map each tuple to a row object
    -        iter.map { value =>
    -          val deserializer = localDeserializer.newInstance()
    -          deserializer.initialize(hconf, partProps)
    -          val deserializedRow = deserializer.deserialize(value)
    -          rowWithPartArr.update(0, deserializedRow)
    -          rowWithPartArr.update(1, partValues)
    -          rowWithPartArr.asInstanceOf[Object]
    +
    +        val partSerDe = localDeserializer.newInstance()
    +        val tableSerDe = tableSerDeClass.newInstance()
    +        partSerDe.initialize(hconf, partProps)
    +        tableSerDe.initialize(hconf,  tableDesc.getProperties)
    +
    +        val tblConvertedOI = ObjectInspectorConverters.getConvertedOI(
    +          partSerDe.getObjectInspector, tableSerDe.getObjectInspector, true)
    +          .asInstanceOf[StructObjectInspector]
    +        val partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
    +          partSerDe.getObjectInspector, tblConvertedOI)
    +
    +        // This is done per partition, and unnecessary to put it in the iterations (in iter.map).
    +        rowWithPartArr.update(1, partValues)
    +
    +        // Map each tuple to a row object.
    +        if (partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) {
    +          iter.map { case value =>
    +            rowWithPartArr.update(0, partSerDe.deserialize(value))
    +            rowWithPartArr.asInstanceOf[Object]
    +          }
    +        } else {
    +          iter.map { case value =>
    +            val deserializedRow = {
    +              // If partition schema does not match table schema, update the row to match.
    +              val convertedRow =
    +                partTblObjectInspectorConverter.convert(partSerDe.deserialize(value))
    +
    +              // If conversion was performed, convertedRow will be a standard Object, but if
    +              // conversion wasn't necessary, it will still be lazy. We can't have both across
    +              // partitions, so we serialize and deserialize again to make it lazy.
    +              if (tableSerDe.isInstanceOf[OrcSerde]) {
    +                convertedRow
    +              } else {
    +                convertedRow match {
    --- End diff --
    
    Yeah, the code is from Shark, and it is a little bit tricky. I think the logic here is:
    * We assumes the table object inspector is always a lazy objectinspector, and the deserializer always produces the lazy objects
    * We assumes the ObjectInspectorConverter always produces NON LAZY objects if the objectinspectors ARE NOT compatible.
    * If `convertedRow` is the lazy object, which means partition objectinspector is compatible with the table objectinspector(`convertedRow` can be retrieved directly), otherwise, the non lazy `convertedRow` is not acceptable by the table object inspector( table object inspector is the lazy object inspector), hence we need to convert it by serializing and de-serializing again.
    
    I don't think we need to maintain the logic here, as we can provide a better solution for the partition based table scanning. All we need to do is converting the `raw object` into `MutableRow` directly, as we did in `HiveTableScan` of Spark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1390#discussion_r14856885
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
    @@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
     
           // Create local references so that the outer object isn't serialized.
           val tableDesc = _tableDesc
    +      val tableSerDeClass = tableDesc.getDeserializerClass
    +
           val broadcastedHiveConf = _broadcastedHiveConf
           val localDeserializer = partDeserializer
     
           val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
    -      hivePartitionRDD.mapPartitions { iter =>
    +      hivePartitionRDD.mapPartitions { case iter =>
    --- End diff --
    
    is the pattern matching here necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2443][SQL] Fix slow read from partition...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on the pull request:

    https://github.com/apache/spark/pull/1390#issuecomment-48859675
  
    The code looks good to me. However, I think we can avoid the work around solution (de-serializing (with partition serde) and then serialize (with table serde) again) for adapting the higher level table scan (`TableScanOperator` in Shark), which have to providing a unique `ObjectInspector` for the downstream Operators.
    
    Not like `TableScanOperator`, `HiveTableScan` in `Spark-Hive` doesn't reply on `ObjectInspector`, 
    and its output type is `GenericMutableRow`, I think we could make the object conversion (from raw type to `Row` object) directly.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---