You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2020/12/19 07:45:15 UTC

[incubator-sedona] branch master updated: [SEDONA-8] Solve the lock contention in ST_Transform (#497)

This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d0dcf9  [SEDONA-8] Solve the lock contention in ST_Transform (#497)
6d0dcf9 is described below

commit 6d0dcf995021c6327c71e943dfe384de4557a278
Author: Jia Yu <ji...@apache.org>
AuthorDate: Fri Dec 18 23:45:05 2020 -0800

    [SEDONA-8] Solve the lock contention in ST_Transform (#497)
    
    * Update GitHub action
    
    * Cache Maven package
    
    * Separate maven and python test
    
    * Fix ST_Transform lock contention and add ST_FlipCoordinates to remedy the missing function in new ST_Transform
    
    * Fix test cases
---
 .../apache/sedona/core/spatialRDD/SpatialRDD.java  |  8 ++++
 .../org/apache/sedona/core/utils/GeomUtils.java    | 28 ++++++++++++
 docs/api/sql/GeoSparkSQL-Function.md               | 25 +++++++++--
 python/tests/sql/test_function.py                  |  4 +-
 .../scala/org/apache/sedona/sql/UDF/Catalog.scala  |  3 +-
 .../sql/sedona_sql/expressions/Functions.scala     | 50 ++++++++++++----------
 .../org/apache/sedona/sql/functionTestScala.scala  | 20 +++++----
 .../sedona/viz/sql/standardVizOperatorTest.scala   |  2 +-
 8 files changed, 102 insertions(+), 38 deletions(-)

diff --git a/core/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java b/core/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
index dd93b5c..ca7fdee 100644
--- a/core/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
+++ b/core/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
@@ -33,6 +33,7 @@ import org.apache.sedona.core.spatialPartitioning.quadtree.QuadTreePartitioner;
 import org.apache.sedona.core.spatialPartitioning.quadtree.StandardQuadTree;
 import org.apache.sedona.core.spatialRddTool.IndexBuilder;
 import org.apache.sedona.core.spatialRddTool.StatCalculator;
+import org.apache.sedona.core.utils.GeomUtils;
 import org.apache.sedona.core.utils.RDDSampleUtils;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.FlatMapFunction;
@@ -661,4 +662,11 @@ public class SpatialRDD<T extends Geometry>
     {
         return targetEpgsgCode;
     }
+
+    public void flipCoordinates() {
+        this.rawSpatialRDD = this.rawSpatialRDD.map(f -> {
+            GeomUtils.flipCoordinates(f);
+            return f;
+        });
+    }
 }
diff --git a/core/src/main/java/org/apache/sedona/core/utils/GeomUtils.java b/core/src/main/java/org/apache/sedona/core/utils/GeomUtils.java
index 30c22ee..dfdc8cb 100644
--- a/core/src/main/java/org/apache/sedona/core/utils/GeomUtils.java
+++ b/core/src/main/java/org/apache/sedona/core/utils/GeomUtils.java
@@ -13,6 +13,8 @@
  */
 package org.apache.sedona.core.utils;
 
+import org.locationtech.jts.geom.CoordinateSequence;
+import org.locationtech.jts.geom.CoordinateSequenceFilter;
 import org.locationtech.jts.geom.Geometry;
 
 import java.util.Objects;
@@ -43,4 +45,30 @@ public class GeomUtils
         if (Objects.equals(geom1.getUserData(), g.getUserData())) return geom1.equalsExact(g);
         else return false;
     }
+
+    /**
+     * Swaps the XY coordinates of a geometry.
+     */
+    public static void flipCoordinates(Geometry g) {
+        g.apply(new CoordinateSequenceFilter() {
+
+            @Override
+            public void filter(CoordinateSequence seq, int i) {
+                double oldX = seq.getCoordinate(i).x;
+                double oldY = seq.getCoordinateCopy(i).y;
+                seq.getCoordinate(i).setX(oldY);
+                seq.getCoordinate(i).setY(oldX);
+            }
+
+            @Override
+            public boolean isGeometryChanged() {
+                return true;
+            }
+
+            @Override
+            public boolean isDone() {
+                return false;
+            }
+        });
+    }
 }
