You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2021/09/21 19:45:53 UTC
[incubator-sedona] branch master updated: [SEDONA-64] Broadcast
dedupParams to improve performance (#545)
This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 4761ac0 [SEDONA-64] Broadcast dedupParams to improve performance (#545)
4761ac0 is described below
commit 4761ac02b3a1b8f6752ecfb0b3dfde5f9e8f9af8
Author: Martin Andersson <u....@gmail.com>
AuthorDate: Tue Sep 21 21:44:57 2021 +0200
[SEDONA-64] Broadcast dedupParams to improve performance (#545)
---
.../sedona/core/joinJudgement/JudgementBase.java | 30 +++++++++++++++++++---
.../sedona/core/spatialOperator/JoinQuery.java | 5 ++++
2 files changed, 32 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java b/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
index 0b73bfd..582766f 100644
--- a/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
+++ b/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
@@ -22,7 +22,10 @@ package org.apache.sedona.core.joinJudgement;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.sedona.core.utils.HalfOpenRectangle;
+import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.Envelope;
import org.locationtech.jts.geom.Geometry;
@@ -33,6 +36,7 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.List;
+import java.util.function.Supplier;
/**
* Base class for partition level join implementations.
@@ -57,10 +61,14 @@ import java.util.List;
abstract class JudgementBase
implements Serializable
{
+ private interface SerializableSupplier<T> extends Serializable, Supplier<T> {}
+
private static final Logger log = LogManager.getLogger(JudgementBase.class);
private final boolean considerBoundaryIntersection;
- private final DedupParams dedupParams;
+ // Supplier will return a broadcasted reference if broadcastDedupParams() is called,
+ // otherwise a local reference is returned.
+ private SerializableSupplier<DedupParams> dedupParams;
transient private HalfOpenRectangle extent;
@@ -71,7 +79,23 @@ abstract class JudgementBase
protected JudgementBase(boolean considerBoundaryIntersection, @Nullable DedupParams dedupParams)
{
this.considerBoundaryIntersection = considerBoundaryIntersection;
- this.dedupParams = dedupParams;
+ this.dedupParams = dedupParams == null ? null : () -> dedupParams;
+ }
+
+ /**
+ * Broadcasts <code>dedupParams</code> and replaces the local reference with
+ * a reference to the broadcasted variable.
+ *
+ * Broadcasted variables are deserialized once per executor instead of once per task.
+ * Broadcasting can reduce execution time significantly for jobs with a large number of partitions.
+ *
+ * @param cxt
+ */
+ public void broadcastDedupParams(SparkContext cxt) {
+ if (dedupParams != null) {
+ Broadcast<DedupParams> broadcast = new JavaSparkContext(cxt).broadcast(dedupParams.get());
+ dedupParams = () -> broadcast.value();
+ }
}
/**
@@ -89,7 +113,7 @@ abstract class JudgementBase
final int partitionId = TaskContext.getPartitionId();
- final List<Envelope> partitionExtents = dedupParams.getPartitionExtents();
+ final List<Envelope> partitionExtents = dedupParams.get().getPartitionExtents();
if (partitionId < partitionExtents.size()) {
extent = new HalfOpenRectangle(partitionExtents.get(partitionId));
}
diff --git a/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java b/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java
index 4cd70be..78aa017 100644
--- a/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java
+++ b/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java
@@ -373,17 +373,20 @@ public class JoinQuery
final SpatialPartitioner partitioner =
(SpatialPartitioner) rightRDD.spatialPartitionedRDD.partitioner().get();
final DedupParams dedupParams = partitioner.getDedupParams();
+ final SparkContext cxt = leftRDD.rawSpatialRDD.context();
final JavaRDD<Pair<U, T>> joinResult;
if (joinParams.useIndex) {
if (rightRDD.indexedRDD != null) {
final RightIndexLookupJudgement judgement =
new RightIndexLookupJudgement(joinParams.considerBoundaryIntersection, dedupParams);
+ judgement.broadcastDedupParams(cxt);
joinResult = leftRDD.spatialPartitionedRDD.zipPartitions(rightRDD.indexedRDD, judgement);
}
else if (leftRDD.indexedRDD != null) {
final LeftIndexLookupJudgement judgement =
new LeftIndexLookupJudgement(joinParams.considerBoundaryIntersection, dedupParams);
+ judgement.broadcastDedupParams(cxt);
joinResult = leftRDD.indexedRDD.zipPartitions(rightRDD.spatialPartitionedRDD, judgement);
}
else {
@@ -395,11 +398,13 @@ public class JoinQuery
joinParams.joinBuildSide,
dedupParams,
buildCount, streamCount, resultCount, candidateCount);
+ judgement.broadcastDedupParams(cxt);
joinResult = leftRDD.spatialPartitionedRDD.zipPartitions(rightRDD.spatialPartitionedRDD, judgement);
}
}
else {
NestedLoopJudgement judgement = new NestedLoopJudgement(joinParams.considerBoundaryIntersection, dedupParams);
+ judgement.broadcastDedupParams(cxt);
joinResult = rightRDD.spatialPartitionedRDD.zipPartitions(leftRDD.spatialPartitionedRDD, judgement);
}