You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2020/12/09 23:15:08 UTC
[incubator-sedona] branch master updated: [SEDONA-7] Build Sedona
for Spark 2.4, 3.0 and Scala 2.11, 2.12 (#494)
This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 220aed4 [SEDONA-7] Build Sedona for Spark 2.4, 3.0 and Scala 2.11, 2.12 (#494)
220aed4 is described below
commit 220aed405f34062dd3a6014e440089ff49910fe4
Author: Jia Yu <ji...@apache.org>
AuthorDate: Wed Dec 9 15:15:02 2020 -0800
[SEDONA-7] Build Sedona for Spark 2.4, 3.0 and Scala 2.11, 2.12 (#494)
* Update GitHub action
* Cache Maven package
* Separate maven and python test
* Add Python3 script to switch between Spark 2 and Spark 3
* Fix the issue in UdfRegistrator
* Improve the spark version converter
* Use the Java-Scala converter which works for both Scala 2.11 and 2.12
* Fix a bug in the converter
* Add the scala build target in pom
* Remove Hive in test
* spark-version-converter takes spark2 or spark3 as the input
* Add test matrix
* Add test matrix
* Add test matrix
* Add test matrix
* Add test matrix
* Fix the python CI
* Fix the python CI
* Fix the python CI
* Fix the python CI
* Fix the bug in Python check version function
* Fix the python test on Spark 2.4
* Drop the test on Spark 2.4. It looks like PySedona does not work on Spark 2.4
* Drop the test on Python 3.8 and 3.9
* Add the test for Python 3.8
* Add the test for Python 3.9
* Revert "Add the test for Python 3.9"
This reverts commit 7e98de1e17408605dbe57e6b6ea7ceb1e9f1c3b0.
* Add the test for Spark 2.4 + Python 3.7
* Add the test for Spark 2.4 + Python 3.7
* Add the test for Spark 2.4 + Python 3.7
* Add the test for Spark 2.4 + Python 3.7
* Add the test for Spark 2.4 + Python 3.7
* Remove the jackson dependency in POM
* Add Spark version to Python adapter artifact name
* Add the test for Python 3.9
---
.github/workflows/java.yml | 30 ++-
.github/workflows/python.yml | 64 ++++--
core/pom.xml | 2 +-
pom.xml | 42 +---
python-adapter/pom.xml | 24 +--
.../utils/PythonAdapterWrapper.scala | 2 +-
python/Pipfile | 2 +-
python/sedona/core/jvm/config.py | 9 +-
python/tests/format_mapper/test_geo_json_reader.py | 9 +-
python/tests/sql/test_adapter.py | 2 +-
spark-version-converter.py | 62 ++++++
sql/pom.xml | 26 +--
.../scala/org/apache/sedona/sql/UDF/Catalog.scala | 9 +-
.../org/apache/sedona/sql/UDF/UdfRegistrator.scala | 6 +-
.../expressions_udaf/AggregateFunctions.scala | 218 +++++++++++++++++++++
.../strategy/join/JoinQueryDetector.scala | 27 ++-
.../strategy/join/TraitJoinQueryExec.scala | 6 +-
.../org/apache/sedona/sql/TestBaseScala.scala | 1 -
viz/pom.xml | 18 +-
19 files changed, 420 insertions(+), 139 deletions(-)
diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml
index c4ebefa..d75f89d 100644
--- a/.github/workflows/java.yml
+++ b/.github/workflows/java.yml
@@ -1,17 +1,33 @@
name: Scala and Java build
-on: [push, pull_request]
-
+on:
+ push:
+ branches:
+ - master
+ pull_request:
+ branches:
+ - '*'
+
jobs:
build:
runs-on: ubuntu-18.04
+ strategy:
+ matrix:
+ spark: [2.4.7, 3.0.1]
+ scala: [2.11.8, 2.12.8]
+ exclude:
+ - spark: 3.0.1
+ scala: 2.11.8
steps:
- uses: actions/checkout@v2
- uses: actions/setup-java@v1
with:
java-version: '8'
+ - uses: actions/setup-python@v2
+ with:
+ python-version: '3.7'
- name: Cache Maven packages
uses: actions/cache@v2
with:
@@ -19,7 +35,13 @@ jobs:
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-m2
- run: git submodule update --init --recursive # Checkout Git submodule if necessary
- - run: mvn -q clean install
+ - env:
+ SPARK_VERSION: ${{ matrix.spark }}
+ run: python3 spark-version-converter.py spark${SPARK_VERSION:0:1}
+ - env:
+ SPARK_VERSION: ${{ matrix.spark }}
+ SCALA_VERSION: ${{ matrix.scala }}
+ run: mvn -q clean install -Dscala.compat.version=${SCALA_VERSION:0:4} -Dscala.version=$SCALA_VERSION -Dspark.compat.version=${SPARK_VERSION:0:3} -Dspark.version=$SPARK_VERSION
- run: mkdir staging
- run: cp core/target/sedona-*.jar staging
- run: cp sql/target/sedona-*.jar staging
@@ -27,5 +49,5 @@ jobs:
- run: cp python-adapter/target/sedona-*.jar staging
- uses: actions/upload-artifact@v2
with:
- name: Package
+ name: generated-jars
path: staging
diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml
index 057bfce..ef66649 100644
--- a/.github/workflows/python.yml
+++ b/.github/workflows/python.yml
@@ -1,17 +1,41 @@
name: Python build
-on: [push, pull_request]
-
+on:
+ push:
+ branches:
+ - master
+ pull_request:
+ branches:
+ - '*'
+
jobs:
build:
runs-on: ubuntu-18.04
+ strategy:
+ matrix:
+ include:
+ - spark: 3.0.1
+ scala: 2.12.8
+ python: 3.7
+ - spark: 3.0.1
+ scala: 2.12.8
+ python: 3.8
+ - spark: 3.0.1
+ scala: 2.12.8
+ python: 3.9
+ - spark: 2.4.7
+ scala: 2.11.8
+ python: 3.7
steps:
- uses: actions/checkout@v2
- uses: actions/setup-java@v1
with:
java-version: '8'
+ - uses: actions/setup-python@v2
+ with:
+ python-version: ${{ matrix.python }}
- name: Cache Maven packages
uses: actions/cache@v2
with:
@@ -19,19 +43,31 @@ jobs:
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-m2
- run: git submodule update --init --recursive # Checkout Git submodule if necessary
- - run: mvn -q clean install -DskipTests
- - uses: actions/setup-python@v2
- with:
- python-version: '3.7'
- - uses: vemonet/setup-spark@v1
- with:
- spark-version: '3.0.1' # Exact version
- - run: export PYTHONPATH=$SPARK_HOME/python
- - run: sudo apt-get -y install python3-pip python-dev
+ - env:
+ SPARK_VERSION: ${{ matrix.spark }}
+ run: python3 spark-version-converter.py spark${SPARK_VERSION:0:1}
+ - env:
+ SPARK_VERSION: ${{ matrix.spark }}
+ SCALA_VERSION: ${{ matrix.scala }}
+ run: mvn -q clean install -DskipTests -Dscala.compat.version=${SCALA_VERSION:0:4} -Dscala.version=$SCALA_VERSION -Dspark.compat.version=${SPARK_VERSION:0:3} -Dspark.version=$SPARK_VERSION
+ - env:
+ SPARK_VERSION: ${{ matrix.spark }}
+ run: wget https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop2.7.tgz
+ - env:
+ SPARK_VERSION: ${{ matrix.spark }}
+ run: tar -xzf spark-${SPARK_VERSION}-bin-hadoop2.7.tgz
+ - run: sudo apt-get -y install python3-pip python-dev libgeos-dev
- run: sudo pip3 install -U setuptools
- run: sudo pip3 install -U wheel
- run: sudo pip3 install -U virtualenvwrapper
- run: python3 -m pip install pipenv
- - run: (cd python;pipenv install --dev)
- - run: find python-adapter/target/ -iregex "python-adapter\/target\/sedona-python-adapter-[0-9]\.[0-9]\.[0-9]-incubator\(-SNAPSHOT\)?\.jar" -exec cp {} $SPARK_HOME/jars \;
- - run: (cd python;pipenv run pytest tests)
+ - env:
+ SPARK_VERSION: ${{ matrix.spark }}
+ PYTHON_VERSION: ${{ matrix.python }}
+ run: (cd python;pipenv --python ${PYTHON_VERSION};pipenv install pyspark==${SPARK_VERSION};pipenv install --dev;pipenv graph)
+ - env:
+ SPARK_VERSION: ${{ matrix.spark }}
+ run: find python-adapter/target -name sedona-* -exec cp {} spark-${SPARK_VERSION}-bin-hadoop2.7/jars/ \;
+ - env:
+ SPARK_VERSION: ${{ matrix.spark }}
+ run: (export SPARK_HOME=$PWD/spark-${SPARK_VERSION}-bin-hadoop2.7;export PYTHONPATH=$SPARK_HOME/python;cd python;pipenv run pytest tests)
diff --git a/core/pom.xml b/core/pom.xml
index 95210a0..5f18c55 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -26,7 +26,7 @@
<version>1.0.0-incubator-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>sedona-core</artifactId>
+ <artifactId>sedona-core_${scala.compat.version}</artifactId>
<name>${project.groupId}:${project.artifactId}</name>
<description>A cluster computing system for processing large-scale spatial data: RDD API</description>
diff --git a/pom.xml b/pom.xml
index 77cd6d0..5543877 100644
--- a/pom.xml
+++ b/pom.xml
@@ -41,15 +41,11 @@
<scala.version>2.12.8</scala.version>
<scala.compat.version>2.12</scala.compat.version>
<geotools.version>24.0</geotools.version>
- <spark.version>3.0.0</spark.version>
+ <spark.version>3.0.1</spark.version>
+ <spark.compat.version>3.0</spark.compat.version>
</properties>
<dependencies>
<dependency>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-scala_${scala.compat.version}</artifactId>
- <version>2.10.0</version>
- </dependency>
- <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
@@ -59,47 +55,25 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>*</artifactId>
</exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>*</artifactId>
- </exclusion>
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.compat.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.8.2</version>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.2</version>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
diff --git a/python-adapter/pom.xml b/python-adapter/pom.xml
index 5c9beaf..940b687 100644
--- a/python-adapter/pom.xml
+++ b/python-adapter/pom.xml
@@ -9,32 +9,16 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>sedona-python-adapter</artifactId>
+ <artifactId>sedona-python-adapter-${spark.compat.version}_${scala.compat.version}</artifactId>
<dependencies>
<dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.compat.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.apache.sedona</groupId>
- <artifactId>sedona-core</artifactId>
+ <artifactId>sedona-core_${scala.compat.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.sedona</groupId>
- <artifactId>sedona-sql</artifactId>
+ <artifactId>sedona-sql-${spark.compat.version}_${scala.compat.version}</artifactId>
<version>${project.version}</version>
</dependency>
<!--for CRS transformation-->
@@ -107,4 +91,4 @@
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/utils/PythonAdapterWrapper.scala b/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/utils/PythonAdapterWrapper.scala
index d129997..9a7d512 100644
--- a/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/utils/PythonAdapterWrapper.scala
+++ b/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/utils/PythonAdapterWrapper.scala
@@ -24,7 +24,7 @@ import org.apache.spark.api.java.JavaPairRDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.locationtech.jts.geom.Geometry
-import scala.collection.convert.ImplicitConversions._
+import scala.collection.JavaConversions._
object PythonAdapterWrapper {
def toDf[T <: Geometry](spatialRDD: SpatialRDD[T], fieldNames: java.util.ArrayList[String], sparkSession: SparkSession): DataFrame = {
diff --git a/python/Pipfile b/python/Pipfile
index d1a197b..308e5d5 100644
--- a/python/Pipfile
+++ b/python/Pipfile
@@ -11,7 +11,7 @@ jupyter="*"
[packages]
pandas="*"
geopandas="==0.6.0"
-pyspark="==3.0.0"
+pyspark=">=2.4.0"
attrs="*"
[requires]
diff --git a/python/sedona/core/jvm/config.py b/python/sedona/core/jvm/config.py
index 9a0e226..a265a97 100644
--- a/python/sedona/core/jvm/config.py
+++ b/python/sedona/core/jvm/config.py
@@ -108,12 +108,13 @@ class SedonaMeta:
@classmethod
def get_version(cls, spark_jars: str) -> Optional[str]:
- sedona_version = findall(r"sedona-python-adapter-(\d{1}\.\d{1}\.\d{1}).*?jar", spark_jars)
+ # Find Spark version, Scala version and Sedona version.
+ versions = findall(r"sedona-python-adapter-([^,\n]+)_([^,\n]+)-([^,\n]+)-incubator", spark_jars)
try:
- version = sedona_version[0]
+ sedona_version = versions[0][2]
except IndexError:
- version = None
- return version
+ sedona_version = None
+ return sedona_version
@classproperty
def version(cls):
diff --git a/python/tests/format_mapper/test_geo_json_reader.py b/python/tests/format_mapper/test_geo_json_reader.py
index 7876e99..7516f7e 100644
--- a/python/tests/format_mapper/test_geo_json_reader.py
+++ b/python/tests/format_mapper/test_geo_json_reader.py
@@ -35,8 +35,7 @@ geo_json_with_invalid_geom_with_feature_property = os.path.join(tests_path, "res
class TestGeoJsonReader(TestBase):
def test_read_to_geometry_rdd(self):
- print(SedonaMeta.version)
- if is_greater_or_equal_version(SedonaMeta.version, "1.2.0"):
+ if is_greater_or_equal_version(SedonaMeta.version, "1.0.0"):
geo_json_rdd = GeoJsonReader.readToGeometryRDD(
self.sc,
geo_json_geom_with_feature_property
@@ -52,7 +51,7 @@ class TestGeoJsonReader(TestBase):
assert geo_json_rdd.rawSpatialRDD.count() == 10
def test_read_to_valid_geometry_rdd(self):
- if is_greater_or_equal_version(SedonaMeta.version, "1.2.0"):
+ if is_greater_or_equal_version(SedonaMeta.version, "1.0.0"):
geo_json_rdd = GeoJsonReader.readToGeometryRDD(
self.sc,
geo_json_geom_with_feature_property,
@@ -87,7 +86,7 @@ class TestGeoJsonReader(TestBase):
assert geo_json_rdd.rawSpatialRDD.count() == 3
def test_read_to_include_id_rdd(self):
- if is_greater_or_equal_version(SedonaMeta.version, "1.2.0"):
+ if is_greater_or_equal_version(SedonaMeta.version, "1.0.0"):
geo_json_rdd = GeoJsonReader.readToGeometryRDD(
self.sc,
geo_json_contains_id,
@@ -108,7 +107,7 @@ class TestGeoJsonReader(TestBase):
assert geo_json_rdd.fieldNames.__len__() == 3
def test_read_to_geometry_rdd_invalid_syntax(self):
- if is_greater_or_equal_version(SedonaMeta.version, "1.2.0"):
+ if is_greater_or_equal_version(SedonaMeta.version, "1.0.0"):
geojson_rdd = GeoJsonReader.readToGeometryRDD(
self.sc,
geo_json_with_invalid_geom_with_feature_property,
diff --git a/python/tests/sql/test_adapter.py b/python/tests/sql/test_adapter.py
index fc508fa..0f06117 100644
--- a/python/tests/sql/test_adapter.py
+++ b/python/tests/sql/test_adapter.py
@@ -275,7 +275,7 @@ class TestAdapter(TestBase):
assert spatial_rdd.approximateTotalCount == 121960
assert spatial_rdd.boundaryEnvelope == Envelope(-179.147236, 179.475569, -14.548699, 71.35513400000001)
- @pytest.mark.skipif(is_greater_or_equal_version(version, "1.3.0"), reason="Depreciated after spark 1.2.0")
+ @pytest.mark.skipif(is_greater_or_equal_version(version, "1.0.0"), reason="Deprecated in Sedona")
def test_to_spatial_rdd_df_geom_column_id(self):
df = self.spark.read.\
format("csv").\
diff --git a/spark-version-converter.py b/spark-version-converter.py
new file mode 100644
index 0000000..61af11f
--- /dev/null
+++ b/spark-version-converter.py
@@ -0,0 +1,62 @@
+# Python 3
+import fileinput
+import sys
+
+spark2_anchor = 'SPARK2 anchor'
+spark3_anchor = 'SPARK3 anchor'
+files = ['sql/src/main/scala/org/apache/sedona/sql/UDF/UdfRegistrator.scala',
+ 'sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala',
+ 'sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala']
+
+def switch_version(line):
+ if line[:2] == '//':
+ print(line[2:], end='') # enable code
+ return 'enabled'
+ else:
+ print('//' + line, end='') # disable code
+ return 'disabled'
+
+def enable_version(line):
+ if line[:2] == '//':
+ print(line[2:], end='') # enable code
+ return 'enabled'
+ else:
+ print(line, end='')
+ return 'enabled before'
+
+def disable_version(line):
+ if line[:2] == '//':
+ print(line, end='')
+ return 'disabled before'
+ else:
+ print('//' + line, end='') # disable code
+ return 'disabled'
+
+def parse_file(filepath, argv):
+ conversion_result_spark2 = ''
+ conversion_result_spark3 = ''
+ if argv[1] == 'spark2':
+ with fileinput.FileInput(filepath, inplace=True) as file:
+ for line in file:
+ if spark2_anchor in line:
+ conversion_result_spark2 = spark2_anchor + ' ' + enable_version(line)
+ elif spark3_anchor in line:
+ conversion_result_spark3 = spark3_anchor + ' ' + disable_version(line)
+ else:
+ print(line, end='')
+ return conversion_result_spark2 + ' and ' + conversion_result_spark3
+ elif argv[1] == 'spark3':
+ with fileinput.FileInput(filepath, inplace=True) as file:
+ for line in file:
+ if spark2_anchor in line:
+ conversion_result_spark2 = spark2_anchor + ' ' + disable_version(line)
+ elif spark3_anchor in line:
+ conversion_result_spark3 = spark3_anchor + ' ' + enable_version(line)
+ else:
+ print(line, end='')
+ return conversion_result_spark2 + ' and ' + conversion_result_spark3
+ else:
+ return 'wrong spark version'
+
+for filepath in files:
+ print(filepath + ': ' + parse_file(filepath, sys.argv))
\ No newline at end of file
diff --git a/sql/pom.xml b/sql/pom.xml
index 5f19029..d814989 100644
--- a/sql/pom.xml
+++ b/sql/pom.xml
@@ -26,7 +26,7 @@
<version>1.0.0-incubator-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>sedona-sql</artifactId>
+ <artifactId>sedona-sql-${spark.compat.version}_${scala.compat.version}</artifactId>
<name>${project.groupId}:${project.artifactId}</name>
<description>A cluster computing system for processing large-scale spatial data: SQL API</description>
@@ -36,32 +36,10 @@
<dependencies>
<dependency>
<groupId>org.apache.sedona</groupId>
- <artifactId>sedona-core</artifactId>
+ <artifactId>sedona-core_${scala.compat.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.compat.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_${scala.compat.version}</artifactId>
- <version>${spark.version}</version>
- <scope>test</scope>
- </dependency>
<!--The following GeoTools dependencies use GNU Lesser General Public License and thus are excluded from the binary distribution-->
<!-- Users have to include them by themselves manually -->
<!-- See https://www.apache.org/legal/resolved.html#category-x -->
diff --git a/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala b/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
index c07b0b5..a971083 100644
--- a/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
+++ b/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
@@ -19,7 +19,7 @@
package org.apache.sedona.sql.UDF
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.sql.expressions.{Aggregator, UserDefinedAggregateFunction}
import org.apache.spark.sql.sedona_sql.expressions._
import org.locationtech.jts.geom.Geometry
@@ -84,4 +84,11 @@ object Catalog {
new ST_Envelope_Aggr,
new ST_Intersection_Aggr
)
+
+ import org.apache.spark.sql.sedona_sql.expressions_udaf
+ val aggregateExpressions_UDAF: Seq[UserDefinedAggregateFunction] = Seq(
+ new expressions_udaf.ST_Union_Aggr,
+ new expressions_udaf.ST_Envelope_Aggr,
+ new expressions_udaf.ST_Intersection_Aggr
+ )
}
diff --git a/sql/src/main/scala/org/apache/sedona/sql/UDF/UdfRegistrator.scala b/sql/src/main/scala/org/apache/sedona/sql/UDF/UdfRegistrator.scala
index a15db81..7074408 100644
--- a/sql/src/main/scala/org/apache/sedona/sql/UDF/UdfRegistrator.scala
+++ b/sql/src/main/scala/org/apache/sedona/sql/UDF/UdfRegistrator.scala
@@ -29,11 +29,13 @@ object UdfRegistrator {
def registerAll(sparkSession: SparkSession): Unit = {
Catalog.expressions.foreach(f => sparkSession.sessionState.functionRegistry.createOrReplaceTempFunction(f.getClass.getSimpleName.dropRight(1), f))
- Catalog.aggregateExpressions.foreach(f => sparkSession.udf.register(f.getClass.getSimpleName, functions.udaf(f)))
+Catalog.aggregateExpressions.foreach(f => sparkSession.udf.register(f.getClass.getSimpleName, functions.udaf(f))) // SPARK3 anchor
+//Catalog.aggregateExpressions_UDAF.foreach(f => sparkSession.udf.register(f.getClass.getSimpleName, f)) // SPARK2 anchor
}
def dropAll(sparkSession: SparkSession): Unit = {
Catalog.expressions.foreach(f => sparkSession.sessionState.functionRegistry.dropFunction(FunctionIdentifier(f.getClass.getSimpleName.dropRight(1))))
- Catalog.aggregateExpressions.foreach(f => sparkSession.sessionState.functionRegistry.dropFunction(FunctionIdentifier(f.getClass.getSimpleName)))
+Catalog.aggregateExpressions.foreach(f => sparkSession.sessionState.functionRegistry.dropFunction(FunctionIdentifier(f.getClass.getSimpleName))) // SPARK3 anchor
+//Catalog.aggregateExpressions_UDAF.foreach(f => sparkSession.sessionState.functionRegistry.dropFunction(FunctionIdentifier(f.getClass.getSimpleName))) // SPARK2 anchor
}
}
\ No newline at end of file
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions_udaf/AggregateFunctions.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions_udaf/AggregateFunctions.scala
new file mode 100644
index 0000000..5b3c0f5
--- /dev/null
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions_udaf/AggregateFunctions.scala
@@ -0,0 +1,218 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sedona_sql.expressions_udaf
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.locationtech.jts.geom.{Coordinate, Geometry, GeometryFactory}
+
+/**
+ * Return the polygon union of all Polygon in the given column
+ */
+
+class ST_Union_Aggr extends UserDefinedAggregateFunction {
+ override def inputSchema: StructType = StructType(StructField("Union", new GeometryUDT) :: Nil)
+
+ override def bufferSchema: StructType = StructType(
+ StructField("Union", new GeometryUDT) :: Nil
+ )
+
+ override def dataType: DataType = new GeometryUDT
+
+ override def deterministic: Boolean = true
+
+ override def initialize(buffer: MutableAggregationBuffer): Unit = {
+ val coordinates: Array[Coordinate] = new Array[Coordinate](5)
+ coordinates(0) = new Coordinate(-999999999, -999999999)
+ coordinates(1) = new Coordinate(-999999999, -999999999)
+ coordinates(2) = new Coordinate(-999999999, -999999999)
+ coordinates(3) = new Coordinate(-999999999, -999999999)
+ coordinates(4) = coordinates(0)
+ val geometryFactory = new GeometryFactory()
+ buffer(0) = geometryFactory.createPolygon(coordinates)
+ }
+
+ override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+ val accumulateUnion = buffer.getAs[Geometry](0)
+ val newPolygon = input.getAs[Geometry](0)
+ if (accumulateUnion.getArea == 0) buffer(0) = newPolygon
+ else buffer(0) = accumulateUnion.union(newPolygon)
+ }
+
+ override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+ val leftPolygon = buffer1.getAs[Geometry](0)
+ val rightPolygon = buffer2.getAs[Geometry](0)
+ if (leftPolygon.getCoordinates()(0).x == -999999999) buffer1(0) = rightPolygon
+ else if (rightPolygon.getCoordinates()(0).x == -999999999) buffer1(0) = leftPolygon
+ else buffer1(0) = leftPolygon.union(rightPolygon)
+ }
+
+ override def evaluate(buffer: Row): Any = {
+ return buffer.getAs[Geometry](0)
+ }
+}
+
+/**
+ * Return the envelope boundary of the entire column
+ */
+class ST_Envelope_Aggr extends UserDefinedAggregateFunction {
+ // This is the input fields for your aggregate function.
+ override def inputSchema: org.apache.spark.sql.types.StructType =
+ StructType(StructField("Envelope", new GeometryUDT) :: Nil)
+
+ // This is the internal fields you keep for computing your aggregate.
+ override def bufferSchema: StructType = StructType(
+ StructField("Envelope", new GeometryUDT) :: Nil
+ )
+
+ // This is the output type of your aggregatation function.
+ override def dataType: DataType = new GeometryUDT
+
+ override def deterministic: Boolean = true
+
+ // This is the initial value for your buffer schema.
+ override def initialize(buffer: MutableAggregationBuffer): Unit = {
+ val coordinates: Array[Coordinate] = new Array[Coordinate](5)
+ coordinates(0) = new Coordinate(-999999999, -999999999)
+ coordinates(1) = new Coordinate(-999999999, -999999999)
+ coordinates(2) = new Coordinate(-999999999, -999999999)
+ coordinates(3) = new Coordinate(-999999999, -999999999)
+ coordinates(4) = new Coordinate(-999999999, -999999999)
+ val geometryFactory = new GeometryFactory()
+ buffer(0) = geometryFactory.createPolygon(coordinates)
+ //buffer(0) = new GenericArrayData(GeometrySerializer.serialize(geometryFactory.createPolygon(coordinates)))
+ }
+
+ // This is how to update your buffer schema given an input.
+ override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+ val accumulateEnvelope = buffer.getAs[Geometry](0).getEnvelopeInternal
+ val newEnvelope = input.getAs[Geometry](0).getEnvelopeInternal
+ val coordinates: Array[Coordinate] = new Array[Coordinate](5)
+ var minX = 0.0
+ var minY = 0.0
+ var maxX = 0.0
+ var maxY = 0.0
+ if (accumulateEnvelope.getMinX == -999999999) {
+ // Found the accumulateEnvelope is the initial value
+ minX = newEnvelope.getMinX
+ minY = newEnvelope.getMinY
+ maxX = newEnvelope.getMaxX
+ maxY = newEnvelope.getMaxY
+ }
+ else {
+ minX = Math.min(accumulateEnvelope.getMinX, newEnvelope.getMinX)
+ minY = Math.min(accumulateEnvelope.getMinY, newEnvelope.getMinY)
+ maxX = Math.max(accumulateEnvelope.getMaxX, newEnvelope.getMaxX)
+ maxY = Math.max(accumulateEnvelope.getMaxY, newEnvelope.getMaxY)
+ }
+ coordinates(0) = new Coordinate(minX, minY)
+ coordinates(1) = new Coordinate(minX, maxY)
+ coordinates(2) = new Coordinate(maxX, maxY)
+ coordinates(3) = new Coordinate(maxX, minY)
+ coordinates(4) = coordinates(0)
+ val geometryFactory = new GeometryFactory()
+ buffer(0) = geometryFactory.createPolygon(coordinates)
+ }
+
+ // This is how to merge two objects with the bufferSchema type.
+ override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+ val leftEnvelope = buffer1.getAs[Geometry](0).getEnvelopeInternal
+ val rightEnvelope = buffer2.getAs[Geometry](0).getEnvelopeInternal
+ val coordinates: Array[Coordinate] = new Array[Coordinate](5)
+ var minX = 0.0
+ var minY = 0.0
+ var maxX = 0.0
+ var maxY = 0.0
+ if (leftEnvelope.getMinX == -999999999) {
+ // Found the leftEnvelope is the initial value
+ minX = rightEnvelope.getMinX
+ minY = rightEnvelope.getMinY
+ maxX = rightEnvelope.getMaxX
+ maxY = rightEnvelope.getMaxY
+ }
+ else if (rightEnvelope.getMinX == -999999999) {
+ // Found the rightEnvelope is the initial value
+ minX = leftEnvelope.getMinX
+ minY = leftEnvelope.getMinY
+ maxX = leftEnvelope.getMaxX
+ maxY = leftEnvelope.getMaxY
+ }
+ else {
+ minX = Math.min(leftEnvelope.getMinX, rightEnvelope.getMinX)
+ minY = Math.min(leftEnvelope.getMinY, rightEnvelope.getMinY)
+ maxX = Math.max(leftEnvelope.getMaxX, rightEnvelope.getMaxX)
+ maxY = Math.max(leftEnvelope.getMaxY, rightEnvelope.getMaxY)
+ }
+ coordinates(0) = new Coordinate(minX, minY)
+ coordinates(1) = new Coordinate(minX, maxY)
+ coordinates(2) = new Coordinate(maxX, maxY)
+ coordinates(3) = new Coordinate(maxX, minY)
+ coordinates(4) = coordinates(0)
+ val geometryFactory = new GeometryFactory()
+ buffer1(0) = geometryFactory.createPolygon(coordinates)
+ }
+
+ // This is where you output the final value, given the final value of your bufferSchema.
+ override def evaluate(buffer: Row): Any = {
+ return buffer.getAs[Geometry](0)
+ }
+}
+
+/**
+ * Return the polygon intersection of all Polygon in the given column
+ */
+class ST_Intersection_Aggr extends UserDefinedAggregateFunction {
+ override def inputSchema: StructType = StructType(StructField("Intersection", new GeometryUDT) :: Nil)
+
+ override def bufferSchema: StructType = StructType(
+ StructField("Intersection", new GeometryUDT) :: Nil
+ )
+
+ override def dataType: DataType = new GeometryUDT
+
+ override def deterministic: Boolean = true
+
+ override def initialize(buffer: MutableAggregationBuffer): Unit = {
+ val coordinates: Array[Coordinate] = new Array[Coordinate](5)
+ coordinates(0) = new Coordinate(-999999999, -999999999)
+ coordinates(1) = new Coordinate(-999999999, -999999999)
+ coordinates(2) = new Coordinate(-999999999, -999999999)
+ coordinates(3) = new Coordinate(-999999999, -999999999)
+ coordinates(4) = new Coordinate(-999999999, -999999999)
+ val geometryFactory = new GeometryFactory()
+ buffer(0) = geometryFactory.createPolygon(coordinates)
+ }
+
+ override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+ val accumulateIntersection = buffer.getAs[Geometry](0)
+ val newPolygon = input.getAs[Geometry](0)
+ if (accumulateIntersection.getArea == 0) buffer(0) = newPolygon
+ else buffer(0) = accumulateIntersection.intersection(newPolygon)
+ }
+
+ override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+ val leftPolygon = buffer1.getAs[Geometry](0)
+ val rightPolygon = buffer2.getAs[Geometry](0)
+ if (leftPolygon.getCoordinates()(0).x == -999999999) buffer1(0) = rightPolygon
+ else if (rightPolygon.getCoordinates()(0).x == -999999999) buffer1(0) = leftPolygon
+ else buffer1(0) = leftPolygon.intersection(rightPolygon)
+ }
+
+ override def evaluate(buffer: Row): Any = {
+ buffer.getAs[Geometry](0)
+ }
+}
\ No newline at end of file
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
index 16a3480..df624f9 100644
--- a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
@@ -36,39 +36,48 @@ object JoinQueryDetector extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// ST_Contains(a, b) - a contains b
- case Join(left, right, Inner, Some(ST_Contains(Seq(leftShape, rightShape))), _) =>
+case Join(left, right, Inner, Some(ST_Contains(Seq(leftShape, rightShape))), _) => // SPARK3 anchor
+//case Join(left, right, Inner, Some(ST_Contains(Seq(leftShape, rightShape)))) => // SPARK2 anchor
planSpatialJoin(left, right, Seq(leftShape, rightShape), false)
// ST_Intersects(a, b) - a intersects b
- case Join(left, right, Inner, Some(ST_Intersects(Seq(leftShape, rightShape))), _) =>
+case Join(left, right, Inner, Some(ST_Intersects(Seq(leftShape, rightShape))), _) => // SPARK3 anchor
+//case Join(left, right, Inner, Some(ST_Intersects(Seq(leftShape, rightShape)))) => // SPARK2 anchor
planSpatialJoin(left, right, Seq(leftShape, rightShape), true)
// ST_WITHIN(a, b) - a is within b
- case Join(left, right, Inner, Some(ST_Within(Seq(leftShape, rightShape))), _) =>
+case Join(left, right, Inner, Some(ST_Within(Seq(leftShape, rightShape))), _) => // SPARK3 anchor
+//case Join(left, right, Inner, Some(ST_Within(Seq(leftShape, rightShape)))) => // SPARK2 anchor
planSpatialJoin(right, left, Seq(rightShape, leftShape), false)
// ST_Overlaps(a, b) - a overlaps b
- case Join(left, right, Inner, Some(ST_Overlaps(Seq(leftShape, rightShape))), _) =>
+case Join(left, right, Inner, Some(ST_Overlaps(Seq(leftShape, rightShape))), _) => // SPARK3 anchor
+//case Join(left, right, Inner, Some(ST_Overlaps(Seq(leftShape, rightShape)))) => // SPARK2 anchor
planSpatialJoin(right, left, Seq(rightShape, leftShape), false)
// ST_Touches(a, b) - a touches b
- case Join(left, right, Inner, Some(ST_Touches(Seq(leftShape, rightShape))), _) =>
+case Join(left, right, Inner, Some(ST_Touches(Seq(leftShape, rightShape))), _) => // SPARK3 anchor
+//case Join(left, right, Inner, Some(ST_Touches(Seq(leftShape, rightShape)))) => // SPARK2 anchor
planSpatialJoin(left, right, Seq(leftShape, rightShape), true)
// ST_Distance(a, b) <= radius consider boundary intersection
- case Join(left, right, Inner, Some(LessThanOrEqual(ST_Distance(Seq(leftShape, rightShape)), radius)), _) =>
+case Join(left, right, Inner, Some(LessThanOrEqual(ST_Distance(Seq(leftShape, rightShape)), radius)), _) => // SPARK3 anchor
+//case Join(left, right, Inner, Some(LessThanOrEqual(ST_Distance(Seq(leftShape, rightShape)), radius))) => // SPARK2 anchor
planDistanceJoin(left, right, Seq(leftShape, rightShape), radius, true)
// ST_Distance(a, b) < radius don't consider boundary intersection
- case Join(left, right, Inner, Some(LessThan(ST_Distance(Seq(leftShape, rightShape)), radius)), _) =>
+case Join(left, right, Inner, Some(LessThan(ST_Distance(Seq(leftShape, rightShape)), radius)), _) => // SPARK3 anchor
+//case Join(left, right, Inner, Some(LessThan(ST_Distance(Seq(leftShape, rightShape)), radius))) => // SPARK2 anchor
planDistanceJoin(left, right, Seq(leftShape, rightShape), radius, false)
// ST_Equals(a, b) - a is equal to b
- case Join(left, right, Inner, Some(ST_Equals(Seq(leftShape, rightShape))), _) =>
+case Join(left, right, Inner, Some(ST_Equals(Seq(leftShape, rightShape))), _) => // SPARK3 anchor
+//case Join(left, right, Inner, Some(ST_Equals(Seq(leftShape, rightShape)))) => // SPARK2 anchor
planSpatialJoin(left, right, Seq(leftShape, rightShape), false)
// ST_Crosses(a, b) - a crosses b
- case Join(left, right, Inner, Some(ST_Crosses(Seq(leftShape, rightShape))), _) =>
+case Join(left, right, Inner, Some(ST_Crosses(Seq(leftShape, rightShape))), _) => // SPARK3 anchor
+//case Join(left, right, Inner, Some(ST_Crosses(Seq(leftShape, rightShape)))) => // SPARK2 anchor
planSpatialJoin(right, left, Seq(rightShape, leftShape), false)
case _ =>
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala
index 768773b..b6fb28b 100644
--- a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala
@@ -38,7 +38,8 @@ trait TraitJoinQueryExec {
// Using lazy val to avoid serialization
@transient private lazy val boundCondition: (InternalRow => Boolean) = {
if (extraCondition.isDefined) {
- Predicate.create(extraCondition.get, left.output ++ right.output).eval _
+Predicate.create(extraCondition.get, left.output ++ right.output).eval _ // SPARK3 anchor
+//newPredicate(extraCondition.get, left.output ++ right.output).eval _ // SPARK2 anchor
} else { (r: InternalRow) =>
true
}
@@ -133,7 +134,8 @@ trait TraitJoinQueryExec {
matches.rdd.mapPartitions { iter =>
val filtered =
if (extraCondition.isDefined) {
- val boundCondition = Predicate.create(extraCondition.get, left.output ++ right.output)
+val boundCondition = Predicate.create(extraCondition.get, left.output ++ right.output) // SPARK3 anchor
+//val boundCondition = newPredicate(extraCondition.get, left.output ++ right.output) // SPARK2 anchor
iter.filter {
case (l, r) =>
val leftRow = l.getUserData.asInstanceOf[UnsafeRow]
diff --git a/sql/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala b/sql/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index 7d94534..28274bb 100644
--- a/sql/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/sql/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -36,7 +36,6 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
config("spark.kryo.registrator", classOf[SedonaKryoRegistrator].getName).
master("local[*]").appName("sedonasqlScalaTest")
.config("spark.sql.warehouse.dir", warehouseLocation)
- .enableHiveSupport()
.getOrCreate()
val resourceFolder = System.getProperty("user.dir") + "/src/test/resources/"
diff --git a/viz/pom.xml b/viz/pom.xml
index e1a2836..6129672 100644
--- a/viz/pom.xml
+++ b/viz/pom.xml
@@ -26,7 +26,7 @@
<version>1.0.0-incubator-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>sedona-viz</artifactId>
+ <artifactId>sedona-viz-${spark.compat.version}_${scala.compat.version}</artifactId>
<name>${project.groupId}:${project.artifactId}</name>
<description>A cluster computing system for processing large-scale spatial data: RDD and SQL for Viz</description>
@@ -36,29 +36,17 @@
<dependencies>
<dependency>
<groupId>org.apache.sedona</groupId>
- <artifactId>sedona-core</artifactId>
+ <artifactId>sedona-core_${scala.compat.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.sedona</groupId>
- <artifactId>sedona-sql</artifactId>
+ <artifactId>sedona-sql-${spark.compat.version}_${scala.compat.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.compat.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.beryx</groupId>
<artifactId>awt-color-factory</artifactId>
<version>1.0.0</version>