You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2020/12/09 23:15:08 UTC

[incubator-sedona] branch master updated: [SEDONA-7] Build Sedona for Spark 2.4, 3.0 and Scala 2.11, 2.12 (#494)

This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 220aed4  [SEDONA-7] Build Sedona for Spark 2.4, 3.0 and Scala 2.11, 2.12 (#494)
220aed4 is described below

commit 220aed405f34062dd3a6014e440089ff49910fe4
Author: Jia Yu <ji...@apache.org>
AuthorDate: Wed Dec 9 15:15:02 2020 -0800

    [SEDONA-7] Build Sedona for Spark 2.4, 3.0 and Scala 2.11, 2.12 (#494)
    
    * Update GitHub action
    
    * Cache Maven package
    
    * Separate maven and python test
    
    * Add Python3 script to switch between Spark 2 and Spark 3
    
    * Fix the issue in UdfRegistrator
    
    * Improve the spark version converter
    
    * Use the Java-Scala converter which works for both Scala 2.11 and 2.12
    
    * Fix a bug in the converter
    
    * Add the scala build target in pom
    
    * Remove Hive in test
    
    * spark-version-converter takes spark2 or spark3 as the input
    
    * Add test matrix
    
    * Add test matrix
    
    * Add test matrix
    
    * Add test matrix
    
    * Add test matrix
    
    * Fix the python CI
    
    * Fix the python CI
    
    * Fix the python CI
    
    * Fix the python CI
    
    * Fix the bug in Python check version function
    
    * Fix the python test on Spark 2.4
    
    * Drop the test on Spark 2.4. It looks like PySedona does not work on Spark 2.4
    
    * Drop the test on Python 3.8 and 3.9
    
    * Add the test for Python 3.8
    
    * Add the test for Python 3.9
    
    * Revert "Add the test for Python 3.9"
    
    This reverts commit 7e98de1e17408605dbe57e6b6ea7ceb1e9f1c3b0.
    
    * Add the test for Spark 2.4 + Python 3.7
    
    * Add the test for Spark 2.4 + Python 3.7
    
    * Add the test for Spark 2.4 + Python 3.7
    
    * Add the test for Spark 2.4 + Python 3.7
    
    * Add the test for Spark 2.4 + Python 3.7
    
    * Remove the jackson dependency in POM
    
    * Add Spark version to Python adapter artifact name
    
    * Add the test for Python 3.9
---
 .github/workflows/java.yml                         |  30 ++-
 .github/workflows/python.yml                       |  64 ++++--
 core/pom.xml                                       |   2 +-
 pom.xml                                            |  42 +---
 python-adapter/pom.xml                             |  24 +--
 .../utils/PythonAdapterWrapper.scala               |   2 +-
 python/Pipfile                                     |   2 +-
 python/sedona/core/jvm/config.py                   |   9 +-
 python/tests/format_mapper/test_geo_json_reader.py |   9 +-
 python/tests/sql/test_adapter.py                   |   2 +-
 spark-version-converter.py                         |  62 ++++++
 sql/pom.xml                                        |  26 +--
 .../scala/org/apache/sedona/sql/UDF/Catalog.scala  |   9 +-
 .../org/apache/sedona/sql/UDF/UdfRegistrator.scala |   6 +-
 .../expressions_udaf/AggregateFunctions.scala      | 218 +++++++++++++++++++++
 .../strategy/join/JoinQueryDetector.scala          |  27 ++-
 .../strategy/join/TraitJoinQueryExec.scala         |   6 +-
 .../org/apache/sedona/sql/TestBaseScala.scala      |   1 -
 viz/pom.xml                                        |  18 +-
 19 files changed, 420 insertions(+), 139 deletions(-)

diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml
index c4ebefa..d75f89d 100644
--- a/.github/workflows/java.yml
+++ b/.github/workflows/java.yml
@@ -1,17 +1,33 @@
 name: Scala and Java build
 
-on: [push, pull_request]
-
+on:
+  push:
+    branches:
+      - master
+  pull_request:
+    branches:
+      - '*'
+      
 jobs:
   build:
 
     runs-on: ubuntu-18.04
+    strategy:
+      matrix:
+        spark: [2.4.7, 3.0.1]
+        scala: [2.11.8, 2.12.8]
+        exclude:
+          - spark: 3.0.1
+            scala: 2.11.8
 
     steps:
     - uses: actions/checkout@v2
     - uses: actions/setup-java@v1
       with:
         java-version: '8'
+    - uses: actions/setup-python@v2
+      with:
+        python-version: '3.7'
     - name: Cache Maven packages
       uses: actions/cache@v2
       with:
@@ -19,7 +35,13 @@ jobs:
         key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
         restore-keys: ${{ runner.os }}-m2
     - run: git submodule update --init --recursive # Checkout Git submodule if necessary
-    - run: mvn -q clean install
+    - env:
+        SPARK_VERSION: ${{ matrix.spark }}
+      run: python3 spark-version-converter.py spark${SPARK_VERSION:0:1}
+    - env:
+        SPARK_VERSION: ${{ matrix.spark }}
+        SCALA_VERSION: ${{ matrix.scala }}
+      run: mvn -q clean install -Dscala.compat.version=${SCALA_VERSION:0:4} -Dscala.version=$SCALA_VERSION -Dspark.compat.version=${SPARK_VERSION:0:3} -Dspark.version=$SPARK_VERSION
     - run: mkdir staging
     - run: cp core/target/sedona-*.jar staging
     - run: cp sql/target/sedona-*.jar staging
@@ -27,5 +49,5 @@ jobs:
     - run: cp python-adapter/target/sedona-*.jar staging
     - uses: actions/upload-artifact@v2
       with:
-        name: Package
+        name: generated-jars
         path: staging
diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml
index 057bfce..ef66649 100644
--- a/.github/workflows/python.yml
+++ b/.github/workflows/python.yml
@@ -1,17 +1,41 @@
 name: Python build
 
-on: [push, pull_request]
-
+on:
+  push:
+    branches:
+      - master
+  pull_request:
+    branches:
+      - '*'
+      
 jobs:
   build:
 
     runs-on: ubuntu-18.04
+    strategy:
+      matrix:
+        include:
+          - spark: 3.0.1
+            scala: 2.12.8
+            python: 3.7
+          - spark: 3.0.1
+            scala: 2.12.8
+            python: 3.8
+          - spark: 3.0.1
+            scala: 2.12.8
+            python: 3.9
+          - spark: 2.4.7
+            scala: 2.11.8
+            python: 3.7
 
     steps:
     - uses: actions/checkout@v2
     - uses: actions/setup-java@v1
       with:
         java-version: '8'
+    - uses: actions/setup-python@v2
+      with:
+        python-version: ${{ matrix.python }}
     - name: Cache Maven packages
       uses: actions/cache@v2
       with:
@@ -19,19 +43,31 @@ jobs:
         key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
         restore-keys: ${{ runner.os }}-m2
     - run: git submodule update --init --recursive # Checkout Git submodule if necessary
-    - run: mvn -q clean install -DskipTests
-    - uses: actions/setup-python@v2
-      with:
-        python-version: '3.7'
-    - uses: vemonet/setup-spark@v1
-      with:
-        spark-version: '3.0.1' # Exact version
-    - run: export PYTHONPATH=$SPARK_HOME/python
-    - run: sudo apt-get -y install python3-pip python-dev
+    - env:
+        SPARK_VERSION: ${{ matrix.spark }}
+      run: python3 spark-version-converter.py spark${SPARK_VERSION:0:1}
+    - env:
+        SPARK_VERSION: ${{ matrix.spark }}
+        SCALA_VERSION: ${{ matrix.scala }}
+      run: mvn -q clean install -DskipTests -Dscala.compat.version=${SCALA_VERSION:0:4} -Dscala.version=$SCALA_VERSION -Dspark.compat.version=${SPARK_VERSION:0:3} -Dspark.version=$SPARK_VERSION
+    - env:
+        SPARK_VERSION: ${{ matrix.spark }}
+      run: wget https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop2.7.tgz
+    - env:
+        SPARK_VERSION: ${{ matrix.spark }}
+      run: tar -xzf spark-${SPARK_VERSION}-bin-hadoop2.7.tgz
+    - run: sudo apt-get -y install python3-pip python-dev libgeos-dev
     - run: sudo pip3 install -U setuptools
     - run: sudo pip3 install -U wheel
     - run: sudo pip3 install -U virtualenvwrapper
     - run: python3 -m pip install pipenv
-    - run: (cd python;pipenv install --dev)
-    - run: find python-adapter/target/ -iregex "python-adapter\/target\/sedona-python-adapter-[0-9]\.[0-9]\.[0-9]-incubator\(-SNAPSHOT\)?\.jar" -exec cp {} $SPARK_HOME/jars \;
-    - run: (cd python;pipenv run pytest tests)
+    - env:
+        SPARK_VERSION: ${{ matrix.spark }}
+        PYTHON_VERSION: ${{ matrix.python }}
+      run: (cd python;pipenv --python ${PYTHON_VERSION};pipenv install pyspark==${SPARK_VERSION};pipenv install --dev;pipenv graph)
+    - env:
+        SPARK_VERSION: ${{ matrix.spark }}
+      run: find python-adapter/target -name sedona-* -exec cp {} spark-${SPARK_VERSION}-bin-hadoop2.7/jars/ \;
+    - env:
+        SPARK_VERSION: ${{ matrix.spark }}
+      run: (export SPARK_HOME=$PWD/spark-${SPARK_VERSION}-bin-hadoop2.7;export PYTHONPATH=$SPARK_HOME/python;cd python;pipenv run pytest tests)
diff --git a/core/pom.xml b/core/pom.xml
index 95210a0..5f18c55 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -26,7 +26,7 @@
         <version>1.0.0-incubator-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
-    <artifactId>sedona-core</artifactId>
+    <artifactId>sedona-core_${scala.compat.version}</artifactId>
 
     <name>${project.groupId}:${project.artifactId}</name>
     <description>A cluster computing system for processing large-scale spatial data: RDD API</description>
diff --git a/pom.xml b/pom.xml
index 77cd6d0..5543877 100644
--- a/pom.xml
+++ b/pom.xml
@@ -41,15 +41,11 @@
         <scala.version>2.12.8</scala.version>
         <scala.compat.version>2.12</scala.compat.version>
         <geotools.version>24.0</geotools.version>
-        <spark.version>3.0.0</spark.version>
+        <spark.version>3.0.1</spark.version>
+        <spark.compat.version>3.0</spark.compat.version>
     </properties>
     <dependencies>
         <dependency>
-            <groupId>com.fasterxml.jackson.module</groupId>
-            <artifactId>jackson-module-scala_${scala.compat.version}</artifactId>
-            <version>2.10.0</version>
-        </dependency>
-        <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_${scala.compat.version}</artifactId>
             <version>${spark.version}</version>
@@ -59,47 +55,25 @@
                     <groupId>org.apache.hadoop</groupId>
                     <artifactId>*</artifactId>
                 </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.module</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.compat.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-core</artifactId>
             <version>2.8.2</version>
             <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.module</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
             <version>2.8.2</version>
             <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.module</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
diff --git a/python-adapter/pom.xml b/python-adapter/pom.xml
index 5c9beaf..940b687 100644
--- a/python-adapter/pom.xml
+++ b/python-adapter/pom.xml
@@ -9,32 +9,16 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>sedona-python-adapter</artifactId>
+    <artifactId>sedona-python-adapter-${spark.compat.version}_${scala.compat.version}</artifactId>
     <dependencies>
         <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_${scala.compat.version}</artifactId>
-            <version>${spark.version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.module</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
             <groupId>org.apache.sedona</groupId>
-            <artifactId>sedona-core</artifactId>
+            <artifactId>sedona-core_${scala.compat.version}</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.sedona</groupId>
-            <artifactId>sedona-sql</artifactId>
+            <artifactId>sedona-sql-${spark.compat.version}_${scala.compat.version}</artifactId>
             <version>${project.version}</version>
         </dependency>
         <!--for CRS transformation-->
@@ -107,4 +91,4 @@
         </dependency>
     </dependencies>
 
-</project>
\ No newline at end of file
+</project>
diff --git a/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/utils/PythonAdapterWrapper.scala b/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/utils/PythonAdapterWrapper.scala
index d129997..9a7d512 100644
--- a/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/utils/PythonAdapterWrapper.scala
+++ b/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/utils/PythonAdapterWrapper.scala
@@ -24,7 +24,7 @@ import org.apache.spark.api.java.JavaPairRDD
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.locationtech.jts.geom.Geometry
 
-import scala.collection.convert.ImplicitConversions._
+import scala.collection.JavaConversions._
 
 object PythonAdapterWrapper {
   def toDf[T <: Geometry](spatialRDD: SpatialRDD[T], fieldNames: java.util.ArrayList[String], sparkSession: SparkSession): DataFrame = {
diff --git a/python/Pipfile b/python/Pipfile
index d1a197b..308e5d5 100644
--- a/python/Pipfile
+++ b/python/Pipfile
@@ -11,7 +11,7 @@ jupyter="*"
 [packages]
 pandas="*"
 geopandas="==0.6.0"
-pyspark="==3.0.0"
+pyspark=">=2.4.0"
 attrs="*"
 
 [requires]
diff --git a/python/sedona/core/jvm/config.py b/python/sedona/core/jvm/config.py
index 9a0e226..a265a97 100644
--- a/python/sedona/core/jvm/config.py
+++ b/python/sedona/core/jvm/config.py
@@ -108,12 +108,13 @@ class SedonaMeta:
 
     @classmethod
     def get_version(cls, spark_jars: str) -> Optional[str]:
-        sedona_version = findall(r"sedona-python-adapter-(\d{1}\.\d{1}\.\d{1}).*?jar", spark_jars)
+        # Find Spark version, Scala version and Sedona version.
+        versions = findall(r"sedona-python-adapter-([^,\n]+)_([^,\n]+)-([^,\n]+)-incubator", spark_jars)
         try:
-            version = sedona_version[0]
+            sedona_version = versions[0][2]
         except IndexError:
-            version = None
-        return version
+            sedona_version = None
+        return sedona_version
 
     @classproperty
     def version(cls):
diff --git a/python/tests/format_mapper/test_geo_json_reader.py b/python/tests/format_mapper/test_geo_json_reader.py
index 7876e99..7516f7e 100644
--- a/python/tests/format_mapper/test_geo_json_reader.py
+++ b/python/tests/format_mapper/test_geo_json_reader.py
@@ -35,8 +35,7 @@ geo_json_with_invalid_geom_with_feature_property = os.path.join(tests_path, "res
 class TestGeoJsonReader(TestBase):
 
     def test_read_to_geometry_rdd(self):
-        print(SedonaMeta.version)
-        if is_greater_or_equal_version(SedonaMeta.version, "1.2.0"):
+        if is_greater_or_equal_version(SedonaMeta.version, "1.0.0"):
             geo_json_rdd = GeoJsonReader.readToGeometryRDD(
                 self.sc,
                 geo_json_geom_with_feature_property
@@ -52,7 +51,7 @@ class TestGeoJsonReader(TestBase):
             assert geo_json_rdd.rawSpatialRDD.count() == 10
 
     def test_read_to_valid_geometry_rdd(self):
-        if is_greater_or_equal_version(SedonaMeta.version, "1.2.0"):
+        if is_greater_or_equal_version(SedonaMeta.version, "1.0.0"):
             geo_json_rdd = GeoJsonReader.readToGeometryRDD(
                 self.sc,
                 geo_json_geom_with_feature_property,
@@ -87,7 +86,7 @@ class TestGeoJsonReader(TestBase):
             assert geo_json_rdd.rawSpatialRDD.count() == 3
 
     def test_read_to_include_id_rdd(self):
-        if is_greater_or_equal_version(SedonaMeta.version, "1.2.0"):
+        if is_greater_or_equal_version(SedonaMeta.version, "1.0.0"):
             geo_json_rdd = GeoJsonReader.readToGeometryRDD(
                 self.sc,
                 geo_json_contains_id,
@@ -108,7 +107,7 @@ class TestGeoJsonReader(TestBase):
                 assert geo_json_rdd.fieldNames.__len__() == 3
 
     def test_read_to_geometry_rdd_invalid_syntax(self):
-        if is_greater_or_equal_version(SedonaMeta.version, "1.2.0"):
+        if is_greater_or_equal_version(SedonaMeta.version, "1.0.0"):
             geojson_rdd = GeoJsonReader.readToGeometryRDD(
                 self.sc,
                 geo_json_with_invalid_geom_with_feature_property,
diff --git a/python/tests/sql/test_adapter.py b/python/tests/sql/test_adapter.py
index fc508fa..0f06117 100644
--- a/python/tests/sql/test_adapter.py
+++ b/python/tests/sql/test_adapter.py
@@ -275,7 +275,7 @@ class TestAdapter(TestBase):
         assert spatial_rdd.approximateTotalCount == 121960
         assert spatial_rdd.boundaryEnvelope == Envelope(-179.147236, 179.475569, -14.548699, 71.35513400000001)
 
-    @pytest.mark.skipif(is_greater_or_equal_version(version, "1.3.0"), reason="Depreciated after spark 1.2.0")
+    @pytest.mark.skipif(is_greater_or_equal_version(version, "1.0.0"), reason="Deprecated in Sedona")
     def test_to_spatial_rdd_df_geom_column_id(self):
         df = self.spark.read.\
             format("csv").\
diff --git a/spark-version-converter.py b/spark-version-converter.py
new file mode 100644
index 0000000..61af11f
--- /dev/null
+++ b/spark-version-converter.py
@@ -0,0 +1,62 @@
+# Python 3
+import fileinput
+import sys
+
+spark2_anchor = 'SPARK2 anchor'
+spark3_anchor = 'SPARK3 anchor'
+files = ['sql/src/main/scala/org/apache/sedona/sql/UDF/UdfRegistrator.scala',
+         'sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala',
+         'sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala']
+
+def switch_version(line):
+    if line[:2] == '//':
+        print(line[2:], end='')  # enable code
+        return 'enabled'
+    else:
+        print('//' + line, end='')  # disable code
+        return 'disabled'
+
+def enable_version(line):
+    if line[:2] == '//':
+        print(line[2:], end='')  # enable code
+        return 'enabled'
+    else:
+        print(line, end='')
+        return 'enabled before'
+
+def disable_version(line):
+    if line[:2] == '//':
+        print(line, end='')
+        return 'disabled before'
+    else:
+        print('//' + line, end='')  # disable code
+        return 'disabled'
+
+def parse_file(filepath, argv):
+    conversion_result_spark2 = ''
+    conversion_result_spark3 = ''
+    if argv[1] == 'spark2':
+        with fileinput.FileInput(filepath, inplace=True) as file:
+            for line in file:
+                if spark2_anchor in line:
+                    conversion_result_spark2 = spark2_anchor + ' ' + enable_version(line)
+                elif spark3_anchor in line:
+                    conversion_result_spark3 = spark3_anchor + ' ' + disable_version(line)
+                else:
+                    print(line, end='')
+            return conversion_result_spark2 + ' and ' + conversion_result_spark3
+    elif argv[1] == 'spark3':
+        with fileinput.FileInput(filepath, inplace=True) as file:
+            for line in file:
+                if spark2_anchor in line:
+                    conversion_result_spark2 = spark2_anchor + ' ' + disable_version(line)
+                elif spark3_anchor in line:
+                    conversion_result_spark3 = spark3_anchor + ' ' + enable_version(line)
+                else:
+                    print(line, end='')
+            return conversion_result_spark2 + ' and ' + conversion_result_spark3
+    else:
+        return 'wrong spark version'
+
+for filepath in files:
+    print(filepath + ': ' + parse_file(filepath, sys.argv))
\ No newline at end of file
diff --git a/sql/pom.xml b/sql/pom.xml
index 5f19029..d814989 100644
--- a/sql/pom.xml
+++ b/sql/pom.xml
@@ -26,7 +26,7 @@
         <version>1.0.0-incubator-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
-	<artifactId>sedona-sql</artifactId>
+	<artifactId>sedona-sql-${spark.compat.version}_${scala.compat.version}</artifactId>
 
 	<name>${project.groupId}:${project.artifactId}</name>
 	<description>A cluster computing system for processing large-scale spatial data: SQL API</description>
@@ -36,32 +36,10 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.sedona</groupId>
-            <artifactId>sedona-core</artifactId>
+            <artifactId>sedona-core_${scala.compat.version}</artifactId>
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_${scala.compat.version}</artifactId>
-            <version>${spark.version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.module</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-hive_${scala.compat.version}</artifactId>
-            <version>${spark.version}</version>
-            <scope>test</scope>
-        </dependency>
         <!--The following GeoTools dependencies use GNU Lesser General Public License and thus are excluded from the binary distribution-->
         <!-- Users have to include them by themselves manually -->
         <!-- See https://www.apache.org/legal/resolved.html#category-x -->
diff --git a/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala b/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
index c07b0b5..a971083 100644
--- a/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
+++ b/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
@@ -19,7 +19,7 @@
 package org.apache.sedona.sql.UDF
 
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.sql.expressions.{Aggregator, UserDefinedAggregateFunction}
 import org.apache.spark.sql.sedona_sql.expressions._
 import org.locationtech.jts.geom.Geometry
 
@@ -84,4 +84,11 @@ object Catalog {
     new ST_Envelope_Aggr,
     new ST_Intersection_Aggr
   )
+
+  import org.apache.spark.sql.sedona_sql.expressions_udaf
+  val aggregateExpressions_UDAF: Seq[UserDefinedAggregateFunction] = Seq(
+    new expressions_udaf.ST_Union_Aggr,
+    new expressions_udaf.ST_Envelope_Aggr,
+    new expressions_udaf.ST_Intersection_Aggr
+  )
 }
diff --git a/sql/src/main/scala/org/apache/sedona/sql/UDF/UdfRegistrator.scala b/sql/src/main/scala/org/apache/sedona/sql/UDF/UdfRegistrator.scala
index a15db81..7074408 100644
--- a/sql/src/main/scala/org/apache/sedona/sql/UDF/UdfRegistrator.scala
+++ b/sql/src/main/scala/org/apache/sedona/sql/UDF/UdfRegistrator.scala
@@ -29,11 +29,13 @@ object UdfRegistrator {
 
   def registerAll(sparkSession: SparkSession): Unit = {
     Catalog.expressions.foreach(f => sparkSession.sessionState.functionRegistry.createOrReplaceTempFunction(f.getClass.getSimpleName.dropRight(1), f))
-    Catalog.aggregateExpressions.foreach(f => sparkSession.udf.register(f.getClass.getSimpleName, functions.udaf(f)))
+Catalog.aggregateExpressions.foreach(f => sparkSession.udf.register(f.getClass.getSimpleName, functions.udaf(f))) // SPARK3 anchor
+//Catalog.aggregateExpressions_UDAF.foreach(f => sparkSession.udf.register(f.getClass.getSimpleName, f)) // SPARK2 anchor
   }
 
   def dropAll(sparkSession: SparkSession): Unit = {
     Catalog.expressions.foreach(f => sparkSession.sessionState.functionRegistry.dropFunction(FunctionIdentifier(f.getClass.getSimpleName.dropRight(1))))
-    Catalog.aggregateExpressions.foreach(f => sparkSession.sessionState.functionRegistry.dropFunction(FunctionIdentifier(f.getClass.getSimpleName)))
+Catalog.aggregateExpressions.foreach(f => sparkSession.sessionState.functionRegistry.dropFunction(FunctionIdentifier(f.getClass.getSimpleName))) // SPARK3 anchor
+//Catalog.aggregateExpressions_UDAF.foreach(f => sparkSession.sessionState.functionRegistry.dropFunction(FunctionIdentifier(f.getClass.getSimpleName))) // SPARK2 anchor
   }
 }
\ No newline at end of file
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions_udaf/AggregateFunctions.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions_udaf/AggregateFunctions.scala
new file mode 100644
index 0000000..5b3c0f5
--- /dev/null
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions_udaf/AggregateFunctions.scala
@@ -0,0 +1,218 @@
+/**
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package org.apache.spark.sql.sedona_sql.expressions_udaf
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.locationtech.jts.geom.{Coordinate, Geometry, GeometryFactory}
+
+/**
+  * Return the polygon union of all Polygon in the given column
+  */
+
+class ST_Union_Aggr extends UserDefinedAggregateFunction {
+  override def inputSchema: StructType = StructType(StructField("Union", new GeometryUDT) :: Nil)
+
+  override def bufferSchema: StructType = StructType(
+    StructField("Union", new GeometryUDT) :: Nil
+  )
+
+  override def dataType: DataType = new GeometryUDT
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    val coordinates: Array[Coordinate] = new Array[Coordinate](5)
+    coordinates(0) = new Coordinate(-999999999, -999999999)
+    coordinates(1) = new Coordinate(-999999999, -999999999)
+    coordinates(2) = new Coordinate(-999999999, -999999999)
+    coordinates(3) = new Coordinate(-999999999, -999999999)
+    coordinates(4) = coordinates(0)
+    val geometryFactory = new GeometryFactory()
+    buffer(0) = geometryFactory.createPolygon(coordinates)
+  }
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val accumulateUnion = buffer.getAs[Geometry](0)
+    val newPolygon = input.getAs[Geometry](0)
+    if (accumulateUnion.getArea == 0) buffer(0) = newPolygon
+    else buffer(0) = accumulateUnion.union(newPolygon)
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    val leftPolygon = buffer1.getAs[Geometry](0)
+    val rightPolygon = buffer2.getAs[Geometry](0)
+    if (leftPolygon.getCoordinates()(0).x == -999999999) buffer1(0) = rightPolygon
+    else if (rightPolygon.getCoordinates()(0).x == -999999999) buffer1(0) = leftPolygon
+    else buffer1(0) = leftPolygon.union(rightPolygon)
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    return buffer.getAs[Geometry](0)
+  }
+}
+
+/**
+  * Return the envelope boundary of the entire column
+  */
+class ST_Envelope_Aggr extends UserDefinedAggregateFunction {
+  // This is the input fields for your aggregate function.
+  override def inputSchema: org.apache.spark.sql.types.StructType =
+    StructType(StructField("Envelope", new GeometryUDT) :: Nil)
+
+  // This is the internal fields you keep for computing your aggregate.
+  override def bufferSchema: StructType = StructType(
+    StructField("Envelope", new GeometryUDT) :: Nil
+  )
+
+  // This is the output type of your aggregatation function.
+  override def dataType: DataType = new GeometryUDT
+
+  override def deterministic: Boolean = true
+
+  // This is the initial value for your buffer schema.
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    val coordinates: Array[Coordinate] = new Array[Coordinate](5)
+    coordinates(0) = new Coordinate(-999999999, -999999999)
+    coordinates(1) = new Coordinate(-999999999, -999999999)
+    coordinates(2) = new Coordinate(-999999999, -999999999)
+    coordinates(3) = new Coordinate(-999999999, -999999999)
+    coordinates(4) = new Coordinate(-999999999, -999999999)
+    val geometryFactory = new GeometryFactory()
+    buffer(0) = geometryFactory.createPolygon(coordinates)
+    //buffer(0) = new GenericArrayData(GeometrySerializer.serialize(geometryFactory.createPolygon(coordinates)))
+  }
+
+  // This is how to update your buffer schema given an input.
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val accumulateEnvelope = buffer.getAs[Geometry](0).getEnvelopeInternal
+    val newEnvelope = input.getAs[Geometry](0).getEnvelopeInternal
+    val coordinates: Array[Coordinate] = new Array[Coordinate](5)
+    var minX = 0.0
+    var minY = 0.0
+    var maxX = 0.0
+    var maxY = 0.0
+    if (accumulateEnvelope.getMinX == -999999999) {
+      // Found the accumulateEnvelope is the initial value
+      minX = newEnvelope.getMinX
+      minY = newEnvelope.getMinY
+      maxX = newEnvelope.getMaxX
+      maxY = newEnvelope.getMaxY
+    }
+    else {
+      minX = Math.min(accumulateEnvelope.getMinX, newEnvelope.getMinX)
+      minY = Math.min(accumulateEnvelope.getMinY, newEnvelope.getMinY)
+      maxX = Math.max(accumulateEnvelope.getMaxX, newEnvelope.getMaxX)
+      maxY = Math.max(accumulateEnvelope.getMaxY, newEnvelope.getMaxY)
+    }
+    coordinates(0) = new Coordinate(minX, minY)
+    coordinates(1) = new Coordinate(minX, maxY)
+    coordinates(2) = new Coordinate(maxX, maxY)
+    coordinates(3) = new Coordinate(maxX, minY)
+    coordinates(4) = coordinates(0)
+    val geometryFactory = new GeometryFactory()
+    buffer(0) = geometryFactory.createPolygon(coordinates)
+  }
+
+  // This is how to merge two objects with the bufferSchema type.
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    val leftEnvelope = buffer1.getAs[Geometry](0).getEnvelopeInternal
+    val rightEnvelope = buffer2.getAs[Geometry](0).getEnvelopeInternal
+    val coordinates: Array[Coordinate] = new Array[Coordinate](5)
+    var minX = 0.0
+    var minY = 0.0
+    var maxX = 0.0
+    var maxY = 0.0
+    if (leftEnvelope.getMinX == -999999999) {
+      // Found the leftEnvelope is the initial value
+      minX = rightEnvelope.getMinX
+      minY = rightEnvelope.getMinY
+      maxX = rightEnvelope.getMaxX
+      maxY = rightEnvelope.getMaxY
+    }
+    else if (rightEnvelope.getMinX == -999999999) {
+      // Found the rightEnvelope is the initial value
+      minX = leftEnvelope.getMinX
+      minY = leftEnvelope.getMinY
+      maxX = leftEnvelope.getMaxX
+      maxY = leftEnvelope.getMaxY
+    }
+    else {
+      minX = Math.min(leftEnvelope.getMinX, rightEnvelope.getMinX)
+      minY = Math.min(leftEnvelope.getMinY, rightEnvelope.getMinY)
+      maxX = Math.max(leftEnvelope.getMaxX, rightEnvelope.getMaxX)
+      maxY = Math.max(leftEnvelope.getMaxY, rightEnvelope.getMaxY)
+    }
+    coordinates(0) = new Coordinate(minX, minY)
+    coordinates(1) = new Coordinate(minX, maxY)
+    coordinates(2) = new Coordinate(maxX, maxY)
+    coordinates(3) = new Coordinate(maxX, minY)
+    coordinates(4) = coordinates(0)
+    val geometryFactory = new GeometryFactory()
+    buffer1(0) = geometryFactory.createPolygon(coordinates)
+  }
+
+  // This is where you output the final value, given the final value of your bufferSchema.
+  override def evaluate(buffer: Row): Any = {
+    return buffer.getAs[Geometry](0)
+  }
+}
+
+/**
+  * Return the polygon intersection of all Polygon in the given column
+  */
+class ST_Intersection_Aggr extends UserDefinedAggregateFunction {
+  override def inputSchema: StructType = StructType(StructField("Intersection", new GeometryUDT) :: Nil)
+
+  override def bufferSchema: StructType = StructType(
+    StructField("Intersection", new GeometryUDT) :: Nil
+  )
+
+  override def dataType: DataType = new GeometryUDT
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    val coordinates: Array[Coordinate] = new Array[Coordinate](5)
+    coordinates(0) = new Coordinate(-999999999, -999999999)
+    coordinates(1) = new Coordinate(-999999999, -999999999)
+    coordinates(2) = new Coordinate(-999999999, -999999999)
+    coordinates(3) = new Coordinate(-999999999, -999999999)
+    coordinates(4) = new Coordinate(-999999999, -999999999)
+    val geometryFactory = new GeometryFactory()
+    buffer(0) = geometryFactory.createPolygon(coordinates)
+  }
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val accumulateIntersection = buffer.getAs[Geometry](0)
+    val newPolygon = input.getAs[Geometry](0)
+    if (accumulateIntersection.getArea == 0) buffer(0) = newPolygon
+    else buffer(0) = accumulateIntersection.intersection(newPolygon)
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    val leftPolygon = buffer1.getAs[Geometry](0)
+    val rightPolygon = buffer2.getAs[Geometry](0)
+    if (leftPolygon.getCoordinates()(0).x == -999999999) buffer1(0) = rightPolygon
+    else if (rightPolygon.getCoordinates()(0).x == -999999999) buffer1(0) = leftPolygon
+    else buffer1(0) = leftPolygon.intersection(rightPolygon)
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    buffer.getAs[Geometry](0)
+  }
+}
\ No newline at end of file
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
index 16a3480..df624f9 100644
--- a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
@@ -36,39 +36,48 @@ object JoinQueryDetector extends Strategy {
   def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
 
     // ST_Contains(a, b) - a contains b
-    case Join(left, right, Inner, Some(ST_Contains(Seq(leftShape, rightShape))), _) =>
+case Join(left, right, Inner, Some(ST_Contains(Seq(leftShape, rightShape))), _) => // SPARK3 anchor
+//case Join(left, right, Inner, Some(ST_Contains(Seq(leftShape, rightShape)))) => // SPARK2 anchor
       planSpatialJoin(left, right, Seq(leftShape, rightShape), false)
 
     // ST_Intersects(a, b) - a intersects b
-    case Join(left, right, Inner, Some(ST_Intersects(Seq(leftShape, rightShape))), _) =>
+case Join(left, right, Inner, Some(ST_Intersects(Seq(leftShape, rightShape))), _) => // SPARK3 anchor
+//case Join(left, right, Inner, Some(ST_Intersects(Seq(leftShape, rightShape)))) => // SPARK2 anchor
       planSpatialJoin(left, right, Seq(leftShape, rightShape), true)
 
     // ST_WITHIN(a, b) - a is within b
-    case Join(left, right, Inner, Some(ST_Within(Seq(leftShape, rightShape))), _) =>
+case Join(left, right, Inner, Some(ST_Within(Seq(leftShape, rightShape))), _) => // SPARK3 anchor
+//case Join(left, right, Inner, Some(ST_Within(Seq(leftShape, rightShape)))) => // SPARK2 anchor
       planSpatialJoin(right, left, Seq(rightShape, leftShape), false)
 
     // ST_Overlaps(a, b) - a overlaps b
-    case Join(left, right, Inner, Some(ST_Overlaps(Seq(leftShape, rightShape))), _) =>
+case Join(left, right, Inner, Some(ST_Overlaps(Seq(leftShape, rightShape))), _) => // SPARK3 anchor
+//case Join(left, right, Inner, Some(ST_Overlaps(Seq(leftShape, rightShape)))) => // SPARK2 anchor
       planSpatialJoin(right, left, Seq(rightShape, leftShape), false)
 
     // ST_Touches(a, b) - a touches b
-    case Join(left, right, Inner, Some(ST_Touches(Seq(leftShape, rightShape))), _) =>
+case Join(left, right, Inner, Some(ST_Touches(Seq(leftShape, rightShape))), _) => // SPARK3 anchor
+//case Join(left, right, Inner, Some(ST_Touches(Seq(leftShape, rightShape)))) => // SPARK2 anchor
       planSpatialJoin(left, right, Seq(leftShape, rightShape), true)
 
     // ST_Distance(a, b) <= radius consider boundary intersection
-    case Join(left, right, Inner, Some(LessThanOrEqual(ST_Distance(Seq(leftShape, rightShape)), radius)), _) =>
+case Join(left, right, Inner, Some(LessThanOrEqual(ST_Distance(Seq(leftShape, rightShape)), radius)), _) => // SPARK3 anchor
+//case Join(left, right, Inner, Some(LessThanOrEqual(ST_Distance(Seq(leftShape, rightShape)), radius))) => // SPARK2 anchor
       planDistanceJoin(left, right, Seq(leftShape, rightShape), radius, true)
 
     // ST_Distance(a, b) < radius don't consider boundary intersection
-    case Join(left, right, Inner, Some(LessThan(ST_Distance(Seq(leftShape, rightShape)), radius)), _) =>
+case Join(left, right, Inner, Some(LessThan(ST_Distance(Seq(leftShape, rightShape)), radius)), _) => // SPARK3 anchor
+//case Join(left, right, Inner, Some(LessThan(ST_Distance(Seq(leftShape, rightShape)), radius))) => // SPARK2 anchor
       planDistanceJoin(left, right, Seq(leftShape, rightShape), radius, false)
 
     // ST_Equals(a, b) - a is equal to b
-    case Join(left, right, Inner, Some(ST_Equals(Seq(leftShape, rightShape))), _) =>
+case Join(left, right, Inner, Some(ST_Equals(Seq(leftShape, rightShape))), _) => // SPARK3 anchor
+//case Join(left, right, Inner, Some(ST_Equals(Seq(leftShape, rightShape)))) => // SPARK2 anchor
       planSpatialJoin(left, right, Seq(leftShape, rightShape), false)
 
     // ST_Crosses(a, b) - a crosses b
-    case Join(left, right, Inner, Some(ST_Crosses(Seq(leftShape, rightShape))), _) =>
+case Join(left, right, Inner, Some(ST_Crosses(Seq(leftShape, rightShape))), _) => // SPARK3 anchor
+//case Join(left, right, Inner, Some(ST_Crosses(Seq(leftShape, rightShape)))) => // SPARK2 anchor
       planSpatialJoin(right, left, Seq(rightShape, leftShape), false)
 
     case _ =>
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala
index 768773b..b6fb28b 100644
--- a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala
@@ -38,7 +38,8 @@ trait TraitJoinQueryExec {
   // Using lazy val to avoid serialization
   @transient private lazy val boundCondition: (InternalRow => Boolean) = {
     if (extraCondition.isDefined) {
-      Predicate.create(extraCondition.get, left.output ++ right.output).eval _
+Predicate.create(extraCondition.get, left.output ++ right.output).eval _ // SPARK3 anchor
+//newPredicate(extraCondition.get, left.output ++ right.output).eval _ // SPARK2 anchor
     } else { (r: InternalRow) =>
       true
     }
@@ -133,7 +134,8 @@ trait TraitJoinQueryExec {
     matches.rdd.mapPartitions { iter =>
       val filtered =
         if (extraCondition.isDefined) {
-          val boundCondition = Predicate.create(extraCondition.get, left.output ++ right.output)
+val boundCondition = Predicate.create(extraCondition.get, left.output ++ right.output) // SPARK3 anchor
+//val boundCondition = newPredicate(extraCondition.get, left.output ++ right.output) // SPARK2 anchor
           iter.filter {
             case (l, r) =>
               val leftRow = l.getUserData.asInstanceOf[UnsafeRow]
diff --git a/sql/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala b/sql/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index 7d94534..28274bb 100644
--- a/sql/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/sql/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -36,7 +36,6 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
     config("spark.kryo.registrator", classOf[SedonaKryoRegistrator].getName).
     master("local[*]").appName("sedonasqlScalaTest")
     .config("spark.sql.warehouse.dir", warehouseLocation)
-    .enableHiveSupport()
     .getOrCreate()
 
   val resourceFolder = System.getProperty("user.dir") + "/src/test/resources/"
diff --git a/viz/pom.xml b/viz/pom.xml
index e1a2836..6129672 100644
--- a/viz/pom.xml
+++ b/viz/pom.xml
@@ -26,7 +26,7 @@
         <version>1.0.0-incubator-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
-	<artifactId>sedona-viz</artifactId>
+	<artifactId>sedona-viz-${spark.compat.version}_${scala.compat.version}</artifactId>
 
 	<name>${project.groupId}:${project.artifactId}</name>
 	<description>A cluster computing system for processing large-scale spatial data: RDD and SQL for Viz</description>
@@ -36,29 +36,17 @@
 	<dependencies>
         <dependency>
             <groupId>org.apache.sedona</groupId>
-            <artifactId>sedona-core</artifactId>
+            <artifactId>sedona-core_${scala.compat.version}</artifactId>
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.sedona</groupId>
-            <artifactId>sedona-sql</artifactId>
+            <artifactId>sedona-sql-${spark.compat.version}_${scala.compat.version}</artifactId>
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_${scala.compat.version}</artifactId>
-            <version>${spark.version}</version>
-            <scope>provided</scope>
-	    <exclusions>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
             <groupId>org.beryx</groupId>
             <artifactId>awt-color-factory</artifactId>
             <version>1.0.0</version>