diff --git a/docs/api/sql/GeoSparkSQL-Function.md b/docs/api/sql/GeoSparkSQL-Function.md
index 0c43743..4a10ca9 100644
--- a/docs/api/sql/GeoSparkSQL-Function.md
+++ b/docs/api/sql/GeoSparkSQL-Function.md
@@ -92,12 +92,12 @@ Introduction:
 Transform the Spatial Reference System / Coordinate Reference System of A, from SourceCRS to TargetCRS
 
 !!!note
-	By default, ==ST_Transform== assumes Longitude/Latitude is your coordinate X/Y. If this is not the case, set UseLongitudeLatitudeOrder as "false".
+	By default, ==ST_Transform== assumes Longitude/Latitude is your coordinate X/Y. If this is not the case, set ==ST_FlipCoordinates== to swap X and Y.
 
 !!!note
 	If ==ST_Transform== throws an Exception called "Bursa wolf parameters required", you need to disable the error notification in ST_Transform. You can append a boolean value at the end.
 
-Format: `ST_Transform (A:geometry, SourceCRS:string, TargetCRS:string, [Optional] UseLongitudeLatitudeOrder:Boolean, [Optional] DisableError)`
+Format: `ST_Transform (A:geometry, SourceCRS:string, TargetCRS:string ,[Optional] DisableError)`
 
 Since: `v1.0.0`
 
@@ -109,7 +109,7 @@ FROM polygondf
 
 Spark SQL example (with optional parameters):
 ```SQL
-SELECT ST_Transform(polygondf.countyshape, 'epsg:4326','epsg:3857',true, false)
+SELECT ST_Transform(polygondf.countyshape, 'epsg:4326','epsg:3857', false)
 FROM polygondf
 ```
 
