You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by wu...@apache.org on 2021/04/29 03:47:17 UTC

[spark] branch master updated: [SPARK-35135][CORE] Turn the `WritablePartitionedIterator` from a trait into a default implementation class

This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 74b9326  [SPARK-35135][CORE] Turn the `WritablePartitionedIterator` from a trait into a default implementation class
74b9326 is described below

commit 74b93261af29e76b9da31b1c9f20900a818d97e6
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Thu Apr 29 11:46:24 2021 +0800

    [SPARK-35135][CORE] Turn the `WritablePartitionedIterator` from a trait into a default implementation class
    
    ### What changes were proposed in this pull request?
    `WritablePartitionedIterator` define in `WritablePartitionedPairCollection.scala` and there are two implementation of these trait,  but the code for these two implementations is duplicate.
    
    The main change of this pr is turn the `WritablePartitionedIterator` from a trait into a default implementation class because there is only one implementation now.
    
    ### Why are the changes needed?
    Cleanup duplicate code.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Pass the Jenkins or GitHub Action
    
    Closes #32232 from LuciferYang/writable-partitioned-iterator.
    
    Authored-by: yangjie01 <ya...@baidu.com>
    Signed-off-by: yi.wu <yi...@databricks.com>
---
 .../spark/util/collection/ExternalSorter.scala     | 17 +++----------
 .../WritablePartitionedPairCollection.scala        | 28 +++++++++-------------
 project/MimaExcludes.scala                         |  5 +++-
 3 files changed, 18 insertions(+), 32 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 66bc3e5..1913637 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -263,7 +263,7 @@ private[spark] class ExternalSorter[K, V, C](
   /**
    * Spill contents of in-memory iterator to a temporary file on disk.
    */
-  private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)
+  private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator[K, C])
       : SpilledFile = {
     // Because these files may be read during shuffle, their compression must be controlled by
     // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
@@ -750,7 +750,7 @@ private[spark] class ExternalSorter[K, V, C](
       // Case where we only have in-memory data
       val collection = if (aggregator.isDefined) map else buffer
       val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
-      while (it.hasNext()) {
+      while (it.hasNext) {
         val partitionId = it.nextPartition()
         var partitionWriter: ShufflePartitionWriter = null
         var partitionPairsWriter: ShufflePartitionPairsWriter = null
@@ -866,18 +866,7 @@ private[spark] class ExternalSorter[K, V, C](
       if (hasSpilled) {
         false
       } else {
-        val inMemoryIterator = new WritablePartitionedIterator {
-          private[this] var cur = if (upstream.hasNext) upstream.next() else null
-
-          def writeNext(writer: PairsWriter): Unit = {
-            writer.write(cur._1._2, cur._2)
-            cur = if (upstream.hasNext) upstream.next() else null
-          }
-
-          def hasNext(): Boolean = cur != null
-
-          def nextPartition(): Int = cur._1._1
-        }
+        val inMemoryIterator = new WritablePartitionedIterator[K, C](upstream)
         logInfo(s"Task ${TaskContext.get().taskAttemptId} force spilling in-memory map to disk " +
           s"and it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
         val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
diff --git a/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala b/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala
index 9624b02..3472a08 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala
@@ -46,20 +46,9 @@ private[spark] trait WritablePartitionedPairCollection[K, V] {
    * This may destroy the underlying collection.
    */
   def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]])
-    : WritablePartitionedIterator = {
+    : WritablePartitionedIterator[K, V] = {
     val it = partitionedDestructiveSortedIterator(keyComparator)
-    new WritablePartitionedIterator {
-      private[this] var cur = if (it.hasNext) it.next() else null
-
-      def writeNext(writer: PairsWriter): Unit = {
-        writer.write(cur._1._2, cur._2)
-        cur = if (it.hasNext) it.next() else null
-      }
-
-      def hasNext(): Boolean = cur != null
-
-      def nextPartition(): Int = cur._1._1
-    }
+    new WritablePartitionedIterator[K, V](it)
   }
 }
 
@@ -87,10 +76,15 @@ private[spark] object WritablePartitionedPairCollection {
  * Iterator that writes elements to a DiskBlockObjectWriter instead of returning them. Each element
  * has an associated partition.
  */
-private[spark] trait WritablePartitionedIterator {
-  def writeNext(writer: PairsWriter): Unit
+private[spark] class WritablePartitionedIterator[K, V](it: Iterator[((Int, K), V)]) {
+  private[this] var cur = if (it.hasNext) it.next() else null
+
+  def writeNext(writer: PairsWriter): Unit = {
+    writer.write(cur._1._2, cur._2)
+    cur = if (it.hasNext) it.next() else null
+  }
 
-  def hasNext(): Boolean
+  def hasNext: Boolean = cur != null
 
-  def nextPartition(): Int
+  def nextPartition(): Int = cur._1._1
 }
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index dc11f33..7c168c1 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -55,7 +55,10 @@ object MimaExcludes {
     ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getBinary"),
     ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getArray"),
     ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getMap"),
-    ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getChild")
+    ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getChild"),
+
+    // [SPARK-35135][CORE] Turn WritablePartitionedIterator from trait into a default implementation class
+    ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.WritablePartitionedIterator")
   )
 
   // Exclude rules for 3.1.x

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org