You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/18 19:19:54 UTC

spark git commit: [SPARK-18457][SQL] ORC and other columnar formats using HiveShim read all columns when doing a simple count

Repository: spark
Updated Branches:
  refs/heads/master 51baca221 -> 795e9fc92


[SPARK-18457][SQL] ORC and other columnar formats using HiveShim read all columns when doing a simple count

## What changes were proposed in this pull request?

When reading zero columns (e.g., count(*)) from ORC or any other format that uses HiveShim, actually set the read column list to empty for Hive to use.

## How was this patch tested?

Query correctness is handled by existing unit tests. I'm happy to add more if anyone can point out some case that is not covered.

Reduction in data read can be verified in the UI when built with a recent version of Hadoop say:
```
build/mvn -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.0 -Phive -DskipTests clean package
```
However the default Hadoop 2.2 that is used for unit tests does not report actual bytes read and instead just full file sizes (see FileScanRDD.scala line 80). Therefore I don't think there is a good way to add a unit test for this.

I tested with the following setup using above build options
```
case class OrcData(intField: Long, stringField: String)
spark.range(1,1000000).map(i => OrcData(i, s"part-$i")).toDF().write.format("orc").save("orc_test")

sql(
      s"""CREATE EXTERNAL TABLE orc_test(
         |  intField LONG,
         |  stringField STRING
         |)
         |STORED AS ORC
         |LOCATION '${System.getProperty("user.dir") + "/orc_test"}'
       """.stripMargin)
```

## Results

query | Spark 2.0.2 | this PR
---|---|---
`sql("select count(*) from orc_test").collect`|4.4 MB|199.4 KB
`sql("select intField from orc_test").collect`|743.4 KB|743.4 KB
`sql("select * from orc_test").collect`|4.4 MB|4.4 MB

Author: Andrew Ray <ra...@gmail.com>

Closes #15898 from aray/sql-orc-no-col.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/795e9fc9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/795e9fc9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/795e9fc9

Branch: refs/heads/master
Commit: 795e9fc9213cb9941ae131aadcafddb94bde5f74
Parents: 51baca2
Author: Andrew Ray <ra...@gmail.com>
Authored: Fri Nov 18 11:19:49 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri Nov 18 11:19:49 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/hive/HiveShim.scala    |  6 ++---
 .../spark/sql/hive/orc/OrcQuerySuite.scala      | 25 +++++++++++++++++++-
 2 files changed, 27 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/795e9fc9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
index 0d2a765..9e98948 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
@@ -69,13 +69,13 @@ private[hive] object HiveShim {
   }
 
   /*
-   * Cannot use ColumnProjectionUtils.appendReadColumns directly, if ids is null or empty
+   * Cannot use ColumnProjectionUtils.appendReadColumns directly, if ids is null
    */
   def appendReadColumns(conf: Configuration, ids: Seq[Integer], names: Seq[String]) {
-    if (ids != null && ids.nonEmpty) {
+    if (ids != null) {
       ColumnProjectionUtils.appendReadColumns(conf, ids.asJava)
     }
-    if (names != null && names.nonEmpty) {
+    if (names != null) {
       appendReadColumnNames(conf, names)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/795e9fc9/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index ecb5972..a628977 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -20,11 +20,13 @@ package org.apache.spark.sql.hive.orc
 import java.nio.charset.StandardCharsets
 import java.sql.Timestamp
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.ql.io.orc.{OrcStruct, SparkOrcNewRecordReader}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator}
 import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation}
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
@@ -577,4 +579,25 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
       assert(spark.table(tableName).schema == schema.copy(fields = expectedFields))
     }
   }
+
+  test("Empty schema does not read data from ORC file") {
+    val data = Seq((1, 1), (2, 2))
+    withOrcFile(data) { path =>
+      val requestedSchema = StructType(Nil)
+      val conf = new Configuration()
+      val physicalSchema = OrcFileOperator.readSchema(Seq(path), Some(conf)).get
+      OrcRelation.setRequiredColumns(conf, physicalSchema, requestedSchema)
+      val maybeOrcReader = OrcFileOperator.getFileReader(path, Some(conf))
+      assert(maybeOrcReader.isDefined)
+      val orcRecordReader = new SparkOrcNewRecordReader(
+        maybeOrcReader.get, conf, 0, maybeOrcReader.get.getContentLength)
+
+      val recordsIterator = new RecordReaderIterator[OrcStruct](orcRecordReader)
+      try {
+        assert(recordsIterator.next().toString == "{null, null}")
+      } finally {
+        recordsIterator.close()
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org