You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2021/09/21 19:45:53 UTC

[incubator-sedona] branch master updated: [SEDONA-64] Broadcast dedupParams to improve performance (#545)

This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 4761ac0  [SEDONA-64] Broadcast dedupParams to improve performance (#545)
4761ac0 is described below

commit 4761ac02b3a1b8f6752ecfb0b3dfde5f9e8f9af8
Author: Martin Andersson <u....@gmail.com>
AuthorDate: Tue Sep 21 21:44:57 2021 +0200

    [SEDONA-64] Broadcast dedupParams to improve performance (#545)
---
 .../sedona/core/joinJudgement/JudgementBase.java   | 30 +++++++++++++++++++---
 .../sedona/core/spatialOperator/JoinQuery.java     |  5 ++++
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java b/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
index 0b73bfd..582766f 100644
--- a/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
+++ b/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
@@ -22,7 +22,10 @@ package org.apache.sedona.core.joinJudgement;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.sedona.core.utils.HalfOpenRectangle;
+import org.apache.spark.SparkContext;
 import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
 import org.locationtech.jts.geom.Coordinate;
 import org.locationtech.jts.geom.Envelope;
 import org.locationtech.jts.geom.Geometry;
@@ -33,6 +36,7 @@ import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.function.Supplier;
 
 /**
  * Base class for partition level join implementations.
@@ -57,10 +61,14 @@ import java.util.List;
 abstract class JudgementBase
         implements Serializable
 {
+    private interface SerializableSupplier<T> extends Serializable, Supplier<T> {}
+
     private static final Logger log = LogManager.getLogger(JudgementBase.class);
 
     private final boolean considerBoundaryIntersection;
-    private final DedupParams dedupParams;
+    // Supplier will return a broadcasted reference if broadcastDedupParams() is called,
+    // otherwise a local reference is returned.
+    private SerializableSupplier<DedupParams> dedupParams;
 
     transient private HalfOpenRectangle extent;
 
@@ -71,7 +79,23 @@ abstract class JudgementBase
     protected JudgementBase(boolean considerBoundaryIntersection, @Nullable DedupParams dedupParams)
     {
         this.considerBoundaryIntersection = considerBoundaryIntersection;
-        this.dedupParams = dedupParams;
+        this.dedupParams = dedupParams == null ? null : () -> dedupParams;
+    }
+
+    /**
+     * Broadcasts <code>dedupParams</code> and replaces the local reference with
+     * a reference to the broadcasted variable.
+     *
+     * Broadcasted variables are deserialized once per executor instead of once per task.
+     * Broadcasting can reduce execution time significantly for jobs with a large number of partitions.
+     *
+     * @param cxt
+     */
+    public void broadcastDedupParams(SparkContext cxt) {
+        if (dedupParams != null) {
+            Broadcast<DedupParams> broadcast = new JavaSparkContext(cxt).broadcast(dedupParams.get());
+            dedupParams = () -> broadcast.value();
+        }
     }
 
     /**
@@ -89,7 +113,7 @@ abstract class JudgementBase
 
         final int partitionId = TaskContext.getPartitionId();
 
-        final List<Envelope> partitionExtents = dedupParams.getPartitionExtents();
+        final List<Envelope> partitionExtents = dedupParams.get().getPartitionExtents();
         if (partitionId < partitionExtents.size()) {
             extent = new HalfOpenRectangle(partitionExtents.get(partitionId));
         }
diff --git a/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java b/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java
index 4cd70be..78aa017 100644
--- a/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java
+++ b/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java
@@ -373,17 +373,20 @@ public class JoinQuery
         final SpatialPartitioner partitioner =
                 (SpatialPartitioner) rightRDD.spatialPartitionedRDD.partitioner().get();
         final DedupParams dedupParams = partitioner.getDedupParams();
+        final SparkContext cxt = leftRDD.rawSpatialRDD.context();
 
         final JavaRDD<Pair<U, T>> joinResult;
         if (joinParams.useIndex) {
             if (rightRDD.indexedRDD != null) {
                 final RightIndexLookupJudgement judgement =
                         new RightIndexLookupJudgement(joinParams.considerBoundaryIntersection, dedupParams);
+                judgement.broadcastDedupParams(cxt);
                 joinResult = leftRDD.spatialPartitionedRDD.zipPartitions(rightRDD.indexedRDD, judgement);
             }
             else if (leftRDD.indexedRDD != null) {
                 final LeftIndexLookupJudgement judgement =
                         new LeftIndexLookupJudgement(joinParams.considerBoundaryIntersection, dedupParams);
+                judgement.broadcastDedupParams(cxt);
                 joinResult = leftRDD.indexedRDD.zipPartitions(rightRDD.spatialPartitionedRDD, judgement);
             }
             else {
@@ -395,11 +398,13 @@ public class JoinQuery
                                 joinParams.joinBuildSide,
                                 dedupParams,
                                 buildCount, streamCount, resultCount, candidateCount);
+                judgement.broadcastDedupParams(cxt);
                 joinResult = leftRDD.spatialPartitionedRDD.zipPartitions(rightRDD.spatialPartitionedRDD, judgement);
             }
         }
         else {
             NestedLoopJudgement judgement = new NestedLoopJudgement(joinParams.considerBoundaryIntersection, dedupParams);
+            judgement.broadcastDedupParams(cxt);
             joinResult = rightRDD.spatialPartitionedRDD.zipPartitions(leftRDD.spatialPartitionedRDD, judgement);
         }