You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2022/06/08 17:59:00 UTC

[incubator-sedona] branch master updated: [SEDONA-122] add ST_GeomFromWKB overload for BYTES column containing (e)wkb byte array. (#628)

This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e6d9aca [SEDONA-122] add ST_GeomFromWKB overload for BYTES column containing (e)wkb byte array. (#628)
9e6d9aca is described below

commit 9e6d9aca9e37ef5af8b80543bcb138a7bdd42b3f
Author: Elephantusparvus <ni...@gmail.com>
AuthorDate: Wed Jun 8 19:58:54 2022 +0200

    [SEDONA-122] add ST_GeomFromWKB overload for BYTES column containing (e)wkb byte array. (#628)
    
    Co-authored-by: Lennart Niecksch <le...@dfki.de>
---
 docs/api/flink/Constructor.md                      | 16 +++++++++--
 docs/api/sql/Constructor.md                        |  3 ++-
 .../sedona/flink/expressions/Constructors.java     | 10 ++++++-
 .../org/apache/sedona/flink/ConstructorTest.java   | 31 ++++++++++++++++++++++
 .../sql/sedona_sql/expressions/Constructors.scala  | 25 ++++++++++++-----
 .../apache/sedona/sql/constructorTestScala.scala   | 12 +++++++--
 6 files changed, 84 insertions(+), 13 deletions(-)

diff --git a/docs/api/flink/Constructor.md b/docs/api/flink/Constructor.md
index 05b7d5ef..e02d00ec 100644
--- a/docs/api/flink/Constructor.md
+++ b/docs/api/flink/Constructor.md
@@ -14,10 +14,11 @@ SELECT ST_GeomFromWKT('POINT(40.7128 -74.0060)') AS geometry
 
 ## ST_GeomFromWKB
 
-Introduction: Construct a Geometry from WKB string
+Introduction: Construct a Geometry from WKB string or Binary
 
 Format:
 `ST_GeomFromWKB (Wkb:string)`
+`ST_GeomFromWKB (Wkb:binary)`
 
 Since: `v1.2.0`
 
@@ -27,6 +28,17 @@ SELECT ST_GeomFromWKB(polygontable._c0) AS polygonshape
 FROM polygontable
 ```
 
+Format:
+`ST_GeomFromWKB (Wkb:bytes)`
+
+Since: `v1.2.1`
+
+SQL example:
+```SQL
+SELECT ST_GeomFromWKB(polygontable._c0) AS polygonshape
+FROM polygontable
+```
+
 ## ST_GeomFromGeoJSON
 
 Introduction: Construct a Geometry from GeoJson
@@ -93,4 +105,4 @@ Since: `v1.2.1`
 SQL example:
 ```SQL
 SELECT ST_GeomFromGeoHash('s00twy01mt', 4) AS geom
-```
\ No newline at end of file
+```
diff --git a/docs/api/sql/Constructor.md b/docs/api/sql/Constructor.md
index cae9bf0c..c8213daa 100644
--- a/docs/api/sql/Constructor.md
+++ b/docs/api/sql/Constructor.md
@@ -19,10 +19,11 @@ SELECT ST_GeomFromWKT('POINT(40.7128 -74.0060)') AS geometry
 
 ## ST_GeomFromWKB
 
-Introduction: Construct a Geometry from WKB string
+Introduction: Construct a Geometry from WKB string or Binary
 
 Format:
 `ST_GeomFromWKB (Wkb:string)`
+`ST_GeomFromWKB (Wkb:binary)`
 
 Since: `v1.0.0`
 
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
index 1a9fcb41..2b2815c0 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
@@ -22,6 +22,7 @@ import org.apache.spark.sql.sedona_sql.expressions.geohash.GeoHashDecoder;
 import org.locationtech.jts.geom.Coordinate;
 import org.locationtech.jts.geom.Geometry;
 import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.io.WKBReader;
 import org.locationtech.jts.io.ParseException;
 
 public class Constructors {
@@ -83,6 +84,13 @@ public class Constructors {
             FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKB, false);
             return formatUtils.readGeometry(wkbString);
         }
+
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+        public Geometry eval(@DataTypeHint("Bytes") byte[] wkb) throws ParseException {
+            WKBReader wkbReader = new WKBReader();
+            return wkbReader.read(wkb);
+        }
+
     }
 
     public static class ST_GeomFromGeoJSON extends ScalarFunction {
@@ -107,4 +115,4 @@ public class Constructors {
             return eval(value, null);
         }
     }
-}
\ No newline at end of file
+}
diff --git a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
index e046679d..e828a854 100644
--- a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
@@ -30,6 +30,12 @@ import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.api.Expressions.call;
 import static org.junit.Assert.assertEquals;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
 public class ConstructorTest extends TestBase{
 
     @BeforeClass
@@ -102,6 +108,31 @@ public class ConstructorTest extends TestBase{
         assertEquals(result, expectedGeom);
     }
 
+    @Test
+    public void testGeomFromWKBBytes()
+    {
+        byte[] wkb = new byte[]{1, 2, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, -124, -42, 0, -64, 0, 0, 0, 0, -128, -75, -42, -65, 0, 0, 0, 96, -31, -17, -9, -65, 0, 0, 0, -128, 7, 93, -27, -65};
+        List<Row> data = new ArrayList<>();
+        data.add(Row.of(wkb, "polygon"));
+        TypeInformation<?>[] colTypes = {
+                PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+                BasicTypeInfo.STRING_TYPE_INFO};
+        RowTypeInfo typeInfo = new RowTypeInfo(colTypes, polygonColNames);
+        DataStream<Row> wkbDS = env.fromCollection(data).returns(typeInfo);
+        Table wkbTable = tableEnv.fromDataStream(wkbDS, $(polygonColNames[0]), $(polygonColNames[1]));
+
+        Table geomTable = wkbTable.select(
+                call(Constructors.ST_GeomFromWKB.class.getSimpleName(), $(polygonColNames[0])).
+                    as(polygonColNames[0]), $(polygonColNames[1]));
+        String result = first(geomTable).
+            getFieldAs(0).toString();
+
+        String expectedGeom = "LINESTRING (-2.1047439575195312 -0.354827880859375, -1.49606454372406 -0.6676061153411865)";
+
+        assertEquals(result, expectedGeom);
+
+       }
+
     @Test
     public void testGeomFromGeoHash() {
         Integer precision = 2;
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala
index 9025be4a..ff9aa605 100644
--- a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala
@@ -28,9 +28,10 @@ import org.apache.spark.sql.catalyst.util.GenericArrayData
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.sql.sedona_sql.expressions.geohash.GeoHashDecoder
 import org.apache.spark.sql.sedona_sql.expressions.implicits.{GeometryEnhancer, InputExpressionEnhancer, SequenceEnhancer}
-import org.apache.spark.sql.types.{DataType, Decimal}
+import org.apache.spark.sql.types.{BinaryType, DataType, Decimal, StringType}
 import org.apache.spark.unsafe.types.UTF8String
 import org.locationtech.jts.geom.{Coordinate, GeometryFactory}
+import org.locationtech.jts.io.WKBReader
 
 /**
   * Return a point from a string. The string must be plain string and each coordinate must be separated by a delimiter.
@@ -224,7 +225,7 @@ case class ST_GeomFromText(inputExpressions: Seq[Expression])
 /**
   * Return a Geometry from a WKB string
   *
-  * @param inputExpressions This function takes 1 parameter which is the geometry string. The string format must be WKB.
+  * @param inputExpressions This function takes 1 parameter which is the utf-8 encoded geometry wkb string or the binary wkb array.
   */
 case class ST_GeomFromWKB(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
@@ -234,11 +235,21 @@ case class ST_GeomFromWKB(inputExpressions: Seq[Expression])
   override def nullable: Boolean = false
 
   override def eval(inputRow: InternalRow): Any = {
-    val geomString = inputExpressions.head.eval(inputRow).asInstanceOf[UTF8String].toString
-    var fileDataSplitter = FileDataSplitter.WKB
-    var formatMapper = new FormatMapper(fileDataSplitter, false)
-    var geometry = formatMapper.readGeometry(geomString)
-    new GenericArrayData(GeometrySerializer.serialize(geometry))
+    if (inputExpressions.head.dataType.equals(StringType)) {
+      // Parse UTF-8 encoded wkb string
+      val geomString = inputExpressions.head.eval(inputRow).asInstanceOf[UTF8String].toString
+      val fileDataSplitter = FileDataSplitter.WKB
+      val formatMapper = new FormatMapper(fileDataSplitter, false)
+      val geometry = formatMapper.readGeometry(geomString)
+      new GenericArrayData(GeometrySerializer.serialize(geometry))
+    }
+    else if (inputExpressions.head.dataType.equals(BinaryType)) {
+      // convert raw wkb byte array to geometry
+      val wkbReader = new WKBReader()
+      val wkb = inputExpressions.head.eval(inputRow).asInstanceOf[Array[Byte]]
+      val geometry = wkbReader.read(wkb)
+      new GenericArrayData(GeometrySerializer.serialize(geometry))
+    }
   }
 
   override def dataType: DataType = GeometryUDT
diff --git a/sql/src/test/scala/org/apache/sedona/sql/constructorTestScala.scala b/sql/src/test/scala/org/apache/sedona/sql/constructorTestScala.scala
index 50abcc3d..3c754f3e 100644
--- a/sql/src/test/scala/org/apache/sedona/sql/constructorTestScala.scala
+++ b/sql/src/test/scala/org/apache/sedona/sql/constructorTestScala.scala
@@ -114,10 +114,18 @@ class constructorTestScala extends TestBaseScala {
     }
 
     it("Passed ST_GeomFromWKB") {
-      var polygonWkbDf = sparkSession.read.format("csv").option("delimiter", "\t").option("header", "false").load(mixedWkbGeometryInputLocation)
+      // UTF-8 encoded WKB String
+      val polygonWkbDf = sparkSession.read.format("csv").option("delimiter", "\t").option("header", "false").load(mixedWkbGeometryInputLocation)
       polygonWkbDf.createOrReplaceTempView("polygontable")
-      var polygonDf = sparkSession.sql("select ST_GeomFromWKB(polygontable._c0) as countyshape from polygontable")
+      val polygonDf = sparkSession.sql("select ST_GeomFromWKB(polygontable._c0) as countyshape from polygontable")
       assert(polygonDf.count() == 100)
+      // RAW binary array
+      val wkbSeq = Seq[Array[Byte]](Array[Byte](1, 2, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, -124, -42, 0, -64, 0, 0, 0, 0, -128, -75, -42, -65, 0, 0, 0, 96, -31, -17, -9, -65, 0, 0, 0, -128, 7, 93, -27, -65))
+      val rawWkbDf = wkbSeq.toDF("wkb")
+      rawWkbDf.createOrReplaceTempView("rawWKBTable")
+      val geometries = sparkSession.sql("SELECT ST_GeomFromWKB(rawWKBTable.wkb) as countyshape from rawWKBTable")
+      val expectedGeom = "LINESTRING (-2.1047439575195312 -0.354827880859375, -1.49606454372406 -0.6676061153411865)";
+      assert(geometries.first().getAs[Geometry](0).toString.equals(expectedGeom))
     }
 
     it("Passed ST_GeomFromGeoJSON") {