You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2022/08/17 23:27:41 UTC
[incubator-sedona] branch master updated: [SEDONA-146] Add missing output funtions to the Flink API (#666)
This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 26aba91e [SEDONA-146] Add missing output funtions to the Flink API (#666)
26aba91e is described below
commit 26aba91ead4f06558880be7dda745cf3c05581e8
Author: Kengo Seki <se...@apache.org>
AuthorDate: Thu Aug 18 08:27:35 2022 +0900
[SEDONA-146] Add missing output funtions to the Flink API (#666)
---
.../java/org/apache/sedona/common/Functions.java | 4 ++
.../org/apache/sedona/common/utils/GeomUtils.java | 12 ++++++
docs/api/flink/Function.md | 48 ++++++++++++++++++++++
.../main/java/org/apache/sedona/flink/Catalog.java | 3 ++
.../apache/sedona/flink/expressions/Functions.java | 24 +++++++++++
.../java/org/apache/sedona/flink/FunctionTest.java | 25 +++++++++++
.../sql/sedona_sql/expressions/Functions.scala | 43 ++-----------------
.../expressions/NullSafeExpressions.scala | 4 ++
8 files changed, 123 insertions(+), 40 deletions(-)
diff --git a/common/src/main/java/org/apache/sedona/common/Functions.java b/common/src/main/java/org/apache/sedona/common/Functions.java
index a7777134..581cd65d 100644
--- a/common/src/main/java/org/apache/sedona/common/Functions.java
+++ b/common/src/main/java/org/apache/sedona/common/Functions.java
@@ -117,6 +117,10 @@ public class Functions {
return GeomUtils.getEWKT(geometry);
}
+ public static byte[] asEWKB(Geometry geometry) {
+ return GeomUtils.getEWKB(geometry);
+ }
+
public static String asGeoJson(Geometry geometry) {
if (geometry == null) {
return null;
diff --git a/common/src/main/java/org/apache/sedona/common/utils/GeomUtils.java b/common/src/main/java/org/apache/sedona/common/utils/GeomUtils.java
index 6c5109fd..d2f0dde6 100644
--- a/common/src/main/java/org/apache/sedona/common/utils/GeomUtils.java
+++ b/common/src/main/java/org/apache/sedona/common/utils/GeomUtils.java
@@ -19,10 +19,13 @@ import org.locationtech.jts.geom.impl.CoordinateArraySequence;
import org.locationtech.jts.geom.CoordinateSequence;
import org.locationtech.jts.geom.CoordinateSequenceFilter;
import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.io.ByteOrderValues;
+import org.locationtech.jts.io.WKBWriter;
import org.locationtech.jts.io.WKTWriter;
import org.locationtech.jts.operation.polygonize.Polygonizer;
import org.locationtech.jts.operation.union.UnaryUnionOp;
+import java.nio.ByteOrder;
import java.util.*;
import static org.locationtech.jts.geom.Coordinate.NULL_ORDINATE;
@@ -139,6 +142,15 @@ public class GeomUtils {
return sridString + new WKTWriter(GeomUtils.getDimension(geometry)).write(geometry);
}
+ public static byte[] getEWKB(Geometry geometry) {
+ if (geometry == null) {
+ return null;
+ }
+ int endian = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN ? ByteOrderValues.BIG_ENDIAN : ByteOrderValues.LITTLE_ENDIAN;
+ WKBWriter writer = new WKBWriter(GeomUtils.getDimension(geometry), endian, geometry.getSRID() != 0);
+ return writer.write(geometry);
+ }
+
public static Geometry get2dGeom(Geometry geom) {
Coordinate[] coordinates = geom.getCoordinates();
GeometryFactory geometryFactory = new GeometryFactory();
diff --git a/docs/api/flink/Function.md b/docs/api/flink/Function.md
index 181ddfa7..afc0f36d 100644
--- a/docs/api/flink/Function.md
+++ b/docs/api/flink/Function.md
@@ -1,3 +1,36 @@
+## ST_AsBinary
+
+Introduction: Return the Well-Known Binary representation of a geometry
+
+Format: `ST_AsBinary (A:geometry)`
+
+Since: `v1.3.0`
+
+Example:
+
+```SQL
+SELECT ST_AsBinary(polygondf.countyshape)
+FROM polygondf
+```
+
+## ST_AsEWKB
+
+Introduction: Return the Extended Well-Known Binary representation of a geometry.
+EWKB is an extended version of WKB which includes the SRID of the geometry.
+The format originated in PostGIS but is supported by many GIS tools.
+If the geometry is lacking SRID a WKB format is produced.
+
+Format: `ST_AsEWKB (A:geometry)`
+
+Since: `v1.3.0`
+
+Example:
+
+```SQL
+SELECT ST_AsEWKB(polygondf.countyshape)
+FROM polygondf
+```
+
## ST_AsEWKT
Introduction: Return the Extended Well-Known Text representation of a geometry.
@@ -30,6 +63,21 @@ SELECT ST_AsGeoJSON(polygondf.countyshape)
FROM polygondf
```
+## ST_AsText
+
+Introduction: Return the Well-Known Text string representation of a geometry
+
+Format: `ST_AsText (A:geometry)`
+
+Since: `v1.3.0`
+
+Example:
+
+```SQL
+SELECT ST_AsText(polygondf.countyshape)
+FROM polygondf
+```
+
## ST_Buffer
Introduction: Returns a geometry/geography that represents all points whose distance from this Geometry/geography is less than or equal to distance.
diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
index 1a71293b..f435de1f 100644
--- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java
+++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
@@ -40,6 +40,9 @@ public class Catalog {
new Functions.ST_PointN(),
new Functions.ST_ExteriorRing(),
new Functions.ST_AsEWKT(),
+ new Functions.ST_AsEWKB(),
+ new Functions.ST_AsText(),
+ new Functions.ST_AsBinary(),
new Functions.ST_AsGeoJSON(),
new Functions.ST_Force_2D(),
new Functions.ST_IsEmpty(),
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
index f1663e5c..2db89814 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
@@ -133,6 +133,30 @@ public class Functions {
}
}
+ public static class ST_AsText extends ScalarFunction {
+ @DataTypeHint("String")
+ public String eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o) {
+ Geometry geom = (Geometry) o;
+ return org.apache.sedona.common.Functions.asEWKT(geom);
+ }
+ }
+
+ public static class ST_AsEWKB extends ScalarFunction {
+ @DataTypeHint("Bytes")
+ public byte[] eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o) {
+ Geometry geom = (Geometry) o;
+ return org.apache.sedona.common.Functions.asEWKB(geom);
+ }
+ }
+
+ public static class ST_AsBinary extends ScalarFunction {
+ @DataTypeHint("Bytes")
+ public byte[] eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o) {
+ Geometry geom = (Geometry) o;
+ return org.apache.sedona.common.Functions.asEWKB(geom);
+ }
+ }
+
public static class ST_AsGeoJSON extends ScalarFunction {
@DataTypeHint("String")
public String eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o) {
diff --git a/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java b/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
index e188d277..e9910bdd 100644
--- a/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
@@ -13,6 +13,7 @@
*/
package org.apache.sedona.flink;
+import org.apache.commons.codec.binary.Hex;
import org.apache.flink.table.api.Table;
import org.apache.sedona.flink.expressions.Constructors;
import org.apache.sedona.flink.expressions.Functions;
@@ -154,6 +155,30 @@ public class FunctionTest extends TestBase{
assertEquals("POLYGON ((-0.5 -0.5, -0.5 0.5, 0.5 0.5, 0.5 -0.5, -0.5 -0.5))", result);
}
+ @Test
+ public void testAsText() {
+ Table polygonTable = createPolygonTable(testDataSize);
+ polygonTable = polygonTable.select(call(Functions.ST_AsText.class.getSimpleName(), $(polygonColNames[0])));
+ String result = (String) first(polygonTable).getField(0);
+ assertEquals("POLYGON ((-0.5 -0.5, -0.5 0.5, 0.5 0.5, 0.5 -0.5, -0.5 -0.5))", result);
+ }
+
+ @Test
+ public void testAsEWKB() {
+ Table polygonTable = createPolygonTable(testDataSize);
+ polygonTable = polygonTable.select(call(Functions.ST_AsEWKB.class.getSimpleName(), $(polygonColNames[0])));
+ String result = Hex.encodeHexString((byte[]) first(polygonTable).getField(0));
+ assertEquals("01030000000100000005000000000000000000e0bf000000000000e0bf000000000000e0bf000000000000e03f000000000000e03f000000000000e03f000000000000e03f000000000000e0bf000000000000e0bf000000000000e0bf", result);
+ }
+
+ @Test
+ public void testAsBinary() {
+ Table polygonTable = createPolygonTable(testDataSize);
+ polygonTable = polygonTable.select(call(Functions.ST_AsBinary.class.getSimpleName(), $(polygonColNames[0])));
+ String result = Hex.encodeHexString((byte[]) first(polygonTable).getField(0));
+ assertEquals("01030000000100000005000000000000000000e0bf000000000000e0bf000000000000e0bf000000000000e03f000000000000e03f000000000000e03f000000000000e03f000000000000e0bf000000000000e0bf000000000000e0bf", result);
+ }
+
@Test
public void testGeoJSON() {
Table polygonTable = createPolygonTable(testDataSize);
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
index c2ab1fd5..88b46e79 100644
--- a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
@@ -19,7 +19,6 @@
package org.apache.spark.sql.sedona_sql.expressions
import org.apache.sedona.common.Functions
-import org.apache.sedona.common.utils.GeomUtils
import org.apache.sedona.core.geometryObjects.Circle
import org.apache.sedona.sql.utils.GeometrySerializer
import org.apache.spark.internal.Logging
@@ -36,7 +35,6 @@ import org.apache.spark.unsafe.types.UTF8String
import org.locationtech.jts.algorithm.MinimumBoundingCircle
import org.locationtech.jts.geom.util.GeometryFixer
import org.locationtech.jts.geom._
-import org.locationtech.jts.io.{ByteOrderValues, WKBWriter, WKTWriter}
import org.locationtech.jts.linearref.LengthIndexedLine
import org.locationtech.jts.operation.IsSimpleOp
import org.locationtech.jts.operation.buffer.BufferParameters
@@ -48,7 +46,6 @@ import org.locationtech.jts.simplify.TopologyPreservingSimplifier
import org.locationtech.jts.geom.Geometry
import org.locationtech.jts.geom.Coordinate
-import java.nio.ByteOrder
import scala.util.{Failure, Success, Try}
/**
@@ -474,17 +471,7 @@ case class ST_PrecisionReduce(inputExpressions: Seq[Expression])
}
case class ST_AsText(inputExpressions: Seq[Expression])
- extends UnaryGeometryExpression with CodegenFallback {
- assert(inputExpressions.length == 1)
-
- override protected def nullSafeEval(geometry: Geometry): Any = {
- val writer = new WKTWriter(GeomUtils.getDimension(geometry))
- UTF8String.fromString(writer.write(geometry))
- }
-
- override def dataType: DataType = StringType
-
- override def children: Seq[Expression] = inputExpressions
+ extends InferredUnaryExpression(Functions.asEWKT) {
protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = {
copy(inputExpressions = newChildren)
@@ -500,19 +487,7 @@ case class ST_AsGeoJSON(inputExpressions: Seq[Expression])
}
case class ST_AsBinary(inputExpressions: Seq[Expression])
- extends UnaryGeometryExpression with CodegenFallback {
- inputExpressions.validateLength(1)
-
- override protected def nullSafeEval(geometry: Geometry): Any = {
- val dimensions = if (geometry.isEmpty() || java.lang.Double.isNaN(geometry.getCoordinate.getZ)) 2 else 3
- val endian = if (ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN) ByteOrderValues.BIG_ENDIAN else ByteOrderValues.LITTLE_ENDIAN
- val writer = new WKBWriter(dimensions, endian)
- writer.write(geometry)
- }
-
- override def dataType: DataType = BinaryType
-
- override def children: Seq[Expression] = inputExpressions
+ extends InferredUnaryExpression(Functions.asEWKB) {
protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = {
copy(inputExpressions = newChildren)
@@ -520,19 +495,7 @@ case class ST_AsBinary(inputExpressions: Seq[Expression])
}
case class ST_AsEWKB(inputExpressions: Seq[Expression])
- extends UnaryGeometryExpression with CodegenFallback {
- inputExpressions.validateLength(1)
-
- override protected def nullSafeEval(geometry: Geometry): Any = {
- val dimensions = if (geometry.isEmpty() || java.lang.Double.isNaN(geometry.getCoordinate.getZ)) 2 else 3
- val endian = if (ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN) ByteOrderValues.BIG_ENDIAN else ByteOrderValues.LITTLE_ENDIAN
- val writer = new WKBWriter(dimensions, endian, geometry.getSRID != 0)
- writer.write(geometry)
- }
-
- override def dataType: DataType = BinaryType
-
- override def children: Seq[Expression] = inputExpressions
+ extends InferredUnaryExpression(Functions.asEWKB) {
protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = {
copy(inputExpressions = newChildren)
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/NullSafeExpressions.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/NullSafeExpressions.scala
index 87ed9ce1..4736412c 100644
--- a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/NullSafeExpressions.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/NullSafeExpressions.scala
@@ -80,6 +80,8 @@ object InferrableType {
new InferrableType[Int] {}
implicit val stringInstance: InferrableType[String] =
new InferrableType[String] {}
+ implicit val binaryInstance: InferrableType[Array[Byte]] =
+ new InferrableType[Array[Byte]] {}
}
object InferredTypes {
@@ -120,6 +122,8 @@ object InferredTypes {
IntegerType
} else if (typeOf[T] =:= typeOf[String]) {
StringType
+ } else if (typeOf[T] =:= typeOf[Array[Byte]]) {
+ BinaryType
} else {
BooleanType
}