You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2022/02/27 23:59:33 UTC
[incubator-sedona] branch master updated: [SEDONA-85] Geohash function streaming/pyspark streaming test geospatial sql functions. (#586)
This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 5c7222a [SEDONA-85] Geohash function streaming/pyspark streaming test geospatial sql functions. (#586)
5c7222a is described below
commit 5c7222a5e755b51844db975186984551764d25c7
Author: Paweł Kociński <pa...@gmail.com>
AuthorDate: Mon Feb 28 00:59:27 2022 +0100
[SEDONA-85] Geohash function streaming/pyspark streaming test geospatial sql functions. (#586)
---
...2218-4c80-a179-77f67180b08c-c000.snappy.parquet | Bin 0 -> 810 bytes
.../main/java/org/apache/sedona/flink/Catalog.java | 1 +
.../apache/sedona/flink/expressions/Functions.java | 17 ++
.../java/org.apache.sedona/flink/FunctionTest.java | 12 +
python/tests/streaming/__init__.py | 0
python/tests/streaming/spark/__init__.py | 0
python/tests/streaming/spark/cases_builder.py | 41 +++
.../streaming/spark/test_constructor_functions.py | 319 +++++++++++++++++++++
8 files changed, 390 insertions(+)
diff --git a/core/src/test/resources/streaming/geometry_example/part-00000-936c6c14-2218-4c80-a179-77f67180b08c-c000.snappy.parquet b/core/src/test/resources/streaming/geometry_example/part-00000-936c6c14-2218-4c80-a179-77f67180b08c-c000.snappy.parquet
new file mode 100644
index 0000000..77dface
Binary files /dev/null and b/core/src/test/resources/streaming/geometry_example/part-00000-936c6c14-2218-4c80-a179-77f67180b08c-c000.snappy.parquet differ
diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
index f9428fe..0ab4607 100644
--- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java
+++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
@@ -29,6 +29,7 @@ public class Catalog {
new Functions.ST_Distance(),
new Functions.ST_Transform(),
new Functions.ST_FlipCoordinates(),
+ new Functions.ST_GeoHash()
};
}
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
index 8bd4b89..2b17836 100644
--- a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
@@ -16,6 +16,8 @@ package org.apache.sedona.flink.expressions;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.sedona.core.utils.GeomUtils;
+import org.apache.spark.sql.sedona_sql.expressions.geohash.GeometryGeoHashEncoder;
+import org.apache.spark.sql.sedona_sql.expressions.geohash.PointGeoHashEncoder;
import org.geotools.geometry.jts.JTS;
import org.geotools.referencing.CRS;
import org.locationtech.jts.geom.Geometry;
@@ -23,6 +25,9 @@ import org.opengis.referencing.FactoryException;
import org.opengis.referencing.crs.CoordinateReferenceSystem;
import org.opengis.referencing.operation.MathTransform;
import org.opengis.referencing.operation.TransformException;
+import scala.Option;
+
+import java.util.Optional;
public class Functions {
public static class ST_Buffer extends ScalarFunction {
@@ -66,4 +71,16 @@ public class Functions {
return geom;
}
}
+
+ public static class ST_GeoHash extends ScalarFunction {
+ @DataTypeHint("RAW")
+ public Optional<String> eval(@DataTypeHint("RAW") Object geometry, Integer precision) {
+ Geometry geom = (Geometry) geometry;
+ Option<String> geoHash = GeometryGeoHashEncoder.calculate(geom, precision);
+ if (geoHash.isDefined()){
+ return Optional.of(geoHash.get());
+ }
+ return Optional.empty();
+ }
+ }
}
diff --git a/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java b/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java
index c8074bb..9b702bc 100644
--- a/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java
+++ b/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java
@@ -18,6 +18,9 @@ import org.apache.sedona.flink.expressions.Functions;
import org.junit.BeforeClass;
import org.junit.Test;
import org.locationtech.jts.geom.Geometry;
+import scala.Some;
+
+import java.util.Optional;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
@@ -53,4 +56,13 @@ public class FunctionTest extends TestBase{
, call("ST_GeomFromWKT", "POINT (0 0)")));
assertEquals(0.0, first(pointTable).getField(0));
}
+
+ @Test
+ public void testGeomToGeoHash() {
+ Table pointTable = createPointTable(testDataSize);
+ pointTable = pointTable.select(
+ call("ST_GeoHash", $(pointColNames[0]), 5)
+ );
+ assertEquals(first(pointTable).getField(0), Optional.of("s0000"));
+ }
}
diff --git a/python/tests/streaming/__init__.py b/python/tests/streaming/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/python/tests/streaming/spark/__init__.py b/python/tests/streaming/spark/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/python/tests/streaming/spark/cases_builder.py b/python/tests/streaming/spark/cases_builder.py
new file mode 100644
index 0000000..44313d2
--- /dev/null
+++ b/python/tests/streaming/spark/cases_builder.py
@@ -0,0 +1,41 @@
+from typing import Dict, Any, List
+
+
+class SuiteContainer:
+
+ def __init__(self, container: Dict[str, Any]):
+ self.container = container
+
+ @classmethod
+ def empty(cls):
+ return cls(container=dict(function_name=None, arguments=None, expected_result=None, transform=None))
+
+ def with_function_name(self, function_name: str):
+ self.container["function_name"] = function_name
+ return self.__class__(
+ container=self.container
+ )
+
+ def with_expected_result(self, expected_result: Any):
+ self.container["expected_result"] = expected_result
+ return self.__class__(
+ container=self.container
+ )
+
+ def with_arguments(self, arguments: List[str]):
+ self.container["arguments"] = arguments
+ return self.__class__(
+ container=self.container
+ )
+
+ def with_transform(self, transform: str):
+ self.container["transform"] = transform
+ return self.__class__(
+ container=self.container
+ )
+
+ def __iter__(self):
+ return self.container.__iter__()
+
+ def __getitem__(self, name):
+ return self.container.__getitem__(name)
diff --git a/python/tests/streaming/spark/test_constructor_functions.py b/python/tests/streaming/spark/test_constructor_functions.py
new file mode 100644
index 0000000..3884e5a
--- /dev/null
+++ b/python/tests/streaming/spark/test_constructor_functions.py
@@ -0,0 +1,319 @@
+import os
+import random
+from typing import List, Any, Optional
+
+import pytest
+from pyspark.sql.types import StructType, StructField, Row
+from shapely import wkt, wkb
+
+from sedona.sql.types import GeometryType
+from tests import tests_resource
+from tests.streaming.spark.cases_builder import SuiteContainer
+from tests.test_base import TestBase
+
+SCHEMA = StructType(
+ [
+ StructField("geom", GeometryType())
+ ]
+)
+
+
+SEDONA_LISTED_SQL_FUNCTIONS = [
+ (SuiteContainer.empty()
+ .with_function_name("ST_AsText")
+ .with_arguments(["ST_GeomFromText('POINT (21 52)')"])
+ .with_expected_result("POINT (21 52)")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_Buffer")
+ .with_arguments(["ST_GeomFromText('POINT (21 52)')", "1.0"])
+ .with_expected_result(3.1214451522580533)
+ .with_transform("ST_AREA")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_Distance")
+ .with_arguments(["ST_GeomFromText('POINT (21 52)')", "ST_GeomFromText('POINT (21 53)')"])
+ .with_expected_result(1.0)),
+ (SuiteContainer.empty()
+ .with_function_name("ST_ConvexHull")
+ .with_arguments(["ST_GeomFromText('POLYGON ((21 52, 21 53, 22 53, 22 52, 21 52))')"])
+ .with_expected_result(1.0)
+ .with_transform("ST_AREA")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_Envelope")
+ .with_arguments(["ST_GeomFromText('POLYGON ((21 52, 21 53, 22 53, 22 52, 21 52))')"])
+ .with_expected_result(1.0)
+ .with_transform("ST_AREA")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_LENGTH")
+ .with_arguments(["ST_GeomFromText('POLYGON ((21 52, 21 53, 22 53, 22 52, 21 52))')"])
+ .with_expected_result(4.0)),
+ (SuiteContainer.empty()
+ .with_function_name("ST_Area")
+ .with_arguments(["ST_GeomFromText('POLYGON ((21 52, 21 53, 22 53, 22 52, 21 52))')"])
+ .with_expected_result(1.0)),
+ (SuiteContainer.empty()
+ .with_function_name("ST_Centroid")
+ .with_arguments(["ST_GeomFromText('POINT(21.5 52.5)')"])
+ .with_expected_result("POINT (21.5 52.5)")
+ .with_transform("ST_ASText")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_Transform")
+ .with_arguments(["ST_GeomFromText('POINT(21.5 52.5)')", "'epsg:4326'", "'epsg:2180'"])
+ .with_expected_result("POINT (-2501415.806893427 4119952.52325666)")
+ .with_transform("ST_ASText")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_Intersection")
+ .with_arguments(["ST_GeomFromText('POINT(21.5 52.5)')", "ST_GeomFromText('POINT(21.5 52.5)')"])
+ .with_expected_result(0)
+ .with_transform("ST_AREA")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_IsValid")
+ .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')"])
+ .with_expected_result(True)),
+ (SuiteContainer.empty()
+ .with_function_name("ST_MakeValid")
+ .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')", "false"])
+ .with_expected_result(1.0)
+ .with_transform("ST_AREA")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_PrecisionReduce")
+ .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')", "9"])
+ .with_expected_result(1.0)
+ .with_transform("ST_AREA")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_IsSimple")
+ .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')"])
+ .with_expected_result(True)),
+ (SuiteContainer.empty()
+ .with_function_name("ST_Buffer")
+ .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')", "0.9"])
+ .with_expected_result(7.128370573329018)
+ .with_transform("ST_AREA")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_AsText")
+ .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')"])
+ .with_expected_result("POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_AsGeoJSON")
+ .with_arguments(["ST_GeomFromText('POLYGON ((21 52, 21 53, 22 53, 22 52, 21 52))')"])
+ .with_expected_result(
+ """{"type":"Polygon","coordinates":[[[21.0,52.0],[21.0,53.0],[22.0,53.0],[22.0,52.0],[21.0,52.0]]]}""")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_AsBinary")
+ .with_arguments(["ST_GeomFromText('POINT(21 52)')"])
+ .with_expected_result(wkb.dumps(wkt.loads("POINT(21 52)")))),
+ (SuiteContainer.empty()
+ .with_function_name("ST_AsEWKB")
+ .with_arguments(["ST_GeomFromText('POINT(21 52)')"])
+ .with_expected_result(wkb.dumps(wkt.loads("POINT(21 52)")))),
+ (SuiteContainer.empty()
+ .with_function_name("ST_SRID")
+ .with_arguments(["ST_GeomFromText('POINT(21 52)')"])
+ .with_expected_result(0)),
+ (SuiteContainer.empty()
+ .with_function_name("ST_SetSRID")
+ .with_arguments(["ST_GeomFromText('POINT(21 52)')", "4326"])
+ .with_expected_result(0)
+ .with_transform("ST_AREA")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_NPoints")
+ .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')"])
+ .with_expected_result(5)),
+ (SuiteContainer.empty()
+ .with_function_name("ST_SimplifyPreserveTopology")
+ .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')", "1.0"])
+ .with_expected_result(1)
+ .with_transform("ST_AREA")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_GeometryType")
+ .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')"])
+ .with_expected_result("ST_Polygon")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_LineMerge")
+ .with_arguments(["ST_GeomFromText('LINESTRING(-29 -27,-30 -29.7,-36 -31,-45 -33,-46 -32)')"])
+ .with_expected_result(0.0)
+ .with_transform("ST_LENGTH")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_Azimuth")
+ .with_arguments(["ST_GeomFromText('POINT(21 52)')", "ST_GeomFromText('POINT(21 53)')"])
+ .with_expected_result(0.0)),
+ (SuiteContainer.empty()
+ .with_function_name("ST_X")
+ .with_arguments(["ST_GeomFromText('POINT(21 52)')"])
+ .with_expected_result(21.0)),
+ (SuiteContainer.empty()
+ .with_function_name("ST_Y")
+ .with_arguments(["ST_GeomFromText('POINT(21 52)')"])
+ .with_expected_result(52.0)),
+ (SuiteContainer.empty()
+ .with_function_name("ST_StartPoint")
+ .with_arguments(["ST_GeomFromText('LINESTRING(100 150,50 60, 70 80, 160 170)')"])
+ .with_expected_result("POINT (100 150)")
+ .with_transform("ST_ASText")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_Endpoint")
+ .with_arguments(["ST_GeomFromText('LINESTRING(100 150,50 60, 70 80, 160 170)')"])
+ .with_expected_result("POINT (160 170)")
+ .with_transform("ST_ASText")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_Boundary")
+ .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')"])
+ .with_expected_result(4)
+ .with_transform("ST_LENGTH")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_ExteriorRing")
+ .with_arguments(["ST_GeomFromText('POLYGON ((21 53, 22 53, 22 52, 21 52, 21 53))')"])
+ .with_expected_result(4)
+ .with_transform("ST_LENGTH")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_GeometryN")
+ .with_arguments(["ST_GeomFromText('MULTIPOINT((1 2), (3 4), (5 6), (8 9))')", "0"])
+ .with_expected_result(1)
+ .with_transform("ST_X")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_InteriorRingN")
+ .with_arguments([
+ "ST_GeomFromText('POLYGON((0 0, 0 5, 5 5, 5 0, 0 0), (1 1, 2 1, 2 2, 1 2, 1 1))')",
+ "0"])
+ .with_expected_result(4.0)
+ .with_transform("ST_LENGTH")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_Dump")
+ .with_arguments([
+ "ST_GeomFromText('MULTIPOINT ((10 40), (40 30), (20 20), (30 10))')"])
+ .with_expected_result(4)
+ .with_transform("SIZE")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_DumpPoints")
+ .with_arguments([
+ "ST_GeomFromTEXT('LINESTRING (0 0, 1 1, 1 0)')"])
+ .with_expected_result(3)
+ .with_transform("SIZE")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_IsClosed")
+ .with_arguments([
+ "ST_GeomFROMTEXT('LINESTRING(0 0, 1 1, 1 0)')"])
+ .with_expected_result(False)),
+ (SuiteContainer.empty()
+ .with_function_name("ST_NumInteriorRings")
+ .with_arguments([
+ "ST_GeomFROMTEXT('POLYGON ((0 0, 0 5, 5 5, 5 0, 0 0), (1 1, 2 1, 2 2, 1 2, 1 1))')"])
+ .with_expected_result(1)),
+ (SuiteContainer.empty()
+ .with_function_name("ST_AddPoint")
+ .with_arguments([
+ "ST_GeomFromText('LINESTRING(0 0, 1 1, 1 0)')",
+ "ST_GeomFromText('Point(21 52)')",
+ "1"])
+ .with_expected_result(111.86168327044916)
+ .with_transform("ST_Length")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_RemovePoint")
+ .with_arguments([
+ "ST_GeomFromText('LINESTRING(0 0, 1 1, 1 0)')",
+ "1"
+ ])
+ .with_expected_result("LINESTRING (0 0, 1 0)")
+ .with_transform("ST_AsText")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_IsRing")
+ .with_arguments([
+ "ST_GeomFromText('LINESTRING(0 0, 0 1, 1 1, 1 0, 0 0)')"
+ ])
+ .with_expected_result(True)),
+ (SuiteContainer.empty()
+ .with_function_name("ST_NumGeometries")
+ .with_arguments([
+ "ST_GeomFromText('LINESTRING(0 0, 0 1, 1 1, 1 0, 0 0)')"
+ ])
+ .with_expected_result(1)),
+ (SuiteContainer.empty()
+ .with_function_name("ST_FlipCoordinates")
+ .with_arguments([
+ "ST_GeomFromText('POINT(21 52)')"
+ ])
+ .with_expected_result(52.0)
+ .with_transform("ST_X")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_MinimumBoundingRadius")
+ .with_arguments([
+ "ST_GeomFromText('POLYGON((1 1,0 0, -1 1, 1 1))')"
+ ])
+ .with_expected_result(Row(center=wkt.loads("POINT(0 1)"), radius=1.0))),
+ (SuiteContainer.empty()
+ .with_function_name("ST_MinimumBoundingCircle")
+ .with_arguments([
+ "ST_GeomFromText('POLYGON((1 1,0 0, -1 1, 1 1))')"
+ ])
+ .with_expected_result(3.121445152258052)
+ .with_transform("ST_AREA")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_MinimumBoundingCircle")
+ .with_arguments(["ST_GeomFromText('POLYGON((1 1,0 0, -1 1, 1 1))')"])
+ .with_expected_result(3.121445152258052)
+ .with_transform("ST_AREA")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_SubDivide")
+ .with_arguments([
+ "ST_GeomFromText('POLYGON((35 10, 45 45, 15 40, 10 20, 35 10), (20 30, 35 35, 30 20, 20 30))')",
+ "5"])
+ .with_expected_result(14)
+ .with_transform("SIZE")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_SubDivideExplode")
+ .with_arguments([
+ "ST_GeomFromText('LINESTRING(0 0, 85 85, 100 100, 120 120, 21 21, 10 10, 5 5)')",
+ "5"])
+ .with_expected_result(2)
+ .with_transform("ST_NPoints")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_GeoHash")
+ .with_arguments([
+ "ST_GeomFromText('POINT(21.427834 52.042576573)')",
+ "5"])
+ .with_expected_result("u3r0p")),
+ (SuiteContainer.empty()
+ .with_function_name("ST_Collect")
+ .with_arguments([
+ "ST_GeomFromText('POINT(21.427834 52.042576573)')",
+ "ST_GeomFromText('POINT(45.342524 56.342354355)')"])
+ .with_expected_result(0.0)
+ .with_transform("ST_LENGTH"))
+]
+
+
+def pytest_generate_tests(metafunc):
+ funcarglist = metafunc.cls.params[metafunc.function.__name__]
+ argnames = sorted(funcarglist[0])
+ metafunc.parametrize(
+ argnames, [[funcargs[name] for name in argnames] for funcargs in funcarglist]
+ )
+
+
+class TestConstructorFunctions(TestBase):
+ params = {
+ "test_geospatial_function_on_stream": SEDONA_LISTED_SQL_FUNCTIONS
+ }
+
+ @pytest.mark.sparkstreaming
+ def test_geospatial_function_on_stream(self, function_name: str, arguments: List[str],
+ expected_result: Any, transform: Optional[str]):
+ # given input stream
+
+ input_stream = self.spark.readStream.schema(SCHEMA).parquet(os.path.join(
+ tests_resource,
+ "streaming/geometry_example")
+ ).selectExpr(f"{function_name}({', '.join(arguments)}) AS result")
+
+ # and target table
+ random_table_name = f"view_{random.randint(0, 100000)}"
+
+ # when saving stream to memory
+ streaming_query = input_stream.writeStream.format("memory") \
+ .queryName(random_table_name) \
+ .outputMode("append").start()
+
+ streaming_query.processAllAvailable()
+
+ # then result should be as expected
+ transform_query = "result" if not transform else f"{transform}(result)"
+ assert self.spark.sql(f"select {transform_query} from {random_table_name}").collect()[0][0] == expected_result