You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2022/07/12 00:49:41 UTC

[zeppelin] branch master updated: [ZEPPELIN-5755] Support Spark 3.3 (#4388)

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new ca216a512e [ZEPPELIN-5755] Support Spark 3.3 (#4388)
ca216a512e is described below

commit ca216a512e5c6ad474ddece5ecc17cfe594de6bc
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Jul 12 08:49:35 2022 +0800

    [ZEPPELIN-5755] Support Spark 3.3 (#4388)
    
    * [ZEPPELIN-5755] Support Spark 3.3
    
    * use hadoop3 profile for spark 3.3
---
 .github/workflows/core.yml                         | 10 +++-
 pom.xml                                            |  2 +-
 spark/interpreter/pom.xml                          | 14 +++++-
 .../apache/zeppelin/spark/IPySparkInterpreter.java |  5 ++
 .../apache/zeppelin/spark/PySparkInterpreter.java  |  6 +++
 .../src/main/resources/python/zeppelin_ipyspark.py |  6 ++-
 .../src/main/resources/python/zeppelin_pyspark.py  |  8 ++--
 .../zeppelin/spark/SparkSqlInterpreterTest.java    |  8 ++--
 spark/scala-2.13/pom.xml                           |  2 +-
 .../org/apache/zeppelin/spark/SparkVersion.java    |  6 ++-
 testing/env_python_3.7_with_R.yml                  |  2 +-
 testing/env_python_3.8_with_R.yml                  |  2 +-
 testing/env_python_3_with_R.yml                    |  2 +-
 testing/env_python_3_with_R_and_tensorflow.yml     |  2 +-
 .../integration/SparkIntegrationTest33.java        | 56 ++++++++++++++++++++++
 .../integration/ZeppelinSparkClusterTest33.java    | 40 ++++++++++++++++
 16 files changed, 155 insertions(+), 16 deletions(-)

diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index b9b3953fc8..aceaf9fa11 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -325,7 +325,7 @@ jobs:
         run: |
           R -e "IRkernel::installspec()"
       - name: run tests on hadoop${{ matrix.hadoop }}
-        run: ./mvnw test -DskipRat -pl zeppelin-interpreter-integration -Phadoop${{ matrix.hadoop }} -Pintegration -B -Dtest=SparkSubmitIntegrationTest,ZeppelinSparkClusterTest24,SparkIntegrationTest24,ZeppelinSparkClusterTest30,SparkIntegrationTest30,ZeppelinSparkClusterTest31,SparkIntegrationTest31,ZeppelinSparkClusterTest32,SparkIntegrationTest32 -DfailIfNoTests=false
+        run: ./mvnw test -DskipRat -pl zeppelin-interpreter-integration -Phadoop${{ matrix.hadoop }} -Pintegration -B -Dtest=SparkSubmitIntegrationTest,ZeppelinSparkClusterTest24,SparkIntegrationTest24,ZeppelinSparkClusterTest30,SparkIntegrationTest30,ZeppelinSparkClusterTest31,SparkIntegrationTest31,ZeppelinSparkClusterTest32,SparkIntegrationTest32,ZeppelinSparkClusterTest33,SparkIntegrationTest33 -DfailIfNoTests=false
 
   # test on spark for each spark version & scala version
   spark-test:
@@ -395,6 +395,14 @@ jobs:
         run: |
           rm -rf spark/interpreter/metastore_db
           ./mvnw test -DskipRat -pl spark-submit,spark/interpreter -Pspark-3.2 -Pspark-scala-2.13 -Phadoop2 -Pintegration -B -DfailIfNoTests=false
+      - name: run spark-3.3 tests with scala-2.12 and python-${{ matrix.python }}
+        run: |
+          rm -rf spark/interpreter/metastore_db
+          ./mvnw test -DskipRat -pl spark-submit,spark/interpreter -Pspark-3.3 -Pspark-scala-2.12 -Phadoop3 -Pintegration -B -DfailIfNoTests=false
+      - name: run spark-3.3 tests with scala-2.13 and python-${{ matrix.python }}
+        run: |
+          rm -rf spark/interpreter/metastore_db
+          ./mvnw test -DskipRat -pl spark-submit,spark/interpreter -Pspark-3.3 -Pspark-scala-2.13 -Phadoop3 -Pintegration -B -DfailIfNoTests=false
 
   livy-0-5-with-spark-2-2-0-under-python3:
     runs-on: ubuntu-20.04
diff --git a/pom.xml b/pom.xml
index 668bb99d70..78b011339c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -131,7 +131,7 @@
     <httpcomponents.client.version>4.5.13</httpcomponents.client.version>
     <httpcomponents.asyncclient.version>4.0.2</httpcomponents.asyncclient.version>
     <commons.compress.version>1.21</commons.compress.version>
-    <commons.lang3.version>3.10</commons.lang3.version>
+    <commons.lang3.version>3.12.0</commons.lang3.version>
     <commons.text.version>1.8</commons.text.version>
     <commons.configuration2.version>2.7</commons.configuration2.version>
     <commons.exec.version>1.3</commons.exec.version>
diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
index e539f99732..8f154ba609 100644
--- a/spark/interpreter/pom.xml
+++ b/spark/interpreter/pom.xml
@@ -572,10 +572,22 @@
     <!-- profile spark-x only affect spark version used in test -->
 
     <profile>
-      <id>spark-3.2</id>
+      <id>spark-3.3</id>
       <activation>
         <activeByDefault>true</activeByDefault>
       </activation>
+      <properties>
+        <datanucleus.core.version>4.1.17</datanucleus.core.version>
+        <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version>
+        <datanucleus.apijdo.version>4.2.4</datanucleus.apijdo.version>
+        <spark.version>3.3.0</spark.version>
+        <protobuf.version>2.5.0</protobuf.version>
+        <py4j.version>0.10.9.5</py4j.version>
+      </properties>
+    </profile>
+
+    <profile>
+      <id>spark-3.2</id>
       <properties>
         <datanucleus.core.version>4.1.17</datanucleus.core.version>
         <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version>
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
index ab6b3dbf29..2e945ed2bb 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -161,6 +161,11 @@ public class IPySparkInterpreter extends IPythonInterpreter {
     return sparkInterpreter.getSparkVersion().getMajorVersion() == 3;
   }
 
+  // Used by PySpark
+  public boolean isAfterSpark33() {
+    return sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_3_3_0);
+  }
+
   public JavaSparkContext getJavaSparkContext() {
     return sparkInterpreter.getJavaSparkContext();
   }
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 514ffa99fa..737bef8f4b 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -233,7 +233,13 @@ public class PySparkInterpreter extends PythonInterpreter {
     }
   }
 
+  // Used by PySpark
   public boolean isSpark3() {
     return sparkInterpreter.getSparkVersion().getMajorVersion() == 3;
   }
+
+  // Used by PySpark
+  public boolean isAfterSpark33() {
+    return sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_3_3_0);
+  }
 }
