You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2020/12/19 07:45:15 UTC
[incubator-sedona] branch master updated: [SEDONA-8] Solve the lock
contention in ST_Transform (#497)
This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 6d0dcf9 [SEDONA-8] Solve the lock contention in ST_Transform (#497)
6d0dcf9 is described below
commit 6d0dcf995021c6327c71e943dfe384de4557a278
Author: Jia Yu <ji...@apache.org>
AuthorDate: Fri Dec 18 23:45:05 2020 -0800
[SEDONA-8] Solve the lock contention in ST_Transform (#497)
* Update GitHub action
* Cache Maven package
* Separate maven and python test
* Fix ST_Transform lock contention and add ST_FlipCoordinates to remedy the missing function in new ST_Transform
* Fix test cases
---
.../apache/sedona/core/spatialRDD/SpatialRDD.java | 8 ++++
.../org/apache/sedona/core/utils/GeomUtils.java | 28 ++++++++++++
docs/api/sql/GeoSparkSQL-Function.md | 25 +++++++++--
python/tests/sql/test_function.py | 4 +-
.../scala/org/apache/sedona/sql/UDF/Catalog.scala | 3 +-
.../sql/sedona_sql/expressions/Functions.scala | 50 ++++++++++++----------
.../org/apache/sedona/sql/functionTestScala.scala | 20 +++++----
.../sedona/viz/sql/standardVizOperatorTest.scala | 2 +-
8 files changed, 102 insertions(+), 38 deletions(-)
diff --git a/core/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java b/core/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
index dd93b5c..ca7fdee 100644
--- a/core/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
+++ b/core/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
@@ -33,6 +33,7 @@ import org.apache.sedona.core.spatialPartitioning.quadtree.QuadTreePartitioner;
import org.apache.sedona.core.spatialPartitioning.quadtree.StandardQuadTree;
import org.apache.sedona.core.spatialRddTool.IndexBuilder;
import org.apache.sedona.core.spatialRddTool.StatCalculator;
+import org.apache.sedona.core.utils.GeomUtils;
import org.apache.sedona.core.utils.RDDSampleUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
@@ -661,4 +662,11 @@ public class SpatialRDD<T extends Geometry>
{
return targetEpgsgCode;
}
+
+ public void flipCoordinates() {
+ this.rawSpatialRDD = this.rawSpatialRDD.map(f -> {
+ GeomUtils.flipCoordinates(f);
+ return f;
+ });
+ }
}
diff --git a/core/src/main/java/org/apache/sedona/core/utils/GeomUtils.java b/core/src/main/java/org/apache/sedona/core/utils/GeomUtils.java
index 30c22ee..dfdc8cb 100644
--- a/core/src/main/java/org/apache/sedona/core/utils/GeomUtils.java
+++ b/core/src/main/java/org/apache/sedona/core/utils/GeomUtils.java
@@ -13,6 +13,8 @@
*/
package org.apache.sedona.core.utils;
+import org.locationtech.jts.geom.CoordinateSequence;
+import org.locationtech.jts.geom.CoordinateSequenceFilter;
import org.locationtech.jts.geom.Geometry;
import java.util.Objects;
@@ -43,4 +45,30 @@ public class GeomUtils
if (Objects.equals(geom1.getUserData(), g.getUserData())) return geom1.equalsExact(g);
else return false;
}
+
+ /**
+ * Swaps the XY coordinates of a geometry.
+ */
+ public static void flipCoordinates(Geometry g) {
+ g.apply(new CoordinateSequenceFilter() {
+
+ @Override
+ public void filter(CoordinateSequence seq, int i) {
+ double oldX = seq.getCoordinate(i).x;
+ double oldY = seq.getCoordinateCopy(i).y;
+ seq.getCoordinate(i).setX(oldY);
+ seq.getCoordinate(i).setY(oldX);
+ }
+
+ @Override
+ public boolean isGeometryChanged() {
+ return true;
+ }
+
+ @Override
+ public boolean isDone() {
+ return false;
+ }
+ });
+ }
}
diff --git a/docs/api/sql/GeoSparkSQL-Function.md b/docs/api/sql/GeoSparkSQL-Function.md
index 0c43743..4a10ca9 100644
--- a/docs/api/sql/GeoSparkSQL-Function.md
+++ b/docs/api/sql/GeoSparkSQL-Function.md
@@ -92,12 +92,12 @@ Introduction:
Transform the Spatial Reference System / Coordinate Reference System of A, from SourceCRS to TargetCRS
!!!note
- By default, ==ST_Transform== assumes Longitude/Latitude is your coordinate X/Y. If this is not the case, set UseLongitudeLatitudeOrder as "false".
+ By default, ==ST_Transform== assumes Longitude/Latitude is your coordinate X/Y. If this is not the case, set ==ST_FlipCoordinates== to swap X and Y.
!!!note
If ==ST_Transform== throws an Exception called "Bursa wolf parameters required", you need to disable the error notification in ST_Transform. You can append a boolean value at the end.
-Format: `ST_Transform (A:geometry, SourceCRS:string, TargetCRS:string, [Optional] UseLongitudeLatitudeOrder:Boolean, [Optional] DisableError)`
+Format: `ST_Transform (A:geometry, SourceCRS:string, TargetCRS:string ,[Optional] DisableError)`
Since: `v1.0.0`
@@ -109,7 +109,7 @@ FROM polygondf
Spark SQL example (with optional parameters):
```SQL
-SELECT ST_Transform(polygondf.countyshape, 'epsg:4326','epsg:3857',true, false)
+SELECT ST_Transform(polygondf.countyshape, 'epsg:4326','epsg:3857', false)
FROM polygondf
```
@@ -565,3 +565,22 @@ Since: `v1.0.0`
SELECT ST_NumGeometries(df.geometry)
FROM df
```
+
+
+## ST_FlipCoordinates
+
+Introduction: Returns a version of the given geometry with X and Y axis flipped.
+
+Format: `ST_FlipCoordinates(A:geometry)`
+
+Since: `v1.0.0`
+
+Spark SQL example:
+```SQL
+SELECT ST_FlipCoordinates(df.geometry)
+FROM df
+```
+
+Input: `POINT (1 2)`
+
+Output: `POINT (2 1)`
\ No newline at end of file
diff --git a/python/tests/sql/test_function.py b/python/tests/sql/test_function.py
index 75be2c4..1eb485b 100644
--- a/python/tests/sql/test_function.py
+++ b/python/tests/sql/test_function.py
@@ -167,10 +167,10 @@ class TestPredicateJoin(TestBase):
polygon_df.createOrReplaceTempView("polygondf")
polygon_df.show()
try:
- function_df = self.spark.sql("select ST_Transform(polygondf.countyshape, 'epsg:4326','epsg:3857',true, false) from polygondf")
+ function_df = self.spark.sql("select ST_Transform(polygondf.countyshape, 'epsg:4326','epsg:3857', false) from polygondf")
function_df.show()
except Exception:
- function_df = self.spark.sql("select ST_Transform(polygondf.countyshape, 'epsg:4326','epsg:3857',true, false) from polygondf")
+ function_df = self.spark.sql("select ST_Transform(polygondf.countyshape, 'epsg:4326','epsg:3857', false) from polygondf")
function_df.show()
def test_st_intersection_intersects_but_not_contains(self):
diff --git a/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala b/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
index a971083..696a4ef 100644
--- a/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
+++ b/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
@@ -76,7 +76,8 @@ object Catalog {
ST_NumInteriorRings,
ST_AddPoint,
ST_RemovePoint,
- ST_IsRing
+ ST_IsRing,
+ ST_FlipCoordinates
)
val aggregateExpressions: Seq[Aggregator[Geometry, Geometry, Geometry]] = Seq(
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
index 83af40a..2071eed 100644
--- a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
@@ -21,6 +21,7 @@ package org.apache.spark.sql.sedona_sql.expressions
import java.util
import org.apache.sedona.core.geometryObjects.{Circle, GeoJSONWriterNew}
+import org.apache.sedona.core.utils.GeomUtils
import org.apache.sedona.sql.utils.GeometrySerializer
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
@@ -239,41 +240,23 @@ case class ST_Transform(inputExpressions: Seq[Expression])
override def nullable: Boolean = false
override def eval(input: InternalRow): Any = {
- assert(inputExpressions.length >= 3 && inputExpressions.length <= 5)
-
- val hints = if (inputExpressions.length >= 4) {
- new Hints(Hints.FORCE_LONGITUDE_FIRST_AXIS_ORDER, inputExpressions(3).eval(input).asInstanceOf[Boolean])
- }
- else {
- new Hints(Hints.FORCE_LONGITUDE_FIRST_AXIS_ORDER, true)
- }
-
- val originalTargetCode = inputExpressions(2).eval(input).asInstanceOf[UTF8String].toString
- val originalSourceCode = inputExpressions(1).eval(input).asInstanceOf[UTF8String].toString
- val targetCode = originalTargetCode.split(":")(1).toInt
+ assert(inputExpressions.length >= 3 && inputExpressions.length <= 4)
val originalGeometry = GeometrySerializer.deserialize(inputExpressions(0).eval(input).asInstanceOf[ArrayData])
- val sourceCRScode = getCRSFromCodeString(originalSourceCode, hints)
- val targetCRScode = getCRSFromCodeString(originalTargetCode, hints)
+ val sourceCRScode = CRS.decode(inputExpressions(1).eval(input).asInstanceOf[UTF8String].toString)
+ val targetCRScode = CRS.decode(inputExpressions(2).eval(input).asInstanceOf[UTF8String].toString)
var transform: MathTransform = null
- if (inputExpressions.length == 5) {
- transform = CRS.findMathTransform(sourceCRScode, targetCRScode, inputExpressions(4).eval(input).asInstanceOf[Boolean])
+ if (inputExpressions.length == 4) {
+ transform = CRS.findMathTransform(sourceCRScode, targetCRScode, inputExpressions(3).eval(input).asInstanceOf[Boolean])
}
else {
transform = CRS.findMathTransform(sourceCRScode, targetCRScode, false)
}
val geom = JTS.transform(originalGeometry, transform)
- geom.setSRID(targetCode)
new GenericArrayData(GeometrySerializer.serialize(geom))
}
- private def getCRSFromCodeString(codeString: String, hints: Hints): CoordinateReferenceSystem = {
- val targetAuthority = codeString.split(":")(0)
- val targetFactory = ReferencingFactoryFinder.getCRSAuthorityFactory(targetAuthority, hints)
- targetFactory.createCoordinateReferenceSystem(codeString)
- }
-
override def dataType: DataType = GeometryUDT
override def children: Seq[Expression] = inputExpressions
@@ -966,3 +949,24 @@ case class ST_NumGeometries(inputExpressions: Seq[Expression])
override def children: Seq[Expression] = inputExpressions
}
+
+/**
+ * Returns a version of the given geometry with X and Y axis flipped.
+ *
+ * @param inputExpressions Geometry
+ */
+case class ST_FlipCoordinates(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback {
+ override def nullable: Boolean = false
+
+ override def eval(input: InternalRow): Any = {
+ assert(inputExpressions.length == 1)
+ val geometry = GeometrySerializer.deserialize(inputExpressions(0).eval(input).asInstanceOf[ArrayData])
+ GeomUtils.flipCoordinates(geometry)
+ geometry.toGenericArrayData
+ }
+
+ override def dataType: DataType = GeometryUDT
+
+ override def children: Seq[Expression] = inputExpressions
+}
\ No newline at end of file
diff --git a/sql/src/test/scala/org/apache/sedona/sql/functionTestScala.scala b/sql/src/test/scala/org/apache/sedona/sql/functionTestScala.scala
index bfabb5d..47018ad 100644
--- a/sql/src/test/scala/org/apache/sedona/sql/functionTestScala.scala
+++ b/sql/src/test/scala/org/apache/sedona/sql/functionTestScala.scala
@@ -100,8 +100,6 @@ class functionTestScala extends TestBaseScala with Matchers with GeometrySample
polygonWktDf.createOrReplaceTempView("polygontable")
var polygonDf = sparkSession.sql("select ST_GeomFromWKT(polygontable._c0) as countyshape from polygontable")
polygonDf.createOrReplaceTempView("polygondf")
- var functionDf = sparkSession.sql("select ST_Transform(polygondf.countyshape, 'epsg:4326','epsg:3857',true, false) from polygondf")
-
val polygon = "POLYGON ((110.54671 55.818002, 110.54671 55.143743, 110.940494 55.143743, 110.940494 55.818002, 110.54671 55.818002))"
val forceXYExpect = "POLYGON ((471596.69167460164 6185916.951191288, 471107.5623640998 6110880.974228167, 496207.109151055 6110788.804712435, 496271.31937046186 6185825.60569904, 471596.69167460164 6185916.951191288))"
@@ -109,13 +107,8 @@ class functionTestScala extends TestBaseScala with Matchers with GeometrySample
.withColumn("geom", expr("ST_GeomFromWKT(value)"))
.createOrReplaceTempView("df")
- sparkSession.sql("select ST_Transform(geom, 'EPSG:4326', 'EPSG:32649', false, false) from df")
-
- sparkSession.sql("select ST_Transform(geom, 'EPSG:4326', 'EPSG:32649', true, false) from df")
-
- val forceXYResult = sparkSession.sql(s"""select ST_Transform(ST_geomFromWKT('$polygon'),'EPSG:4326', 'EPSG:32649', true, false)""").rdd.map(row => row.getAs[Geometry](0).toString).collect()(0)
+ val forceXYResult = sparkSession.sql(s"""select ST_Transform(ST_FlipCoordinates(ST_geomFromWKT('$polygon')),'EPSG:4326', 'EPSG:32649', false)""").rdd.map(row => row.getAs[Geometry](0).toString).collect()(0)
assert(forceXYResult == forceXYExpect)
-
}
it("Passed ST_Intersection - intersects but not contains") {
@@ -846,4 +839,15 @@ class functionTestScala extends TestBaseScala with Matchers with GeometrySample
"GEOMETRYCOLLECTION EMPTY")
}
+ it("Should pass ST_FlipCoordinates") {
+ val pointDF = createSamplePointDf(5, "geom")
+ val oldX = pointDF.take(1)(0).get(0).asInstanceOf[Geometry].getCoordinate.x
+ val oldY = pointDF.take(1)(0).get(0).asInstanceOf[Geometry].getCoordinate.y
+ val newDf = pointDF.withColumn("geom", callUDF("ST_FlipCoordinates", col("geom")))
+ val newX = newDf.take(1)(0).get(0).asInstanceOf[Geometry].getCoordinate.x
+ val newY = newDf.take(1)(0).get(0).asInstanceOf[Geometry].getCoordinate.y
+ assert(newX == oldY)
+ assert(newY == oldX)
+ }
+
}
diff --git a/viz/src/test/scala/org/apache/sedona/viz/sql/standardVizOperatorTest.scala b/viz/src/test/scala/org/apache/sedona/viz/sql/standardVizOperatorTest.scala
index 8b416ed..304525c 100644
--- a/viz/src/test/scala/org/apache/sedona/viz/sql/standardVizOperatorTest.scala
+++ b/viz/src/test/scala/org/apache/sedona/viz/sql/standardVizOperatorTest.scala
@@ -98,7 +98,7 @@ class standardVizOperatorTest extends TestBaseScala {
"""
|CREATE OR REPLACE TEMP VIEW pixels AS
|SELECT pixel, shape FROM pointtable
- |LATERAL VIEW ST_Pixelize(ST_Transform(shape, 'epsg:4326','epsg:3857'), 256, 256, (SELECT ST_Transform(bound, 'epsg:4326','epsg:3857') FROM boundtable)) AS pixel
+ |LATERAL VIEW ST_Pixelize(ST_Transform(ST_FlipCoordinates(shape), 'epsg:4326','epsg:3857'), 256, 256, (SELECT ST_Transform(ST_FlipCoordinates(bound), 'epsg:4326','epsg:3857') FROM boundtable)) AS pixel
""".stripMargin)
spark.sql(
"""