You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2022/03/05 07:05:42 UTC

[incubator-sedona] branch master updated: [SEDONA-87] Support Flink Table and DataStream conversion (#588)

This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 358ae35  [SEDONA-87] Support Flink Table and DataStream conversion (#588)
358ae35 is described below

commit 358ae3548032078205cf0d00d950cbd76977f33e
Author: Jia Yu <ji...@apache.org>
AuthorDate: Fri Mar 4 23:05:34 2022 -0800

    [SEDONA-87] Support Flink Table and DataStream conversion (#588)
---
 .../sedona/flink/expressions/Constructors.java     | 14 ++---
 .../apache/sedona/flink/expressions/Functions.java | 17 +++---
 .../sedona/flink/expressions/Predicates.java       |  8 +--
 .../java/org.apache.sedona/flink/AdapterTest.java  | 60 ++++++++++++++++++++++
 .../java/org.apache.sedona/flink/FunctionTest.java |  1 -
 .../java/org.apache.sedona/flink/TestBase.java     |  6 +--
 6 files changed, 83 insertions(+), 23 deletions(-)

diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
index 41e744b..b337104 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
@@ -25,21 +25,21 @@ import org.locationtech.jts.io.ParseException;
 
 public class Constructors {
     public static class ST_PointFromText extends ScalarFunction {
-        @DataTypeHint("RAW")
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
         public Geometry eval(@DataTypeHint("String") String s, @DataTypeHint("String") String inputDelimiter) throws ParseException {
             FileDataSplitter delimiter = inputDelimiter == null? FileDataSplitter.CSV:FileDataSplitter.getFileDataSplitter(inputDelimiter);
             FormatUtils<Geometry> formatUtils = new FormatUtils(delimiter, false, GeometryType.POINT);
             return formatUtils.readGeometry(s);
         }
 
-        @DataTypeHint("RAW")
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
         public Geometry eval(@DataTypeHint("String") String s) throws ParseException {
             return eval(s, null);
         }
     }
 
     public static class ST_PolygonFromText extends ScalarFunction {
-        @DataTypeHint("RAW")
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
         public Geometry eval(@DataTypeHint("String") String s, @DataTypeHint("String") String inputDelimiter) throws ParseException {
             // The default delimiter is comma. Otherwise, use the delimiter given by the user
             FileDataSplitter delimiter = inputDelimiter == null? FileDataSplitter.CSV:FileDataSplitter.getFileDataSplitter(inputDelimiter);
@@ -47,14 +47,14 @@ public class Constructors {
             return formatUtils.readGeometry(s);
         }
 
-        @DataTypeHint("RAW")
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
         public Geometry eval(@DataTypeHint("String") String s) throws ParseException {
             return eval(s, null);
         }
     }
 
     public static class ST_PolygonFromEnvelope extends ScalarFunction {
-        @DataTypeHint("RAW")
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
         public Geometry eval(@DataTypeHint("Double") Double minX, @DataTypeHint("Double") Double minY,
                              @DataTypeHint("Double") Double maxX, @DataTypeHint("Double") Double maxY) {
             Coordinate[] coordinates = new Coordinate[5];
@@ -69,7 +69,7 @@ public class Constructors {
     }
 
     public static class ST_GeomFromWKT extends ScalarFunction {
-        @DataTypeHint("RAW")
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
         public Geometry eval(@DataTypeHint("String") String wktString) throws ParseException {
             FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKT, false);
             return formatUtils.readGeometry(wktString);
@@ -77,7 +77,7 @@ public class Constructors {
     }
 
     public static class ST_GeomFromWKB extends ScalarFunction {
-        @DataTypeHint("RAW")
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
         public Geometry eval(@DataTypeHint("String") String wkbString) throws ParseException {
             FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKB, false);
             return formatUtils.readGeometry(wkbString);
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
index 2b17836..6f43143 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
@@ -31,8 +31,8 @@ import java.util.Optional;
 
 public class Functions {
     public static class ST_Buffer extends ScalarFunction {
-        @DataTypeHint("RAW")
-        public Geometry eval(@DataTypeHint("RAW") Object o, @DataTypeHint("Double") Double radius) {
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+        public Geometry eval(Object o, @DataTypeHint("Double") Double radius) {
             Geometry geom = (Geometry) o;
             return geom.buffer(radius);
         }
@@ -40,7 +40,8 @@ public class Functions {
 
     public static class ST_Distance extends ScalarFunction {
         @DataTypeHint("Double")
-        public Double eval(@DataTypeHint("RAW") Object o1, @DataTypeHint("RAW") Object o2) {
+        public Double eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1,
+                @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2) {
             Geometry geom1 = (Geometry) o1;
             Geometry geom2 = (Geometry) o2;
             return geom1.distance(geom2);
@@ -48,8 +49,8 @@ public class Functions {
     }
 
     public static class ST_Transform extends ScalarFunction {
-        @DataTypeHint("RAW")
-        public Geometry eval(@DataTypeHint("RAW") Object o, @DataTypeHint("String") String sourceCRS, @DataTypeHint("String") String targetCRS) {
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+        public Geometry eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o, @DataTypeHint("String") String sourceCRS, @DataTypeHint("String") String targetCRS) {
             Geometry geom = (Geometry) o;
             try {
                 CoordinateReferenceSystem sourceCRScode = CRS.decode(sourceCRS);
@@ -64,8 +65,8 @@ public class Functions {
     }
 
     public static class ST_FlipCoordinates extends ScalarFunction {
-        @DataTypeHint("RAW")
-        public Geometry eval(@DataTypeHint("RAW") Object o) {
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+        public Geometry eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o) {
             Geometry geom = (Geometry) o;
             GeomUtils.flipCoordinates(geom);
             return geom;
@@ -74,7 +75,7 @@ public class Functions {
 
     public static class ST_GeoHash extends ScalarFunction {
         @DataTypeHint("RAW")
-        public Optional<String> eval(@DataTypeHint("RAW") Object geometry, Integer precision) {
+        public Optional<String> eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object geometry, Integer precision) {
             Geometry geom = (Geometry) geometry;
             Option<String> geoHash = GeometryGeoHashEncoder.calculate(geom, precision);
             if (geoHash.isDefined()){
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java
index c1a8e13..eedcc1c 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java
@@ -42,7 +42,7 @@ public class Predicates {
         }
 
         @DataTypeHint("Boolean")
-        public Boolean eval(@DataTypeHint("RAW") Object o1, @DataTypeHint("RAW") Object o2) {
+        public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2) {
             Geometry geom1 = (Geometry) o1;
             Geometry geom2 = (Geometry) o2;
             return geom1.intersects(geom2);
@@ -56,7 +56,7 @@ public class Predicates {
          * @return
          */
         @DataTypeHint("Boolean")
-        public Boolean eval(@DataTypeHint("INT") Integer key, @DataTypeHint("RAW") Object o1, @DataTypeHint("RAW") Object o2) {
+        public Boolean eval(@DataTypeHint("INT") Integer key, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2) {
             Objects.requireNonNull(grids, "This predicate has to be initialized by a partitioner.");
             Geometry geom1 = (Geometry) o1;
             Geometry geom2 = (Geometry) o2;
@@ -82,7 +82,7 @@ public class Predicates {
         }
 
         @DataTypeHint("Boolean")
-        public Boolean eval(@DataTypeHint("RAW") Object o1, @DataTypeHint("RAW") Object o2) {
+        public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2) {
             Geometry geom1 = (Geometry) o1;
             Geometry geom2 = (Geometry) o2;
             return geom1.covers(geom2);
@@ -96,7 +96,7 @@ public class Predicates {
          * @return
          */
         @DataTypeHint("Boolean")
-        public Boolean eval(@DataTypeHint("INT") Integer key, @DataTypeHint("RAW") Object o1, @DataTypeHint("RAW") Object o2) {
+        public Boolean eval(@DataTypeHint("INT") Integer key, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2) {
             Objects.requireNonNull(grids, "This predicate has to be initialized by a partitioner.");
             Geometry geom1 = (Geometry) o1;
             Geometry geom2 = (Geometry) o2;
diff --git a/flink/src/test/java/org.apache.sedona/flink/AdapterTest.java b/flink/src/test/java/org.apache.sedona/flink/AdapterTest.java
new file mode 100644
index 0000000..cb3c4cd
--- /dev/null
+++ b/flink/src/test/java/org.apache.sedona/flink/AdapterTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.types.Row;
+import org.apache.sedona.flink.expressions.Constructors;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
+import static org.junit.Assert.assertEquals;
+
+public class AdapterTest extends TestBase
+{
+    @BeforeClass
+    public static void onceExecutedBeforeAll() {
+        initialize();
+    }
+
+    @Test
+    public void testTableToDS()
+            throws Exception
+    {
+        List<Row> data = createPolygonWKT(testDataSize);
+        Table wktTable = createTextTable(data, polygonColNames);
+        Table geomTable = wktTable.select(call(Constructors.ST_GeomFromWKT.class.getSimpleName(),
+                        $(polygonColNames[0])).as(polygonColNames[0]),
+                $(polygonColNames[1]));
+        Row result = last(geomTable);
+        assertEquals(result.toString(), data.get(data.size() - 1).toString());
+        // GeomTable to GeomDS
+        DataStream<Row> geomStream = tableEnv.toDataStream(geomTable);
+        assertEquals(geomStream.executeAndCollect(1).get(0).toString(), data.get(0).toString());
+        // GeomDS to GeomTable
+        geomTable = tableEnv.fromDataStream(geomStream);
+        result = last(geomTable);
+        assertEquals(data.get(data.size() - 1).toString(), result.toString());
+    }
+}
diff --git a/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java b/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java
index 9b702bc..2d244a7 100644
--- a/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java
+++ b/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java
@@ -18,7 +18,6 @@ import org.apache.sedona.flink.expressions.Functions;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.locationtech.jts.geom.Geometry;
-import scala.Some;
 
 import java.util.Optional;
 
diff --git a/flink/src/test/java/org.apache.sedona/flink/TestBase.java b/flink/src/test/java/org.apache.sedona/flink/TestBase.java
index 372f4ee..97f7fe8 100644
--- a/flink/src/test/java/org.apache.sedona/flink/TestBase.java
+++ b/flink/src/test/java/org.apache.sedona/flink/TestBase.java
@@ -26,10 +26,10 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-import org.apache.sedona.core.enums.GridType;
-import org.apache.sedona.core.spatialPartitioning.PartitioningUtils;
 import org.apache.sedona.flink.expressions.Constructors;
-import org.locationtech.jts.geom.*;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.geom.Point;
 
 import java.util.ArrayList;
 import java.util.List;