You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/05/28 14:26:32 UTC
[spark] branch master updated: [SPARK-27776][SQL] Avoid duplicate
Java reflection in DataSource.
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c30b529 [SPARK-27776][SQL] Avoid duplicate Java reflection in DataSource.
c30b529 is described below
commit c30b5297bc607ae33cc2fcf624b127942154e559
Author: gengjiaan <ge...@360.cn>
AuthorDate: Tue May 28 09:26:06 2019 -0500
[SPARK-27776][SQL] Avoid duplicate Java reflection in DataSource.
## What changes were proposed in this pull request?
I checked the code of
`org.apache.spark.sql.execution.datasources.DataSource`
, there exists duplicate Java reflection.
`sourceSchema`,`createSource`,`createSink`,`resolveRelation`,`writeAndRead`, all the methods call the `providingClass.getConstructor().newInstance()`.
The instance of `providingClass` is stateless, such as:
`KafkaSourceProvider`
`RateSourceProvider`
`TextSocketSourceProvider`
`JdbcRelationProvider`
`ConsoleSinkProvider`
AFAIK, Java reflection will result in significant performance issue.
The oracle website [https://docs.oracle.com/javase/tutorial/reflect/index.html](https://docs.oracle.com/javase/tutorial/reflect/index.html) contains some performance description about Java reflection:
```
Performance Overhead
Because reflection involves types that are dynamically resolved, certain Java virtual machine optimizations can not be performed. Consequently, reflective operations have slower performance than their non-reflective counterparts, and should be avoided in sections of code which are called frequently in performance-sensitive applications.
```
I have found some performance cost test of Java reflection as follows:
[https://blog.frankel.ch/performance-cost-of-reflection/](https://blog.frankel.ch/performance-cost-of-reflection/) contains performance cost test.
[https://stackoverflow.com/questions/435553/java-reflection-performance](https://stackoverflow.com/questions/435553/java-reflection-performance) has a discussion of java reflection.
So I think should avoid duplicate Java reflection and reuse the instance of `providingClass`.
## How was this patch tested?
Exists UT.
Closes #24647 from beliefer/optimize-DataSource.
Authored-by: gengjiaan <ge...@360.cn>
Signed-off-by: Sean Owen <se...@databricks.com>
---
.../spark/sql/execution/datasources/DataSource.scala | 15 +++++++++------
1 file changed, 9 insertions(+), 6 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index ef430f4..04ae528 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -105,6 +105,9 @@ case class DataSource(
case _ => cls
}
}
+
+ private def providingInstance() = providingClass.getConstructor().newInstance()
+
lazy val sourceInfo: SourceInfo = sourceSchema()
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
private val equality = sparkSession.sessionState.conf.resolver
@@ -210,7 +213,7 @@ case class DataSource(
/** Returns the name and schema of the source that can be used to continually read data. */
private def sourceSchema(): SourceInfo = {
- providingClass.getConstructor().newInstance() match {
+ providingInstance() match {
case s: StreamSourceProvider =>
val (name, schema) = s.sourceSchema(
sparkSession.sqlContext, userSpecifiedSchema, className, caseInsensitiveOptions)
@@ -264,7 +267,7 @@ case class DataSource(
/** Returns a source that can be used to continually read data. */
def createSource(metadataPath: String): Source = {
- providingClass.getConstructor().newInstance() match {
+ providingInstance() match {
case s: StreamSourceProvider =>
s.createSource(
sparkSession.sqlContext,
@@ -293,7 +296,7 @@ case class DataSource(
/** Returns a sink that can be used to continually write data. */
def createSink(outputMode: OutputMode): Sink = {
- providingClass.getConstructor().newInstance() match {
+ providingInstance() match {
case s: StreamSinkProvider =>
s.createSink(sparkSession.sqlContext, caseInsensitiveOptions, partitionColumns, outputMode)
@@ -324,7 +327,7 @@ case class DataSource(
* that files already exist, we don't need to check them again.
*/
def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
- val relation = (providingClass.getConstructor().newInstance(), userSpecifiedSchema) match {
+ val relation = (providingInstance(), userSpecifiedSchema) match {
// TODO: Throw when too much is given.
case (dataSource: SchemaRelationProvider, Some(schema)) =>
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions, schema)
@@ -495,7 +498,7 @@ case class DataSource(
throw new AnalysisException("Cannot save interval data type into external storage.")
}
- providingClass.getConstructor().newInstance() match {
+ providingInstance() match {
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(
sparkSession.sqlContext, mode, caseInsensitiveOptions, Dataset.ofRows(sparkSession, data))
@@ -532,7 +535,7 @@ case class DataSource(
throw new AnalysisException("Cannot save interval data type into external storage.")
}
- providingClass.getConstructor().newInstance() match {
+ providingInstance() match {
case dataSource: CreatableRelationProvider =>
SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode)
case format: FileFormat =>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org