diff --git a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
index fdf7b97918..958802ccdb 100644
--- a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
+++ b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
@@ -57,8 +57,12 @@ conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf)
 sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
 
 from pyspark.sql import SparkSession
+from pyspark.sql import SQLContext
 spark = __zSpark__ = SparkSession(sc, intp.getSparkSession())
-sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped
+if intp.isAfterSpark33():
+    sqlContext = sqlc = __zSqlc__ = SQLContext._get_or_create(sc)
+else:
+    sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped
 
 class IPySparkZeppelinContext(PyZeppelinContext):
 
diff --git a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
index 8dd3224baf..a77c383886 100644
--- a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
@@ -49,10 +49,12 @@ conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
 sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
 
 from pyspark.sql import SparkSession
+from pyspark.sql import SQLContext
 spark = __zSpark__ = SparkSession(sc, intp.getSparkSession())
-sqlc = __zSqlc__ = __zSpark__._wrapped
-
-sqlContext = __zSqlc__
+if intp.isAfterSpark33():
+  sqlContext = sqlc = __zSqlc__ = SQLContext._get_or_create(sc)
+else:
+  sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped
 
 from zeppelin_context import PyZeppelinContext
 
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index f469fc5ff2..c4a1d8e7d0 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -225,15 +225,17 @@ public class SparkSqlInterpreterTest {
     assertEquals(InterpreterResult.Code.ERROR, ret.code());
     assertEquals(context.out.toString(), 2, context.out.toInterpreterResultMessage().size());
     assertEquals(context.out.toString(), Type.TABLE, context.out.toInterpreterResultMessage().get(0).getType());
-    assertTrue(context.out.toString(), context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched input"));
+    assertTrue(context.out.toString(), context.out.toString().contains("mismatched input") ||
+            context.out.toString().contains("Syntax error"));
 
     // One correct sql + One invalid sql + One valid sql (skipped)
     ret = sqlInterpreter.interpret("select * from gr;invalid_sql; select count(1) from gr", context);
     assertEquals(InterpreterResult.Code.ERROR, ret.code());
     assertEquals(context.out.toString(), 2, context.out.toInterpreterResultMessage().size());
     assertEquals(context.out.toString(), Type.TABLE, context.out.toInterpreterResultMessage().get(0).getType());
-    assertTrue(context.out.toString(), context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched input"));
-
+    assertTrue(context.out.toString(), context.out.toString().contains("mismatched input") ||
+            context.out.toString().contains("Syntax error"));
+    
     // Two 2 comments
     ret = sqlInterpreter.interpret(
             "--comment_1\n--comment_2", context);
diff --git a/spark/scala-2.13/pom.xml b/spark/scala-2.13/pom.xml
index c8f3bbc1f7..d2f337ed1b 100644
--- a/spark/scala-2.13/pom.xml
+++ b/spark/scala-2.13/pom.xml
@@ -31,7 +31,7 @@
   <name>Zeppelin: Spark Interpreter Scala_2.13</name>
 
   <properties>
-    <spark.version>3.2.0</spark.version>
+    <spark.version>3.3.0</spark.version>
     <spark.scala.version>2.13.4</spark.scala.version>
     <spark.scala.binary.version>2.13</spark.scala.binary.version>
     <spark.scala.compile.version>${spark.scala.version}</spark.scala.compile.version>
diff --git a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
index eb41f43b12..8f88c1b6e1 100644
--- a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
+++ b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
@@ -30,11 +30,15 @@ public class SparkVersion {
   public static final SparkVersion SPARK_2_3_0 = SparkVersion.fromVersionString("2.3.0");
   public static final SparkVersion SPARK_2_3_1 = SparkVersion.fromVersionString("2.3.1");
   public static final SparkVersion SPARK_2_4_0 = SparkVersion.fromVersionString("2.4.0");
+
   public static final SparkVersion SPARK_3_1_0 = SparkVersion.fromVersionString("3.1.0");
+
   public static final SparkVersion SPARK_3_3_0 = SparkVersion.fromVersionString("3.3.0");
 
+  public static final SparkVersion SPARK_3_4_0 = SparkVersion.fromVersionString("3.4.0");
+
   public static final SparkVersion MIN_SUPPORTED_VERSION =  SPARK_2_0_0;
-  public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_3_3_0;
+  public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_3_4_0;
 
   private int version;
   private int majorVersion;
diff --git a/testing/env_python_3.7_with_R.yml b/testing/env_python_3.7_with_R.yml
index 10d46aa9d1..34e228e9d0 100644
--- a/testing/env_python_3.7_with_R.yml
+++ b/testing/env_python_3.7_with_R.yml
@@ -24,7 +24,7 @@ dependencies:
   - plotly
   - jinja2=3.0.3
   - pip
-  - r-base=3
+  - r-base=3.6
   - r-data.table
   - r-evaluate
   - r-base64enc
diff --git a/testing/env_python_3.8_with_R.yml b/testing/env_python_3.8_with_R.yml
index 10d46aa9d1..34e228e9d0 100644
--- a/testing/env_python_3.8_with_R.yml
+++ b/testing/env_python_3.8_with_R.yml
@@ -24,7 +24,7 @@ dependencies:
   - plotly
   - jinja2=3.0.3
   - pip
-  - r-base=3
+  - r-base=3.6
   - r-data.table
   - r-evaluate
   - r-base64enc
diff --git a/testing/env_python_3_with_R.yml b/testing/env_python_3_with_R.yml
index 016e7082b2..bd6a5781c0 100644
--- a/testing/env_python_3_with_R.yml
+++ b/testing/env_python_3_with_R.yml
@@ -25,7 +25,7 @@ dependencies:
   - plotly
   - jinja2=3.0.3
   - pip
-  - r-base=3
+  - r-base=3.6
   - r-data.table
   - r-evaluate
   - r-base64enc
diff --git a/testing/env_python_3_with_R_and_tensorflow.yml b/testing/env_python_3_with_R_and_tensorflow.yml
index 498a00dd01..bbe0d80db3 100644
--- a/testing/env_python_3_with_R_and_tensorflow.yml
+++ b/testing/env_python_3_with_R_and_tensorflow.yml
@@ -25,7 +25,7 @@ dependencies:
   - plotly
   - jinja2=3.0.3
   - pip
-  - r-base=3
+  - r-base=3.6
   - r-data.table
   - r-evaluate
   - r-base64enc
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest33.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest33.java
new file mode 100644
index 0000000000..44fa8ebb99
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest33.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(value = Parameterized.class)
+public class SparkIntegrationTest33 extends SparkIntegrationTest {
+
+  public SparkIntegrationTest33(String sparkVersion, String hadoopVersion) {
+    super(sparkVersion, hadoopVersion);
+  }
+
+  @Parameterized.Parameters
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+            {"3.3.0", "2"},
+    });
+  }
+
+  @Override
+  protected void setUpSparkInterpreterSetting(InterpreterSetting interpreterSetting) {
+    // spark3 doesn't support yarn-client and yarn-cluster anymore, use
+    // spark.master and spark.submit.deployMode instead
+    String sparkMaster = interpreterSetting.getJavaProperties().getProperty("spark.master");
+    if (sparkMaster.equals("yarn-client")) {
+      interpreterSetting.setProperty("spark.master", "yarn");
+      interpreterSetting.setProperty("spark.submit.deployMode", "client");
+    } else if (sparkMaster.equals("yarn-cluster")){
+      interpreterSetting.setProperty("spark.master", "yarn");
+      interpreterSetting.setProperty("spark.submit.deployMode", "cluster");
+    } else if (sparkMaster.startsWith("local")) {
+      interpreterSetting.setProperty("spark.submit.deployMode", "client");
+    }
+  }
+}
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest33.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest33.java
new file mode 100644
index 0000000000..11f62217e0
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest33.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(value = Parameterized.class)
+public class ZeppelinSparkClusterTest33 extends ZeppelinSparkClusterTest {
+
+  public ZeppelinSparkClusterTest33(String sparkVersion, String hadoopVersion) throws Exception {
+    super(sparkVersion, hadoopVersion);
+  }
+
+  @Parameterized.Parameters
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+        {"3.3.0", "2"},
+        {"3.3.0", "3"}
+    });
+  }
+}


