You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2022/06/08 17:59:00 UTC
[incubator-sedona] branch master updated: [SEDONA-122] add ST_GeomFromWKB overload for BYTES column containing (e)wkb byte array. (#628)
This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 9e6d9aca [SEDONA-122] add ST_GeomFromWKB overload for BYTES column containing (e)wkb byte array. (#628)
9e6d9aca is described below
commit 9e6d9aca9e37ef5af8b80543bcb138a7bdd42b3f
Author: Elephantusparvus <ni...@gmail.com>
AuthorDate: Wed Jun 8 19:58:54 2022 +0200
[SEDONA-122] add ST_GeomFromWKB overload for BYTES column containing (e)wkb byte array. (#628)
Co-authored-by: Lennart Niecksch <le...@dfki.de>
---
docs/api/flink/Constructor.md | 16 +++++++++--
docs/api/sql/Constructor.md | 3 ++-
.../sedona/flink/expressions/Constructors.java | 10 ++++++-
.../org/apache/sedona/flink/ConstructorTest.java | 31 ++++++++++++++++++++++
.../sql/sedona_sql/expressions/Constructors.scala | 25 ++++++++++++-----
.../apache/sedona/sql/constructorTestScala.scala | 12 +++++++--
6 files changed, 84 insertions(+), 13 deletions(-)
diff --git a/docs/api/flink/Constructor.md b/docs/api/flink/Constructor.md
index 05b7d5ef..e02d00ec 100644
--- a/docs/api/flink/Constructor.md
+++ b/docs/api/flink/Constructor.md
@@ -14,10 +14,11 @@ SELECT ST_GeomFromWKT('POINT(40.7128 -74.0060)') AS geometry
## ST_GeomFromWKB
-Introduction: Construct a Geometry from WKB string
+Introduction: Construct a Geometry from WKB string or Binary
Format:
`ST_GeomFromWKB (Wkb:string)`
+`ST_GeomFromWKB (Wkb:binary)`
Since: `v1.2.0`
@@ -27,6 +28,17 @@ SELECT ST_GeomFromWKB(polygontable._c0) AS polygonshape
FROM polygontable
```
+Format:
+`ST_GeomFromWKB (Wkb:bytes)`
+
+Since: `v1.2.1`
+
+SQL example:
+```SQL
+SELECT ST_GeomFromWKB(polygontable._c0) AS polygonshape
+FROM polygontable
+```
+
## ST_GeomFromGeoJSON
Introduction: Construct a Geometry from GeoJson
@@ -93,4 +105,4 @@ Since: `v1.2.1`
SQL example:
```SQL
SELECT ST_GeomFromGeoHash('s00twy01mt', 4) AS geom
-```
\ No newline at end of file
+```
diff --git a/docs/api/sql/Constructor.md b/docs/api/sql/Constructor.md
index cae9bf0c..c8213daa 100644
--- a/docs/api/sql/Constructor.md
+++ b/docs/api/sql/Constructor.md
@@ -19,10 +19,11 @@ SELECT ST_GeomFromWKT('POINT(40.7128 -74.0060)') AS geometry
## ST_GeomFromWKB
-Introduction: Construct a Geometry from WKB string
+Introduction: Construct a Geometry from WKB string or Binary
Format:
`ST_GeomFromWKB (Wkb:string)`
+`ST_GeomFromWKB (Wkb:binary)`
Since: `v1.0.0`
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
index 1a9fcb41..2b2815c0 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
@@ -22,6 +22,7 @@ import org.apache.spark.sql.sedona_sql.expressions.geohash.GeoHashDecoder;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.io.WKBReader;
import org.locationtech.jts.io.ParseException;
public class Constructors {
@@ -83,6 +84,13 @@ public class Constructors {
FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKB, false);
return formatUtils.readGeometry(wkbString);
}
+
+ @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+ public Geometry eval(@DataTypeHint("Bytes") byte[] wkb) throws ParseException {
+ WKBReader wkbReader = new WKBReader();
+ return wkbReader.read(wkb);
+ }
+
}
public static class ST_GeomFromGeoJSON extends ScalarFunction {
@@ -107,4 +115,4 @@ public class Constructors {
return eval(value, null);
}
}
-}
\ No newline at end of file
+}
diff --git a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
index e046679d..e828a854 100644
--- a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
@@ -30,6 +30,12 @@ import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
import static org.junit.Assert.assertEquals;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
public class ConstructorTest extends TestBase{
@BeforeClass
@@ -102,6 +108,31 @@ public class ConstructorTest extends TestBase{
assertEquals(result, expectedGeom);
}
+ @Test
+ public void testGeomFromWKBBytes()
+ {
+ byte[] wkb = new byte[]{1, 2, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, -124, -42, 0, -64, 0, 0, 0, 0, -128, -75, -42, -65, 0, 0, 0, 96, -31, -17, -9, -65, 0, 0, 0, -128, 7, 93, -27, -65};
+ List<Row> data = new ArrayList<>();
+ data.add(Row.of(wkb, "polygon"));
+ TypeInformation<?>[] colTypes = {
+ PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO};
+ RowTypeInfo typeInfo = new RowTypeInfo(colTypes, polygonColNames);
+ DataStream<Row> wkbDS = env.fromCollection(data).returns(typeInfo);
+ Table wkbTable = tableEnv.fromDataStream(wkbDS, $(polygonColNames[0]), $(polygonColNames[1]));
+
+ Table geomTable = wkbTable.select(
+ call(Constructors.ST_GeomFromWKB.class.getSimpleName(), $(polygonColNames[0])).
+ as(polygonColNames[0]), $(polygonColNames[1]));
+ String result = first(geomTable).
+ getFieldAs(0).toString();
+
+ String expectedGeom = "LINESTRING (-2.1047439575195312 -0.354827880859375, -1.49606454372406 -0.6676061153411865)";
+
+ assertEquals(result, expectedGeom);
+
+ }
+
@Test
public void testGeomFromGeoHash() {
Integer precision = 2;
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala
index 9025be4a..ff9aa605 100644
--- a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala
@@ -28,9 +28,10 @@ import org.apache.spark.sql.catalyst.util.GenericArrayData
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.sql.sedona_sql.expressions.geohash.GeoHashDecoder
import org.apache.spark.sql.sedona_sql.expressions.implicits.{GeometryEnhancer, InputExpressionEnhancer, SequenceEnhancer}
-import org.apache.spark.sql.types.{DataType, Decimal}
+import org.apache.spark.sql.types.{BinaryType, DataType, Decimal, StringType}
import org.apache.spark.unsafe.types.UTF8String
import org.locationtech.jts.geom.{Coordinate, GeometryFactory}
+import org.locationtech.jts.io.WKBReader
/**
* Return a point from a string. The string must be plain string and each coordinate must be separated by a delimiter.
@@ -224,7 +225,7 @@ case class ST_GeomFromText(inputExpressions: Seq[Expression])
/**
* Return a Geometry from a WKB string
*
- * @param inputExpressions This function takes 1 parameter which is the geometry string. The string format must be WKB.
+ * @param inputExpressions This function takes 1 parameter which is the utf-8 encoded geometry wkb string or the binary wkb array.
*/
case class ST_GeomFromWKB(inputExpressions: Seq[Expression])
extends Expression with CodegenFallback with UserDataGeneratator {
@@ -234,11 +235,21 @@ case class ST_GeomFromWKB(inputExpressions: Seq[Expression])
override def nullable: Boolean = false
override def eval(inputRow: InternalRow): Any = {
- val geomString = inputExpressions.head.eval(inputRow).asInstanceOf[UTF8String].toString
- var fileDataSplitter = FileDataSplitter.WKB
- var formatMapper = new FormatMapper(fileDataSplitter, false)
- var geometry = formatMapper.readGeometry(geomString)
- new GenericArrayData(GeometrySerializer.serialize(geometry))
+ if (inputExpressions.head.dataType.equals(StringType)) {
+ // Parse UTF-8 encoded wkb string
+ val geomString = inputExpressions.head.eval(inputRow).asInstanceOf[UTF8String].toString
+ val fileDataSplitter = FileDataSplitter.WKB
+ val formatMapper = new FormatMapper(fileDataSplitter, false)
+ val geometry = formatMapper.readGeometry(geomString)
+ new GenericArrayData(GeometrySerializer.serialize(geometry))
+ }
+ else if (inputExpressions.head.dataType.equals(BinaryType)) {
+ // convert raw wkb byte array to geometry
+ val wkbReader = new WKBReader()
+ val wkb = inputExpressions.head.eval(inputRow).asInstanceOf[Array[Byte]]
+ val geometry = wkbReader.read(wkb)
+ new GenericArrayData(GeometrySerializer.serialize(geometry))
+ }
}
override def dataType: DataType = GeometryUDT
diff --git a/sql/src/test/scala/org/apache/sedona/sql/constructorTestScala.scala b/sql/src/test/scala/org/apache/sedona/sql/constructorTestScala.scala
index 50abcc3d..3c754f3e 100644
--- a/sql/src/test/scala/org/apache/sedona/sql/constructorTestScala.scala
+++ b/sql/src/test/scala/org/apache/sedona/sql/constructorTestScala.scala
@@ -114,10 +114,18 @@ class constructorTestScala extends TestBaseScala {
}
it("Passed ST_GeomFromWKB") {
- var polygonWkbDf = sparkSession.read.format("csv").option("delimiter", "\t").option("header", "false").load(mixedWkbGeometryInputLocation)
+ // UTF-8 encoded WKB String
+ val polygonWkbDf = sparkSession.read.format("csv").option("delimiter", "\t").option("header", "false").load(mixedWkbGeometryInputLocation)
polygonWkbDf.createOrReplaceTempView("polygontable")
- var polygonDf = sparkSession.sql("select ST_GeomFromWKB(polygontable._c0) as countyshape from polygontable")
+ val polygonDf = sparkSession.sql("select ST_GeomFromWKB(polygontable._c0) as countyshape from polygontable")
assert(polygonDf.count() == 100)
+ // RAW binary array
+ val wkbSeq = Seq[Array[Byte]](Array[Byte](1, 2, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, -124, -42, 0, -64, 0, 0, 0, 0, -128, -75, -42, -65, 0, 0, 0, 96, -31, -17, -9, -65, 0, 0, 0, -128, 7, 93, -27, -65))
+ val rawWkbDf = wkbSeq.toDF("wkb")
+ rawWkbDf.createOrReplaceTempView("rawWKBTable")
+ val geometries = sparkSession.sql("SELECT ST_GeomFromWKB(rawWKBTable.wkb) as countyshape from rawWKBTable")
+ val expectedGeom = "LINESTRING (-2.1047439575195312 -0.354827880859375, -1.49606454372406 -0.6676061153411865)";
+ assert(geometries.first().getAs[Geometry](0).toString.equals(expectedGeom))
}
it("Passed ST_GeomFromGeoJSON") {