You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ma...@apache.org on 2022/06/12 19:06:25 UTC

[incubator-sedona] branch SEDONA-121]-constructors-from-spark2flink created (now d5ba3e71)

This is an automated email from the ASF dual-hosted git repository.

malka pushed a change to branch SEDONA-121]-constructors-from-spark2flink
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


      at d5ba3e71 Add ST_Point

This branch includes the following new commits:

     new 81353acd ST_GeomFromText and ST_LineFromText, ST_LineStringFromText
     new d5ba3e71 Add ST_Point

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-sedona] 02/02: Add ST_Point

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

malka pushed a commit to branch SEDONA-121]-constructors-from-spark2flink
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git

commit d5ba3e71dc09ebc0c3dfe82c704684c6d2b714f7
Author: Netanel Malka <ne...@gmail.com>
AuthorDate: Sun Jun 12 22:01:53 2022 +0300

    Add ST_Point
---
 docs/api/flink/Constructor.md                      | 14 +++++++++++
 .../main/java/org/apache/sedona/flink/Catalog.java |  1 +
 .../sedona/flink/expressions/Constructors.java     |  9 ++++++++
 .../org/apache/sedona/flink/ConstructorTest.java   | 27 ++++++++++++++++++++++
 4 files changed, 51 insertions(+)

diff --git a/docs/api/flink/Constructor.md b/docs/api/flink/Constructor.md
index e8568b01..b193fe9f 100644
--- a/docs/api/flink/Constructor.md
+++ b/docs/api/flink/Constructor.md
@@ -1,3 +1,17 @@
+## ST_Point
+
+Introduction: Construct a Point from X and Y
+
+Format: `ST_Point (X:decimal, Y:decimal)`
+
+Since: `v1.2.1`
+
+SQL example:
+```SQL
+SELECT ST_Point(x, y) AS pointshape
+FROM pointtable
+```
+
 ## ST_GeomFromWKT
 
 Introduction: Construct a Geometry from Wkt
diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
index 9e4ff44a..81c2a457 100644
--- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java
+++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
@@ -20,6 +20,7 @@ import org.apache.sedona.flink.expressions.*;
 public class Catalog {
     public static UserDefinedFunction[] getFuncs() {
         return new UserDefinedFunction[]{
+                new Constructors.ST_Point(),
                 new Constructors.ST_PointFromText(),
                 new Constructors.ST_LineStringFromText(),
                 new Constructors.ST_LineFromText(),
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
index 97900ede..fc006e18 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
@@ -33,6 +33,15 @@ public class Constructors {
         return formatUtils.readGeometry(geom);
     }
 
+    public static class ST_Point extends ScalarFunction {
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+        public Geometry eval(@DataTypeHint("Double") Double x, @DataTypeHint("Double") Double y) throws ParseException {
+            Coordinate coordinates = new Coordinate(x, y);
+            GeometryFactory geometryFactory = new GeometryFactory();
+            return geometryFactory.createPoint(coordinates);
+        }
+    }
+
     public static class ST_PointFromText extends ScalarFunction {
         @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
         public Geometry eval(@DataTypeHint("String") String s, @DataTypeHint("String") String inputDelimiter) throws ParseException {
diff --git a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
index dd8925d6..077130d5 100644
--- a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
@@ -43,6 +43,33 @@ public class ConstructorTest extends TestBase{
         initialize();
     }
 
+    @Test
+    public void test2DPoint() {
+        List<Row> data = new ArrayList<>();
+        data.add(Row.of(1.0, 2.0 , "point"));
+        String[] colNames = new String[]{"x", "y", "name_point"};
+
+        TypeInformation<?>[] colTypes = {
+                BasicTypeInfo.DOUBLE_TYPE_INFO,
+                BasicTypeInfo.DOUBLE_TYPE_INFO,
+                BasicTypeInfo.STRING_TYPE_INFO};
+        RowTypeInfo typeInfo = new RowTypeInfo(colTypes, colNames);
+        DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
+        Table pointTable = tableEnv.fromDataStream(ds);
+
+        Table geomTable = pointTable
+                .select(call(Constructors.ST_Point.class.getSimpleName(), $(colNames[0]), $(colNames[1]))
+                        .as(colNames[2]));
+
+        String result = first(geomTable)
+                .getFieldAs(colNames[2])
+                .toString();
+
+        String expected = "POINT (1 2)";
+
+        assertEquals(result, expected);
+    }
+
     @Test
     public void testPointFromText() {
         List<Row> data = createPointWKT(testDataSize);


[incubator-sedona] 01/02: ST_GeomFromText and ST_LineFromText, ST_LineStringFromText

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

malka pushed a commit to branch SEDONA-121]-constructors-from-spark2flink
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git

commit 81353acd6ddcd43f34d549841e0327961373f7aa
Author: Netanel Malka <ne...@gmail.com>
AuthorDate: Wed Jun 8 23:02:22 2022 +0300

    ST_GeomFromText and ST_LineFromText, ST_LineStringFromText
---
 docs/api/flink/Constructor.md                      | 40 +++++++++++++
 .../main/java/org/apache/sedona/flink/Catalog.java |  3 +
 .../sedona/flink/expressions/Constructors.java     | 70 +++++++++++++++++-----
 .../org/apache/sedona/flink/ConstructorTest.java   | 35 +++++++++++
 .../java/org/apache/sedona/flink/TestBase.java     | 47 +++++++++++++--
 5 files changed, 176 insertions(+), 19 deletions(-)

diff --git a/docs/api/flink/Constructor.md b/docs/api/flink/Constructor.md
index e02d00ec..e8568b01 100644
--- a/docs/api/flink/Constructor.md
+++ b/docs/api/flink/Constructor.md
@@ -12,6 +12,20 @@ SQL example:
 SELECT ST_GeomFromWKT('POINT(40.7128 -74.0060)') AS geometry
 ```
 
+## ST_GeomFromText
+
+Introduction: Construct a Geometry from Wkt. Alias of  [ST_GeomFromWKT](#ST_GeomFromWKT)
+
+Format:
+`ST_GeomFromText (Wkt:string)`
+
+Since: `v1.2.1`
+
+SQL example:
+```SQL
+SELECT ST_GeomFromText('POINT(40.7128 -74.0060)') AS geometry
+```
+
 ## ST_GeomFromWKB
 
 Introduction: Construct a Geometry from WKB string or Binary
@@ -66,6 +80,32 @@ SQL example:
 SELECT ST_PointFromText('40.7128,-74.0060', ',') AS pointshape
 ```
 
+## ST_LineFromText
+
+Introduction: Construct a LineString from Text, delimited by Delimiter (Optional)
+
+Format: `ST_LineFromText (Text:string, Delimiter:char)`
+
+Since: `v1.2.1`
+
+SQL example:
+```SQL
+SELECT ST_LineFromText('Linestring(1 2, 3 4)') AS line
+```
+
+## ST_LineStringFromText
+
+Introduction: Construct a LineString from Text, delimited by Delimiter (Optional). Alias of  [ST_LineFromText](#ST_LineFromText)
+
+Format: `ST_LineStringFromText (Text:string, Delimiter:char)`
+
+Since: `v1.2.1`
+
+Spark SQL example:
+```SQL
+SELECT ST_LineStringFromText('Linestring(1 2, 3 4)') AS line
+```
+
 ## ST_PolygonFromText
 
 Introduction: Construct a Polygon from Text, delimited by Delimiter. Path must be closed
diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
index eac9b4f9..9e4ff44a 100644
--- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java
+++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
@@ -21,9 +21,12 @@ public class Catalog {
     public static UserDefinedFunction[] getFuncs() {
         return new UserDefinedFunction[]{
                 new Constructors.ST_PointFromText(),
+                new Constructors.ST_LineStringFromText(),
+                new Constructors.ST_LineFromText(),
                 new Constructors.ST_PolygonFromText(),
                 new Constructors.ST_PolygonFromEnvelope(),
                 new Constructors.ST_GeomFromWKT(),
+                new Constructors.ST_GeomFromText(),
                 new Constructors.ST_GeomFromWKB(),
                 new Constructors.ST_GeomFromGeoJSON(),
                 new Constructors.ST_GeomFromGeoHash(),
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
index 2b2815c0..97900ede 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
@@ -26,12 +26,17 @@ import org.locationtech.jts.io.WKBReader;
 import org.locationtech.jts.io.ParseException;
 
 public class Constructors {
+
+    private static Geometry getGeometryByType(String geom, String inputDelimiter, GeometryType geometryType) throws ParseException {
+        FileDataSplitter delimiter = inputDelimiter == null? FileDataSplitter.CSV:FileDataSplitter.getFileDataSplitter(inputDelimiter);
+        FormatUtils<Geometry> formatUtils = new FormatUtils<>(delimiter, false, geometryType);
+        return formatUtils.readGeometry(geom);
+    }
+
     public static class ST_PointFromText extends ScalarFunction {
         @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
         public Geometry eval(@DataTypeHint("String") String s, @DataTypeHint("String") String inputDelimiter) throws ParseException {
-            FileDataSplitter delimiter = inputDelimiter == null? FileDataSplitter.CSV:FileDataSplitter.getFileDataSplitter(inputDelimiter);
-            FormatUtils<Geometry> formatUtils = new FormatUtils(delimiter, false, GeometryType.POINT);
-            return formatUtils.readGeometry(s);
+            return getGeometryByType(s, inputDelimiter, GeometryType.POINT);
         }
 
         @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
@@ -40,18 +45,44 @@ public class Constructors {
         }
     }
 
+    public static class ST_LineFromText extends ScalarFunction {
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+        public Geometry eval(@DataTypeHint("String") String lineString,
+                             @DataTypeHint("String") String inputDelimiter) throws ParseException {
+            // The default delimiter is comma. Otherwise, use the delimiter given by the user
+            return getGeometryByType(lineString, inputDelimiter, GeometryType.LINESTRING);
+        }
+
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+        public Geometry eval(@DataTypeHint("String") String lineString) throws ParseException {
+            return eval(lineString, null);
+        }
+    }
+
+    public static class ST_LineStringFromText extends ScalarFunction {
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+        public Geometry eval(@DataTypeHint("String") String lineString,
+                             @DataTypeHint("String") String inputDelimiter) throws ParseException {
+            // The default delimiter is comma. Otherwise, use the delimiter given by the user
+            return new ST_LineFromText().eval(lineString, inputDelimiter);
+        }
+
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+        public Geometry eval(@DataTypeHint("String") String lineString) throws ParseException {
+            return eval(lineString, null);
+        }
+    }
+
     public static class ST_PolygonFromText extends ScalarFunction {
         @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
-        public Geometry eval(@DataTypeHint("String") String s, @DataTypeHint("String") String inputDelimiter) throws ParseException {
+        public Geometry eval(@DataTypeHint("String") String polygonString, @DataTypeHint("String") String inputDelimiter) throws ParseException {
             // The default delimiter is comma. Otherwise, use the delimiter given by the user
-            FileDataSplitter delimiter = inputDelimiter == null? FileDataSplitter.CSV:FileDataSplitter.getFileDataSplitter(inputDelimiter);
-            FormatUtils<Geometry> formatUtils = new FormatUtils(delimiter, false, GeometryType.POLYGON);
-            return formatUtils.readGeometry(s);
+            return getGeometryByType(polygonString, inputDelimiter, GeometryType.POLYGON);
         }
 
         @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
-        public Geometry eval(@DataTypeHint("String") String s) throws ParseException {
-            return eval(s, null);
+        public Geometry eval(@DataTypeHint("String") String polygonString) throws ParseException {
+            return eval(polygonString, null);
         }
     }
 
@@ -70,19 +101,29 @@ public class Constructors {
         }
     }
 
+    private static Geometry getGeometryByFileData(String wktString, FileDataSplitter dataSplitter) throws ParseException {
+        FormatUtils<Geometry> formatUtils = new FormatUtils<>(dataSplitter, false);
+        return formatUtils.readGeometry(wktString);
+    }
+
     public static class ST_GeomFromWKT extends ScalarFunction {
         @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
         public Geometry eval(@DataTypeHint("String") String wktString) throws ParseException {
-            FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKT, false);
-            return formatUtils.readGeometry(wktString);
+            return getGeometryByFileData(wktString, FileDataSplitter.WKT);
+        }
+    }
+
+    public static class ST_GeomFromText extends ScalarFunction {
+        @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
+        public Geometry eval(@DataTypeHint("String") String wktString) throws ParseException {
+            return new ST_GeomFromWKT().eval(wktString);
         }
     }
 
     public static class ST_GeomFromWKB extends ScalarFunction {
         @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
         public Geometry eval(@DataTypeHint("String") String wkbString) throws ParseException {
-            FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKB, false);
-            return formatUtils.readGeometry(wkbString);
+            return getGeometryByFileData(wkbString, FileDataSplitter.WKB);
         }
 
         @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
@@ -96,8 +137,7 @@ public class Constructors {
     public static class ST_GeomFromGeoJSON extends ScalarFunction {
         @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
         public Geometry eval(@DataTypeHint("String") String geoJson) throws ParseException {
-            FormatUtils formatUtils = new FormatUtils(FileDataSplitter.GEOJSON, false);
-            return formatUtils.readGeometry(geoJson);
+            return getGeometryByFileData(geoJson, FileDataSplitter.GEOJSON);
         }
     }
 
diff --git a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
index e828a854..dd8925d6 100644
--- a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
@@ -50,6 +50,30 @@ public class ConstructorTest extends TestBase{
         assertEquals(result.toString(), data.get(data.size() - 1).toString());
     }
 
+    @Test
+    public void testLineFromText() {
+        List<Row> data = createLineStringWKT(testDataSize);
+
+        Table lineStringTable = createLineStringTextTable(testDataSize)
+                .select(call(Constructors.ST_LineFromText.class.getSimpleName(), $(linestringColNames[0])).as(linestringColNames[0]),
+                        $(linestringColNames[1]));
+        Row result = last(lineStringTable);
+
+        assertEquals(result.toString(), data.get(data.size() - 1).toString());
+    }
+
+    @Test
+    public void testLineStringFromText() {
+        List<Row> data = createLineStringWKT(testDataSize);
+
+        Table lineStringTable = createLineStringTextTable(testDataSize)
+                .select(call(Constructors.ST_LineStringFromText.class.getSimpleName(), $(linestringColNames[0])).as(linestringColNames[0]),
+                        $(linestringColNames[1]));
+        Row result = last(lineStringTable);
+
+        assertEquals(result.toString(), data.get(data.size() - 1).toString());
+    }
+
     @Test
     public void testPolygonFromText() {
         List<Row> data = createPolygonWKT(testDataSize);
@@ -68,6 +92,17 @@ public class ConstructorTest extends TestBase{
         assertEquals(result.toString(), data.get(data.size() - 1).toString());
     }
 
+    @Test
+    public void testGeomFromText() {
+        List<Row> data = createPolygonWKT(testDataSize);
+        Table wktTable = createTextTable(data, polygonColNames);
+        Table geomTable = wktTable.select(call(Constructors.ST_GeomFromText.class.getSimpleName(),
+                        $(polygonColNames[0])).as(polygonColNames[0]),
+                $(polygonColNames[1]));
+        Row result = last(geomTable);
+        assertEquals(result.toString(), data.get(data.size() - 1).toString());
+    }
+
     @Test
     public void testPolygonFromEnvelope() {
         Double minX = 1.0;
diff --git a/flink/src/test/java/org/apache/sedona/flink/TestBase.java b/flink/src/test/java/org/apache/sedona/flink/TestBase.java
index 9623d231..3dd99432 100644
--- a/flink/src/test/java/org/apache/sedona/flink/TestBase.java
+++ b/flink/src/test/java/org/apache/sedona/flink/TestBase.java
@@ -28,14 +28,10 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.sedona.flink.expressions.Constructors;
 import org.locationtech.jts.geom.*;
-import org.locationtech.jts.geom.impl.CoordinateArraySequence;
-import org.wololo.geojson.Feature;
 import org.wololo.jts2geojson.GeoJSONWriter;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.api.Expressions.call;
@@ -45,6 +41,7 @@ public class TestBase {
     protected static StreamTableEnvironment tableEnv;
     static int testDataSize = 1000;
     static String[] pointColNames = {"geom_point", "name_point"};
+    static String[] linestringColNames = {"geom_linestring", "name_linestring"};
     static String[] polygonColNames = {"geom_polygon", "name_polygon"};
     static String pointTableName = "point_table";
     static String polygonTableName = "polygon_table";
@@ -126,6 +123,44 @@ public class TestBase {
         return data;
     }
 
+    static List<Row> createLineStringText(int size) {
+        List<Row> data = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            // Create polygons each of which only has 1 match in points
+            // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
+            String minX = String.valueOf(i - 0.5);
+            String minY = String.valueOf(i - 0.5);
+            String maxX = String.valueOf(i + 0.5);
+            String maxY = String.valueOf(i + 0.5);
+            List<String> linestring = new ArrayList<>();
+            linestring.add(minX);
+            linestring.add(minY);
+            linestring.add(maxX);
+            linestring.add(maxY);
+
+            data.add(Row.of(String.join(",", linestring), "linestring" + i));
+        }
+        return data;
+    }
+
+    static List<Row> createLineStringWKT(int size) {
+        List<Row> data = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+
+            String minX = String.valueOf(i - 0.5);
+            String minY = String.valueOf(i - 0.5);
+            String maxX = String.valueOf(i + 0.5);
+            String maxY = String.valueOf(i + 0.5);
+
+            List<String> linestring = new ArrayList<>();
+            linestring.add(minX + " " + minY);
+            linestring.add(maxX + " " + maxY);
+
+            data.add(Row.of("LINESTRING (" + String.join(", ", linestring) + ")", "linestring" + i));
+        }
+        return data;
+    }
+
     static List<Row> createPolygonWKT(int size) {
         List<Row> data = new ArrayList<>();
         for (int i = 0; i < size; i++) {
@@ -190,6 +225,10 @@ public class TestBase {
         return createTextTable(createPointText_real(size), pointColNames);
     }
 
+    static Table createLineStringTextTable(int size) {
+        return createTextTable(createLineStringText(size), linestringColNames);
+    }
+
     static Table createPolygonTextTable(int size) {
         return createTextTable(createPolygonText(size), polygonColNames);
     }