You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2022/08/24 19:14:37 UTC

[incubator-sedona] branch master updated: [SEDONA-154] Add measurement functions to the Flink API (#674)

This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 7623e7b9 [SEDONA-154] Add measurement functions to the Flink API (#674)
7623e7b9 is described below

commit 7623e7b9599e8da9394af8425d82e647429b1ae9
Author: Kengo Seki <se...@apache.org>
AuthorDate: Thu Aug 25 04:14:32 2022 +0900

    [SEDONA-154] Add measurement functions to the Flink API (#674)
    
    Co-authored-by: Jia Yu <ji...@apache.org>
---
 .../java/org/apache/sedona/common/Functions.java   | 22 ++++++++
 docs/api/flink/Function.md                         | 61 ++++++++++++++++++++++
 docs/api/sql/Function.md                           |  2 +-
 .../main/java/org/apache/sedona/flink/Catalog.java |  4 ++
 .../apache/sedona/flink/expressions/Functions.java | 48 ++++++++++++-----
 .../java/org/apache/sedona/flink/FunctionTest.java | 34 ++++++++++--
 .../sql/sedona_sql/expressions/Functions.scala     | 57 ++------------------
 7 files changed, 158 insertions(+), 70 deletions(-)

diff --git a/common/src/main/java/org/apache/sedona/common/Functions.java b/common/src/main/java/org/apache/sedona/common/Functions.java
index f51d7e79..7cb9bba7 100644
--- a/common/src/main/java/org/apache/sedona/common/Functions.java
+++ b/common/src/main/java/org/apache/sedona/common/Functions.java
@@ -28,6 +28,7 @@ import org.locationtech.jts.geom.MultiPoint;
 import org.locationtech.jts.geom.MultiPolygon;
 import org.locationtech.jts.geom.Point;
 import org.locationtech.jts.geom.Polygon;
+import org.locationtech.jts.operation.distance3d.Distance3DOp;
 import org.locationtech.jts.io.gml2.GMLWriter;
 import org.locationtech.jts.io.kml.KMLWriter;
 import org.locationtech.jts.operation.valid.IsSimpleOp;
@@ -40,6 +41,19 @@ import org.wololo.jts2geojson.GeoJSONWriter;
 
 
 public class Functions {
+    public static double area(Geometry geometry) {
+        return geometry.getArea();
+    }
+
+    public static double azimuth(Geometry left, Geometry right) {
+        Coordinate leftCoordinate = left.getCoordinate();
+        Coordinate rightCoordinate = right.getCoordinate();
+        double deltaX = rightCoordinate.x - leftCoordinate.x;
+        double deltaY = rightCoordinate.y - leftCoordinate.y;
+        double azimuth = Math.atan2(deltaX, deltaY);
+        return azimuth < 0 ? azimuth + (2 * Math.PI) : azimuth;
+    }
+
     public static Geometry buffer(Geometry geometry, double radius) {
         return geometry.buffer(radius);
     }
@@ -48,6 +62,14 @@ public class Functions {
         return left.distance(right);
     }
 
+    public static double distance3d(Geometry left, Geometry right) {
+        return new Distance3DOp(left, right).distance();
+    }
+
+    public static double length(Geometry geometry) {
+        return geometry.getLength();
+    }
+
     public static double xMin(Geometry geometry) {
         Coordinate[] points = geometry.getCoordinates();
         double min = Double.MAX_VALUE;
diff --git a/docs/api/flink/Function.md b/docs/api/flink/Function.md
index b575448d..059c4ecc 100644
--- a/docs/api/flink/Function.md
+++ b/docs/api/flink/Function.md
@@ -1,3 +1,33 @@
+## ST_3DDistance
+
+Introduction: Return the 3-dimensional minimum cartesian distance between A and B
+
+Format: `ST_3DDistance (A:geometry, B:geometry)`
+
+Since: `v1.3.0`
+
+Example:
+
+```SQL
+SELECT ST_3DDistance(polygondf.countyshape, polygondf.countyshape)
+FROM polygondf
+```
+
+## ST_Area
+
+Introduction: Return the area of A
+
+Format: `ST_Area (A:geometry)`
+
+Since: `v1.3.0`
+
+Example:
+
+```SQL
+SELECT ST_Area(polygondf.countyshape)
+FROM polygondf
+```
+
 ## ST_AsBinary
 
 Introduction: Return the Well-Known Binary representation of a geometry
@@ -106,6 +136,22 @@ SELECT ST_AsText(polygondf.countyshape)
 FROM polygondf
 ```
 
+## ST_Azimuth
+
+Introduction: Returns Azimuth for two given points in radians null otherwise.
+
+Format: `ST_Azimuth(pointA: Point, pointB: Point)`
+
+Since: `v1.3.0`
+
+Example:
+
+```SQL
+SELECT ST_Azimuth(ST_POINT(0.0, 25.0), ST_POINT(0.0, 0.0))
+```
+
+Output: `3.141592653589793`
+
 ## ST_Buffer
 
 Introduction: Returns a geometry/geography that represents all points whose distance from this Geometry/geography is less than or equal to distance.
@@ -310,6 +356,21 @@ SELECT ST_IsValid(polygondf.countyshape)
 FROM polygondf
 ```
 
+## ST_Length
+
+Introduction: Return the perimeter of A
+
+Format: ST_Length (A:geometry)
+
+Since: `v1.3.0`
+
+Example:
+
+```SQL
+SELECT ST_Length(polygondf.countyshape)
+FROM polygondf
+```
+
 ## ST_PointN
 
 Introduction: Return the Nth point in a single linestring or circular linestring in the geometry. Negative values are counted backwards from the end of the LineString, so that -1 is the last point. Returns NULL if there is no linestring in the geometry.
diff --git a/docs/api/sql/Function.md b/docs/api/sql/Function.md
index f60ca1e2..49dae112 100644
--- a/docs/api/sql/Function.md
+++ b/docs/api/sql/Function.md
@@ -165,7 +165,7 @@ Since: `v1.0.0`
 
 Spark SQL example:
 ```SQL
-SELECT ST_Azimuth(ST_POINT(0.0 25.0), ST_POINT(0.0 0.0))
+SELECT ST_Azimuth(ST_POINT(0.0, 25.0), ST_POINT(0.0, 0.0))
 ```
 
 Output: `3.141592653589793`
diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
index a2e365c9..4bf52347 100644
--- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java
+++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
@@ -32,10 +32,14 @@ public class Catalog {
                 new Constructors.ST_GeomFromWKB(),
                 new Constructors.ST_GeomFromGeoJSON(),
                 new Constructors.ST_GeomFromGeoHash(),
+                new Functions.ST_Area(),
+                new Functions.ST_Azimuth(),
                 new Constructors.ST_GeomFromGML(),
                 new Constructors.ST_GeomFromKML(),
                 new Functions.ST_Buffer(),
                 new Functions.ST_Distance(),
+                new Functions.ST_3DDistance(),
+                new Functions.ST_Length(),
                 new Functions.ST_Transform(),
                 new Functions.ST_FlipCoordinates(),
                 new Functions.ST_GeoHash(),
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
index ee0b4177..3c6c5cfe 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
@@ -15,23 +15,29 @@ package org.apache.sedona.flink.expressions;
 
 import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.sedona.common.utils.GeomUtils;
-import org.geotools.geometry.jts.JTS;
-import org.geotools.referencing.CRS;
-import org.locationtech.jts.io.WKTWriter;
-import org.locationtech.jts.geom.Coordinate;
 import org.locationtech.jts.geom.Geometry;
-import org.locationtech.jts.geom.LineString;
 import org.opengis.referencing.FactoryException;
-import org.opengis.referencing.crs.CoordinateReferenceSystem;
-import org.opengis.referencing.operation.MathTransform;
 import org.opengis.referencing.operation.TransformException;
-import org.wololo.jts2geojson.GeoJSONWriter;
-import scala.Option;
-
-import static org.locationtech.jts.geom.Coordinate.NULL_ORDINATE;
 
 public class Functions {
+    public static class ST_Area extends ScalarFunction {
+        @DataTypeHint("Double")
+        public Double eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o) {
+            Geometry geom = (Geometry) o;
+            return org.apache.sedona.common.Functions.area(geom);
+        }
+    }
+
+    public static class ST_Azimuth extends ScalarFunction {
+        @DataTypeHint("Double")
+        public Double eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1,
+                           @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2) {
+            Geometry geom1 = (Geometry) o1;
+            Geometry geom2 = (Geometry) o2;
+            return org.apache.sedona.common.Functions.azimuth(geom1, geom2);
+        }
+    }
+
     public static class ST_Buffer extends ScalarFunction {
         @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
         public Geometry eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
@@ -51,6 +57,24 @@ public class Functions {
         }
     }
 
+    public static class ST_3DDistance extends ScalarFunction {
+        @DataTypeHint("Double")
+        public Double eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1,
+                           @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2) {
+            Geometry geom1 = (Geometry) o1;
+            Geometry geom2 = (Geometry) o2;
+            return org.apache.sedona.common.Functions.distance3d(geom1, geom2);
+        }
+    }
+
+    public static class ST_Length extends ScalarFunction {
+        @DataTypeHint("Double")
+        public Double eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o) {
+            Geometry geom = (Geometry) o;
+            return org.apache.sedona.common.Functions.length(geom);
+        }
+    }
+
     public static class ST_YMin extends ScalarFunction {
         @DataTypeHint("Double")
         public Double eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o){
diff --git a/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java b/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
index 51987835..099b9173 100644
--- a/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
@@ -15,7 +15,6 @@ package org.apache.sedona.flink;
 
 import org.apache.commons.codec.binary.Hex;
 import org.apache.flink.table.api.Table;
-import org.apache.sedona.flink.expressions.Constructors;
 import org.apache.sedona.flink.expressions.Functions;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -25,9 +24,6 @@ import org.locationtech.jts.geom.LinearRing;
 import org.locationtech.jts.geom.Point;
 import org.locationtech.jts.geom.Polygon;
 
-import java.util.Arrays;
-import java.util.Optional;
-
 import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.api.Expressions.call;
 import static org.junit.Assert.assertEquals;
@@ -41,6 +37,21 @@ public class FunctionTest extends TestBase{
         initialize();
     }
 
+    @Test
+    public void testArea() {
+        Table polygonTable = createPolygonTable(1);
+        Table ResultTable = polygonTable.select(call(Functions.ST_Area.class.getSimpleName(), $(polygonColNames[0])));
+        assertNotNull(first(ResultTable).getField(0));
+        double result = (double) first(ResultTable).getField(0);
+        assertEquals(1.0, result, 0);
+    }
+
+    @Test
+    public void testAzimuth() {
+        Table pointTable = tableEnv.sqlQuery("SELECT ST_Azimuth(ST_GeomFromWKT('POINT (0 0)'), ST_GeomFromWKT('POINT (1 1)'))");
+        assertEquals(45, ((double) first(pointTable).getField(0)) / (Math.PI * 2) * 360, 0);
+    }
+
     @Test
     public void testBuffer() {
         Table pointTable = createPointTable_real(testDataSize);
@@ -74,6 +85,21 @@ public class FunctionTest extends TestBase{
         assertEquals(0.0, first(pointTable).getField(0));
     }
 
+    @Test
+    public void test3dDistance() {
+        Table pointTable = tableEnv.sqlQuery("SELECT ST_3DDistance(ST_GeomFromWKT('POINT (0 0 0)'), ST_GeomFromWKT('POINT (1 1 1)'))");
+        assertEquals(Math.sqrt(3), first(pointTable).getField(0));
+    }
+
+    @Test
+    public void testLength() {
+        Table polygonTable = createPolygonTable(1);
+        Table resultTable = polygonTable.select(call(Functions.ST_Length.class.getSimpleName(), $(polygonColNames[0])));
+        assertNotNull(first(resultTable).getField(0));
+        double result = (double) first(resultTable).getField(0);
+        assertEquals(4, result, 0);
+    }
+
     @Test
     public void testYMax() {
         Table polygonTable = createPolygonTable(1);
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
index 8564895e..aa6c8585 100644
--- a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
@@ -36,9 +36,7 @@ import org.locationtech.jts.geom.util.GeometryFixer
 import org.locationtech.jts.geom._
 import org.locationtech.jts.linearref.LengthIndexedLine
 import org.locationtech.jts.operation.buffer.BufferParameters
-import org.locationtech.jts.operation.distance3d.Distance3DOp
 import org.locationtech.jts.operation.linemerge.LineMerger
-import org.locationtech.jts.operation.valid.IsValidOp
 import org.locationtech.jts.precision.GeometryPrecisionReducer
 import org.locationtech.jts.simplify.TopologyPreservingSimplifier
 import org.locationtech.jts.geom.Geometry
@@ -77,18 +75,7 @@ case class ST_YMin(inputExpressions: Seq[Expression])
 }
 
 case class ST_3DDistance(inputExpressions: Seq[Expression])
-  extends BinaryGeometryExpression with CodegenFallback {
-  assert(inputExpressions.length == 2)
-
-  override def toString: String = s" **${ST_3DDistance.getClass.getName}**  "
-
-  override def nullSafeEval(leftGeometry: Geometry, rightGeometry: Geometry): Any = {
-    Distance3DOp.distance(leftGeometry, rightGeometry)
-  }
-
-  override def dataType = DoubleType
-
-  override def children: Seq[Expression] = inputExpressions
+  extends InferredBinaryExpression(Functions.distance3d) {
 
   protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = {
     copy(inputExpressions = newChildren)
@@ -180,16 +167,7 @@ case class ST_Envelope(inputExpressions: Seq[Expression])
   * @param inputExpressions
   */
 case class ST_Length(inputExpressions: Seq[Expression])
-  extends UnaryGeometryExpression with CodegenFallback {
-  assert(inputExpressions.length == 1)
-
-  override def nullSafeEval(geometry: Geometry): Any = {
-    geometry.getLength
-  }
-
-  override def dataType: DataType = DoubleType
-
-  override def children: Seq[Expression] = inputExpressions
+  extends InferredUnaryExpression(Functions.length) with CodegenFallback {
 
   protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = {
     copy(inputExpressions = newChildren)
@@ -202,16 +180,7 @@ case class ST_Length(inputExpressions: Seq[Expression])
   * @param inputExpressions
   */
 case class ST_Area(inputExpressions: Seq[Expression])
-  extends UnaryGeometryExpression with CodegenFallback {
-  assert(inputExpressions.length == 1)
-
-  override def nullSafeEval(geometry: Geometry): Any = {
-    geometry.getArea
-  }
-
-  override def dataType: DataType = DoubleType
-
-  override def children: Seq[Expression] = inputExpressions
+  extends InferredUnaryExpression(Functions.area) {
 
   protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = {
     copy(inputExpressions = newChildren)
@@ -562,25 +531,7 @@ case class ST_LineMerge(inputExpressions: Seq[Expression])
 }
 
 case class ST_Azimuth(inputExpressions: Seq[Expression])
-  extends BinaryGeometryExpression with CodegenFallback {
-  assert(inputExpressions.length == 2)
-
-  override def nullSafeEval(leftGeometry: Geometry, rightGeometry: Geometry): Any = {
-    (leftGeometry, rightGeometry) match {
-      case (pointA: Point, pointB: Point) => calculateAzimuth(pointA, pointB)
-    }
-  }
-
-  private def calculateAzimuth(pointA: Point, pointB: Point): Double = {
-    val deltaX = pointB.getX - pointA.getX
-    val deltaY = pointB.getY - pointA.getY
-    val azimuth = math.atan2(deltaX, deltaY)
-    if (azimuth < 0) azimuth + (2 * math.Pi) else azimuth
-  }
-
-  override def dataType: DataType = DoubleType
-
-  override def children: Seq[Expression] = inputExpressions
+  extends InferredBinaryExpression(Functions.azimuth) with CodegenFallback {
 
   protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = {
     copy(inputExpressions = newChildren)