You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2022/02/27 23:59:33 UTC

[incubator-sedona] branch master updated: [SEDONA-85] Geohash function streaming/pyspark streaming test geospatial sql functions. (#586)

This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c7222a  [SEDONA-85] Geohash function streaming/pyspark streaming test geospatial sql functions. (#586)
5c7222a is described below

commit 5c7222a5e755b51844db975186984551764d25c7
Author: Paweł Kociński <pa...@gmail.com>
AuthorDate: Mon Feb 28 00:59:27 2022 +0100

    [SEDONA-85] Geohash function streaming/pyspark streaming test geospatial sql functions. (#586)
---
 ...2218-4c80-a179-77f67180b08c-c000.snappy.parquet | Bin 0 -> 810 bytes
 .../main/java/org/apache/sedona/flink/Catalog.java |   1 +
 .../apache/sedona/flink/expressions/Functions.java |  17 ++
 .../java/org.apache.sedona/flink/FunctionTest.java |  12 +
 python/tests/streaming/__init__.py                 |   0
 python/tests/streaming/spark/__init__.py           |   0
 python/tests/streaming/spark/cases_builder.py      |  41 +++
 .../streaming/spark/test_constructor_functions.py  | 319 +++++++++++++++++++++
 8 files changed, 390 insertions(+)

diff --git a/core/src/test/resources/streaming/geometry_example/part-00000-936c6c14-2218-4c80-a179-77f67180b08c-c000.snappy.parquet b/core/src/test/resources/streaming/geometry_example/part-00000-936c6c14-2218-4c80-a179-77f67180b08c-c000.snappy.parquet
new file mode 100644
index 0000000..77dface
Binary files /dev/null and b/core/src/test/resources/streaming/geometry_example/part-00000-936c6c14-2218-4c80-a179-77f67180b08c-c000.snappy.parquet differ
diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
index f9428fe..0ab4607 100644
--- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java
+++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
@@ -29,6 +29,7 @@ public class Catalog {
                 new Functions.ST_Distance(),
                 new Functions.ST_Transform(),
                 new Functions.ST_FlipCoordinates(),
+                new Functions.ST_GeoHash()
         };
     }
 
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
index 8bd4b89..2b17836 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
@@ -16,6 +16,8 @@ package org.apache.sedona.flink.expressions;
 import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.sedona.core.utils.GeomUtils;
+import org.apache.spark.sql.sedona_sql.expressions.geohash.GeometryGeoHashEncoder;
+import org.apache.spark.sql.sedona_sql.expressions.geohash.PointGeoHashEncoder;
 import org.geotools.geometry.jts.JTS;
 import org.geotools.referencing.CRS;
 import org.locationtech.jts.geom.Geometry;
@@ -23,6 +25,9 @@ import org.opengis.referencing.FactoryException;
 import org.opengis.referencing.crs.CoordinateReferenceSystem;
 import org.opengis.referencing.operation.MathTransform;
 import org.opengis.referencing.operation.TransformException;
+import scala.Option;
+
+import java.util.Optional;
 
 public class Functions {
     public static class ST_Buffer extends ScalarFunction {
@@ -66,4 +71,16 @@ public class Functions {
             return geom;
         }
     }
+
+    public static class ST_GeoHash extends ScalarFunction {
+        @DataTypeHint("RAW")
+        public Optional<String> eval(@DataTypeHint("RAW") Object geometry, Integer precision) {
+            Geometry geom = (Geometry) geometry;
+            Option<String> geoHash = GeometryGeoHashEncoder.calculate(geom, precision);
+            if (geoHash.isDefined()){
+                return Optional.of(geoHash.get());
+            }
+            return Optional.empty();
+        }
+    }
 }
diff --git a/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java b/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java
index c8074bb..9b702bc 100644
--- a/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java
+++ b/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java
@@ -18,6 +18,9 @@ import org.apache.sedona.flink.expressions.Functions;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.locationtech.jts.geom.Geometry;
+import scala.Some;
+
+import java.util.Optional;
 
 import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.api.Expressions.call;
@@ -53,4 +56,13 @@ public class FunctionTest extends TestBase{
                 , call("ST_GeomFromWKT", "POINT (0 0)")));
         assertEquals(0.0, first(pointTable).getField(0));
     }
