You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2022/08/17 23:27:41 UTC

[incubator-sedona] branch master updated: [SEDONA-146] Add missing output funtions to the Flink API (#666)

This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 26aba91e [SEDONA-146] Add missing output funtions to the Flink API (#666)
26aba91e is described below

commit 26aba91ead4f06558880be7dda745cf3c05581e8
Author: Kengo Seki <se...@apache.org>
AuthorDate: Thu Aug 18 08:27:35 2022 +0900

    [SEDONA-146] Add missing output funtions to the Flink API (#666)
---
 .../java/org/apache/sedona/common/Functions.java   |  4 ++
 .../org/apache/sedona/common/utils/GeomUtils.java  | 12 ++++++
 docs/api/flink/Function.md                         | 48 ++++++++++++++++++++++
 .../main/java/org/apache/sedona/flink/Catalog.java |  3 ++
 .../apache/sedona/flink/expressions/Functions.java | 24 +++++++++++
 .../java/org/apache/sedona/flink/FunctionTest.java | 25 +++++++++++
 .../sql/sedona_sql/expressions/Functions.scala     | 43 ++-----------------
 .../expressions/NullSafeExpressions.scala          |  4 ++
 8 files changed, 123 insertions(+), 40 deletions(-)

diff --git a/common/src/main/java/org/apache/sedona/common/Functions.java b/common/src/main/java/org/apache/sedona/common/Functions.java
index a7777134..581cd65d 100644
--- a/common/src/main/java/org/apache/sedona/common/Functions.java
+++ b/common/src/main/java/org/apache/sedona/common/Functions.java
@@ -117,6 +117,10 @@ public class Functions {
         return GeomUtils.getEWKT(geometry);
     }
 
+    public static byte[] asEWKB(Geometry geometry) {
+        return GeomUtils.getEWKB(geometry);
+    }
+
     public static String asGeoJson(Geometry geometry) {
         if (geometry == null) {
             return null;
diff --git a/common/src/main/java/org/apache/sedona/common/utils/GeomUtils.java b/common/src/main/java/org/apache/sedona/common/utils/GeomUtils.java
index 6c5109fd..d2f0dde6 100644
--- a/common/src/main/java/org/apache/sedona/common/utils/GeomUtils.java
+++ b/common/src/main/java/org/apache/sedona/common/utils/GeomUtils.java
@@ -19,10 +19,13 @@ import org.locationtech.jts.geom.impl.CoordinateArraySequence;
 import org.locationtech.jts.geom.CoordinateSequence;
 import org.locationtech.jts.geom.CoordinateSequenceFilter;
 import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.io.ByteOrderValues;
+import org.locationtech.jts.io.WKBWriter;
 import org.locationtech.jts.io.WKTWriter;
 import org.locationtech.jts.operation.polygonize.Polygonizer;
 import org.locationtech.jts.operation.union.UnaryUnionOp;
 
+import java.nio.ByteOrder;
 import java.util.*;
 
 import static org.locationtech.jts.geom.Coordinate.NULL_ORDINATE;
@@ -139,6 +142,15 @@ public class GeomUtils {
         return sridString + new WKTWriter(GeomUtils.getDimension(geometry)).write(geometry);
     }
 
+    public static byte[] getEWKB(Geometry geometry) {
+        if (geometry == null) {
+            return null;
+        }
+        int endian = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN ? ByteOrderValues.BIG_ENDIAN : ByteOrderValues.LITTLE_ENDIAN;
+        WKBWriter writer = new WKBWriter(GeomUtils.getDimension(geometry), endian, geometry.getSRID() != 0);
+        return writer.write(geometry);
+    }
+
     public static Geometry get2dGeom(Geometry geom) {
         Coordinate[] coordinates = geom.getCoordinates();
         GeometryFactory geometryFactory = new GeometryFactory();
diff --git a/docs/api/flink/Function.md b/docs/api/flink/Function.md
index 181ddfa7..afc0f36d 100644
--- a/docs/api/flink/Function.md
+++ b/docs/api/flink/Function.md
@@ -1,3 +1,36 @@
+## ST_AsBinary
+
+Introduction: Return the Well-Known Binary representation of a geometry
+
+Format: `ST_AsBinary (A:geometry)`
+
+Since: `v1.3.0`
+
+Example:
+
+```SQL
+SELECT ST_AsBinary(polygondf.countyshape)
+FROM polygondf
+```
+
+## ST_AsEWKB
+
+Introduction: Return the Extended Well-Known Binary representation of a geometry.
+EWKB is an extended version of WKB which includes the SRID of the geometry.
+The format originated in PostGIS but is supported by many GIS tools.
+If the geometry is lacking SRID a WKB format is produced.
+
+Format: `ST_AsEWKB (A:geometry)`
+
+Since: `v1.3.0`
+
+Example:
+
+```SQL
+SELECT ST_AsEWKB(polygondf.countyshape)
+FROM polygondf
+```
+
 ## ST_AsEWKT
 
 Introduction: Return the Extended Well-Known Text representation of a geometry.
@@ -30,6 +63,21 @@ SELECT ST_AsGeoJSON(polygondf.countyshape)
 FROM polygondf
 ```
 
+## ST_AsText
+
+Introduction: Return the Well-Known Text string representation of a geometry
+
+Format: `ST_AsText (A:geometry)`
+
+Since: `v1.3.0`
+
+Example:
+
+```SQL
+SELECT ST_AsText(polygondf.countyshape)
+FROM polygondf
+```
+
 ## ST_Buffer
 
 Introduction: Returns a geometry/geography that represents all points whose distance from this Geometry/geography is less than or equal to distance.
diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
index 1a71293b..f435de1f 100644
--- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java
+++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
@@ -40,6 +40,9 @@ public class Catalog {
                 new Functions.ST_PointN(),
                 new Functions.ST_ExteriorRing(),
                 new Functions.ST_AsEWKT(),
+                new Functions.ST_AsEWKB(),
+                new Functions.ST_AsText(),
+                new Functions.ST_AsBinary(),
                 new Functions.ST_AsGeoJSON(),
                 new Functions.ST_Force_2D(),
                 new Functions.ST_IsEmpty(),
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
index f1663e5c..2db89814 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
@@ -133,6 +133,30 @@ public class Functions {
         }
     }
 
+    public static class ST_AsText extends ScalarFunction {
+        @DataTypeHint("String")
+        public String eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o) {
+            Geometry geom = (Geometry) o;
+            return org.apache.sedona.common.Functions.asEWKT(geom);
+        }
+    }
+
+    public static class ST_AsEWKB extends ScalarFunction {
+        @DataTypeHint("Bytes")
+        public byte[] eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o) {
+            Geometry geom = (Geometry) o;
+            return org.apache.sedona.common.Functions.asEWKB(geom);
+        }
+    }
+
+    public static class ST_AsBinary extends ScalarFunction {
+        @DataTypeHint("Bytes")
+        public byte[] eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o) {
+            Geometry geom = (Geometry) o;
+            return org.apache.sedona.common.Functions.asEWKB(geom);
+        }
+    }
+
     public static class ST_AsGeoJSON extends ScalarFunction {
         @DataTypeHint("String")
         public String eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o) {
diff --git a/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java b/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
index e188d277..e9910bdd 100644
--- a/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
@@ -13,6 +13,7 @@
  */
 package org.apache.sedona.flink;
 
+import org.apache.commons.codec.binary.Hex;
 import org.apache.flink.table.api.Table;
 import org.apache.sedona.flink.expressions.Constructors;
 import org.apache.sedona.flink.expressions.Functions;
@@ -154,6 +155,30 @@ public class FunctionTest extends TestBase{
         assertEquals("POLYGON ((-0.5 -0.5, -0.5 0.5, 0.5 0.5, 0.5 -0.5, -0.5 -0.5))", result);
     }
 
+    @Test
+    public void testAsText() {
+        Table polygonTable = createPolygonTable(testDataSize);
+        polygonTable = polygonTable.select(call(Functions.ST_AsText.class.getSimpleName(), $(polygonColNames[0])));
+        String result = (String) first(polygonTable).getField(0);
+        assertEquals("POLYGON ((-0.5 -0.5, -0.5 0.5, 0.5 0.5, 0.5 -0.5, -0.5 -0.5))", result);
+    }
+
+    @Test
+    public void testAsEWKB() {
+        Table polygonTable = createPolygonTable(testDataSize);
+        polygonTable = polygonTable.select(call(Functions.ST_AsEWKB.class.getSimpleName(), $(polygonColNames[0])));
+        String result = Hex.encodeHexString((byte[]) first(polygonTable).getField(0));
+        assertEquals("01030000000100000005000000000000000000e0bf000000000000e0bf000000000000e0bf000000000000e03f000000000000e03f000000000000e03f000000000000e03f000000000000e0bf000000000000e0bf000000000000e0bf", result);
+    }
+
+    @Test
+    public void testAsBinary() {
+        Table polygonTable = createPolygonTable(testDataSize);
+        polygonTable = polygonTable.select(call(Functions.ST_AsBinary.class.getSimpleName(), $(polygonColNames[0])));
+        String result = Hex.encodeHexString((byte[]) first(polygonTable).getField(0));
+        assertEquals("01030000000100000005000000000000000000e0bf000000000000e0bf000000000000e0bf000000000000e03f000000000000e03f000000000000e03f000000000000e03f000000000000e0bf000000000000e0bf000000000000e0bf", result);
+    }
+
     @Test
     public void testGeoJSON() {
         Table polygonTable = createPolygonTable(testDataSize);
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
index c2ab1fd5..88b46e79 100644
--- a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
@@ -19,7 +19,6 @@
 package org.apache.spark.sql.sedona_sql.expressions
 
 import org.apache.sedona.common.Functions
-import org.apache.sedona.common.utils.GeomUtils
 import org.apache.sedona.core.geometryObjects.Circle
 import org.apache.sedona.sql.utils.GeometrySerializer
 import org.apache.spark.internal.Logging
@@ -36,7 +35,6 @@ import org.apache.spark.unsafe.types.UTF8String
 import org.locationtech.jts.algorithm.MinimumBoundingCircle
 import org.locationtech.jts.geom.util.GeometryFixer
 import org.locationtech.jts.geom._
-import org.locationtech.jts.io.{ByteOrderValues, WKBWriter, WKTWriter}
 import org.locationtech.jts.linearref.LengthIndexedLine
 import org.locationtech.jts.operation.IsSimpleOp
 import org.locationtech.jts.operation.buffer.BufferParameters
@@ -48,7 +46,6 @@ import org.locationtech.jts.simplify.TopologyPreservingSimplifier
 import org.locationtech.jts.geom.Geometry
 import org.locationtech.jts.geom.Coordinate
 
-import java.nio.ByteOrder
 import scala.util.{Failure, Success, Try}
 
 /**
@@ -474,17 +471,7 @@ case class ST_PrecisionReduce(inputExpressions: Seq[Expression])
 }
 
 case class ST_AsText(inputExpressions: Seq[Expression])
-  extends UnaryGeometryExpression with CodegenFallback {
-  assert(inputExpressions.length == 1)
-
-  override protected def nullSafeEval(geometry: Geometry): Any = {
-    val writer = new WKTWriter(GeomUtils.getDimension(geometry))
-    UTF8String.fromString(writer.write(geometry))
-  }
-
-  override def dataType: DataType = StringType
-
-  override def children: Seq[Expression] = inputExpressions
+  extends  InferredUnaryExpression(Functions.asEWKT) {
 
   protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = {
     copy(inputExpressions = newChildren)
@@ -500,19 +487,7 @@ case class ST_AsGeoJSON(inputExpressions: Seq[Expression])
 }
 
 case class ST_AsBinary(inputExpressions: Seq[Expression])
-  extends UnaryGeometryExpression with CodegenFallback {
-  inputExpressions.validateLength(1)
-
-  override protected def nullSafeEval(geometry: Geometry): Any = {
-    val dimensions = if (geometry.isEmpty() || java.lang.Double.isNaN(geometry.getCoordinate.getZ)) 2 else 3
-    val endian = if (ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN) ByteOrderValues.BIG_ENDIAN else ByteOrderValues.LITTLE_ENDIAN
-    val writer = new WKBWriter(dimensions, endian)
-    writer.write(geometry)
-  }
-
-  override def dataType: DataType = BinaryType
-
-  override def children: Seq[Expression] = inputExpressions
+  extends InferredUnaryExpression(Functions.asEWKB) {
 
   protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = {
     copy(inputExpressions = newChildren)
@@ -520,19 +495,7 @@ case class ST_AsBinary(inputExpressions: Seq[Expression])
 }
 
 case class ST_AsEWKB(inputExpressions: Seq[Expression])
-  extends UnaryGeometryExpression with CodegenFallback {
-  inputExpressions.validateLength(1)
-
-  override protected def nullSafeEval(geometry: Geometry): Any = {
-    val dimensions = if (geometry.isEmpty() || java.lang.Double.isNaN(geometry.getCoordinate.getZ)) 2 else 3
-    val endian = if (ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN) ByteOrderValues.BIG_ENDIAN else ByteOrderValues.LITTLE_ENDIAN
-    val writer = new WKBWriter(dimensions, endian, geometry.getSRID != 0)
-    writer.write(geometry)
-  }
-
-  override def dataType: DataType = BinaryType
-
-  override def children: Seq[Expression] = inputExpressions
+  extends InferredUnaryExpression(Functions.asEWKB) {
 
   protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = {
     copy(inputExpressions = newChildren)
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/NullSafeExpressions.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/NullSafeExpressions.scala
index 87ed9ce1..4736412c 100644
--- a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/NullSafeExpressions.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/NullSafeExpressions.scala
@@ -80,6 +80,8 @@ object InferrableType {
     new InferrableType[Int] {}
   implicit val stringInstance: InferrableType[String] =
     new InferrableType[String] {}
+  implicit val binaryInstance: InferrableType[Array[Byte]] =
+    new InferrableType[Array[Byte]] {}
 }
 
 object InferredTypes {
@@ -120,6 +122,8 @@ object InferredTypes {
       IntegerType
     } else if (typeOf[T] =:= typeOf[String]) {
       StringType
+    } else if (typeOf[T] =:= typeOf[Array[Byte]]) {
+      BinaryType
     } else {
       BooleanType
     }