You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/03/26 01:14:14 UTC

[GitHub] [spark] maropu commented on a change in pull request #31965: [SPARK-34843][SQL] Improve stride calculation to decide partitions in JDBCRelation

maropu commented on a change in pull request #31965:
URL: https://github.com/apache/spark/pull/31965#discussion_r601938799



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
##########
@@ -118,13 +119,28 @@ private[sql] object JDBCRelation extends Logging {
           s"Upper bound: ${boundValueToString(upperBound)}.")
         upperBound - lowerBound
       }
-    // Overflow and silliness can happen if you subtract then divide.
-    // Here we get a little roundoff, but that's (hopefully) OK.
-    val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
+
+    // Overflow can happen if you subtract then divide. For example:
+    // (Long.MaxValue - Long.MinValue) / (numPartitions - 2).
+    // Also, using fixed-point decimals here to avoid possible inaccuracy from floating point.
+    val strideUpperCalculation = (upperBound / BigDecimal(numPartitions))

Review comment:
       nit: `strideUpperCalculation` => `upperStride`?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
##########
@@ -433,6 +433,61 @@ class JDBCSuite extends QueryTest
     assert(ids(2) === 3)
   }
 
+  test("SPARK-34843: columnPartition should generate a correct stride size") {
+
+    val schema = StructType(Seq(
+      StructField("PartitionColumn", DateType, nullable = false)
+    ))
+
+    val numPartitions = 1000
+    val partitionConfig = Map(
+      "lowerBound" -> "1930-01-01",
+      "upperBound" -> "2020-12-31",
+      "numPartitions" -> numPartitions.toString,
+      "partitionColumn" -> "PartitionColumn"
+    )
+
+    val partitions = JDBCRelation.columnPartition(
+      schema,
+      analysis.caseInsensitiveResolution,
+      TimeZone.getDefault.toZoneId.toString,
+      new JDBCOptions(url, "table", partitionConfig)
+    )
+
+    val lastPredicate = partitions(numPartitions - 1).asInstanceOf[JDBCPartition].whereClause
+    assert(lastPredicate == """"PartitionColumn" >= '2020-08-02'""")
+  }
+
+  test("SPARK-34843: columnPartition should realign the first partition for better distribution") {
+

Review comment:
       ditto

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
##########
@@ -118,13 +119,28 @@ private[sql] object JDBCRelation extends Logging {
           s"Upper bound: ${boundValueToString(upperBound)}.")
         upperBound - lowerBound
       }
-    // Overflow and silliness can happen if you subtract then divide.
-    // Here we get a little roundoff, but that's (hopefully) OK.
-    val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
+
+    // Overflow can happen if you subtract then divide. For example:
+    // (Long.MaxValue - Long.MinValue) / (numPartitions - 2).
+    // Also, using fixed-point decimals here to avoid possible inaccuracy from floating point.
+    val strideUpperCalculation = (upperBound / BigDecimal(numPartitions))
+      .setScale(18, RoundingMode.HALF_EVEN)
+    val strideLowerCalculation = (lowerBound / BigDecimal(numPartitions))
+      .setScale(18, RoundingMode.HALF_EVEN)

Review comment:
       Why do we use `scale=18` and `RoundingMode.HALF_EVEN` here?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
##########
@@ -118,13 +119,28 @@ private[sql] object JDBCRelation extends Logging {
           s"Upper bound: ${boundValueToString(upperBound)}.")
         upperBound - lowerBound
       }
