You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by be...@apache.org on 2023/10/14 03:23:07 UTC
[spark] branch master updated: [SPARK-45513][CORE][SQL][MLLIB][CONNECT] Replace `scala.runtime.Tuple2Zipped` to `scala.collection.LazyZip2`
This is an automated email from the ASF dual-hosted git repository.
beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6f46ea2f9bb [SPARK-45513][CORE][SQL][MLLIB][CONNECT] Replace `scala.runtime.Tuple2Zipped` to `scala.collection.LazyZip2`
6f46ea2f9bb is described below
commit 6f46ea2f9bbad71077f9f2dbf72fa4e6906ef29a
Author: Jiaan Geng <be...@163.com>
AuthorDate: Sat Oct 14 11:22:40 2023 +0800
[SPARK-45513][CORE][SQL][MLLIB][CONNECT] Replace `scala.runtime.Tuple2Zipped` to `scala.collection.LazyZip2`
### What changes were proposed in this pull request?
Since scala 2.13.0, `scala.runtime.Tuple2Zipped` marked as deprecated and `scala.collection.LazyZip2` recommended.
### Why are the changes needed?
Replace `scala.runtime.Tuple2Zipped` to `scala.collection.LazyZip2`
### Does this PR introduce _any_ user-facing change?
'No'.
### How was this patch tested?
Exist test cases.
### Was this patch authored or co-authored using generative AI tooling?
'No'.
Closes #43351 from beliefer/SPARK-45513.
Authored-by: Jiaan Geng <be...@163.com>
Signed-off-by: Jiaan Geng <be...@163.com>
---
.../org/apache/spark/sql/test/SQLHelper.scala | 2 +-
.../scheduler/EventLoggingListenerSuite.scala | 2 +-
.../spark/shuffle/ShuffleBlockPusherSuite.scala | 6 +++---
.../mllib/feature/ElementwiseProductSuite.scala | 2 +-
.../spark/mllib/feature/NormalizerSuite.scala | 6 +++---
.../spark/mllib/feature/StandardScalerSuite.scala | 24 +++++++++++-----------
.../spark/mllib/optimization/LBFGSSuite.scala | 2 +-
.../catalyst/optimizer/InjectRuntimeFilter.scala | 2 +-
.../spark/sql/catalyst/plans/SQLHelper.scala | 2 +-
.../columnar/compression/IntegralDeltaSuite.scala | 4 ++--
.../datasources/parquet/ParquetSchemaSuite.scala | 2 +-
.../sql/execution/joins/HashedRelationSuite.scala | 24 ++++++++++++++++------
12 files changed, 45 insertions(+), 33 deletions(-)
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala
index 12212492e37..727e2a4f420 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala
@@ -41,7 +41,7 @@ trait SQLHelper {
None
}
}
- (keys, values).zipped.foreach { (k, v) =>
+ keys.lazyZip(values).foreach { (k, v) =>
if (spark.conf.isModifiable(k)) {
spark.conf.set(k, v)
} else {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index edc54e60654..bd659363e53 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -385,7 +385,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
8000L, 5000L, 7000L, 4000L, 6000L, 3000L, 10L, 90L, 2L, 20L, 110L)
def max(a: Array[Long], b: Array[Long]): Array[Long] =
- (a, b).zipped.map(Math.max).toArray
+ a.lazyZip(b).map(Math.max).toArray
// calculated metric peaks per stage per executor
// metrics sent during stage 0 for each executor
diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
index c8d89625dd8..18c27ff1269 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
@@ -82,7 +82,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite {
pushedBlocks ++= blocks
val managedBuffers = invocation.getArguments()(3).asInstanceOf[Array[ManagedBuffer]]
val blockPushListener = invocation.getArguments()(4).asInstanceOf[BlockPushingListener]
- (blocks, managedBuffers).zipped.foreach((blockId, buffer) => {
+ blocks.lazyZip(managedBuffers).foreach((blockId, buffer) => {
blockPushListener.onBlockPushSuccess(blockId, buffer)
})
})
@@ -91,7 +91,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite {
private def verifyPushRequests(
pushRequests: Seq[PushRequest],
expectedSizes: Seq[Int]): Unit = {
- (pushRequests, expectedSizes).zipped.foreach((req, size) => {
+ pushRequests.lazyZip(expectedSizes).foreach((req, size) => {
assert(req.size == size)
})
}
@@ -256,7 +256,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite {
// blocks to be deferred
blockPushListener.onBlockPushSuccess(blocks(0), managedBuffers(0))
} else {
- (blocks, managedBuffers).zipped.foreach((blockId, buffer) => {
+ blocks.lazyZip(managedBuffers).foreach((blockId, buffer) => {
blockPushListener.onBlockPushSuccess(blockId, buffer)
})
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/ElementwiseProductSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/ElementwiseProductSuite.scala
index ccbf8a91cdd..9eca2d682d6 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/ElementwiseProductSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/ElementwiseProductSuite.scala
@@ -54,7 +54,7 @@ class ElementwiseProductSuite extends SparkFunSuite with MLlibTestSparkContext {
case _ => false
}, "The vector type should be preserved after hadamard product")
- assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+ assert(data2.lazyZip(data2RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(data2(0) ~== Vectors.sparse(3, Seq((1, 0.0), (2, -1.5))) absTol 1E-5)
}
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/NormalizerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/NormalizerSuite.scala
index 10f7bafd6cf..71ce26360b8 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/NormalizerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/NormalizerSuite.scala
@@ -49,7 +49,7 @@ class NormalizerSuite extends SparkFunSuite with MLlibTestSparkContext {
case _ => false
}, "The vector type should be preserved after normalization.")
- assert((data1, data1RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+ assert(data1.lazyZip(data1RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(brzNorm(data1(0).asBreeze, 1) ~== 1.0 absTol 1E-5)
assert(brzNorm(data1(2).asBreeze, 1) ~== 1.0 absTol 1E-5)
@@ -76,7 +76,7 @@ class NormalizerSuite extends SparkFunSuite with MLlibTestSparkContext {
case _ => false
}, "The vector type should be preserved after normalization.")
- assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+ assert(data2.lazyZip(data2RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(brzNorm(data2(0).asBreeze, 2) ~== 1.0 absTol 1E-5)
assert(brzNorm(data2(2).asBreeze, 2) ~== 1.0 absTol 1E-5)
@@ -103,7 +103,7 @@ class NormalizerSuite extends SparkFunSuite with MLlibTestSparkContext {
case _ => false
}, "The vector type should be preserved after normalization.")
- assert((dataInf, dataInfRDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+ assert(dataInf.lazyZip(dataInfRDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(dataInf(0).toArray.map(math.abs).max ~== 1.0 absTol 1E-5)
assert(dataInf(2).toArray.map(math.abs).max ~== 1.0 absTol 1E-5)
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala
index a5769631e51..a2c72de4231 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala
@@ -105,9 +105,9 @@ class StandardScalerSuite extends SparkFunSuite with MLlibTestSparkContext {
case _ => false
}, "The vector type should be preserved after standardization.")
- assert((data1, data1RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
- assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
- assert((data3, data3RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+ assert(data1.lazyZip(data1RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+ assert(data2.lazyZip(data2RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+ assert(data3.lazyZip(data3RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(summary1.mean ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5)
assert(summary1.variance ~== Vectors.dense(1.0, 1.0, 1.0) absTol 1E-5)
@@ -169,9 +169,9 @@ class StandardScalerSuite extends SparkFunSuite with MLlibTestSparkContext {
case _ => false
}, "The vector type should be preserved after standardization.")
- assert((data1, data1RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
- assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
- assert((data3, data3RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+ assert(data1.lazyZip(data1RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+ assert(data2.lazyZip(data2RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+ assert(data3.lazyZip(data3RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(summary1.mean ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5)
assert(summary1.variance ~== Vectors.dense(1.0, 1.0, 1.0) absTol 1E-5)
@@ -225,9 +225,9 @@ class StandardScalerSuite extends SparkFunSuite with MLlibTestSparkContext {
case _ => false
}, "The vector type should be preserved after standardization.")
- assert((data1, data1RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
- assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
- assert((data3, data3RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+ assert(data1.lazyZip(data1RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+ assert(data2.lazyZip(data2RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+ assert(data3.lazyZip(data3RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(summary1.mean ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5)
assert(summary1.variance ~== Vectors.dense(1.0, 1.0, 1.0) absTol 1E-5)
@@ -274,9 +274,9 @@ class StandardScalerSuite extends SparkFunSuite with MLlibTestSparkContext {
case _ => false
}, "The vector type should be preserved after standardization.")
- assert((data1, data1RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
- assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
- assert((data3, data3RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+ assert(data1.lazyZip(data1RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+ assert(data2.lazyZip(data2RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+ assert(data3.lazyZip(data3RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5))
assert(summary1.mean ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5)
assert(summary1.variance ~== Vectors.dense(1.0, 1.0, 1.0) absTol 1E-5)
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala
index 27e21acc275..1318b23d28c 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala
@@ -71,7 +71,7 @@ class LBFGSSuite extends SparkFunSuite with MLlibTestSparkContext with Matchers
// Since the cost function is convex, the loss is guaranteed to be monotonically decreasing
// with L-BFGS optimizer.
// (SGD doesn't guarantee this, and the loss will be fluctuating in the optimization process.)
- assert((loss, loss.tail).zipped.forall(_ > _), "loss should be monotonically decreasing.")
+ assert(loss.lazyZip(loss.tail).forall(_ > _), "loss should be monotonically decreasing.")
val stepSize = 1.0
// Well, GD converges slower, so it requires more iterations!
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 13554908379..614ab4a1d01 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -311,7 +311,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
case join @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, left, right, hint) =>
var newLeft = left
var newRight = right
- (leftKeys, rightKeys).zipped.foreach((l, r) => {
+ leftKeys.lazyZip(rightKeys).foreach((l, r) => {
// Check if:
// 1. There is already a DPP filter on the key
// 2. There is already a runtime filter (Bloom filter or IN subquery) on the key
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SQLHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SQLHelper.scala
index 75fe61c4980..eb844e6f057 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SQLHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SQLHelper.scala
@@ -45,7 +45,7 @@ trait SQLHelper {
None
}
}
- (keys, values).zipped.foreach { (k, v) =>
+ keys.lazyZip(values).foreach { (k, v) =>
if (SQLConf.isStaticConfigKey(k)) {
throw new AnalysisException(s"Cannot modify the value of a static config: $k")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
index 655a9d7ff46..385c122e804 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
@@ -43,7 +43,7 @@ class IntegralDeltaSuite extends SparkFunSuite {
val deltas = if (input.isEmpty) {
Seq.empty[Long]
} else {
- (input.tail, input.init).zipped.map {
+ input.tail.lazyZip(input.init).map {
case (x: Int, y: Int) => (x - y).toLong
case (x: Long, y: Long) => x - y
case other => fail(s"Unexpected input $other")
@@ -80,7 +80,7 @@ class IntegralDeltaSuite extends SparkFunSuite {
assertResult(Byte.MinValue, "The first byte should be an escaping mark")(buffer.get())
assertResult(input.head, "The first value is wrong")(columnType.extract(buffer))
- (input.tail, deltas).zipped.foreach { (value, delta) =>
+ input.tail.lazyZip(deltas).foreach { (value, delta) =>
if (math.abs(delta) <= Byte.MaxValue) {
assertResult(delta, "Wrong delta")(buffer.get())
} else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index facc9b90ff7..ef06e64d2eb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -968,7 +968,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
val fromCaseClassString = StructType.fromString(caseClassString)
val fromJson = StructType.fromString(jsonString)
- (fromCaseClassString, fromJson).zipped.foreach { (a, b) =>
+ fromCaseClassString.lazyZip(fromJson).foreach { (a, b) =>
assert(a.name == b.name)
assert(a.dataType === b.dataType)
assert(a.nullable === b.nullable)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index 686fb2d838b..69b07e6b6b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -467,7 +467,9 @@ class HashedRelationSuite extends SharedSparkSession {
test("LongToUnsafeRowMap: key set iterator on a contiguous array of keys") {
val rowMap = new LongToUnsafeRowMap(mm, 1)
- (contiguousArray, contiguousRows).zipped.map { (i, row) => rowMap.append(i, row) }
+ contiguousArray.toArray.lazyZip(contiguousRows).map {
+ (i, row) => rowMap.append(i, row)
+ }
var keyIterator = rowMap.keys()
// in sparse mode the keys are unsorted
assert(keyIterator.map(key => key.getLong(0)).toArray.sortWith(_ < _) === contiguousArray)
@@ -479,7 +481,9 @@ class HashedRelationSuite extends SharedSparkSession {
test("LongToUnsafeRowMap: key set iterator on a sparse array with equidistant keys") {
val rowMap = new LongToUnsafeRowMap(mm, 1)
- (sparseArray, sparseRows).zipped.map { (i, row) => rowMap.append(i, row) }
+ sparseArray.toArray.lazyZip(sparseRows).map {
+ (i, row) => rowMap.append(i, row)
+ }
var keyIterator = rowMap.keys()
assert(keyIterator.map(_.getLong(0)).toArray.sortWith(_ < _) === sparseArray)
rowMap.optimize()
@@ -503,7 +507,9 @@ class HashedRelationSuite extends SharedSparkSession {
test("LongToUnsafeRowMap: multiple hasNext calls before calling next() on the key iterator") {
val rowMap = new LongToUnsafeRowMap(mm, 1)
- (randomArray, randomRows).zipped.map { (i, row) => rowMap.append(i, row) }
+ randomArray.toArray.lazyZip(randomRows).map {
+ (i, row) => rowMap.append(i, row)
+ }
val buffer = new ArrayBuffer[Long]()
// hasNext should not change the cursor unless the key was read by a next() call
var keyIterator = rowMap.keys()
@@ -527,7 +533,9 @@ class HashedRelationSuite extends SharedSparkSession {
test("LongToUnsafeRowMap: no explicit hasNext calls on the key iterator") {
val rowMap = new LongToUnsafeRowMap(mm, 1)
- (randomArray, randomRows).zipped.map { (i, row) => rowMap.append(i, row) }
+ randomArray.toArray.lazyZip(randomRows).map {
+ (i, row) => rowMap.append(i, row)
+ }
val buffer = new ArrayBuffer[Long]()
// call next() until the buffer is filled with all keys
var keyIterator = rowMap.keys()
@@ -555,7 +563,9 @@ class HashedRelationSuite extends SharedSparkSession {
test("LongToUnsafeRowMap: call hasNext at the end of the iterator") {
val rowMap = new LongToUnsafeRowMap(mm, 1)
- (sparseArray, sparseRows).zipped.map { (i, row) => rowMap.append(i, row) }
+ sparseArray.toArray.lazyZip(sparseRows).map {
+ (i, row) => rowMap.append(i, row)
+ }
var keyIterator = rowMap.keys()
assert(keyIterator.map(key => key.getLong(0)).toArray.sortWith(_ < _) === sparseArray)
assert(keyIterator.hasNext == false)
@@ -570,7 +580,9 @@ class HashedRelationSuite extends SharedSparkSession {
test("LongToUnsafeRowMap: random sequence of hasNext and next() calls on the key iterator") {
val rowMap = new LongToUnsafeRowMap(mm, 1)
- (randomArray, randomRows).zipped.map { (i, row) => rowMap.append(i, row) }
+ randomArray.toArray.lazyZip(randomRows).map {
+ (i, row) => rowMap.append(i, row)
+ }
val buffer = new ArrayBuffer[Long]()
// call hasNext or next() at random
var keyIterator = rowMap.keys()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org