You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2022/06/15 21:36:05 UTC

[incubator-sedona] branch master updated: [SEDONA-121] constructors from spark2flink (#630)

This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 18e713b6 [SEDONA-121] constructors from spark2flink (#630)
18e713b6 is described below

commit 18e713b694aceed70c86b1edca93cbff94cab383
Author: Netanel Malka <ne...@gmail.com>
AuthorDate: Thu Jun 16 00:36:00 2022 +0300

    [SEDONA-121] constructors from spark2flink (#630)
---
 docs/api/flink/Constructor.md                      | 54 +++++++++++++++
 .../main/java/org/apache/sedona/flink/Catalog.java |  4 ++
 .../sedona/flink/expressions/Constructors.java     | 79 ++++++++++++++++++----
 .../org/apache/sedona/flink/ConstructorTest.java   | 62 +++++++++++++++++
 .../java/org/apache/sedona/flink/TestBase.java     | 47 +++++++++++--
 5 files changed, 227 insertions(+), 19 deletions(-)

diff --git a/docs/api/flink/Constructor.md b/docs/api/flink/Constructor.md
index e02d00ec..b193fe9f 100644
--- a/docs/api/flink/Constructor.md
+++ b/docs/api/flink/Constructor.md
@@ -1,3 +1,17 @@
+## ST_Point
+
+Introduction: Construct a Point from X and Y
+
+Format: `ST_Point (X:decimal, Y:decimal)`
+
+Since: `v1.2.1`
+
+SQL example:
+```SQL
+SELECT ST_Point(x, y) AS pointshape
+FROM pointtable
+```
+
 ## ST_GeomFromWKT
 
 Introduction: Construct a Geometry from Wkt
@@ -12,6 +26,20 @@ SQL example:
 SELECT ST_GeomFromWKT('POINT(40.7128 -74.0060)') AS geometry
 ```
 
+## ST_GeomFromText
+
+Introduction: Construct a Geometry from Wkt. Alias of  [ST_GeomFromWKT](#ST_GeomFromWKT)
+
+Format:
+`ST_GeomFromText (Wkt:string)`
+
+Since: `v1.2.1`
+
+SQL example:
+```SQL
+SELECT ST_GeomFromText('POINT(40.7128 -74.0060)') AS geometry
+```
+
 ## ST_GeomFromWKB
 
 Introduction: Construct a Geometry from WKB string or Binary
@@ -66,6 +94,32 @@ SQL example:
 SELECT ST_PointFromText('40.7128,-74.0060', ',') AS pointshape
 ```
 
+## ST_LineFromText
+
+Introduction: Construct a LineString from Text, delimited by Delimiter (Optional)
+
+Format: `ST_LineFromText (Text:string, Delimiter:char)`
+
+Since: `v1.2.1`
+
+SQL example:
+```SQL
+SELECT ST_LineFromText('Linestring(1 2, 3 4)') AS line
+```
+
+## ST_LineStringFromText
+
+Introduction: Construct a LineString from Text, delimited by Delimiter (Optional). Alias of  [ST_LineFromText](#ST_LineFromText)
+
+Format: `ST_LineStringFromText (Text:string, Delimiter:char)`
+
+Since: `v1.2.1`
+
+Spark SQL example:
+```SQL
+SELECT ST_LineStringFromText('Linestring(1 2, 3 4)') AS line
+```
+
 ## ST_PolygonFromText
 
 Introduction: Construct a Polygon from Text, delimited by Delimiter. Path must be closed
diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
index eac9b4f9..81c2a457 100644
--- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java
+++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
@@ -20,10 +20,14 @@ import org.apache.sedona.flink.expressions.*;
 public class Catalog {
     public static UserDefinedFunction[] getFuncs() {
         return new UserDefinedFunction[]{
+                new Constructors.ST_Point(),
                 new Constructors.ST_PointFromText(),
+                new Constructors.ST_LineStringFromText(),
+                new Constructors.ST_LineFromText(),
                 new Constructors.ST_PolygonFromText(),
                 new Constructors.ST_PolygonFromEnvelope(),
                 new Constructors.ST_GeomFromWKT(),
+                new Constructors.ST_GeomFromText(),
                 new Constructors.ST_GeomFromWKB(),
                 new Constructors.ST_GeomFromGeoJSON(),
                 new Constructors.ST_GeomFromGeoHash(),
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
index 2b2815c0..fc006e18 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
@@ -26,12 +26,26 @@ import org.locationtech.jts.io.WKBReader;
 import org.locationtech.jts.io.ParseException;
 
 public class Constructors {
+
+    private static Geometry getGeometryByType(String geom, String inputDelimiter, GeometryType geometryType) throws ParseException {
+        FileDataSplitter delimiter = inputDelimiter == null? FileDataSplitter.CSV:FileDataSplitter.getFileDataSplitter(inputDelimiter);
+        FormatUtils<Geometry> formatUtils = new FormatUtils<>(delimiter, false, geometryType);
+        return formatUtils.readGeometry(geom);
+    }
+
+    public static class ST_Point extends ScalarFunction {
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+        public Geometry eval(@DataTypeHint("Double") Double x, @DataTypeHint("Double") Double y) throws ParseException {
+            Coordinate coordinates = new Coordinate(x, y);
+            GeometryFactory geometryFactory = new GeometryFactory();
+            return geometryFactory.createPoint(coordinates);
+        }
+    }
+
     public static class ST_PointFromText extends ScalarFunction {
         @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
         public Geometry eval(@DataTypeHint("String") String s, @DataTypeHint("String") String inputDelimiter) throws ParseException {
-            FileDataSplitter delimiter = inputDelimiter == null? FileDataSplitter.CSV:FileDataSplitter.getFileDataSplitter(inputDelimiter);
-            FormatUtils<Geometry> formatUtils = new FormatUtils(delimiter, false, GeometryType.POINT);
-            return formatUtils.readGeometry(s);
+            return getGeometryByType(s, inputDelimiter, GeometryType.POINT);
         }
 
         @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
@@ -40,18 +54,44 @@ public class Constructors {
         }
     }
 
+    public static class ST_LineFromText extends ScalarFunction {
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+        public Geometry eval(@DataTypeHint("String") String lineString,
+                             @DataTypeHint("String") String inputDelimiter) throws ParseException {
+            // The default delimiter is comma. Otherwise, use the delimiter given by the user
+            return getGeometryByType(lineString, inputDelimiter, GeometryType.LINESTRING);
+        }
+
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+        public Geometry eval(@DataTypeHint("String") String lineString) throws ParseException {
+            return eval(lineString, null);
+        }
+    }
+
+    public static class ST_LineStringFromText extends ScalarFunction {
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+        public Geometry eval(@DataTypeHint("String") String lineString,
+                             @DataTypeHint("String") String inputDelimiter) throws ParseException {
+            // The default delimiter is comma. Otherwise, use the delimiter given by the user
+            return new ST_LineFromText().eval(lineString, inputDelimiter);
+        }
+
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+        public Geometry eval(@DataTypeHint("String") String lineString) throws ParseException {
+            return eval(lineString, null);
+        }
+    }
+
     public static class ST_PolygonFromText extends ScalarFunction {
         @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
-        public Geometry eval(@DataTypeHint("String") String s, @DataTypeHint("String") String inputDelimiter) throws ParseException {
+        public Geometry eval(@DataTypeHint("String") String polygonString, @DataTypeHint("String") String inputDelimiter) throws ParseException {
             // The default delimiter is comma. Otherwise, use the delimiter given by the user
-            FileDataSplitter delimiter = inputDelimiter == null? FileDataSplitter.CSV:FileDataSplitter.getFileDataSplitter(inputDelimiter);
-            FormatUtils<Geometry> formatUtils = new FormatUtils(delimiter, false, GeometryType.POLYGON);
-            return formatUtils.readGeometry(s);
+            return getGeometryByType(polygonString, inputDelimiter, GeometryType.POLYGON);
         }
 
         @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
-        public Geometry eval(@DataTypeHint("String") String s) throws ParseException {
-            return eval(s, null);
+        public Geometry eval(@DataTypeHint("String") String polygonString) throws ParseException {
+            return eval(polygonString, null);
         }
     }
 
@@ -70,19 +110,29 @@ public class Constructors {
         }
     }
 
+    private static Geometry getGeometryByFileData(String wktString, FileDataSplitter dataSplitter) throws ParseException {
+        FormatUtils<Geometry> formatUtils = new FormatUtils<>(dataSplitter, false);
+        return formatUtils.readGeometry(wktString);
+    }
+
     public static class ST_GeomFromWKT extends ScalarFunction {
         @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
         public Geometry eval(@DataTypeHint("String") String wktString) throws ParseException {
-            FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKT, false);
-            return formatUtils.readGeometry(wktString);
+            return getGeometryByFileData(wktString, FileDataSplitter.WKT);
+        }
+    }
+
+    public static class ST_GeomFromText extends ScalarFunction {
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+        public Geometry eval(@DataTypeHint("String") String wktString) throws ParseException {
+            return new ST_GeomFromWKT().eval(wktString);
         }
     }
 
     public static class ST_GeomFromWKB extends ScalarFunction {
         @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
         public Geometry eval(@DataTypeHint("String") String wkbString) throws ParseException {
-            FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKB, false);
-            return formatUtils.readGeometry(wkbString);
+            return getGeometryByFileData(wkbString, FileDataSplitter.WKB);
         }
 
         @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
@@ -96,8 +146,7 @@ public class Constructors {
     public static class ST_GeomFromGeoJSON extends ScalarFunction {
         @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
         public Geometry eval(@DataTypeHint("String") String geoJson) throws ParseException {
-            FormatUtils formatUtils = new FormatUtils(FileDataSplitter.GEOJSON, false);
-            return formatUtils.readGeometry(geoJson);
+            return getGeometryByFileData(geoJson, FileDataSplitter.GEOJSON);
         }
     }
 
diff --git a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
index e828a854..077130d5 100644
--- a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
@@ -43,6 +43,33 @@ public class ConstructorTest extends TestBase{
         initialize();
     }
 
+    @Test
+    public void test2DPoint() {
+        List<Row> data = new ArrayList<>();
+        data.add(Row.of(1.0, 2.0 , "point"));
+        String[] colNames = new String[]{"x", "y", "name_point"};
+
+        TypeInformation<?>[] colTypes = {
+                BasicTypeInfo.DOUBLE_TYPE_INFO,
+                BasicTypeInfo.DOUBLE_TYPE_INFO,
+                BasicTypeInfo.STRING_TYPE_INFO};
+        RowTypeInfo typeInfo = new RowTypeInfo(colTypes, colNames);
+        DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
+        Table pointTable = tableEnv.fromDataStream(ds);
+
+        Table geomTable = pointTable
+                .select(call(Constructors.ST_Point.class.getSimpleName(), $(colNames[0]), $(colNames[1]))
+                        .as(colNames[2]));
+
+        String result = first(geomTable)
+                .getFieldAs(colNames[2])
+                .toString();
+
+        String expected = "POINT (1 2)";
+
+        assertEquals(result, expected);
+    }
+
     @Test
     public void testPointFromText() {
         List<Row> data = createPointWKT(testDataSize);
@@ -50,6 +77,30 @@ public class ConstructorTest extends TestBase{
         assertEquals(result.toString(), data.get(data.size() - 1).toString());
     }
 
+    @Test
+    public void testLineFromText() {
+        List<Row> data = createLineStringWKT(testDataSize);
+
+        Table lineStringTable = createLineStringTextTable(testDataSize)
+                .select(call(Constructors.ST_LineFromText.class.getSimpleName(), $(linestringColNames[0])).as(linestringColNames[0]),
+                        $(linestringColNames[1]));
+        Row result = last(lineStringTable);
+
+        assertEquals(result.toString(), data.get(data.size() - 1).toString());
+    }
+
+    @Test
+    public void testLineStringFromText() {
+        List<Row> data = createLineStringWKT(testDataSize);
+
+        Table lineStringTable = createLineStringTextTable(testDataSize)
+                .select(call(Constructors.ST_LineStringFromText.class.getSimpleName(), $(linestringColNames[0])).as(linestringColNames[0]),
+                        $(linestringColNames[1]));
+        Row result = last(lineStringTable);
+
+        assertEquals(result.toString(), data.get(data.size() - 1).toString());
+    }
+
     @Test
     public void testPolygonFromText() {
         List<Row> data = createPolygonWKT(testDataSize);
@@ -68,6 +119,17 @@ public class ConstructorTest extends TestBase{
         assertEquals(result.toString(), data.get(data.size() - 1).toString());
     }
 
+    @Test
+    public void testGeomFromText() {
+        List<Row> data = createPolygonWKT(testDataSize);
+        Table wktTable = createTextTable(data, polygonColNames);
+        Table geomTable = wktTable.select(call(Constructors.ST_GeomFromText.class.getSimpleName(),
+                        $(polygonColNames[0])).as(polygonColNames[0]),
+                $(polygonColNames[1]));
+        Row result = last(geomTable);
+        assertEquals(result.toString(), data.get(data.size() - 1).toString());
+    }
+
     @Test
     public void testPolygonFromEnvelope() {
         Double minX = 1.0;
diff --git a/flink/src/test/java/org/apache/sedona/flink/TestBase.java b/flink/src/test/java/org/apache/sedona/flink/TestBase.java
index 9623d231..3dd99432 100644
--- a/flink/src/test/java/org/apache/sedona/flink/TestBase.java
+++ b/flink/src/test/java/org/apache/sedona/flink/TestBase.java
@@ -28,14 +28,10 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.sedona.flink.expressions.Constructors;
 import org.locationtech.jts.geom.*;
-import org.locationtech.jts.geom.impl.CoordinateArraySequence;
-import org.wololo.geojson.Feature;
 import org.wololo.jts2geojson.GeoJSONWriter;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.api.Expressions.call;
@@ -45,6 +41,7 @@ public class TestBase {
     protected static StreamTableEnvironment tableEnv;
     static int testDataSize = 1000;
     static String[] pointColNames = {"geom_point", "name_point"};
+    static String[] linestringColNames = {"geom_linestring", "name_linestring"};
     static String[] polygonColNames = {"geom_polygon", "name_polygon"};
     static String pointTableName = "point_table";
     static String polygonTableName = "polygon_table";
@@ -126,6 +123,44 @@ public class TestBase {
         return data;
     }
 
+    static List<Row> createLineStringText(int size) {
+        List<Row> data = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            // Create polygons each of which only has 1 match in points
+            // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
+            String minX = String.valueOf(i - 0.5);
+            String minY = String.valueOf(i - 0.5);
+            String maxX = String.valueOf(i + 0.5);
+            String maxY = String.valueOf(i + 0.5);
+            List<String> linestring = new ArrayList<>();
+            linestring.add(minX);
+            linestring.add(minY);
+            linestring.add(maxX);
+            linestring.add(maxY);
+
+            data.add(Row.of(String.join(",", linestring), "linestring" + i));
+        }
+        return data;
+    }
+
+    static List<Row> createLineStringWKT(int size) {
+        List<Row> data = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+
+            String minX = String.valueOf(i - 0.5);
+            String minY = String.valueOf(i - 0.5);
+            String maxX = String.valueOf(i + 0.5);
+            String maxY = String.valueOf(i + 0.5);
+
+            List<String> linestring = new ArrayList<>();
+            linestring.add(minX + " " + minY);
+            linestring.add(maxX + " " + maxY);
+
+            data.add(Row.of("LINESTRING (" + String.join(", ", linestring) + ")", "linestring" + i));
+        }
+        return data;
+    }
+
     static List<Row> createPolygonWKT(int size) {
         List<Row> data = new ArrayList<>();
         for (int i = 0; i < size; i++) {
@@ -190,6 +225,10 @@ public class TestBase {
         return createTextTable(createPointText_real(size), pointColNames);
     }
 
+    static Table createLineStringTextTable(int size) {
+        return createTextTable(createLineStringText(size), linestringColNames);
+    }
+
     static Table createPolygonTextTable(int size) {
         return createTextTable(createPolygonText(size), polygonColNames);
     }