@@ -565,3 +565,22 @@ Since: `v1.0.0`
 SELECT ST_NumGeometries(df.geometry)
 FROM df
 ```
+
+
+## ST_FlipCoordinates
+
+Introduction: Returns a version of the given geometry with X and Y axis flipped.
+
+Format: `ST_FlipCoordinates(A:geometry)`
+
+Since: `v1.0.0`
+
+Spark SQL example:
+```SQL
+SELECT ST_FlipCoordinates(df.geometry)
+FROM df
+```
+
+Input: `POINT (1 2)`
+
+Output: `POINT (2 1)`
\ No newline at end of file
diff --git a/python/tests/sql/test_function.py b/python/tests/sql/test_function.py
index 75be2c4..1eb485b 100644
--- a/python/tests/sql/test_function.py
+++ b/python/tests/sql/test_function.py
@@ -167,10 +167,10 @@ class TestPredicateJoin(TestBase):
         polygon_df.createOrReplaceTempView("polygondf")
         polygon_df.show()
         try:
-            function_df = self.spark.sql("select ST_Transform(polygondf.countyshape, 'epsg:4326','epsg:3857',true, false) from polygondf")
+            function_df = self.spark.sql("select ST_Transform(polygondf.countyshape, 'epsg:4326','epsg:3857', false) from polygondf")
             function_df.show()
         except Exception:
-            function_df = self.spark.sql("select ST_Transform(polygondf.countyshape, 'epsg:4326','epsg:3857',true, false) from polygondf")
+            function_df = self.spark.sql("select ST_Transform(polygondf.countyshape, 'epsg:4326','epsg:3857', false) from polygondf")
             function_df.show()
 
     def test_st_intersection_intersects_but_not_contains(self):
diff --git a/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala b/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
index a971083..696a4ef 100644
--- a/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
+++ b/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
@@ -76,7 +76,8 @@ object Catalog {
     ST_NumInteriorRings,
     ST_AddPoint,
     ST_RemovePoint,
-    ST_IsRing
+    ST_IsRing,
+    ST_FlipCoordinates
   )
 
   val aggregateExpressions: Seq[Aggregator[Geometry, Geometry, Geometry]] = Seq(
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
index 83af40a..2071eed 100644
--- a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
@@ -21,6 +21,7 @@ package org.apache.spark.sql.sedona_sql.expressions
 import java.util
 
 import org.apache.sedona.core.geometryObjects.{Circle, GeoJSONWriterNew}
+import org.apache.sedona.core.utils.GeomUtils
 import org.apache.sedona.sql.utils.GeometrySerializer
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
@@ -239,41 +240,23 @@ case class ST_Transform(inputExpressions: Seq[Expression])
   override def nullable: Boolean = false
 
   override def eval(input: InternalRow): Any = {
-    assert(inputExpressions.length >= 3 && inputExpressions.length <= 5)
-
-    val hints = if (inputExpressions.length >= 4) {
-      new Hints(Hints.FORCE_LONGITUDE_FIRST_AXIS_ORDER, inputExpressions(3).eval(input).asInstanceOf[Boolean])
-    }
-    else {
-      new Hints(Hints.FORCE_LONGITUDE_FIRST_AXIS_ORDER, true)
-    }
-
-    val originalTargetCode = inputExpressions(2).eval(input).asInstanceOf[UTF8String].toString
-    val originalSourceCode = inputExpressions(1).eval(input).asInstanceOf[UTF8String].toString
-    val targetCode = originalTargetCode.split(":")(1).toInt
+    assert(inputExpressions.length >= 3 && inputExpressions.length <= 4)
 
     val originalGeometry = GeometrySerializer.deserialize(inputExpressions(0).eval(input).asInstanceOf[ArrayData])
-    val sourceCRScode = getCRSFromCodeString(originalSourceCode, hints)
-    val targetCRScode = getCRSFromCodeString(originalTargetCode, hints)
+    val sourceCRScode = CRS.decode(inputExpressions(1).eval(input).asInstanceOf[UTF8String].toString)
+    val targetCRScode = CRS.decode(inputExpressions(2).eval(input).asInstanceOf[UTF8String].toString)
 
     var transform: MathTransform = null
-    if (inputExpressions.length == 5) {
-      transform = CRS.findMathTransform(sourceCRScode, targetCRScode, inputExpressions(4).eval(input).asInstanceOf[Boolean])
+    if (inputExpressions.length == 4) {
+      transform = CRS.findMathTransform(sourceCRScode, targetCRScode, inputExpressions(3).eval(input).asInstanceOf[Boolean])
     }
     else {
       transform = CRS.findMathTransform(sourceCRScode, targetCRScode, false)
     }
     val geom = JTS.transform(originalGeometry, transform)
-    geom.setSRID(targetCode)
     new GenericArrayData(GeometrySerializer.serialize(geom))
   }
 
-  private def getCRSFromCodeString(codeString: String, hints: Hints): CoordinateReferenceSystem = {
-    val targetAuthority = codeString.split(":")(0)
-    val targetFactory = ReferencingFactoryFinder.getCRSAuthorityFactory(targetAuthority, hints)
-    targetFactory.createCoordinateReferenceSystem(codeString)
-  }
-
   override def dataType: DataType = GeometryUDT
 
   override def children: Seq[Expression] = inputExpressions
@@ -966,3 +949,24 @@ case class ST_NumGeometries(inputExpressions: Seq[Expression])
 
   override def children: Seq[Expression] = inputExpressions
 }
+
+/**
+  * Returns a version of the given geometry with X and Y axis flipped.
+  *
+  * @param inputExpressions Geometry
+  */
+case class ST_FlipCoordinates(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback {
+  override def nullable: Boolean = false
+
+  override def eval(input: InternalRow): Any = {
+    assert(inputExpressions.length == 1)
+    val geometry = GeometrySerializer.deserialize(inputExpressions(0).eval(input).asInstanceOf[ArrayData])
+    GeomUtils.flipCoordinates(geometry)
+    geometry.toGenericArrayData
+  }
+
+  override def dataType: DataType = GeometryUDT
+
+  override def children: Seq[Expression] = inputExpressions
+}
\ No newline at end of file
diff --git a/sql/src/test/scala/org/apache/sedona/sql/functionTestScala.scala b/sql/src/test/scala/org/apache/sedona/sql/functionTestScala.scala
index bfabb5d..47018ad 100644
--- a/sql/src/test/scala/org/apache/sedona/sql/functionTestScala.scala
+++ b/sql/src/test/scala/org/apache/sedona/sql/functionTestScala.scala
@@ -100,8 +100,6 @@ class functionTestScala extends TestBaseScala with Matchers with GeometrySample
       polygonWktDf.createOrReplaceTempView("polygontable")
       var polygonDf = sparkSession.sql("select ST_GeomFromWKT(polygontable._c0) as countyshape from polygontable")
       polygonDf.createOrReplaceTempView("polygondf")
-      var functionDf = sparkSession.sql("select ST_Transform(polygondf.countyshape, 'epsg:4326','epsg:3857',true, false) from polygondf")
-
       val polygon = "POLYGON ((110.54671 55.818002, 110.54671 55.143743, 110.940494 55.143743, 110.940494 55.818002, 110.54671 55.818002))"
       val forceXYExpect = "POLYGON ((471596.69167460164 6185916.951191288, 471107.5623640998 6110880.974228167, 496207.109151055 6110788.804712435, 496271.31937046186 6185825.60569904, 471596.69167460164 6185916.951191288))"
 
@@ -109,13 +107,8 @@ class functionTestScala extends TestBaseScala with Matchers with GeometrySample
         .withColumn("geom", expr("ST_GeomFromWKT(value)"))
         .createOrReplaceTempView("df")
 
-      sparkSession.sql("select ST_Transform(geom, 'EPSG:4326', 'EPSG:32649', false, false)  from df")
-
-      sparkSession.sql("select ST_Transform(geom, 'EPSG:4326', 'EPSG:32649', true, false)  from df")
-
-      val forceXYResult = sparkSession.sql(s"""select ST_Transform(ST_geomFromWKT('$polygon'),'EPSG:4326', 'EPSG:32649', true, false)""").rdd.map(row => row.getAs[Geometry](0).toString).collect()(0)
+      val forceXYResult = sparkSession.sql(s"""select ST_Transform(ST_FlipCoordinates(ST_geomFromWKT('$polygon')),'EPSG:4326', 'EPSG:32649', false)""").rdd.map(row => row.getAs[Geometry](0).toString).collect()(0)
       assert(forceXYResult == forceXYExpect)
-
     }
 
     it("Passed ST_Intersection - intersects but not contains") {
@@ -846,4 +839,15 @@ class functionTestScala extends TestBaseScala with Matchers with GeometrySample
         "GEOMETRYCOLLECTION EMPTY")
   }
 
+  it("Should pass ST_FlipCoordinates") {
+    val pointDF = createSamplePointDf(5, "geom")
+    val oldX = pointDF.take(1)(0).get(0).asInstanceOf[Geometry].getCoordinate.x
+    val oldY = pointDF.take(1)(0).get(0).asInstanceOf[Geometry].getCoordinate.y
+    val newDf = pointDF.withColumn("geom", callUDF("ST_FlipCoordinates", col("geom")))
+    val newX = newDf.take(1)(0).get(0).asInstanceOf[Geometry].getCoordinate.x
+    val newY = newDf.take(1)(0).get(0).asInstanceOf[Geometry].getCoordinate.y
+    assert(newX == oldY)
+    assert(newY == oldX)
+  }
+
 }
diff --git a/viz/src/test/scala/org/apache/sedona/viz/sql/standardVizOperatorTest.scala b/viz/src/test/scala/org/apache/sedona/viz/sql/standardVizOperatorTest.scala
index 8b416ed..304525c 100644
--- a/viz/src/test/scala/org/apache/sedona/viz/sql/standardVizOperatorTest.scala
+++ b/viz/src/test/scala/org/apache/sedona/viz/sql/standardVizOperatorTest.scala
@@ -98,7 +98,7 @@ class standardVizOperatorTest extends TestBaseScala {
         """
           |CREATE OR REPLACE TEMP VIEW pixels AS
           |SELECT pixel, shape FROM pointtable
-          |LATERAL VIEW ST_Pixelize(ST_Transform(shape, 'epsg:4326','epsg:3857'), 256, 256, (SELECT ST_Transform(bound, 'epsg:4326','epsg:3857') FROM boundtable)) AS pixel
+          |LATERAL VIEW ST_Pixelize(ST_Transform(ST_FlipCoordinates(shape), 'epsg:4326','epsg:3857'), 256, 256, (SELECT ST_Transform(ST_FlipCoordinates(bound), 'epsg:4326','epsg:3857') FROM boundtable)) AS pixel
         """.stripMargin)
       spark.sql(
         """