You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2022/03/05 04:24:04 UTC
[incubator-sedona] branch flink-table-ds updated: Support Table and DataStream conversion
This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch flink-table-ds
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git
The following commit(s) were added to refs/heads/flink-table-ds by this push:
new 98f3536 Support Table and DataStream conversion
98f3536 is described below
commit 98f3536b52342d982d44ccf83f47bb7beea0b0e4
Author: Jia Yu <ji...@apache.org>
AuthorDate: Fri Mar 4 20:22:13 2022 -0800
Support Table and DataStream conversion
---
.../sedona/flink/expressions/Constructors.java | 14 ++---
.../apache/sedona/flink/expressions/Functions.java | 17 +++---
.../sedona/flink/expressions/Predicates.java | 8 +--
.../java/org.apache.sedona/flink/AdapterTest.java | 60 ++++++++++++++++++++++
.../java/org.apache.sedona/flink/FunctionTest.java | 1 -
.../java/org.apache.sedona/flink/TestBase.java | 6 +--
6 files changed, 83 insertions(+), 23 deletions(-)
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
index 41e744b..b337104 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
@@ -25,21 +25,21 @@ import org.locationtech.jts.io.ParseException;
public class Constructors {
public static class ST_PointFromText extends ScalarFunction {
- @DataTypeHint("RAW")
+ @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
public Geometry eval(@DataTypeHint("String") String s, @DataTypeHint("String") String inputDelimiter) throws ParseException {
FileDataSplitter delimiter = inputDelimiter == null? FileDataSplitter.CSV:FileDataSplitter.getFileDataSplitter(inputDelimiter);
FormatUtils<Geometry> formatUtils = new FormatUtils(delimiter, false, GeometryType.POINT);
return formatUtils.readGeometry(s);
}
- @DataTypeHint("RAW")
+ @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
public Geometry eval(@DataTypeHint("String") String s) throws ParseException {
return eval(s, null);
}
}
public static class ST_PolygonFromText extends ScalarFunction {
- @DataTypeHint("RAW")
+ @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
public Geometry eval(@DataTypeHint("String") String s, @DataTypeHint("String") String inputDelimiter) throws ParseException {
// The default delimiter is comma. Otherwise, use the delimiter given by the user
FileDataSplitter delimiter = inputDelimiter == null? FileDataSplitter.CSV:FileDataSplitter.getFileDataSplitter(inputDelimiter);
@@ -47,14 +47,14 @@ public class Constructors {
return formatUtils.readGeometry(s);
}
- @DataTypeHint("RAW")
+ @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
public Geometry eval(@DataTypeHint("String") String s) throws ParseException {
return eval(s, null);
}
}
public static class ST_PolygonFromEnvelope extends ScalarFunction {
- @DataTypeHint("RAW")
+ @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
public Geometry eval(@DataTypeHint("Double") Double minX, @DataTypeHint("Double") Double minY,
@DataTypeHint("Double") Double maxX, @DataTypeHint("Double") Double maxY) {
Coordinate[] coordinates = new Coordinate[5];
@@ -69,7 +69,7 @@ public class Constructors {
}
public static class ST_GeomFromWKT extends ScalarFunction {
- @DataTypeHint("RAW")
+ @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
public Geometry eval(@DataTypeHint("String") String wktString) throws ParseException {
FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKT, false);
return formatUtils.readGeometry(wktString);
@@ -77,7 +77,7 @@ public class Constructors {
}
public static class ST_GeomFromWKB extends ScalarFunction {
- @DataTypeHint("RAW")
+ @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
public Geometry eval(@DataTypeHint("String") String wkbString) throws ParseException {
FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKB, false);
return formatUtils.readGeometry(wkbString);
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
index 2b17836..6f43143 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
@@ -31,8 +31,8 @@ import java.util.Optional;
public class Functions {
public static class ST_Buffer extends ScalarFunction {
- @DataTypeHint("RAW")
- public Geometry eval(@DataTypeHint("RAW") Object o, @DataTypeHint("Double") Double radius) {
+ @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+ public Geometry eval(Object o, @DataTypeHint("Double") Double radius) {
Geometry geom = (Geometry) o;
return geom.buffer(radius);
}
@@ -40,7 +40,8 @@ public class Functions {
public static class ST_Distance extends ScalarFunction {
@DataTypeHint("Double")
- public Double eval(@DataTypeHint("RAW") Object o1, @DataTypeHint("RAW") Object o2) {
+ public Double eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1,
+ @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2) {
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
return geom1.distance(geom2);
@@ -48,8 +49,8 @@ public class Functions {
}
public static class ST_Transform extends ScalarFunction {
- @DataTypeHint("RAW")
- public Geometry eval(@DataTypeHint("RAW") Object o, @DataTypeHint("String") String sourceCRS, @DataTypeHint("String") String targetCRS) {
+ @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+ public Geometry eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o, @DataTypeHint("String") String sourceCRS, @DataTypeHint("String") String targetCRS) {
Geometry geom = (Geometry) o;
try {
CoordinateReferenceSystem sourceCRScode = CRS.decode(sourceCRS);
@@ -64,8 +65,8 @@ public class Functions {
}
public static class ST_FlipCoordinates extends ScalarFunction {
- @DataTypeHint("RAW")
- public Geometry eval(@DataTypeHint("RAW") Object o) {
+ @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+ public Geometry eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o) {
Geometry geom = (Geometry) o;
GeomUtils.flipCoordinates(geom);
return geom;
@@ -74,7 +75,7 @@ public class Functions {
public static class ST_GeoHash extends ScalarFunction {
@DataTypeHint("RAW")
- public Optional<String> eval(@DataTypeHint("RAW") Object geometry, Integer precision) {
+ public Optional<String> eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object geometry, Integer precision) {
Geometry geom = (Geometry) geometry;
Option<String> geoHash = GeometryGeoHashEncoder.calculate(geom, precision);
if (geoHash.isDefined()){
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java
index c1a8e13..eedcc1c 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java
@@ -42,7 +42,7 @@ public class Predicates {
}
@DataTypeHint("Boolean")
- public Boolean eval(@DataTypeHint("RAW") Object o1, @DataTypeHint("RAW") Object o2) {
+ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2) {
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
return geom1.intersects(geom2);
@@ -56,7 +56,7 @@ public class Predicates {
* @return
*/
@DataTypeHint("Boolean")
- public Boolean eval(@DataTypeHint("INT") Integer key, @DataTypeHint("RAW") Object o1, @DataTypeHint("RAW") Object o2) {
+ public Boolean eval(@DataTypeHint("INT") Integer key, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2) {
Objects.requireNonNull(grids, "This predicate has to be initialized by a partitioner.");
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
@@ -82,7 +82,7 @@ public class Predicates {
}
@DataTypeHint("Boolean")
- public Boolean eval(@DataTypeHint("RAW") Object o1, @DataTypeHint("RAW") Object o2) {
+ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2) {
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
return geom1.covers(geom2);
@@ -96,7 +96,7 @@ public class Predicates {
* @return
*/
@DataTypeHint("Boolean")
- public Boolean eval(@DataTypeHint("INT") Integer key, @DataTypeHint("RAW") Object o1, @DataTypeHint("RAW") Object o2) {
+ public Boolean eval(@DataTypeHint("INT") Integer key, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2) {
Objects.requireNonNull(grids, "This predicate has to be initialized by a partitioner.");
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
diff --git a/flink/src/test/java/org.apache.sedona/flink/AdapterTest.java b/flink/src/test/java/org.apache.sedona/flink/AdapterTest.java
new file mode 100644
index 0000000..cb3c4cd
--- /dev/null
+++ b/flink/src/test/java/org.apache.sedona/flink/AdapterTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.types.Row;
+import org.apache.sedona.flink.expressions.Constructors;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
+import static org.junit.Assert.assertEquals;
+
+public class AdapterTest extends TestBase
+{
+ @BeforeClass
+ public static void onceExecutedBeforeAll() {
+ initialize();
+ }
+
+ @Test
+ public void testTableToDS()
+ throws Exception
+ {
+ List<Row> data = createPolygonWKT(testDataSize);
+ Table wktTable = createTextTable(data, polygonColNames);
+ Table geomTable = wktTable.select(call(Constructors.ST_GeomFromWKT.class.getSimpleName(),
+ $(polygonColNames[0])).as(polygonColNames[0]),
+ $(polygonColNames[1]));
+ Row result = last(geomTable);
+ assertEquals(result.toString(), data.get(data.size() - 1).toString());
+ // GeomTable to GeomDS
+ DataStream<Row> geomStream = tableEnv.toDataStream(geomTable);
+ assertEquals(geomStream.executeAndCollect(1).get(0).toString(), data.get(0).toString());
+ // GeomDS to GeomTable
+ geomTable = tableEnv.fromDataStream(geomStream);
+ result = last(geomTable);
+ assertEquals(data.get(data.size() - 1).toString(), result.toString());
+ }
+}
diff --git a/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java b/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java
index 9b702bc..2d244a7 100644
--- a/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java
+++ b/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java
@@ -18,7 +18,6 @@ import org.apache.sedona.flink.expressions.Functions;
import org.junit.BeforeClass;
import org.junit.Test;
import org.locationtech.jts.geom.Geometry;
-import scala.Some;
import java.util.Optional;
diff --git a/flink/src/test/java/org.apache.sedona/flink/TestBase.java b/flink/src/test/java/org.apache.sedona/flink/TestBase.java
index 372f4ee..97f7fe8 100644
--- a/flink/src/test/java/org.apache.sedona/flink/TestBase.java
+++ b/flink/src/test/java/org.apache.sedona/flink/TestBase.java
@@ -26,10 +26,10 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-import org.apache.sedona.core.enums.GridType;
-import org.apache.sedona.core.spatialPartitioning.PartitioningUtils;
import org.apache.sedona.flink.expressions.Constructors;
-import org.locationtech.jts.geom.*;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.geom.Point;
import java.util.ArrayList;
import java.util.List;