You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zeppelin.apache.org by emmanuel warreng <em...@gmail.com> on 2022/07/20 17:34:19 UTC

Re: [zeppelin] branch master updated: [ZEPPELIN-5755] Support Spark 3.3 (#4388)

Unsubscribe

On Tue, Jul 12, 2022, 02:49 <zj...@apache.org> wrote:

> This is an automated email from the ASF dual-hosted git repository.
>
> zjffdu pushed a commit to branch master
> in repository https://gitbox.apache.org/repos/asf/zeppelin.git
>
>
> The following commit(s) were added to refs/heads/master by this push:
>      new ca216a512e [ZEPPELIN-5755] Support Spark 3.3 (#4388)
> ca216a512e is described below
>
> commit ca216a512e5c6ad474ddece5ecc17cfe594de6bc
> Author: Jeff Zhang <zj...@apache.org>
> AuthorDate: Tue Jul 12 08:49:35 2022 +0800
>
>     [ZEPPELIN-5755] Support Spark 3.3 (#4388)
>
>     * [ZEPPELIN-5755] Support Spark 3.3
>
>     * use hadoop3 profile for spark 3.3
> ---
>  .github/workflows/core.yml                         | 10 +++-
>  pom.xml                                            |  2 +-
>  spark/interpreter/pom.xml                          | 14 +++++-
>  .../apache/zeppelin/spark/IPySparkInterpreter.java |  5 ++
>  .../apache/zeppelin/spark/PySparkInterpreter.java  |  6 +++
>  .../src/main/resources/python/zeppelin_ipyspark.py |  6 ++-
>  .../src/main/resources/python/zeppelin_pyspark.py  |  8 ++--
>  .../zeppelin/spark/SparkSqlInterpreterTest.java    |  8 ++--
>  spark/scala-2.13/pom.xml                           |  2 +-
>  .../org/apache/zeppelin/spark/SparkVersion.java    |  6 ++-
>  testing/env_python_3.7_with_R.yml                  |  2 +-
>  testing/env_python_3.8_with_R.yml                  |  2 +-
>  testing/env_python_3_with_R.yml                    |  2 +-
>  testing/env_python_3_with_R_and_tensorflow.yml     |  2 +-
>  .../integration/SparkIntegrationTest33.java        | 56
> ++++++++++++++++++++++
>  .../integration/ZeppelinSparkClusterTest33.java    | 40 ++++++++++++++++
>  16 files changed, 155 insertions(+), 16 deletions(-)
>
> diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
> index b9b3953fc8..aceaf9fa11 100644
> --- a/.github/workflows/core.yml
> +++ b/.github/workflows/core.yml
> @@ -325,7 +325,7 @@ jobs:
>          run: |
>            R -e "IRkernel::installspec()"
>        - name: run tests on hadoop${{ matrix.hadoop }}
> -        run: ./mvnw test -DskipRat -pl zeppelin-interpreter-integration
> -Phadoop${{ matrix.hadoop }} -Pintegration -B
> -Dtest=SparkSubmitIntegrationTest,ZeppelinSparkClusterTest24,SparkIntegrationTest24,ZeppelinSparkClusterTest30,SparkIntegrationTest30,ZeppelinSparkClusterTest31,SparkIntegrationTest31,ZeppelinSparkClusterTest32,SparkIntegrationTest32
> -DfailIfNoTests=false
> +        run: ./mvnw test -DskipRat -pl zeppelin-interpreter-integration
> -Phadoop${{ matrix.hadoop }} -Pintegration -B
> -Dtest=SparkSubmitIntegrationTest,ZeppelinSparkClusterTest24,SparkIntegrationTest24,ZeppelinSparkClusterTest30,SparkIntegrationTest30,ZeppelinSparkClusterTest31,SparkIntegrationTest31,ZeppelinSparkClusterTest32,SparkIntegrationTest32,ZeppelinSparkClusterTest33,SparkIntegrationTest33
> -DfailIfNoTests=false
>
>    # test on spark for each spark version & scala version
>    spark-test:
> @@ -395,6 +395,14 @@ jobs:
>          run: |
>            rm -rf spark/interpreter/metastore_db
>            ./mvnw test -DskipRat -pl spark-submit,spark/interpreter
> -Pspark-3.2 -Pspark-scala-2.13 -Phadoop2 -Pintegration -B
> -DfailIfNoTests=false
> +      - name: run spark-3.3 tests with scala-2.12 and python-${{
> matrix.python }}
> +        run: |
> +          rm -rf spark/interpreter/metastore_db
> +          ./mvnw test -DskipRat -pl spark-submit,spark/interpreter
> -Pspark-3.3 -Pspark-scala-2.12 -Phadoop3 -Pintegration -B
> -DfailIfNoTests=false
> +      - name: run spark-3.3 tests with scala-2.13 and python-${{
> matrix.python }}
> +        run: |
> +          rm -rf spark/interpreter/metastore_db
> +          ./mvnw test -DskipRat -pl spark-submit,spark/interpreter
> -Pspark-3.3 -Pspark-scala-2.13 -Phadoop3 -Pintegration -B
> -DfailIfNoTests=false
>
>    livy-0-5-with-spark-2-2-0-under-python3:
>      runs-on: ubuntu-20.04
> diff --git a/pom.xml b/pom.xml
> index 668bb99d70..78b011339c 100644
> --- a/pom.xml
> +++ b/pom.xml
> @@ -131,7 +131,7 @@
>      <httpcomponents.client.version>4.5.13</httpcomponents.client.version>
>
>  <httpcomponents.asyncclient.version>4.0.2</httpcomponents.asyncclient.version>
>      <commons.compress.version>1.21</commons.compress.version>
> -    <commons.lang3.version>3.10</commons.lang3.version>
> +    <commons.lang3.version>3.12.0</commons.lang3.version>
>      <commons.text.version>1.8</commons.text.version>
>      <commons.configuration2.version>2.7</commons.configuration2.version>
>      <commons.exec.version>1.3</commons.exec.version>
> diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
> index e539f99732..8f154ba609 100644
> --- a/spark/interpreter/pom.xml
> +++ b/spark/interpreter/pom.xml
> @@ -572,10 +572,22 @@
>      <!-- profile spark-x only affect spark version used in test -->
>
>      <profile>
> -      <id>spark-3.2</id>
> +      <id>spark-3.3</id>
>        <activation>
>          <activeByDefault>true</activeByDefault>
>        </activation>
> +      <properties>
> +        <datanucleus.core.version>4.1.17</datanucleus.core.version>
> +        <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version>
> +        <datanucleus.apijdo.version>4.2.4</datanucleus.apijdo.version>
> +        <spark.version>3.3.0</spark.version>
> +        <protobuf.version>2.5.0</protobuf.version>
> +        <py4j.version>0.10.9.5</py4j.version>
> +      </properties>
> +    </profile>
> +
> +    <profile>
> +      <id>spark-3.2</id>
>        <properties>
>          <datanucleus.core.version>4.1.17</datanucleus.core.version>
>          <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version>
> diff --git
> a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
> b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
> index ab6b3dbf29..2e945ed2bb 100644
> ---
> a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
> +++
> b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
> @@ -161,6 +161,11 @@ public class IPySparkInterpreter extends
> IPythonInterpreter {
>      return sparkInterpreter.getSparkVersion().getMajorVersion() == 3;
>    }
>
> +  // Used by PySpark
> +  public boolean isAfterSpark33() {
> +    return
> sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_3_3_0);
> +  }
> +
>    public JavaSparkContext getJavaSparkContext() {
>      return sparkInterpreter.getJavaSparkContext();
>    }
> diff --git
> a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
> b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
> index 514ffa99fa..737bef8f4b 100644
> ---
> a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
> +++
> b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
> @@ -233,7 +233,13 @@ public class PySparkInterpreter extends
> PythonInterpreter {
>      }
>    }
>
> +  // Used by PySpark
>    public boolean isSpark3() {
>      return sparkInterpreter.getSparkVersion().getMajorVersion() == 3;
>    }
> +
> +  // Used by PySpark
> +  public boolean isAfterSpark33() {
> +    return
> sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_3_3_0);
> +  }
>  }
> diff --git
> a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
> b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
> index fdf7b97918..958802ccdb 100644
> --- a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
> +++ b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
> @@ -57,8 +57,12 @@ conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf)
>  sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
>
>  from pyspark.sql import SparkSession
> +from pyspark.sql import SQLContext
>  spark = __zSpark__ = SparkSession(sc, intp.getSparkSession())
> -sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped
> +if intp.isAfterSpark33():
> +    sqlContext = sqlc = __zSqlc__ = SQLContext._get_or_create(sc)
> +else:
> +    sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped
>
>  class IPySparkZeppelinContext(PyZeppelinContext):
>
> diff --git
> a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
> b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
> index 8dd3224baf..a77c383886 100644
> --- a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
> +++ b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
> @@ -49,10 +49,12 @@ conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
>  sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
>
>  from pyspark.sql import SparkSession
> +from pyspark.sql import SQLContext
>  spark = __zSpark__ = SparkSession(sc, intp.getSparkSession())
> -sqlc = __zSqlc__ = __zSpark__._wrapped
> -
> -sqlContext = __zSqlc__
> +if intp.isAfterSpark33():
> +  sqlContext = sqlc = __zSqlc__ = SQLContext._get_or_create(sc)
> +else:
> +  sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped
>
>  from zeppelin_context import PyZeppelinContext
>
> diff --git
> a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
> b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
> index f469fc5ff2..c4a1d8e7d0 100644
> ---
> a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
> +++
> b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
> @@ -225,15 +225,17 @@ public class SparkSqlInterpreterTest {
>      assertEquals(InterpreterResult.Code.ERROR, ret.code());
>      assertEquals(context.out.toString(), 2,
> context.out.toInterpreterResultMessage().size());
>      assertEquals(context.out.toString(), Type.TABLE,
> context.out.toInterpreterResultMessage().get(0).getType());
> -    assertTrue(context.out.toString(),
> context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched
> input"));
> +    assertTrue(context.out.toString(),
> context.out.toString().contains("mismatched input") ||
> +            context.out.toString().contains("Syntax error"));
>
>      // One correct sql + One invalid sql + One valid sql (skipped)
>      ret = sqlInterpreter.interpret("select * from gr;invalid_sql; select
> count(1) from gr", context);
>      assertEquals(InterpreterResult.Code.ERROR, ret.code());
>      assertEquals(context.out.toString(), 2,
> context.out.toInterpreterResultMessage().size());
>      assertEquals(context.out.toString(), Type.TABLE,
> context.out.toInterpreterResultMessage().get(0).getType());
> -    assertTrue(context.out.toString(),
> context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched
> input"));
> -
> +    assertTrue(context.out.toString(),
> context.out.toString().contains("mismatched input") ||
> +            context.out.toString().contains("Syntax error"));
> +
>      // Two 2 comments
>      ret = sqlInterpreter.interpret(
>              "--comment_1\n--comment_2", context);
> diff --git a/spark/scala-2.13/pom.xml b/spark/scala-2.13/pom.xml
> index c8f3bbc1f7..d2f337ed1b 100644
> --- a/spark/scala-2.13/pom.xml
> +++ b/spark/scala-2.13/pom.xml
> @@ -31,7 +31,7 @@
>    <name>Zeppelin: Spark Interpreter Scala_2.13</name>
>
>    <properties>
> -    <spark.version>3.2.0</spark.version>
> +    <spark.version>3.3.0</spark.version>
>      <spark.scala.version>2.13.4</spark.scala.version>
>      <spark.scala.binary.version>2.13</spark.scala.binary.version>
>
>  <spark.scala.compile.version>${spark.scala.version}</spark.scala.compile.version>
> diff --git
> a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
> b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
> index eb41f43b12..8f88c1b6e1 100644
> ---
> a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
> +++
> b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
> @@ -30,11 +30,15 @@ public class SparkVersion {
>    public static final SparkVersion SPARK_2_3_0 =
> SparkVersion.fromVersionString("2.3.0");
>    public static final SparkVersion SPARK_2_3_1 =
> SparkVersion.fromVersionString("2.3.1");
>    public static final SparkVersion SPARK_2_4_0 =
> SparkVersion.fromVersionString("2.4.0");
> +
>    public static final SparkVersion SPARK_3_1_0 =
> SparkVersion.fromVersionString("3.1.0");
> +
>    public static final SparkVersion SPARK_3_3_0 =
> SparkVersion.fromVersionString("3.3.0");
>
> +  public static final SparkVersion SPARK_3_4_0 =
> SparkVersion.fromVersionString("3.4.0");
> +
>    public static final SparkVersion MIN_SUPPORTED_VERSION =  SPARK_2_0_0;
> -  public static final SparkVersion UNSUPPORTED_FUTURE_VERSION =
> SPARK_3_3_0;
> +  public static final SparkVersion UNSUPPORTED_FUTURE_VERSION =
> SPARK_3_4_0;
>
>    private int version;
>    private int majorVersion;
> diff --git a/testing/env_python_3.7_with_R.yml
> b/testing/env_python_3.7_with_R.yml
> index 10d46aa9d1..34e228e9d0 100644
> --- a/testing/env_python_3.7_with_R.yml
> +++ b/testing/env_python_3.7_with_R.yml
> @@ -24,7 +24,7 @@ dependencies:
>    - plotly
>    - jinja2=3.0.3
>    - pip
> -  - r-base=3
> +  - r-base=3.6
>    - r-data.table
>    - r-evaluate
>    - r-base64enc
> diff --git a/testing/env_python_3.8_with_R.yml
> b/testing/env_python_3.8_with_R.yml
> index 10d46aa9d1..34e228e9d0 100644
> --- a/testing/env_python_3.8_with_R.yml
> +++ b/testing/env_python_3.8_with_R.yml
> @@ -24,7 +24,7 @@ dependencies:
>    - plotly
>    - jinja2=3.0.3
>    - pip
> -  - r-base=3
> +  - r-base=3.6
>    - r-data.table
>    - r-evaluate
>    - r-base64enc
> diff --git a/testing/env_python_3_with_R.yml
> b/testing/env_python_3_with_R.yml
> index 016e7082b2..bd6a5781c0 100644
> --- a/testing/env_python_3_with_R.yml
> +++ b/testing/env_python_3_with_R.yml
> @@ -25,7 +25,7 @@ dependencies:
>    - plotly
>    - jinja2=3.0.3
>    - pip
> -  - r-base=3
> +  - r-base=3.6
>    - r-data.table
>    - r-evaluate
>    - r-base64enc
> diff --git a/testing/env_python_3_with_R_and_tensorflow.yml
> b/testing/env_python_3_with_R_and_tensorflow.yml
> index 498a00dd01..bbe0d80db3 100644
> --- a/testing/env_python_3_with_R_and_tensorflow.yml
> +++ b/testing/env_python_3_with_R_and_tensorflow.yml
> @@ -25,7 +25,7 @@ dependencies:
>    - plotly
>    - jinja2=3.0.3
>    - pip
> -  - r-base=3
> +  - r-base=3.6
>    - r-data.table
>    - r-evaluate
>    - r-base64enc
> diff --git
> a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest33.java
> b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest33.java
> new file mode 100644
> index 0000000000..44fa8ebb99
> --- /dev/null
> +++
> b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest33.java
> @@ -0,0 +1,56 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *    http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.zeppelin.integration;
> +
> +import org.apache.zeppelin.interpreter.InterpreterSetting;
> +import org.junit.runner.RunWith;
> +import org.junit.runners.Parameterized;
> +
> +import java.util.Arrays;
> +import java.util.List;
> +
> +@RunWith(value = Parameterized.class)
> +public class SparkIntegrationTest33 extends SparkIntegrationTest {
> +
> +  public SparkIntegrationTest33(String sparkVersion, String
> hadoopVersion) {
> +    super(sparkVersion, hadoopVersion);
> +  }
> +
> +  @Parameterized.Parameters
> +  public static List<Object[]> data() {
> +    return Arrays.asList(new Object[][]{
> +            {"3.3.0", "2"},
> +    });
> +  }
> +
> +  @Override
> +  protected void setUpSparkInterpreterSetting(InterpreterSetting
> interpreterSetting) {
> +    // spark3 doesn't support yarn-client and yarn-cluster anymore, use
> +    // spark.master and spark.submit.deployMode instead
> +    String sparkMaster =
> interpreterSetting.getJavaProperties().getProperty("spark.master");
> +    if (sparkMaster.equals("yarn-client")) {
> +      interpreterSetting.setProperty("spark.master", "yarn");
> +      interpreterSetting.setProperty("spark.submit.deployMode", "client");
> +    } else if (sparkMaster.equals("yarn-cluster")){
> +      interpreterSetting.setProperty("spark.master", "yarn");
> +      interpreterSetting.setProperty("spark.submit.deployMode",
> "cluster");
> +    } else if (sparkMaster.startsWith("local")) {
> +      interpreterSetting.setProperty("spark.submit.deployMode", "client");
> +    }
> +  }
> +}
> diff --git
> a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest33.java
> b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest33.java
> new file mode 100644
> index 0000000000..11f62217e0
> --- /dev/null
> +++
> b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest33.java
> @@ -0,0 +1,40 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *    http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.zeppelin.integration;
> +
> +import org.junit.runner.RunWith;
> +import org.junit.runners.Parameterized;
> +
> +import java.util.Arrays;
> +import java.util.List;
> +
> +@RunWith(value = Parameterized.class)
> +public class ZeppelinSparkClusterTest33 extends ZeppelinSparkClusterTest {
> +
> +  public ZeppelinSparkClusterTest33(String sparkVersion, String
> hadoopVersion) throws Exception {
> +    super(sparkVersion, hadoopVersion);
> +  }
> +
> +  @Parameterized.Parameters
> +  public static List<Object[]> data() {
> +    return Arrays.asList(new Object[][]{
> +        {"3.3.0", "2"},
> +        {"3.3.0", "3"}
> +    });
> +  }
> +}
>
>