You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by wu...@apache.org on 2021/04/29 03:47:17 UTC
[spark] branch master updated: [SPARK-35135][CORE] Turn the
`WritablePartitionedIterator` from a trait into a default implementation
class
This is an automated email from the ASF dual-hosted git repository.
wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 74b9326 [SPARK-35135][CORE] Turn the `WritablePartitionedIterator` from a trait into a default implementation class
74b9326 is described below
commit 74b93261af29e76b9da31b1c9f20900a818d97e6
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Thu Apr 29 11:46:24 2021 +0800
[SPARK-35135][CORE] Turn the `WritablePartitionedIterator` from a trait into a default implementation class
### What changes were proposed in this pull request?
`WritablePartitionedIterator` define in `WritablePartitionedPairCollection.scala` and there are two implementation of these trait, but the code for these two implementations is duplicate.
The main change of this pr is turn the `WritablePartitionedIterator` from a trait into a default implementation class because there is only one implementation now.
### Why are the changes needed?
Cleanup duplicate code.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the Jenkins or GitHub Action
Closes #32232 from LuciferYang/writable-partitioned-iterator.
Authored-by: yangjie01 <ya...@baidu.com>
Signed-off-by: yi.wu <yi...@databricks.com>
---
.../spark/util/collection/ExternalSorter.scala | 17 +++----------
.../WritablePartitionedPairCollection.scala | 28 +++++++++-------------
project/MimaExcludes.scala | 5 +++-
3 files changed, 18 insertions(+), 32 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 66bc3e5..1913637 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -263,7 +263,7 @@ private[spark] class ExternalSorter[K, V, C](
/**
* Spill contents of in-memory iterator to a temporary file on disk.
*/
- private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)
+ private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator[K, C])
: SpilledFile = {
// Because these files may be read during shuffle, their compression must be controlled by
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
@@ -750,7 +750,7 @@ private[spark] class ExternalSorter[K, V, C](
// Case where we only have in-memory data
val collection = if (aggregator.isDefined) map else buffer
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
- while (it.hasNext()) {
+ while (it.hasNext) {
val partitionId = it.nextPartition()
var partitionWriter: ShufflePartitionWriter = null
var partitionPairsWriter: ShufflePartitionPairsWriter = null
@@ -866,18 +866,7 @@ private[spark] class ExternalSorter[K, V, C](
if (hasSpilled) {
false
} else {
- val inMemoryIterator = new WritablePartitionedIterator {
- private[this] var cur = if (upstream.hasNext) upstream.next() else null
-
- def writeNext(writer: PairsWriter): Unit = {
- writer.write(cur._1._2, cur._2)
- cur = if (upstream.hasNext) upstream.next() else null
- }
-
- def hasNext(): Boolean = cur != null
-
- def nextPartition(): Int = cur._1._1
- }
+ val inMemoryIterator = new WritablePartitionedIterator[K, C](upstream)
logInfo(s"Task ${TaskContext.get().taskAttemptId} force spilling in-memory map to disk " +
s"and it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
diff --git a/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala b/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala
index 9624b02..3472a08 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala
@@ -46,20 +46,9 @@ private[spark] trait WritablePartitionedPairCollection[K, V] {
* This may destroy the underlying collection.
*/
def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]])
- : WritablePartitionedIterator = {
+ : WritablePartitionedIterator[K, V] = {
val it = partitionedDestructiveSortedIterator(keyComparator)
- new WritablePartitionedIterator {
- private[this] var cur = if (it.hasNext) it.next() else null
-
- def writeNext(writer: PairsWriter): Unit = {
- writer.write(cur._1._2, cur._2)
- cur = if (it.hasNext) it.next() else null
- }
-
- def hasNext(): Boolean = cur != null
-
- def nextPartition(): Int = cur._1._1
- }
+ new WritablePartitionedIterator[K, V](it)
}
}
@@ -87,10 +76,15 @@ private[spark] object WritablePartitionedPairCollection {
* Iterator that writes elements to a DiskBlockObjectWriter instead of returning them. Each element
* has an associated partition.
*/
-private[spark] trait WritablePartitionedIterator {
- def writeNext(writer: PairsWriter): Unit
+private[spark] class WritablePartitionedIterator[K, V](it: Iterator[((Int, K), V)]) {
+ private[this] var cur = if (it.hasNext) it.next() else null
+
+ def writeNext(writer: PairsWriter): Unit = {
+ writer.write(cur._1._2, cur._2)
+ cur = if (it.hasNext) it.next() else null
+ }
- def hasNext(): Boolean
+ def hasNext: Boolean = cur != null
- def nextPartition(): Int
+ def nextPartition(): Int = cur._1._1
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index dc11f33..7c168c1 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -55,7 +55,10 @@ object MimaExcludes {
ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getBinary"),
ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getArray"),
ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getMap"),
- ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getChild")
+ ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.vectorized.ColumnVector.getChild"),
+
+ // [SPARK-35135][CORE] Turn WritablePartitionedIterator from trait into a default implementation class
+ ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.WritablePartitionedIterator")
)
// Exclude rules for 3.1.x
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org