+
+    @Test
+    public void testGeomToGeoHash() {
+        Table pointTable = createPointTable(testDataSize);
+        pointTable = pointTable.select(
+                call("ST_GeoHash", $(pointColNames[0]), 5)
+        );
+        assertEquals(first(pointTable).getField(0), Optional.of("s0000"));
+    }
 }
diff --git a/python/tests/streaming/__init__.py b/python/tests/streaming/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/python/tests/streaming/spark/__init__.py b/python/tests/streaming/spark/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/python/tests/streaming/spark/cases_builder.py b/python/tests/streaming/spark/cases_builder.py
new file mode 100644
index 0000000..44313d2
--- /dev/null
+++ b/python/tests/streaming/spark/cases_builder.py
@@ -0,0 +1,41 @@
+from typing import Dict, Any, List
+
+
+class SuiteContainer:
+
+    def __init__(self, container: Dict[str, Any]):
+        self.container = container
+
+    @classmethod
+    def empty(cls):
+        return cls(container=dict(function_name=None, arguments=None, expected_result=None, transform=None))
+
+    def with_function_name(self, function_name: str):
+        self.container["function_name"] = function_name
+        return self.__class__(
+            container=self.container
+        )
+
+    def with_expected_result(self, expected_result: Any):
+        self.container["expected_result"] = expected_result
+        return self.__class__(
+            container=self.container
+        )
+
+    def with_arguments(self, arguments: List[str]):
+        self.container["arguments"] = arguments
+        return self.__class__(
+            container=self.container
+        )
+
+    def with_transform(self, transform: str):
+        self.container["transform"] = transform
+        return self.__class__(
+            container=self.container
+        )
+
+    def __iter__(self):
+        return self.container.__iter__()
+
+    def __getitem__(self, name):
+        return self.container.__getitem__(name)
diff --git a/python/tests/streaming/spark/test_constructor_functions.py b/python/tests/streaming/spark/test_constructor_functions.py
new file mode 100644
index 0000000..3884e5a
--- /dev/null
+++ b/python/tests/streaming/spark/test_constructor_functions.py
@@ -0,0 +1,319 @@
+import os
+import random
+from typing import List, Any, Optional
+
+import pytest
+from pyspark.sql.types import StructType, StructField, Row
+from shapely import wkt, wkb
+
+from sedona.sql.types import GeometryType
+from tests import tests_resource
+from tests.streaming.spark.cases_builder import SuiteContainer
+from tests.test_base import TestBase
+
+SCHEMA = StructType(
+    [
+        StructField("geom", GeometryType())
+    ]
+)
+
+
+SEDONA_LISTED_SQL_FUNCTIONS = [
+    (SuiteContainer.empty()
+     .with_function_name("ST_AsText")
+     .with_arguments(["ST_GeomFromText('POINT (21 52)')"])
+     .with_expected_result("POINT (21 52)")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_Buffer")
+     .with_arguments(["ST_GeomFromText('POINT (21 52)')", "1.0"])
+     .with_expected_result(3.1214451522580533)
+     .with_transform("ST_AREA")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_Distance")
+     .with_arguments(["ST_GeomFromText('POINT (21 52)')", "ST_GeomFromText('POINT (21 53)')"])
+     .with_expected_result(1.0)),
+    (SuiteContainer.empty()
+     .with_function_name("ST_ConvexHull")
+     .with_arguments(["ST_GeomFromText('POLYGON ((21 52, 21 53, 22 53, 22 52, 21 52))')"])
+     .with_expected_result(1.0)
+     .with_transform("ST_AREA")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_Envelope")
+     .with_arguments(["ST_GeomFromText('POLYGON ((21 52, 21 53, 22 53, 22 52, 21 52))')"])
+     .with_expected_result(1.0)
+     .with_transform("ST_AREA")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_LENGTH")
+     .with_arguments(["ST_GeomFromText('POLYGON ((21 52, 21 53, 22 53, 22 52, 21 52))')"])
+     .with_expected_result(4.0)),
+    (SuiteContainer.empty()
+     .with_function_name("ST_Area")
+     .with_arguments(["ST_GeomFromText('POLYGON ((21 52, 21 53, 22 53, 22 52, 21 52))')"])
+     .with_expected_result(1.0)),
+    (SuiteContainer.empty()
+     .with_function_name("ST_Centroid")
+     .with_arguments(["ST_GeomFromText('POINT(21.5 52.5)')"])
+     .with_expected_result("POINT (21.5 52.5)")
+     .with_transform("ST_ASText")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_Transform")
+     .with_arguments(["ST_GeomFromText('POINT(21.5 52.5)')", "'epsg:4326'", "'epsg:2180'"])
+     .with_expected_result("POINT (-2501415.806893427 4119952.52325666)")
+     .with_transform("ST_ASText")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_Intersection")
+     .with_arguments(["ST_GeomFromText('POINT(21.5 52.5)')", "ST_GeomFromText('POINT(21.5 52.5)')"])
+     .with_expected_result(0)
+     .with_transform("ST_AREA")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_IsValid")
+     .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')"])
+     .with_expected_result(True)),
+    (SuiteContainer.empty()
+     .with_function_name("ST_MakeValid")
+     .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')", "false"])
+     .with_expected_result(1.0)
+     .with_transform("ST_AREA")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_PrecisionReduce")
+     .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')", "9"])
+     .with_expected_result(1.0)
+     .with_transform("ST_AREA")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_IsSimple")
+     .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')"])
+     .with_expected_result(True)),
+    (SuiteContainer.empty()
+     .with_function_name("ST_Buffer")
+     .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')", "0.9"])
+     .with_expected_result(7.128370573329018)
+     .with_transform("ST_AREA")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_AsText")
+     .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')"])
+     .with_expected_result("POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))")),
+    (SuiteContainer.empty()
+        .with_function_name("ST_AsGeoJSON")
+        .with_arguments(["ST_GeomFromText('POLYGON ((21 52, 21 53, 22 53, 22 52, 21 52))')"])
+        .with_expected_result(
+        """{"type":"Polygon","coordinates":[[[21.0,52.0],[21.0,53.0],[22.0,53.0],[22.0,52.0],[21.0,52.0]]]}""")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_AsBinary")
+     .with_arguments(["ST_GeomFromText('POINT(21 52)')"])
+     .with_expected_result(wkb.dumps(wkt.loads("POINT(21 52)")))),
+    (SuiteContainer.empty()
+     .with_function_name("ST_AsEWKB")
+     .with_arguments(["ST_GeomFromText('POINT(21 52)')"])
+     .with_expected_result(wkb.dumps(wkt.loads("POINT(21 52)")))),
+    (SuiteContainer.empty()
+     .with_function_name("ST_SRID")
+     .with_arguments(["ST_GeomFromText('POINT(21 52)')"])
+     .with_expected_result(0)),
+    (SuiteContainer.empty()
+     .with_function_name("ST_SetSRID")
+     .with_arguments(["ST_GeomFromText('POINT(21 52)')", "4326"])
+     .with_expected_result(0)
+     .with_transform("ST_AREA")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_NPoints")
+     .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')"])
+     .with_expected_result(5)),
+    (SuiteContainer.empty()
+     .with_function_name("ST_SimplifyPreserveTopology")
+     .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')", "1.0"])
+     .with_expected_result(1)
+     .with_transform("ST_AREA")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_GeometryType")
+     .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')"])
+     .with_expected_result("ST_Polygon")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_LineMerge")
+     .with_arguments(["ST_GeomFromText('LINESTRING(-29 -27,-30 -29.7,-36 -31,-45 -33,-46 -32)')"])
+     .with_expected_result(0.0)
+     .with_transform("ST_LENGTH")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_Azimuth")
+     .with_arguments(["ST_GeomFromText('POINT(21 52)')", "ST_GeomFromText('POINT(21 53)')"])
+     .with_expected_result(0.0)),
+    (SuiteContainer.empty()
+     .with_function_name("ST_X")
+     .with_arguments(["ST_GeomFromText('POINT(21 52)')"])
+     .with_expected_result(21.0)),
+    (SuiteContainer.empty()
+     .with_function_name("ST_Y")
+     .with_arguments(["ST_GeomFromText('POINT(21 52)')"])
+     .with_expected_result(52.0)),
+    (SuiteContainer.empty()
+     .with_function_name("ST_StartPoint")
+     .with_arguments(["ST_GeomFromText('LINESTRING(100 150,50 60, 70 80, 160 170)')"])
+     .with_expected_result("POINT (100 150)")
+     .with_transform("ST_ASText")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_Endpoint")
+     .with_arguments(["ST_GeomFromText('LINESTRING(100 150,50 60, 70 80, 160 170)')"])
+     .with_expected_result("POINT (160 170)")
+     .with_transform("ST_ASText")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_Boundary")
+     .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')"])
+     .with_expected_result(4)
+     .with_transform("ST_LENGTH")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_ExteriorRing")
+     .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')"])
+     .with_expected_result(4)
+     .with_transform("ST_LENGTH")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_GeometryN")
+     .with_arguments(["ST_GeomFromText('MULTIPOINT((1 2), (3 4), (5 6), (8 9))')", "0"])
+     .with_expected_result(1)
+     .with_transform("ST_X")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_InteriorRingN")
+     .with_arguments([
+        "ST_GeomFromText('POLYGON((0 0, 0 5, 5 5, 5 0, 0 0), (1 1, 2 1, 2 2, 1 2, 1 1))')",
+        "0"])
+     .with_expected_result(4.0)
+     .with_transform("ST_LENGTH")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_Dump")
+     .with_arguments([
+        "ST_GeomFromText('MULTIPOINT ((10 40), (40 30), (20 20), (30 10))')"])
+     .with_expected_result(4)
+     .with_transform("SIZE")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_DumpPoints")
+     .with_arguments([
+        "ST_GeomFromTEXT('LINESTRING (0 0, 1 1, 1 0)')"])
+     .with_expected_result(3)
+     .with_transform("SIZE")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_IsClosed")
+     .with_arguments([
+        "ST_GeomFROMTEXT('LINESTRING(0 0, 1 1, 1 0)')"])
+     .with_expected_result(False)),
+    (SuiteContainer.empty()
+     .with_function_name("ST_NumInteriorRings")
+     .with_arguments([
+        "ST_GeomFROMTEXT('POLYGON ((0 0, 0 5, 5 5, 5 0, 0 0), (1 1, 2 1, 2 2, 1 2, 1 1))')"])
+     .with_expected_result(1)),
+    (SuiteContainer.empty()
+     .with_function_name("ST_AddPoint")
+     .with_arguments([
+        "ST_GeomFromText('LINESTRING(0 0, 1 1, 1 0)')",
+        "ST_GeomFromText('Point(21 52)')",
+        "1"])
+     .with_expected_result(111.86168327044916)
+     .with_transform("ST_Length")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_RemovePoint")
+     .with_arguments([
+        "ST_GeomFromText('LINESTRING(0 0, 1 1, 1 0)')",
+        "1"
+    ])
+     .with_expected_result("LINESTRING (0 0, 1 0)")
+     .with_transform("ST_AsText")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_IsRing")
+     .with_arguments([
+        "ST_GeomFromText('LINESTRING(0 0, 0 1, 1 1, 1 0, 0 0)')"
+    ])
+     .with_expected_result(True)),
+    (SuiteContainer.empty()
+     .with_function_name("ST_NumGeometries")
+     .with_arguments([
+        "ST_GeomFromText('LINESTRING(0 0, 0 1, 1 1, 1 0, 0 0)')"
+    ])
+     .with_expected_result(1)),
+    (SuiteContainer.empty()
+     .with_function_name("ST_FlipCoordinates")
+     .with_arguments([
+        "ST_GeomFromText('POINT(21 52)')"
+    ])
+     .with_expected_result(52.0)
+     .with_transform("ST_X")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_MinimumBoundingRadius")
+     .with_arguments([
+        "ST_GeomFromText('POLYGON((1 1,0 0, -1 1, 1 1))')"
+    ])
+     .with_expected_result(Row(center=wkt.loads("POINT(0 1)"), radius=1.0))),
+    (SuiteContainer.empty()
+     .with_function_name("ST_MinimumBoundingCircle")
+     .with_arguments([
+        "ST_GeomFromText('POLYGON((1 1,0 0, -1 1, 1 1))')"
+    ])
+     .with_expected_result(3.121445152258052)
+     .with_transform("ST_AREA")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_MinimumBoundingCircle")
+     .with_arguments(["ST_GeomFromText('POLYGON((1 1,0 0, -1 1, 1 1))')"])
+     .with_expected_result(3.121445152258052)
+     .with_transform("ST_AREA")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_SubDivide")
+     .with_arguments([
+        "ST_GeomFromText('POLYGON((35 10, 45 45, 15 40, 10 20, 35 10), (20 30, 35 35, 30 20, 20 30))')",
+        "5"])
+     .with_expected_result(14)
+     .with_transform("SIZE")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_SubDivideExplode")
+     .with_arguments([
+        "ST_GeomFromText('LINESTRING(0 0, 85 85, 100 100, 120 120, 21 21, 10 10, 5 5)')",
+        "5"])
+     .with_expected_result(2)
+     .with_transform("ST_NPoints")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_GeoHash")
+     .with_arguments([
+        "ST_GeomFromText('POINT(21.427834 52.042576573)')",
+        "5"])
+     .with_expected_result("u3r0p")),
+    (SuiteContainer.empty()
+     .with_function_name("ST_Collect")
+     .with_arguments([
+        "ST_GeomFromText('POINT(21.427834 52.042576573)')",
+        "ST_GeomFromText('POINT(45.342524 56.342354355)')"])
+     .with_expected_result(0.0)
+     .with_transform("ST_LENGTH"))
+]
+
+
+def pytest_generate_tests(metafunc):
+    funcarglist = metafunc.cls.params[metafunc.function.__name__]
+    argnames = sorted(funcarglist[0])
+    metafunc.parametrize(
+        argnames, [[funcargs[name] for name in argnames] for funcargs in funcarglist]
+    )
+
+
+class TestConstructorFunctions(TestBase):
+    params = {
+        "test_geospatial_function_on_stream": SEDONA_LISTED_SQL_FUNCTIONS
+    }
+
+    @pytest.mark.sparkstreaming
+    def test_geospatial_function_on_stream(self, function_name: str, arguments: List[str],
+                                           expected_result: Any, transform: Optional[str]):
+        # given input stream
+
+        input_stream = self.spark.readStream.schema(SCHEMA).parquet(os.path.join(
+            tests_resource,
+            "streaming/geometry_example")
+        ).selectExpr(f"{function_name}({', '.join(arguments)}) AS result")
+
+        # and target table
+        random_table_name = f"view_{random.randint(0, 100000)}"
+
+        # when saving stream to memory
+        streaming_query = input_stream.writeStream.format("memory") \
+            .queryName(random_table_name) \
+            .outputMode("append").start()
+
+        streaming_query.processAllAvailable()
+
+        # then result should be as expected
+        transform_query = "result" if not transform else f"{transform}(result)"
+        assert self.spark.sql(f"select {transform_query} from {random_table_name}").collect()[0][0] == expected_result