You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/09/27 05:51:59 UTC

[GitHub] [spark] sadikovi commented on a diff in pull request #35764: [SPARK-38444][SQL]Automatically calculate the upper and lower bounds of partitions when no specified partition related params

sadikovi commented on code in PR #35764:
URL: https://github.com/apache/spark/pull/35764#discussion_r980766168


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala:
##########
@@ -168,6 +177,71 @@ private[sql] object JDBCRelation extends Logging {
     partitions
   }
 
+  /**
+   * get the min and max value by the column
+   * @param schema resolved schema of a JDBC table
+   * @param resolver function used to determine if two identifiers are equal
+   * @param timeZoneId timezone ID to be used if a partition column type is date or timestamp
+   * @param jdbcOptions JDBC options that contains url
+   * @param filters filters in Where clause
+   * @return JDBCPartitioningInfo
+   */
+  def getPartitionBound(
+      schema: StructType,
+      resolver: Resolver,
+      timeZoneId: String,
+      jdbcOptions: JDBCOptions,
+      filters: Array[Filter] = Array.empty): JDBCPartitioningInfo = {
+    // columns in filters
+    val filterColumns = new util.ArrayList[String]()
+    filters.map(filter => filter.references.distinct.map(r => filterColumns.add(r)))
+    // primary keys used for partitioning
+    val prks = schema.fields.filter(
+      f => f.metadata.getBoolean("isIndexKey") &&
+        !filterColumns.contains(f.name) &&
+        (f.dataType.isInstanceOf[NumericType] ||
+          f.dataType.isInstanceOf[DateType] ||
+          f.dataType.isInstanceOf[TimestampType]))
+
+    if (prks.length > 0) {
+      val prk = prks.head
+      val dataType = prk.dataType
+      var lBound: String = null
+      var uBound: String = null
+      val sql = s"select min(${prk.name}) as lBound, max(${prk.name}) as uBound " +

Review Comment:
   Can you explain this logic in the javadoc for this method? Also, what happens if the table is empty?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala:
##########
@@ -168,6 +177,71 @@ private[sql] object JDBCRelation extends Logging {
     partitions
   }
 
+  /**
+   * get the min and max value by the column

Review Comment:
   nit: Get the min and max values for the column.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala:
##########
@@ -168,6 +177,71 @@ private[sql] object JDBCRelation extends Logging {
     partitions
   }
 
+  /**
+   * get the min and max value by the column
+   * @param schema resolved schema of a JDBC table
+   * @param resolver function used to determine if two identifiers are equal
+   * @param timeZoneId timezone ID to be used if a partition column type is date or timestamp
+   * @param jdbcOptions JDBC options that contains url
+   * @param filters filters in Where clause
+   * @return JDBCPartitioningInfo
+   */
+  def getPartitionBound(
+      schema: StructType,
+      resolver: Resolver,
+      timeZoneId: String,
+      jdbcOptions: JDBCOptions,
+      filters: Array[Filter] = Array.empty): JDBCPartitioningInfo = {
+    // columns in filters
+    val filterColumns = new util.ArrayList[String]()
+    filters.map(filter => filter.references.distinct.map(r => filterColumns.add(r)))
+    // primary keys used for partitioning
+    val prks = schema.fields.filter(
+      f => f.metadata.getBoolean("isIndexKey") &&
+        !filterColumns.contains(f.name) &&
+        (f.dataType.isInstanceOf[NumericType] ||
+          f.dataType.isInstanceOf[DateType] ||
+          f.dataType.isInstanceOf[TimestampType]))
+
+    if (prks.length > 0) {
+      val prk = prks.head
+      val dataType = prk.dataType
+      var lBound: String = null
+      var uBound: String = null
+      val sql = s"select min(${prk.name}) as lBound, max(${prk.name}) as uBound " +
+        s"from ${jdbcOptions.tableOrQuery} limit 1"
+      val conn = JdbcDialects.get(jdbcOptions.url).createConnectionFactory(jdbcOptions)(-1)
+      try {
+        val statement = conn.prepareStatement(sql)
+        try {
+          statement.setQueryTimeout(jdbcOptions.queryTimeout)
+          val resultSet = statement.executeQuery()
+          while (resultSet.next()) {
+            lBound = resultSet.getString("lBound")

Review Comment:
   Would it work for primary keys that are integers or timestamps?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala:
##########
@@ -168,6 +177,71 @@ private[sql] object JDBCRelation extends Logging {
     partitions
   }
 
+  /**
+   * get the min and max value by the column
+   * @param schema resolved schema of a JDBC table
+   * @param resolver function used to determine if two identifiers are equal
+   * @param timeZoneId timezone ID to be used if a partition column type is date or timestamp
+   * @param jdbcOptions JDBC options that contains url
+   * @param filters filters in Where clause
+   * @return JDBCPartitioningInfo
+   */
+  def getPartitionBound(
+      schema: StructType,
+      resolver: Resolver,
+      timeZoneId: String,
+      jdbcOptions: JDBCOptions,
+      filters: Array[Filter] = Array.empty): JDBCPartitioningInfo = {
+    // columns in filters
+    val filterColumns = new util.ArrayList[String]()
+    filters.map(filter => filter.references.distinct.map(r => filterColumns.add(r)))
+    // primary keys used for partitioning
+    val prks = schema.fields.filter(
+      f => f.metadata.getBoolean("isIndexKey") &&

Review Comment:
   Does the code handle composite primary keys or any multi-column indexes, e.g. with 2 or more columns?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala:
##########
@@ -111,6 +111,9 @@ class JDBCOptions(
   // the number of partitions
   val numPartitions = parameters.get(JDBC_NUM_PARTITIONS).map(_.toInt)
 
+  // the default number of partitions

Review Comment:
   Can you update this comment? It is unclear what default number of partition it is - is it for overall number of partitions in the RDD or is it specifically for primary keys in the table and pushed filters?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala:
##########
@@ -168,6 +177,71 @@ private[sql] object JDBCRelation extends Logging {
     partitions
   }
 
+  /**
+   * get the min and max value by the column
+   * @param schema resolved schema of a JDBC table
+   * @param resolver function used to determine if two identifiers are equal
+   * @param timeZoneId timezone ID to be used if a partition column type is date or timestamp
+   * @param jdbcOptions JDBC options that contains url
+   * @param filters filters in Where clause
+   * @return JDBCPartitioningInfo
+   */
+  def getPartitionBound(
+      schema: StructType,
+      resolver: Resolver,
+      timeZoneId: String,
+      jdbcOptions: JDBCOptions,
+      filters: Array[Filter] = Array.empty): JDBCPartitioningInfo = {
+    // columns in filters
+    val filterColumns = new util.ArrayList[String]()
+    filters.map(filter => filter.references.distinct.map(r => filterColumns.add(r)))
+    // primary keys used for partitioning
+    val prks = schema.fields.filter(
+      f => f.metadata.getBoolean("isIndexKey") &&
+        !filterColumns.contains(f.name) &&
+        (f.dataType.isInstanceOf[NumericType] ||
+          f.dataType.isInstanceOf[DateType] ||
+          f.dataType.isInstanceOf[TimestampType]))
+
+    if (prks.length > 0) {
+      val prk = prks.head
+      val dataType = prk.dataType
+      var lBound: String = null
+      var uBound: String = null
+      val sql = s"select min(${prk.name}) as lBound, max(${prk.name}) as uBound " +
+        s"from ${jdbcOptions.tableOrQuery} limit 1"
+      val conn = JdbcDialects.get(jdbcOptions.url).createConnectionFactory(jdbcOptions)(-1)
+      try {
+        val statement = conn.prepareStatement(sql)
+        try {
+          statement.setQueryTimeout(jdbcOptions.queryTimeout)
+          val resultSet = statement.executeQuery()
+          while (resultSet.next()) {
+            lBound = resultSet.getString("lBound")
+            uBound = resultSet.getString("uBound")
+          }
+        } catch {
+          case _: SQLException =>

Review Comment:
   Maybe it is worth at least logging the exception but I would consider re-throwing it.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala:
##########
@@ -306,6 +318,12 @@ object JdbcUtils extends Logging with SQLConfHelper {
       val columnType =
         dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse(
           getCatalystType(dataType, fieldSize, fieldScale, isSigned))
+      list.contains(columnName) match {

Review Comment:
   Is it the same as:
   ```scala
   metadata.putBoolean("isIndexKey", list.contains(columnName))
   ```
   
   Also, can we make list a set?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala:
##########
@@ -168,6 +177,71 @@ private[sql] object JDBCRelation extends Logging {
     partitions
   }
 
+  /**
+   * get the min and max value by the column
+   * @param schema resolved schema of a JDBC table
+   * @param resolver function used to determine if two identifiers are equal
+   * @param timeZoneId timezone ID to be used if a partition column type is date or timestamp
+   * @param jdbcOptions JDBC options that contains url
+   * @param filters filters in Where clause
+   * @return JDBCPartitioningInfo
+   */
+  def getPartitionBound(
+      schema: StructType,
+      resolver: Resolver,
+      timeZoneId: String,
+      jdbcOptions: JDBCOptions,
+      filters: Array[Filter] = Array.empty): JDBCPartitioningInfo = {

Review Comment:
   Shall we return `Option[JDBCPartitioningInfo]` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org