Re: [zeppelin] branch master updated: [ZEPPELIN-5755] Support Spark 3.3 (#4388)

Posted by emmanuel warreng <em...@gmail.com>.
Unsubscribe

On Tue, Jul 12, 2022, 02:49 <zj...@apache.org> wrote:

> This is an automated email from the ASF dual-hosted git repository.
>
> zjffdu pushed a commit to branch master
> in repository https://gitbox.apache.org/repos/asf/zeppelin.git
>
>
> The following commit(s) were added to refs/heads/master by this push:
>      new ca216a512e [ZEPPELIN-5755] Support Spark 3.3 (#4388)
> ca216a512e is described below
>
> commit ca216a512e5c6ad474ddece5ecc17cfe594de6bc
> Author: Jeff Zhang <zj...@apache.org>
> AuthorDate: Tue Jul 12 08:49:35 2022 +0800
>
>     [ZEPPELIN-5755] Support Spark 3.3 (#4388)
>
>     * [ZEPPELIN-5755] Support Spark 3.3
>
>     * use hadoop3 profile for spark 3.3
> ---
>  .github/workflows/core.yml                         | 10 +++-
>  pom.xml                                            |  2 +-
>  spark/interpreter/pom.xml                          | 14 +++++-
>  .../apache/zeppelin/spark/IPySparkInterpreter.java |  5 ++
>  .../apache/zeppelin/spark/PySparkInterpreter.java  |  6 +++
>  .../src/main/resources/python/zeppelin_ipyspark.py |  6 ++-
>  .../src/main/resources/python/zeppelin_pyspark.py  |  8 ++--
>  .../zeppelin/spark/SparkSqlInterpreterTest.java    |  8 ++--
>  spark/scala-2.13/pom.xml                           |  2 +-
>  .../org/apache/zeppelin/spark/SparkVersion.java    |  6 ++-
>  testing/env_python_3.7_with_R.yml                  |  2 +-
>  testing/env_python_3.8_with_R.yml                  |  2 +-
>  testing/env_python_3_with_R.yml                    |  2 +-
>  testing/env_python_3_with_R_and_tensorflow.yml     |  2 +-
>  .../integration/SparkIntegrationTest33.java        | 56
> ++++++++++++++++++++++
>  .../integration/ZeppelinSparkClusterTest33.java    | 40 ++++++++++++++++
>  16 files changed, 155 insertions(+), 16 deletions(-)
>
> diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
> index b9b3953fc8..aceaf9fa11 100644
> --- a/.github/workflows/core.yml
> +++ b/.github/workflows/core.yml
> @@ -325,7 +325,7 @@ jobs:
>          run: |
>            R -e "IRkernel::installspec()"
>        - name: run tests on hadoop${{ matrix.hadoop }}
> -        run: ./mvnw test -DskipRat -pl zeppelin-interpreter-integration
> -Phadoop${{ matrix.hadoop }} -Pintegration -B
> -Dtest=SparkSubmitIntegrationTest,ZeppelinSparkClusterTest24,SparkIntegrationTest24,ZeppelinSparkClusterTest30,SparkIntegrationTest30,ZeppelinSparkClusterTest31,SparkIntegrationTest31,ZeppelinSparkClusterTest32,SparkIntegrationTest32
> -DfailIfNoTests=false
> +        run: ./mvnw test -DskipRat -pl zeppelin-interpreter-integration
> -Phadoop${{ matrix.hadoop }} -Pintegration -B
> -Dtest=SparkSubmitIntegrationTest,ZeppelinSparkClusterTest24,SparkIntegrationTest24,ZeppelinSparkClusterTest30,SparkIntegrationTest30,ZeppelinSparkClusterTest31,SparkIntegrationTest31,ZeppelinSparkClusterTest32,SparkIntegrationTest32,ZeppelinSparkClusterTest33,SparkIntegrationTest33
> -DfailIfNoTests=false
>
>    # test on spark for each spark version & scala version
>    spark-test:
> @@ -395,6 +395,14 @@ jobs:
>          run: |
>            rm -rf spark/interpreter/metastore_db
>            ./mvnw test -DskipRat -pl spark-submit,spark/interpreter
> -Pspark-3.2 -Pspark-scala-2.13 -Phadoop2 -Pintegration -B
> -DfailIfNoTests=false
> +      - name: run spark-3.3 tests with scala-2.12 and python-${{
> matrix.python }}
> +        run: |
> +          rm -rf spark/interpreter/metastore_db
> +          ./mvnw test -DskipRat -pl spark-submit,spark/interpreter
> -Pspark-3.3 -Pspark-scala-2.12 -Phadoop3 -Pintegration -B
> -DfailIfNoTests=false
> +      - name: run spark-3.3 tests with scala-2.13 and python-${{
> matrix.python }}
> +        run: |
> +          rm -rf spark/interpreter/metastore_db
> +          ./mvnw test -DskipRat -pl spark-submit,spark/interpreter
> -Pspark-3.3 -Pspark-scala-2.13 -Phadoop3 -Pintegration -B
> -DfailIfNoTests=false
>
>    livy-0-5-with-spark-2-2-0-under-python3:
>      runs-on: ubuntu-20.04
> diff --git a/pom.xml b/pom.xml
> index 668bb99d70..78b011339c 100644
> --- a/pom.xml
> +++ b/pom.xml
> @@ -131,7 +131,7 @@
>      <httpcomponents.client.version>4.5.13</httpcomponents.client.version>
>
>  <httpcomponents.asyncclient.version>4.0.2</httpcomponents.asyncclient.version>
>      <commons.compress.version>1.21</commons.compress.version>
> -    <commons.lang3.version>3.10</commons.lang3.version>
> +    <commons.lang3.version>3.12.0</commons.lang3.version>
>      <commons.text.version>1.8</commons.text.version>
>      <commons.configuration2.version>2.7</commons.configuration2.version>
>      <commons.exec.version>1.3</commons.exec.version>
> diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
> index e539f99732..8f154ba609 100644
> --- a/spark/interpreter/pom.xml
> +++ b/spark/interpreter/pom.xml
> @@ -572,10 +572,22 @@
>      <!-- profile spark-x only affect spark version used in test -->
>
>      <profile>
> -      <id>spark-3.2</id>
> +      <id>spark-3.3</id>
>        <activation>
>          <activeByDefault>true</activeByDefault>
>        </activation>
> +      <properties>
> +        <datanucleus.core.version>4.1.17</datanucleus.core.version>
> +        <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version>
> +        <datanucleus.apijdo.version>4.2.4</datanucleus.apijdo.version>
> +        <spark.version>3.3.0</spark.version>
> +        <protobuf.version>2.5.0</protobuf.version>
> +        <py4j.version>0.10.9.5</py4j.version>
> +      </properties>
> +    </profile>
> +
> +    <profile>
> +      <id>spark-3.2</id>
>        <properties>
>          <datanucleus.core.version>4.1.17</datanucleus.core.version>
>          <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version>
> diff --git
> a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
> b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
> index ab6b3dbf29..2e945ed2bb 100644
> ---
> a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
> +++
> b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
> @@ -161,6 +161,11 @@ public class IPySparkInterpreter extends
> IPythonInterpreter {
>      return sparkInterpreter.getSparkVersion().getMajorVersion() == 3;
>    }
>
> +  // Used by PySpark
> +  public boolean isAfterSpark33() {
> +    return
> sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_3_3_0);
> +  }
> +
>    public JavaSparkContext getJavaSparkContext() {
>      return sparkInterpreter.getJavaSparkContext();
>    }
> diff --git
> a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
> b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
> index 514ffa99fa..737bef8f4b 100644
> ---
> a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
> +++
> b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
> @@ -233,7 +233,13 @@ public class PySparkInterpreter extends
> PythonInterpreter {
>      }
>    }
>
> +  // Used by PySpark
>    public boolean isSpark3() {
>      return sparkInterpreter.getSparkVersion().getMajorVersion() == 3;
>    }
> +
> +  // Used by PySpark
> +  public boolean isAfterSpark33() {
> +    return
> sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_3_3_0);
> +  }
>  }
> diff --git
> a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
> b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
> index fdf7b97918..958802ccdb 100644
> --- a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
> +++ b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
> @@ -57,8 +57,12 @@ conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf)
>  sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
>
>  from pyspark.sql import SparkSession
> +from pyspark.sql import SQLContext
>  spark = __zSpark__ = SparkSession(sc, intp.getSparkSession())
> -sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped
> +if intp.isAfterSpark33():
> +    sqlContext = sqlc = __zSqlc__ = SQLContext._get_or_create(sc)
> +else:
> +    sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped
>
>  class IPySparkZeppelinContext(PyZeppelinContext):
>
> diff --git
> a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
> b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
> index 8dd3224baf..a77c383886 100644
> --- a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
> +++ b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
> @@ -49,10 +49,12 @@ conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
>  sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
>
>  from pyspark.sql import SparkSession
> +from pyspark.sql import SQLContext
>  spark = __zSpark__ = SparkSession(sc, intp.getSparkSession())
> -sqlc = __zSqlc__ = __zSpark__._wrapped
> -
> -sqlContext = __zSqlc__
> +if intp.isAfterSpark33():
> +  sqlContext = sqlc = __zSqlc__ = SQLContext._get_or_create(sc)
> +else:
> +  sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped
>
>  from zeppelin_context import PyZeppelinContext
>
> diff --git
> a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
> b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
> index f469fc5ff2..c4a1d8e7d0 100644
> ---
> a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
> +++
> b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
> @@ -225,15 +225,17 @@ public class SparkSqlInterpreterTest {
>      assertEquals(InterpreterResult.Code.ERROR, ret.code());
>      assertEquals(context.out.toString(), 2,
> context.out.toInterpreterResultMessage().size());
>      assertEquals(context.out.toString(), Type.TABLE,
> context.out.toInterpreterResultMessage().get(0).getType());
> -    assertTrue(context.out.toString(),
> context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched
> input"));
> +    assertTrue(context.out.toString(),
> context.out.toString().contains("mismatched input") ||
> +            context.out.toString().contains("Syntax error"));
>
>      // One correct sql + One invalid sql + One valid sql (skipped)
>      ret = sqlInterpreter.interpret("select * from gr;invalid_sql; select
> count(1) from gr", context);
>      assertEquals(InterpreterResult.Code.ERROR, ret.code());
>      assertEquals(context.out.toString(), 2,
> context.out.toInterpreterResultMessage().size());
>      assertEquals(context.out.toString(), Type.TABLE,
> context.out.toInterpreterResultMessage().get(0).getType());
> -    assertTrue(context.out.toString(),
> context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched
> input"));
> -
> +    assertTrue(context.out.toString(),
> context.out.toString().contains("mismatched input") ||
> +            context.out.toString().contains("Syntax error"));
> +
>      // Two 2 comments
>      ret = sqlInterpreter.interpret(
>              "--comment_1\n--comment_2", context);
> diff --git a/spark/scala-2.13/pom.xml b/spark/scala-2.13/pom.xml
> index c8f3bbc1f7..d2f337ed1b 100644
> --- a/spark/scala-2.13/pom.xml
> +++ b/spark/scala-2.13/pom.xml
> @@ -31,7 +31,7 @@
>    <name>Zeppelin: Spark Interpreter Scala_2.13</name>
>
>    <properties>
> -    <spark.version>3.2.0</spark.version>
> +    <spark.version>3.3.0</spark.version>
>      <spark.scala.version>2.13.4</spark.scala.version>
>      <spark.scala.binary.version>2.13</spark.scala.binary.version>
>
>  <spark.scala.compile.version>${spark.scala.version}</spark.scala.compile.version>
> diff --git
> a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
> b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
> index eb41f43b12..8f88c1b6e1 100644
> ---
> a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
> +++
> b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
> @@ -30,11 +30,15 @@ public class SparkVersion {
>    public static final SparkVersion SPARK_2_3_0 =
> SparkVersion.fromVersionString("2.3.0");
>    public static final SparkVersion SPARK_2_3_1 =
> SparkVersion.fromVersionString("2.3.1");
>    public static final SparkVersion SPARK_2_4_0 =
> SparkVersion.fromVersionString("2.4.0");
> +
>    public static final SparkVersion SPARK_3_1_0 =
> SparkVersion.fromVersionString("3.1.0");
> +
>    public static final SparkVersion SPARK_3_3_0 =
> SparkVersion.fromVersionString("3.3.0");
>
> +  public static final SparkVersion SPARK_3_4_0 =
> SparkVersion.fromVersionString("3.4.0");
> +
>    public static final SparkVersion MIN_SUPPORTED_VERSION =  SPARK_2_0_0;
> -  public static final SparkVersion UNSUPPORTED_FUTURE_VERSION =
> SPARK_3_3_0;
> +  public static final SparkVersion UNSUPPORTED_FUTURE_VERSION =
> SPARK_3_4_0;
>
>    private int version;
>    private int majorVersion;
> diff --git a/testing/env_python_3.7_with_R.yml
> b/testing/env_python_3.7_with_R.yml
> index 10d46aa9d1..34e228e9d0 100644
> --- a/testing/env_python_3.7_with_R.yml
> +++ b/testing/env_python_3.7_with_R.yml
> @@ -24,7 +24,7 @@ dependencies:
>    - plotly
>    - jinja2=3.0.3
>    - pip
> -  - r-base=3
> +  - r-base=3.6
>    - r-data.table
>    - r-evaluate
>    - r-base64enc
> diff --git a/testing/env_python_3.8_with_R.yml
> b/testing/env_python_3.8_with_R.yml
> index 10d46aa9d1..34e228e9d0 100644
> --- a/testing/env_python_3.8_with_R.yml
> +++ b/testing/env_python_3.8_with_R.yml
> @@ -24,7 +24,7 @@ dependencies:
>    - plotly
>    - jinja2=3.0.3
>    - pip
> -  - r-base=3
> +  - r-base=3.6
>    - r-data.table
>    - r-evaluate
>    - r-base64enc
> diff --git a/testing/env_python_3_with_R.yml
> b/testing/env_python_3_with_R.yml
> index 016e7082b2..bd6a5781c0 100644
> --- a/testing/env_python_3_with_R.yml
> +++ b/testing/env_python_3_with_R.yml
> @@ -25,7 +25,7 @@ dependencies:
>    - plotly
>    - jinja2=3.0.3
>    - pip
> -  - r-base=3
> +  - r-base=3.6
>    - r-data.table
>    - r-evaluate
>    - r-base64enc
> diff --git a/testing/env_python_3_with_R_and_tensorflow.yml
> b/testing/env_python_3_with_R_and_tensorflow.yml
> index 498a00dd01..bbe0d80db3 100644
> --- a/testing/env_python_3_with_R_and_tensorflow.yml
> +++ b/testing/env_python_3_with_R_and_tensorflow.yml
> @@ -25,7 +25,7 @@ dependencies:
>    - plotly
>    - jinja2=3.0.3
>    - pip
> -  - r-base=3
> +  - r-base=3.6
>    - r-data.table
>    - r-evaluate
>    - r-base64enc
> diff --git
> a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest33.java
> b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest33.java
> new file mode 100644
> index 0000000000..44fa8ebb99
> --- /dev/null
> +++
> b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest33.java
> @@ -0,0 +1,56 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *    http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.zeppelin.integration;
> +
> +import org.apache.zeppelin.interpreter.InterpreterSetting;
> +import org.junit.runner.RunWith;
> +import org.junit.runners.Parameterized;
> +
> +import java.util.Arrays;
> +import java.util.List;
> +
> +@RunWith(value = Parameterized.class)
> +public class SparkIntegrationTest33 extends SparkIntegrationTest {
> +
> +  public SparkIntegrationTest33(String sparkVersion, String
> hadoopVersion) {
> +    super(sparkVersion, hadoopVersion);
> +  }
> +
> +  @Parameterized.Parameters
> +  public static List<Object[]> data() {
> +    return Arrays.asList(new Object[][]{
> +            {"3.3.0", "2"},
> +    });
> +  }
> +
> +  @Override
> +  protected void setUpSparkInterpreterSetting(InterpreterSetting
> interpreterSetting) {
> +    // spark3 doesn't support yarn-client and yarn-cluster anymore, use
> +    // spark.master and spark.submit.deployMode instead
> +    String sparkMaster =
> interpreterSetting.getJavaProperties().getProperty("spark.master");
> +    if (sparkMaster.equals("yarn-client")) {
> +      interpreterSetting.setProperty("spark.master", "yarn");
> +      interpreterSetting.setProperty("spark.submit.deployMode", "client");
> +    } else if (sparkMaster.equals("yarn-cluster")){
> +      interpreterSetting.setProperty("spark.master", "yarn");
> +      interpreterSetting.setProperty("spark.submit.deployMode",
> "cluster");
> +    } else if (sparkMaster.startsWith("local")) {
> +      interpreterSetting.setProperty("spark.submit.deployMode", "client");
> +    }
> +  }
> +}
> diff --git
> a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest33.java
> b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest33.java
> new file mode 100644
> index 0000000000..11f62217e0
> --- /dev/null
> +++
> b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest33.java
> @@ -0,0 +1,40 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *    http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.zeppelin.integration;
> +
> +import org.junit.runner.RunWith;
> +import org.junit.runners.Parameterized;
> +
> +import java.util.Arrays;
> +import java.util.List;
> +
> +@RunWith(value = Parameterized.class)
> +public class ZeppelinSparkClusterTest33 extends ZeppelinSparkClusterTest {
> +
> +  public ZeppelinSparkClusterTest33(String sparkVersion, String
> hadoopVersion) throws Exception {
> +    super(sparkVersion, hadoopVersion);
> +  }
> +
> +  @Parameterized.Parameters
> +  public static List<Object[]> data() {
> +    return Arrays.asList(new Object[][]{
> +        {"3.3.0", "2"},
> +        {"3.3.0", "3"}
> +    });
> +  }
> +}
>
>