-    // Overflow and silliness can happen if you subtract then divide.
-    // Here we get a little roundoff, but that's (hopefully) OK.
-    val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
+
+    // Overflow can happen if you subtract then divide. For example:
+    // (Long.MaxValue - Long.MinValue) / (numPartitions - 2).
+    // Also, using fixed-point decimals here to avoid possible inaccuracy from floating point.
+    val strideUpperCalculation = (upperBound / BigDecimal(numPartitions))
+      .setScale(18, RoundingMode.HALF_EVEN)
+    val strideLowerCalculation = (lowerBound / BigDecimal(numPartitions))
+      .setScale(18, RoundingMode.HALF_EVEN)
+
+    val preciseStride = strideUpperCalculation - strideLowerCalculation
+    val stride = preciseStride.toLong
+
+    // Determine the number of strides the last partition will fall short of compared to the
+    // supplied upper bound. Take half of those strides, and then add them to the lower bound
+    // for better distribution of the first and last partitions.
+    val lostNumOfStrides = (preciseStride - stride) * numPartitions / stride
+    val lowerBoundWithStrideAlignment = lowerBound +
+      ((lostNumOfStrides / 2) * stride).setScale(0, RoundingMode.HALF_UP).toLong

Review comment:
       > This can lead to a big difference between the provided upper bound and the actual start of the last partition.
   
   Could you add a simple calculation example in the PR description for showing the difference between the current one and the proposed one ?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
##########
@@ -433,6 +433,61 @@ class JDBCSuite extends QueryTest
     assert(ids(2) === 3)
   }
 
+  test("SPARK-34843: columnPartition should generate a correct stride size") {

Review comment:
       What's a difference between this test and `test("SPARK-34843: columnPartition should realign the first partition for better distribution")`? They have a different test purpose?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
##########
@@ -433,6 +433,61 @@ class JDBCSuite extends QueryTest
     assert(ids(2) === 3)
   }
 
+  test("SPARK-34843: columnPartition should generate a correct stride size") {
+

Review comment:
       nit: unnecessary blank.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
##########
@@ -433,6 +433,61 @@ class JDBCSuite extends QueryTest
     assert(ids(2) === 3)
   }
 
+  test("SPARK-34843: columnPartition should generate a correct stride size") {
+
+    val schema = StructType(Seq(
+      StructField("PartitionColumn", DateType, nullable = false)

Review comment:
       Why `nullable = false`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
##########
@@ -118,13 +119,28 @@ private[sql] object JDBCRelation extends Logging {
           s"Upper bound: ${boundValueToString(upperBound)}.")
         upperBound - lowerBound
       }
-    // Overflow and silliness can happen if you subtract then divide.
-    // Here we get a little roundoff, but that's (hopefully) OK.
-    val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
+
+    // Overflow can happen if you subtract then divide. For example:
+    // (Long.MaxValue - Long.MinValue) / (numPartitions - 2).
+    // Also, using fixed-point decimals here to avoid possible inaccuracy from floating point.
+    val strideUpperCalculation = (upperBound / BigDecimal(numPartitions))
+      .setScale(18, RoundingMode.HALF_EVEN)
+    val strideLowerCalculation = (lowerBound / BigDecimal(numPartitions))

Review comment:
       ditto: `strideLowerCalculation` => `lowerStride`?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
##########
@@ -433,6 +433,61 @@ class JDBCSuite extends QueryTest
     assert(ids(2) === 3)
   }
 
+  test("SPARK-34843: columnPartition should generate a correct stride size") {
+
+    val schema = StructType(Seq(
+      StructField("PartitionColumn", DateType, nullable = false)
+    ))
+
+    val numPartitions = 1000
+    val partitionConfig = Map(
+      "lowerBound" -> "1930-01-01",
+      "upperBound" -> "2020-12-31",
+      "numPartitions" -> numPartitions.toString,
+      "partitionColumn" -> "PartitionColumn"
+    )
+
+    val partitions = JDBCRelation.columnPartition(
+      schema,
+      analysis.caseInsensitiveResolution,
+      TimeZone.getDefault.toZoneId.toString,
+      new JDBCOptions(url, "table", partitionConfig)
+    )
+
+    val lastPredicate = partitions(numPartitions - 1).asInstanceOf[JDBCPartition].whereClause
+    assert(lastPredicate == """"PartitionColumn" >= '2020-08-02'""")
+  }
+
+  test("SPARK-34843: columnPartition should realign the first partition for better distribution") {
+
+    val schema = StructType(Seq(
+      StructField("PartitionColumn", DateType, nullable = false)

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org