You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/10/05 19:47:41 UTC

[GitHub] [spark] MaxGekk commented on a change in pull request #29939: [SPARK-33062][SQL] Make DataFrameReader.jdbc work for DataSource V2

MaxGekk commented on a change in pull request #29939:
URL: https://github.com/apache/spark/pull/29939#discussion_r499819222



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala
##########
@@ -36,6 +37,8 @@ case class JDBCScanBuilder(
 
   private var prunedSchema = schema
 
+  var partition = Array.empty[Partition]

Review comment:
       How about to avoid `var`, and add `partition` to the case class `JDBCScanBuilder`, and use `.copy`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala
##########
@@ -64,7 +67,11 @@ case class JDBCScanBuilder(
   override def build(): Scan = {
     val resolver = session.sessionState.conf.resolver
     val timeZoneId = session.sessionState.conf.sessionLocalTimeZone
-    val parts = JDBCRelation.columnPartition(schema, resolver, timeZoneId, jdbcOptions)
-    JDBCScan(JDBCRelation(schema, parts, jdbcOptions)(session), prunedSchema, pushedFilter)
+    val part = if (partition.isEmpty) {

Review comment:
       Is there any reason for renaming `parts` -> `part`. It is an array of partitions, correct?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
##########
@@ -362,7 +363,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
               }
           }
 
-          val relation = DataSourceV2Relation.create(table, catalog, ident, dsOptions)
+          val relation = DataSourceV2Relation.create(table, catalog, ident, dsOptions,
+            Array.empty[Partition])

Review comment:
       ```suggestion
             val relation = DataSourceV2Relation.create(
               table,
               catalog,
               ident,
               dsOptions,
               Array.empty[Partition])
   ```

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
##########
@@ -43,7 +44,8 @@ case class DataSourceV2Relation(
     output: Seq[AttributeReference],
     catalog: Option[CatalogPlugin],
     identifier: Option[Identifier],
-    options: CaseInsensitiveStringMap)
+    options: CaseInsensitiveStringMap,
+    partitions: Array[Partition] = Array.empty[Partition])

Review comment:
       Why did you add the default value `Array.empty[Partition]`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
##########
@@ -333,9 +333,26 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
     assertNoSpecifiedSchema("jdbc")
     // properties should override settings in extraOptions.
     this.extraOptions ++= properties.asScala
-    // explicit url and dbtable should override all
-    this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
-    format("jdbc").load()
+    // explicit url should override all
+    this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url)

Review comment:
       Probably, no need to construct the Seq.
   ```suggestion
       this.extraOptions += JDBCOptions.JDBC_URL -> url
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
##########
@@ -333,9 +333,26 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
     assertNoSpecifiedSchema("jdbc")
     // properties should override settings in extraOptions.
     this.extraOptions ++= properties.asScala
-    // explicit url and dbtable should override all
-    this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
-    format("jdbc").load()
+    // explicit url should override all
+    this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url)
+
+    import sparkSession.sessionState.analyzer.NonSessionCatalogAndIdentifier
+
+    this.source = "jdbc"
+    if (table.contains(" ")) { // if table is not a table name, e.g. a SELECT statement

Review comment:
       Is it reliable check? 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
##########
@@ -333,9 +333,26 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
     assertNoSpecifiedSchema("jdbc")
     // properties should override settings in extraOptions.
     this.extraOptions ++= properties.asScala
-    // explicit url and dbtable should override all
-    this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
-    format("jdbc").load()
+    // explicit url should override all
+    this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url)
+
+    import sparkSession.sessionState.analyzer.NonSessionCatalogAndIdentifier
+
+    this.source = "jdbc"
+    if (table.contains(" ")) { // if table is not a table name, e.g. a SELECT statement
+      // explicit dbtable should override all
+      this.extraOptions ++= Seq(JDBCOptions.JDBC_TABLE_NAME -> table)
+      load
+    } else {
+      sparkSession.sessionState.sqlParser.parseMultipartIdentifier(table) match {
+        case nameParts@NonSessionCatalogAndIdentifier(_, _) =>

Review comment:
       ```suggestion
           case nameParts @ NonSessionCatalogAndIdentifier(_, _) =>
   ```

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -957,7 +959,8 @@ class Analyzer(
                 Some(StreamingRelationV2(None, table.name, table, options,
                   table.schema.toAttributes, Some(catalog), Some(ident), None))
               } else {
-                Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident), options))
+                Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident), options,
+                  partitions))

Review comment:
       According to the style guide:
   ```suggestion
                   Some(DataSourceV2Relation.create(
                     table,
                     Some(catalog),
                     Some(ident),
                     options,
                     partitions))
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
##########
@@ -333,9 +333,26 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
     assertNoSpecifiedSchema("jdbc")
     // properties should override settings in extraOptions.
     this.extraOptions ++= properties.asScala
-    // explicit url and dbtable should override all
-    this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
-    format("jdbc").load()
+    // explicit url should override all
+    this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url)
+
+    import sparkSession.sessionState.analyzer.NonSessionCatalogAndIdentifier
+
+    this.source = "jdbc"
+    if (table.contains(" ")) { // if table is not a table name, e.g. a SELECT statement
+      // explicit dbtable should override all
+      this.extraOptions ++= Seq(JDBCOptions.JDBC_TABLE_NAME -> table)
+      load
+    } else {
+      sparkSession.sessionState.sqlParser.parseMultipartIdentifier(table) match {
+        case nameParts@NonSessionCatalogAndIdentifier(_, _) =>

Review comment:
       `nameParts` is not used actually. Maybe:
   ```suggestion
           case _ @ NonSessionCatalogAndIdentifier(_, _) =>
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
##########
@@ -333,9 +333,26 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
     assertNoSpecifiedSchema("jdbc")
     // properties should override settings in extraOptions.
     this.extraOptions ++= properties.asScala
-    // explicit url and dbtable should override all
-    this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
-    format("jdbc").load()
+    // explicit url should override all
+    this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url)
+
+    import sparkSession.sessionState.analyzer.NonSessionCatalogAndIdentifier
+
+    this.source = "jdbc"
+    if (table.contains(" ")) { // if table is not a table name, e.g. a SELECT statement
+      // explicit dbtable should override all
+      this.extraOptions ++= Seq(JDBCOptions.JDBC_TABLE_NAME -> table)
+      load
+    } else {
+      sparkSession.sessionState.sqlParser.parseMultipartIdentifier(table) match {
+        case nameParts@NonSessionCatalogAndIdentifier(_, _) =>
+          this.table(table)
+        case _ =>
+          // explicit dbtable should override all
+          this.extraOptions ++= Seq(JDBCOptions.JDBC_TABLE_NAME -> table)

Review comment:
       ```suggestion
             this.extraOptions += JDBCOptions.JDBC_TABLE_NAME -> table
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
##########
@@ -333,9 +333,26 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
     assertNoSpecifiedSchema("jdbc")
     // properties should override settings in extraOptions.
     this.extraOptions ++= properties.asScala
-    // explicit url and dbtable should override all
-    this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
-    format("jdbc").load()
+    // explicit url should override all
+    this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url)
+
+    import sparkSession.sessionState.analyzer.NonSessionCatalogAndIdentifier
+
+    this.source = "jdbc"
+    if (table.contains(" ")) { // if table is not a table name, e.g. a SELECT statement
+      // explicit dbtable should override all
+      this.extraOptions ++= Seq(JDBCOptions.JDBC_TABLE_NAME -> table)

Review comment:
       ```suggestion
         this.extraOptions += JDBCOptions.JDBC_TABLE_NAME -> table
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org