You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2022/09/04 18:50:09 UTC

[incubator-sedona] branch master updated: [SEDONA-157] Add coordinate accessors to the Flink API (#677)

This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new ec684457 [SEDONA-157] Add coordinate accessors to the Flink API (#677)
ec684457 is described below

commit ec6844572336bd2860a1d54db068ed0ddadeb430
Author: Kengo Seki <se...@apache.org>
AuthorDate: Mon Sep 5 03:50:04 2022 +0900

    [SEDONA-157] Add coordinate accessors to the Flink API (#677)
---
 .../java/org/apache/sedona/common/Functions.java   | 21 ++++++++++
 docs/api/flink/Function.md                         | 45 ++++++++++++++++++++++
 .../main/java/org/apache/sedona/flink/Catalog.java |  3 ++
 .../apache/sedona/flink/expressions/Functions.java | 24 ++++++++++++
 .../java/org/apache/sedona/flink/FunctionTest.java | 21 ++++++++++
 .../sql/sedona_sql/expressions/Functions.scala     | 42 ++------------------
 .../expressions/NullSafeExpressions.scala          |  4 ++
 7 files changed, 121 insertions(+), 39 deletions(-)

diff --git a/common/src/main/java/org/apache/sedona/common/Functions.java b/common/src/main/java/org/apache/sedona/common/Functions.java
index 7cb9bba7..c6714c75 100644
--- a/common/src/main/java/org/apache/sedona/common/Functions.java
+++ b/common/src/main/java/org/apache/sedona/common/Functions.java
@@ -70,6 +70,27 @@ public class Functions {
         return geometry.getLength();
     }
 
+    public static Double x(Geometry geometry) {
+        if (geometry instanceof Point) {
+            return geometry.getCoordinate().x;
+        }
+        return null;
+    }
+
+    public static Double y(Geometry geometry) {
+        if (geometry instanceof Point) {
+            return geometry.getCoordinate().y;
+        }
+        return null;
+    }
+
+    public static Double z(Geometry geometry) {
+        if (geometry instanceof Point) {
+            return geometry.getCoordinate().z;
+        }
+        return null;
+    }
+
     public static double xMin(Geometry geometry) {
         Coordinate[] points = geometry.getCoordinates();
         double min = Double.MAX_VALUE;
diff --git a/docs/api/flink/Function.md b/docs/api/flink/Function.md
index 059c4ecc..53b4925c 100644
--- a/docs/api/flink/Function.md
+++ b/docs/api/flink/Function.md
@@ -507,6 +507,21 @@ FROM polygondf
 !!!note
 The detailed EPSG information can be searched on [EPSG.io](https://epsg.io/).
 
+## ST_X
+
+Introduction: Returns X Coordinate of given Point, null otherwise.
+
+Format: `ST_X(pointA: Point)`
+
+Since: `v1.3.0`
+
+Example:
+```SQL
+SELECT ST_X(ST_POINT(0.0 25.0))
+```
+
+Output: `0.0`
+
 ## ST_XMax
 
 Introduction: Returns the maximum X coordinate of a geometry
@@ -545,6 +560,21 @@ Input: `POLYGON ((-1 -11, 0 10, 1 11, 2 12, -1 -11))`
 
 Output: `-1`
 
+## ST_Y
+
+Introduction: Returns Y Coordinate of given Point, null otherwise.
+
+Format: `ST_Y(pointA: Point)`
+
+Since: `v1.3.0`
+
+Example:
+```SQL
+SELECT ST_Y(ST_POINT(0.0 25.0))
+```
+
+Output: `25.0`
+
 ## ST_YMax
 
 Introduction: Return the minimum Y coordinate of A
@@ -574,3 +604,18 @@ SELECT ST_YMin(ST_GeomFromText('POLYGON((0 0 1, 1 1 1, 1 2 1, 1 1 1, 0 0 1))'))
 ```
 
 Output : 0
+
+## ST_Z
+
+Introduction: Returns Z Coordinate of given Point, null otherwise.
+
+Format: `ST_Z(pointA: Point)`
+
+Since: `v1.3.0`
+
+Example:
+```SQL
+SELECT ST_Z(ST_POINT(0.0 25.0 11.0))
+```
+
+Output: `11.0`
diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
index 4bf52347..f4787411 100644
--- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java
+++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
@@ -56,6 +56,9 @@ public class Catalog {
                 new Functions.ST_AsKML(),
                 new Functions.ST_Force_2D(),
                 new Functions.ST_IsEmpty(),
+                new Functions.ST_X(),
+                new Functions.ST_Y(),
+                new Functions.ST_Z(),
                 new Functions.ST_YMax(),
                 new Functions.ST_YMin(),
                 new Functions.ST_XMax(),
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
index 3c6c5cfe..33f890d4 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
@@ -221,6 +221,30 @@ public class Functions {
         }
     }
 
+    public static class ST_X extends ScalarFunction {
+        @DataTypeHint("Double")
+        public Double eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o) {
+            Geometry geom = (Geometry) o;
+            return org.apache.sedona.common.Functions.x(geom);
+        }
+    }
+
+    public static class ST_Y extends ScalarFunction {
+        @DataTypeHint("Double")
+        public Double eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o) {
+            Geometry geom = (Geometry) o;
+            return org.apache.sedona.common.Functions.y(geom);
+        }
+    }
+
+    public static class ST_Z extends ScalarFunction {
+        @DataTypeHint("Double")
+        public Double eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o) {
+            Geometry geom = (Geometry) o;
+            return org.apache.sedona.common.Functions.z(geom);
+        }
+    }
+
     public static class ST_XMax extends ScalarFunction {
         @DataTypeHint("Double")
         public Double eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o) {
diff --git a/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java b/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
index 099b9173..a5994e5f 100644
--- a/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
@@ -265,6 +265,27 @@ public class FunctionTest extends TestBase{
         assertEquals(false, result);
     }
 
+    @Test
+    public void testX() {
+        Table pointTable = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('POINT (1.23 4.56 7.89)') AS " + pointColNames[0]);
+        pointTable = pointTable.select(call(Functions.ST_X.class.getSimpleName(), $(pointColNames[0])));
+        assertEquals(1.23, first(pointTable).getField(0));
+    }
+
+    @Test
+    public void testY() {
+        Table pointTable = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('POINT (1.23 4.56 7.89)') AS " + pointColNames[0]);
+        pointTable = pointTable.select(call(Functions.ST_Y.class.getSimpleName(), $(pointColNames[0])));
+        assertEquals(4.56, first(pointTable).getField(0));
+    }
+
+    @Test
+    public void testZ() {
+        Table pointTable = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('POINT (1.23 4.56 7.89)') AS " + pointColNames[0]);
+        pointTable = pointTable.select(call(Functions.ST_Z.class.getSimpleName(), $(pointColNames[0])));
+        assertEquals(7.89, first(pointTable).getField(0));
+    }
+
     @Test
     public void testXMax() {
         Table polygonTable = createPolygonTable(1);
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
index aa6c8585..5b742429 100644
--- a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
@@ -539,19 +539,7 @@ case class ST_Azimuth(inputExpressions: Seq[Expression])
 }
 
 case class ST_X(inputExpressions: Seq[Expression])
-  extends UnaryGeometryExpression with CodegenFallback {
-  assert(inputExpressions.length == 1)
-
-  override protected def nullSafeEval(geometry: Geometry): Any = {
-    geometry match {
-      case point: Point => point.getX
-      case _ => null
-    }
-  }
-
-  override def dataType: DataType = DoubleType
-
-  override def children: Seq[Expression] = inputExpressions
+  extends InferredUnaryExpression(Functions.x) {
 
   protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = {
     copy(inputExpressions = newChildren)
@@ -560,19 +548,7 @@ case class ST_X(inputExpressions: Seq[Expression])
 
 
 case class ST_Y(inputExpressions: Seq[Expression])
-  extends UnaryGeometryExpression with CodegenFallback {
-  assert(inputExpressions.length == 1)
-
-  override protected def nullSafeEval(geometry: Geometry): Any = {
-    geometry match {
-      case point: Point => point.getY
-      case _ => null
-    }
-  }
-
-  override def dataType: DataType = DoubleType
-
-  override def children: Seq[Expression] = inputExpressions
+  extends InferredUnaryExpression(Functions.y) {
 
   protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = {
     copy(inputExpressions = newChildren)
@@ -580,19 +556,7 @@ case class ST_Y(inputExpressions: Seq[Expression])
 }
 
 case class ST_Z(inputExpressions: Seq[Expression])
-  extends UnaryGeometryExpression with CodegenFallback {
-  assert(inputExpressions.length == 1)
-
-  override protected def nullSafeEval(geometry: Geometry): Any = {
-    geometry match {
-      case point: Point => point.getCoordinate.getZ
-      case _ => null
-    }
-  }
-
-  override def dataType: DataType = DoubleType
-
-  override def children: Seq[Expression] = inputExpressions
+  extends InferredUnaryExpression(Functions.z) {
 
   protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = {
     copy(inputExpressions = newChildren)
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/NullSafeExpressions.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/NullSafeExpressions.scala
index 4736412c..4fe2d5b1 100644
--- a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/NullSafeExpressions.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/NullSafeExpressions.scala
@@ -72,6 +72,8 @@ sealed class InferrableType[T: TypeTag]
 object InferrableType {
   implicit val geometryInstance: InferrableType[Geometry] =
     new InferrableType[Geometry] {}
+  implicit val javaDoubleInstance: InferrableType[java.lang.Double] =
+    new InferrableType[java.lang.Double] {}
   implicit val doubleInstance: InferrableType[Double] =
     new InferrableType[Double] {}
   implicit val booleanInstance: InferrableType[Boolean] =
@@ -116,6 +118,8 @@ object InferredTypes {
   def inferSparkType[T: TypeTag]: DataType = {
     if (typeOf[T] =:= typeOf[Geometry]) {
       GeometryUDT
+    } else if (typeOf[T] =:= typeOf[java.lang.Double]) {
+      DoubleType
     } else if (typeOf[T] =:= typeOf[Double]) {
       DoubleType
     } else if (typeOf[T] =:= typeOf[Int]) {