You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/01 04:05:20 UTC

spark git commit: [SPARK-23247][SQL] combines Unsafe operations and statistics operations in Scan Data Source

Repository: spark
Updated Branches:
  refs/heads/master 52e00f706 -> 2ac895be9


[SPARK-23247][SQL] combines Unsafe operations and statistics operations in Scan Data Source

## What changes were proposed in this pull request?

Currently, we scan the execution plan of the data source, first the unsafe operation of each row of data, and then re traverse the data for the count of rows. In terms of performance, this is not necessary. this PR combines the two operations and makes statistics on the number of rows while performing the unsafe operation.

Before modified,

```
val unsafeRow = rdd.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
 proj.initialize(index)
iter.map(proj)
}

val numOutputRows = longMetric("numOutputRows")
unsafeRow.map { r =>
numOutputRows += 1
 r
}
```
After modified,

    val numOutputRows = longMetric("numOutputRows")

    rdd.mapPartitionsWithIndexInternal { (index, iter) =>
      val proj = UnsafeProjection.create(schema)
      proj.initialize(index)
      iter.map( r => {
        numOutputRows += 1
        proj(r)
      })
    }

## How was this patch tested?

the existed test cases.

Author: caoxuewen <ca...@zte.com.cn>

Closes #20415 from heary-cao/DataSourceScanExec.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ac895be
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ac895be
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ac895be

Branch: refs/heads/master
Commit: 2ac895be909de7e58e1051dc2a1bba98a25bf4be
Parents: 52e00f7
Author: caoxuewen <ca...@zte.com.cn>
Authored: Thu Feb 1 12:05:12 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Feb 1 12:05:12 2018 +0800

----------------------------------------------------------------------
 .../sql/execution/DataSourceScanExec.scala      | 45 ++++++++++----------
 1 file changed, 22 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2ac895be/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index f7732e2..ba1157d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -90,16 +90,15 @@ case class RowDataSourceScanExec(
     Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   protected override def doExecute(): RDD[InternalRow] = {
-    val unsafeRow = rdd.mapPartitionsWithIndexInternal { (index, iter) =>
+    val numOutputRows = longMetric("numOutputRows")
+
+    rdd.mapPartitionsWithIndexInternal { (index, iter) =>
       val proj = UnsafeProjection.create(schema)
       proj.initialize(index)
-      iter.map(proj)
-    }
-
-    val numOutputRows = longMetric("numOutputRows")
-    unsafeRow.map { r =>
-      numOutputRows += 1
-      r
+      iter.map( r => {
+        numOutputRows += 1
+        proj(r)
+      })
     }
   }
 
@@ -326,22 +325,22 @@ case class FileSourceScanExec(
       // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
       WholeStageCodegenExec(this)(codegenStageId = 0).execute()
     } else {
-      val unsafeRows = {
-        val scan = inputRDD
-        if (needsUnsafeRowConversion) {
-          scan.mapPartitionsWithIndexInternal { (index, iter) =>
-            val proj = UnsafeProjection.create(schema)
-            proj.initialize(index)
-            iter.map(proj)
-          }
-        } else {
-          scan
-        }
-      }
       val numOutputRows = longMetric("numOutputRows")
-      unsafeRows.map { r =>
-        numOutputRows += 1
-        r
+
+      if (needsUnsafeRowConversion) {
+        inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
+          val proj = UnsafeProjection.create(schema)
+          proj.initialize(index)
+          iter.map( r => {
+            numOutputRows += 1
+            proj(r)
+          })
+        }
+      } else {
+        inputRDD.map { r =>
+          numOutputRows += 1
+          r
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org