You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/06/24 05:43:44 UTC

[zeppelin] branch master updated: [ZEPPELIN-4893]. Upgrade to spark 3.0.0

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 82e4057  [ZEPPELIN-4893]. Upgrade to spark 3.0.0
82e4057 is described below

commit 82e4057d3744974b7b16b6f1f68c018355f3c765
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Sat Jun 20 17:58:52 2020 +0800

    [ZEPPELIN-4893]. Upgrade to spark 3.0.0
    
    ### What is this PR for?
    
    Simple PR to upgrade to official released spark 3.0.0.
    
    ### What type of PR is it?
    [ Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4893
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3809 from zjffdu/ZEPPELIN-4893 and squashes the following commits:
    
    e34cd7984 [Jeff Zhang] save
    0918aebfb [Jeff Zhang] [ZEPPELIN-4893]. Upgrade to spark 3.0.0
---
 .travis.yml                                        |  6 ++
 docs/interpreter/spark.md                          | 32 ++++++++-
 .../src/main/resources/interpreter-setting.json    |  7 ++
 spark/pom.xml                                      |  4 +-
 spark/spark3-shims/pom.xml                         |  2 +-
 testing/install_R.sh                               |  7 ++
 .../zeppelin/integration/SparkIntegrationTest.java | 73 +++++++++++++--------
 .../integration/SparkIntegrationTest16.java        |  6 +-
 .../integration/SparkIntegrationTest20.java        |  6 +-
 .../integration/SparkIntegrationTest21.java        |  6 +-
 .../integration/SparkIntegrationTest22.java        |  6 +-
 .../integration/SparkIntegrationTest23.java        |  6 +-
 .../integration/SparkIntegrationTest24.java        |  6 +-
 .../integration/SparkIntegrationTest30.java        | 23 ++++++-
 .../integration/ZeppelinSparkClusterTest.java      | 75 +++++++++++++++-------
 .../integration/ZeppelinSparkClusterTest16.java    |  6 +-
 .../integration/ZeppelinSparkClusterTest20.java    |  6 +-
 .../integration/ZeppelinSparkClusterTest21.java    |  6 +-
 .../integration/ZeppelinSparkClusterTest22.java    |  6 +-
 .../integration/ZeppelinSparkClusterTest23.java    |  6 +-
 .../integration/ZeppelinSparkClusterTest24.java    |  6 +-
 .../integration/ZeppelinSparkClusterTest30.java    |  7 +-
 .../apache/zeppelin/rest/AbstractTestRestApi.java  |  2 +-
 .../interpreter/integration/DownloadUtils.java     | 11 +---
 .../launcher/SparkInterpreterLauncherTest.java     |  2 +-
 25 files changed, 215 insertions(+), 108 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 05779db..c732915 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -97,6 +97,11 @@ jobs:
 
     # Run Spark integration test and unit test
 
+    # Run spark integration of in one zeppelin instance: Spark 3.0
+    - jdk: "openjdk8"
+      dist: xenial
+      env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.12" PROFILE="-Phadoop2 -Pintegration" R="true" BUILD_FLAG="install -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown -am" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest30,SparkIntegrationTest30 -DfailIfNoTests=false"
+
     # Run spark integration of in one zeppelin instance (2.4, 2.3, 2.2)
     - jdk: "openjdk8"
       dist: xenial
@@ -162,6 +167,7 @@ before_install:
   - clearcache=$(echo $gitlog | grep -c -E "clear bower|bower clear" || true)
   - if [ "$hasbowerchanged" -gt 0 ] || [ "$clearcache" -gt 0 ]; then echo "Clearing bower_components cache"; rm -r zeppelin-web/bower_components; npm cache verify; else echo "Using cached bower_components."; fi
   - echo "MAVEN_OPTS='-Xms1024M -Xmx2048M -XX:MaxMetaspaceSize=1024m -XX:-UseGCOverheadLimit -Dorg.slf4j.simpleLogger.defaultLogLevel=warn'" >> ~/.mavenrc
+  - if [[ -n $R ]]; then ./testing/install_R.sh; fi
   - bash -x ./testing/install_external_dependencies.sh
   - ls -la .spark-dist ${HOME}/.m2/repository/.cache/maven-download-plugin || true
   - ls .node_modules && cp -r .node_modules zeppelin-web/node_modules || echo "node_modules are not cached"
diff --git a/docs/interpreter/spark.md b/docs/interpreter/spark.md
index 990f466..5fc9305 100644
--- a/docs/interpreter/spark.md
+++ b/docs/interpreter/spark.md
@@ -85,6 +85,11 @@ You can also set other Spark properties which are not listed in the table. For a
     <td>local[*]</td>
     <td>Spark master uri. <br/> e.g. spark://master_host:7077</td>
   <tr>
+  <tr>
+    <td>spark.submit.deployMode</td>
+    <td></td>
+    <td>The deploy mode of Spark driver program, either "client" or "cluster", Which means to launch driver program locally ("client") or remotely ("cluster") on one of the nodes inside the cluster.</td>
+  <tr>
     <td>spark.app.name</td>
     <td>Zeppelin</td>
     <td>The name of spark application.</td>
@@ -254,8 +259,8 @@ For example,
 
  * **local[*]** in local mode
  * **spark://master:7077** in standalone cluster
- * **yarn-client** in Yarn client mode
- * **yarn-cluster** in Yarn cluster mode
+ * **yarn-client** in Yarn client mode  (Not supported in spark 3.x, refer below for how to configure yarn-client in Spark 3.x)
+ * **yarn-cluster** in Yarn cluster mode  (Not supported in spark 3.x, refer below for how to configure yarn-client in Spark 3.x)
  * **mesos://host:5050** in Mesos cluster
 
 That's it. Zeppelin will work with any version of Spark and any deployment type without rebuilding Zeppelin in this way.
@@ -265,6 +270,29 @@ For the further information about Spark & Zeppelin version compatibility, please
 
 > Yarn client mode and local mode will run driver in the same machine with zeppelin server, this would be dangerous for production. Because it may run out of memory when there's many spark interpreters running at the same time. So we suggest you only allow yarn-cluster mode via setting `zeppelin.spark.only_yarn_cluster` in `zeppelin-site.xml`.
 
+#### Configure yarn mode for Spark 3.x
+
+Specifying `yarn-client` & `yarn-cluster` in `spark.master` is not supported in Spark 3.x any more, instead you need to use `spark.master` and `spark.submit.deployMode` together.
+
+<table class="table-configuration">
+  <tr>
+    <th>Mode</th>
+    <th>spark.master</th>
+    <th>spark.submit.deployMode</th>
+  </tr>
+  <tr>
+    <td>Yarn Client</td>
+    <td>yarn</td>
+    <td>client</td>
+  </tr>
+  <tr>
+    <td>Yarn Cluster</td>
+    <td>yarn</td>
+    <td>cluster</td>
+  </tr>  
+</table>
+
+
 ## SparkContext, SQLContext, SparkSession, ZeppelinContext
 
 SparkContext, SQLContext, SparkSession (for spark 2.x) and ZeppelinContext are automatically created and exposed as variable names `sc`, `sqlContext`, `spark` and `z`, respectively, in Scala, Kotlin, Python and R environments.
diff --git a/spark/interpreter/src/main/resources/interpreter-setting.json b/spark/interpreter/src/main/resources/interpreter-setting.json
index dfe09d6..db078c5 100644
--- a/spark/interpreter/src/main/resources/interpreter-setting.json
+++ b/spark/interpreter/src/main/resources/interpreter-setting.json
@@ -19,6 +19,13 @@
         "description": "Spark master uri. local | yarn-client | yarn-cluster | spark master address of standalone mode, ex) spark://master_host:7077",
         "type": "string"
       },
+      "spark.submit.deployMode": {
+        "envName": "",
+        "propertyName": "spark.submit.deployMode",
+        "defaultValue": "",
+        "description": "The deploy mode of Spark driver program, either \"client\" or \"cluster\", Which means to launch driver program locally (\"client\") or remotely (\"cluster\") on one of the nodes inside the cluster.",
+        "type": "string"
+      },
       "spark.app.name": {
         "envName": "",
         "propertyName": "spark.app.name",
diff --git a/spark/pom.xml b/spark/pom.xml
index 3102eb8..25301b2 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -195,9 +195,9 @@
         <profile>
             <id>spark-3.0</id>
             <properties>
-                <spark.version>3.0.0-preview2</spark.version>
+                <spark.version>3.0.0</spark.version>
                 <protobuf.version>2.5.0</protobuf.version>
-                <py4j.version>0.10.8.1</py4j.version>
+                <py4j.version>0.10.9</py4j.version>
             </properties>
         </profile>
 
diff --git a/spark/spark3-shims/pom.xml b/spark/spark3-shims/pom.xml
index 645a83e..853bf71 100644
--- a/spark/spark3-shims/pom.xml
+++ b/spark/spark3-shims/pom.xml
@@ -34,7 +34,7 @@
 
   <properties>
     <scala.binary.version>2.12</scala.binary.version>
-    <spark.version>3.0.0-preview2</spark.version>
+    <spark.version>3.0.0</spark.version>
   </properties>
 
   <dependencies>
diff --git a/testing/install_R.sh b/testing/install_R.sh
new file mode 100755
index 0000000..d6bcb86
--- /dev/null
+++ b/testing/install_R.sh
@@ -0,0 +1,7 @@
+#!/usr/bin/env bash
+# Install instruction from here https://cran.r-project.org/bin/linux/ubuntu/README.html
+
+sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E298A3A825C0D65DFD57CBB651716619E084DAB9
+echo "deb https://cloud.r-project.org/bin/linux/ubuntu xenial-cran35/" | sudo tee -a /etc/apt/sources.list
+sudo apt-get update
+sudo apt-get install -y r-base
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
index dd86529..42f35a6 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
@@ -44,7 +44,6 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.EnumSet;
-import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -61,10 +60,11 @@ public abstract class SparkIntegrationTest {
   private String sparkVersion;
   private String sparkHome;
 
-  public SparkIntegrationTest(String sparkVersion) {
-    LOGGER.info("Testing SparkVersion: " + sparkVersion);
+  public SparkIntegrationTest(String sparkVersion, String hadoopVersion) {
+    LOGGER.info("Testing Spark Version: " + sparkVersion);
+    LOGGER.info("Testing Hadoop Version: " + hadoopVersion);
     this.sparkVersion = sparkVersion;
-    this.sparkHome = DownloadUtils.downloadSpark(sparkVersion);
+    this.sparkHome = DownloadUtils.downloadSpark(sparkVersion, hadoopVersion);
   }
 
   @BeforeClass
@@ -88,6 +88,10 @@ public abstract class SparkIntegrationTest {
     }
   }
 
+  protected void setUpSparkInterpreterSetting(InterpreterSetting interpreterSetting) {
+    // sub class can customize spark interpreter setting.
+  }
+
   private void testInterpreterBasics() throws IOException, InterpreterException, XmlPullParserException {
     // add jars & packages for testing
     InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
@@ -133,7 +137,7 @@ public abstract class SparkIntegrationTest {
 
     // test SparkRInterpreter
     Interpreter sparkrInterpreter = interpreterFactory.getInterpreter("spark.r", new ExecutionContext("user1", "note1", "test"));
-    if (isSpark2()) {
+    if (isSpark2() || isSpark3()) {
       interpreterResult = sparkrInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", context);
     } else {
       interpreterResult = sparkrInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", context);
@@ -154,14 +158,17 @@ public abstract class SparkIntegrationTest {
     sparkInterpreterSetting.setProperty("zeppelin.spark.scala.color", "false");
     sparkInterpreterSetting.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
 
-    testInterpreterBasics();
+    try {
+      setUpSparkInterpreterSetting(sparkInterpreterSetting);
+      testInterpreterBasics();
 
-    // no yarn application launched
-    GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
-    GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
-    assertEquals(0, response.getApplicationList().size());
-
-    interpreterSettingManager.close();
+      // no yarn application launched
+      GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+      GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+      assertEquals(0, response.getApplicationList().size());
+    } finally {
+      interpreterSettingManager.close();
+    }
   }
 
   @Test
@@ -178,16 +185,19 @@ public abstract class SparkIntegrationTest {
     sparkInterpreterSetting.setProperty("zeppelin.spark.scala.color", "false");
     sparkInterpreterSetting.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
 
-    testInterpreterBasics();
+    try {
+      setUpSparkInterpreterSetting(sparkInterpreterSetting);
+      testInterpreterBasics();
 
-    // 1 yarn application launched
-    GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
-    GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
-    assertEquals(1, response.getApplicationList().size());
-
-    interpreterSettingManager.close();
+      // 1 yarn application launched
+      GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+      GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+      assertEquals(1, response.getApplicationList().size());
 
-    waitForYarnAppCompleted(30 * 1000);
+    } finally {
+      interpreterSettingManager.close();
+      waitForYarnAppCompleted(30 * 1000);
+    }
   }
 
   private void waitForYarnAppCompleted(int timeout) throws YarnException {
@@ -223,22 +233,29 @@ public abstract class SparkIntegrationTest {
     sparkInterpreterSetting.setProperty("zeppelin.spark.scala.color", "false");
     sparkInterpreterSetting.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
 
-    testInterpreterBasics();
+    try {
+      setUpSparkInterpreterSetting(sparkInterpreterSetting);
+      testInterpreterBasics();
 
-    // 1 yarn application launched
-    GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
-    GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
-    assertEquals(1, response.getApplicationList().size());
-
-    interpreterSettingManager.close();
+      // 1 yarn application launched
+      GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+      GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+      assertEquals(1, response.getApplicationList().size());
 
-    waitForYarnAppCompleted(30 * 1000);
+    } finally {
+      interpreterSettingManager.close();
+      waitForYarnAppCompleted(30 * 1000);
+    }
   }
 
   private boolean isSpark2() {
     return this.sparkVersion.startsWith("2.");
   }
 
+  private boolean isSpark3() {
+    return this.sparkVersion.startsWith("3.");
+  }
+
   private String getPythonExec() throws IOException, InterruptedException {
     Process process = Runtime.getRuntime().exec(new String[]{"which", "python"});
     if (process.waitFor() != 0) {
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest16.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest16.java
index 8f5aacb..6574ed6 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest16.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest16.java
@@ -26,14 +26,14 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class SparkIntegrationTest16 extends SparkIntegrationTest{
 
-  public SparkIntegrationTest16(String sparkVersion) {
-    super(sparkVersion);
+  public SparkIntegrationTest16(String sparkVersion, String hadoopVersion) {
+    super(sparkVersion, hadoopVersion);
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"1.6.3"}
+            {"1.6.3", "2.6"}
     });
   }
 
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest20.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest20.java
index 4f3ebd8..b9c7cb0 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest20.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest20.java
@@ -26,14 +26,14 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class SparkIntegrationTest20 extends SparkIntegrationTest{
 
-  public SparkIntegrationTest20(String sparkVersion) {
-    super(sparkVersion);
+  public SparkIntegrationTest20(String sparkVersion, String hadoopVersion) {
+    super(sparkVersion, hadoopVersion);
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.0.2"}
+            {"2.0.2", "2.7"}
     });
   }
 
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest21.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest21.java
index 37305cd..5f0fdfc 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest21.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest21.java
@@ -26,14 +26,14 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class SparkIntegrationTest21 extends SparkIntegrationTest{
 
-  public SparkIntegrationTest21(String sparkVersion) {
-    super(sparkVersion);
+  public SparkIntegrationTest21(String sparkVersion, String hadoopVersion) {
+    super(sparkVersion, hadoopVersion);
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.1.3"}
+            {"2.1.3", "2.7"}
     });
   }
 
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest22.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest22.java
index a400118..cddd2a7 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest22.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest22.java
@@ -26,14 +26,14 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class SparkIntegrationTest22 extends SparkIntegrationTest{
 
-  public SparkIntegrationTest22(String sparkVersion) {
-    super(sparkVersion);
+  public SparkIntegrationTest22(String sparkVersion, String hadoopVersion) {
+    super(sparkVersion, hadoopVersion);
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.2.2"}
+            {"2.2.2", "2.7"}
     });
   }
 
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest23.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest23.java
index ca960d3..834e3d8 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest23.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest23.java
@@ -26,14 +26,14 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class SparkIntegrationTest23 extends SparkIntegrationTest{
 
-  public SparkIntegrationTest23(String sparkVersion) {
-    super(sparkVersion);
+  public SparkIntegrationTest23(String sparkVersion, String hadoopVersion) {
+    super(sparkVersion, hadoopVersion);
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.3.2"}
+            {"2.3.2", "2.7"}
     });
   }
 
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java
index aae4951..6920c20 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java
@@ -26,14 +26,14 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class SparkIntegrationTest24 extends SparkIntegrationTest{
 
-  public SparkIntegrationTest24(String sparkVersion) {
-    super(sparkVersion);
+  public SparkIntegrationTest24(String sparkVersion, String hadoopVersion) {
+    super(sparkVersion, hadoopVersion);
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.4.4"}
+            {"2.4.4", "2.7"}
     });
   }
 
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java
index 4371023..8e3c8c8 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java
@@ -17,6 +17,7 @@
 
 package org.apache.zeppelin.integration;
 
+import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -26,15 +27,31 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class SparkIntegrationTest30 extends SparkIntegrationTest {
 
-  public SparkIntegrationTest30(String sparkVersion) {
-    super(sparkVersion);
+  public SparkIntegrationTest30(String sparkVersion, String hadoopVersion) {
+    super(sparkVersion, hadoopVersion);
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"3.0.0-preview2"}
+            {"3.0.0", "2.7"},
+            {"3.0.0", "3.2"}
     });
   }
 
+  @Override
+  protected void setUpSparkInterpreterSetting(InterpreterSetting interpreterSetting) {
+    // spark3 doesn't support yarn-client and yarn-cluster any more, use
+    // spark.master and spark.submit.deployMode instead
+    String sparkMaster = interpreterSetting.getJavaProperties().getProperty("spark.master");
+    if (sparkMaster.equals("yarn-client")) {
+      interpreterSetting.setProperty("spark.master", "yarn");
+      interpreterSetting.setProperty("spark.submit.deployMode", "client");
+    } else if (sparkMaster.equals("yarn-cluster")){
+      interpreterSetting.setProperty("spark.master", "yarn");
+      interpreterSetting.setProperty("spark.submit.deployMode", "cluster");
+    } else if (sparkMaster.startsWith("local")) {
+      interpreterSetting.setProperty("spark.submit.deployMode", "client");
+    }
+  }
 }
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
index 599727f..2eb505a 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
@@ -76,10 +76,10 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
   private String sparkHome;
   private AuthenticationInfo anonymous = new AuthenticationInfo("anonymous");
 
-  public ZeppelinSparkClusterTest(String sparkVersion) throws Exception {
+  public ZeppelinSparkClusterTest(String sparkVersion, String hadoopVersion) throws Exception {
     this.sparkVersion = sparkVersion;
     LOGGER.info("Testing SparkVersion: " + sparkVersion);
-    this.sparkHome = DownloadUtils.downloadSpark(sparkVersion);
+    this.sparkHome = DownloadUtils.downloadSpark(sparkVersion, hadoopVersion);
     if (!verifiedSparkVersions.contains(sparkVersion)) {
       verifiedSparkVersions.add(sparkVersion);
       setupSparkInterpreter(sparkHome);
@@ -224,14 +224,14 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
       IOUtils.copy(new StringReader("{\"metadata\": { \"key\": 84896, \"value\": 54 }}\n"),
               jsonFileWriter);
       jsonFileWriter.close();
-      if (isSpark2()) {
+      if (isSpark2() || isSpark3()) {
         p.setText("%spark spark.read.json(\"file://" + tmpJsonFile.getAbsolutePath() + "\")");
       } else {
         p.setText("%spark sqlContext.read.json(\"file://" + tmpJsonFile.getAbsolutePath() + "\")");
       }
       note.run(p.getId(), true);
       assertEquals(Status.FINISHED, p.getStatus());
-      if (isSpark2()) {
+      if (isSpark2() || isSpark3()) {
         assertTrue(p.getReturn().message().get(0).getData().contains(
                 "org.apache.spark.sql.DataFrame = [metadata: struct<key: bigint, value: bigint>]"));
       } else {
@@ -247,7 +247,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
 
   @Test
   public void sparkReadCSVTest() throws IOException {
-    if (!isSpark2()) {
+    if (isSpark1()) {
       // csv if not supported in spark 1.x natively
       return;
     }
@@ -279,7 +279,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
       note = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
       // test basic dataframe api
       Paragraph p = note.addNewParagraph(anonymous);
-      if (isSpark2()) {
+      if (isSpark2() || isSpark3()) {
         p.setText("%spark val df=spark.createDataFrame(Seq((\"hello\",20)))" +
                 ".toDF(\"name\", \"age\")\n" +
                 "df.collect()");
@@ -295,7 +295,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
 
       // test display DataFrame
       p = note.addNewParagraph(anonymous);
-      if (isSpark2()) {
+      if (isSpark2() || isSpark3()) {
         p.setText("%spark val df=spark.createDataFrame(Seq((\"hello\",20)))" +
                 ".toDF(\"name\", \"age\")\n" +
                 "df.createOrReplaceTempView(\"test_table\")\n" +
@@ -353,7 +353,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
               p.getReturn().message().get(0).getData().contains("name age\n1 hello  20"));
 
       // test display DataSet
-      if (isSpark2()) {
+      if (isSpark2() || isSpark3()) {
         p = note.addNewParagraph(anonymous);
         p.setText("%spark val ds=spark.createDataset(Seq((\"hello\",20)))\n" +
             "z.show(ds)");
@@ -374,16 +374,24 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
     Note note = null;
     try {
       note = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
+      Paragraph p = note.addNewParagraph(anonymous);
 
-      String sqlContextName = "sqlContext";
-      if (isSpark2()) {
-        sqlContextName = "spark";
+      if (isSpark3()) {
+        p.setText("%spark.r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" +
+                "df <- createDataFrame(localDF)\n" +
+                "count(df)"
+        );
+      } else {
+        String sqlContextName = "sqlContext";
+        if (isSpark2() || isSpark3()) {
+          sqlContextName = "spark";
+        }
+        p.setText("%spark.r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" +
+                "df <- createDataFrame(" + sqlContextName + ", localDF)\n" +
+                "count(df)"
+        );
       }
-      Paragraph p = note.addNewParagraph(anonymous);
-      p.setText("%spark.r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" +
-          "df <- createDataFrame(" + sqlContextName + ", localDF)\n" +
-          "count(df)"
-      );
+
       note.run(p.getId(), true);
       assertEquals(Status.FINISHED, p.getStatus());
       assertEquals("[1] 3", p.getReturn().message().get(0).getData().trim());
@@ -415,7 +423,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
       assertEquals(Status.FINISHED, p.getStatus());
       assertEquals("name_abc\n", p.getReturn().message().get(0).getData());
 
-      if (!isSpark2()) {
+      if (isSpark1()) {
         // run sqlContext test
         p = note.addNewParagraph(anonymous);
         p.setText("%pyspark from pyspark.sql import Row\n" +
@@ -461,7 +469,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
             .contains("Fail to execute line 3: print(a2)"));
         assertTrue(p.getReturn().message().get(0).getData()
             .contains("name 'a2' is not defined"));
-      } else {
+      } else if (isSpark2()){
         // run SparkSession test
         p = note.addNewParagraph(anonymous);
         p.setText("%pyspark from pyspark.sql import Row\n" +
@@ -480,6 +488,25 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
         assertEquals(Status.FINISHED, p.getStatus());
         assertTrue("[Row(len=u'3')]\n".equals(p.getReturn().message().get(0).getData()) ||
             "[Row(len='3')]\n".equals(p.getReturn().message().get(0).getData()));
+      } else {
+        // run SparkSession test
+        p = note.addNewParagraph(anonymous);
+        p.setText("%pyspark from pyspark.sql import Row\n" +
+                "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
+                "df.collect()");
+        note.run(p.getId(), true);
+        assertEquals(Status.FINISHED, p.getStatus());
+        assertEquals("[Row(id=1, age=20)]\n", p.getReturn().message().get(0).getData());
+
+        // test udf
+        p = note.addNewParagraph(anonymous);
+        // use SQLContext to register UDF but use this UDF through SparkSession
+        p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" +
+                "spark.sql(\"select f1(\\\"abc\\\") as len\").collect()");
+        note.run(p.getId(), true);
+        assertEquals(Status.FINISHED, p.getStatus());
+        assertTrue("[Row(len=u'3')]\n".equals(p.getReturn().message().get(0).getData()) ||
+                "[Row(len='3')]\n".equals(p.getReturn().message().get(0).getData()));
       }
     } finally {
       if (null != note) {
@@ -680,14 +707,16 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
     }
   }
 
-  private int toIntSparkVersion(String sparkVersion) {
-    String[] split = sparkVersion.split("\\.");
-    int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]);
-    return version;
+  private boolean isSpark1() {
+    return sparkVersion.startsWith("1.");
   }
 
   private boolean isSpark2() {
-    return toIntSparkVersion(sparkVersion) >= 20;
+    return sparkVersion.startsWith("2.");
+  }
+
+  private boolean isSpark3() {
+    return sparkVersion.startsWith("3.");
   }
 
   @Test
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest16.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest16.java
index 954f024..777c166 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest16.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest16.java
@@ -26,15 +26,15 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class ZeppelinSparkClusterTest16 extends ZeppelinSparkClusterTest {
 
-  public ZeppelinSparkClusterTest16(String sparkVersion) throws Exception {
-    super(sparkVersion);
+  public ZeppelinSparkClusterTest16(String sparkVersion, String hadoopVersion) throws Exception {
+    super(sparkVersion, hadoopVersion);
   }
 
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"1.6.3"}
+            {"1.6.3", "2.6"}
     });
   }
 }
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest20.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest20.java
index 22687d9..be2b5c6 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest20.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest20.java
@@ -26,15 +26,15 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class ZeppelinSparkClusterTest20 extends ZeppelinSparkClusterTest {
 
-  public ZeppelinSparkClusterTest20(String sparkVersion) throws Exception {
-    super(sparkVersion);
+  public ZeppelinSparkClusterTest20(String sparkVersion, String hadoopVersion) throws Exception {
+    super(sparkVersion, hadoopVersion);
   }
 
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.0.2"}
+            {"2.0.2", "2.7"}
     });
   }
 }
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest21.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest21.java
index fd98364..c127312 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest21.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest21.java
@@ -26,15 +26,15 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class ZeppelinSparkClusterTest21 extends ZeppelinSparkClusterTest {
 
-  public ZeppelinSparkClusterTest21(String sparkVersion) throws Exception {
-    super(sparkVersion);
+  public ZeppelinSparkClusterTest21(String sparkVersion, String hadoopVersion) throws Exception {
+    super(sparkVersion, hadoopVersion);
   }
 
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.1.3"}
+            {"2.1.3", "2.7"}
     });
   }
 }
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest22.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest22.java
index 9b51e17..d7a63af 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest22.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest22.java
@@ -26,15 +26,15 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class ZeppelinSparkClusterTest22 extends ZeppelinSparkClusterTest {
 
-  public ZeppelinSparkClusterTest22(String sparkVersion) throws Exception {
-    super(sparkVersion);
+  public ZeppelinSparkClusterTest22(String sparkVersion, String hadoopVersion) throws Exception {
+    super(sparkVersion, hadoopVersion);
   }
 
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.2.2"}
+            {"2.2.2", "2.7"}
     });
   }
 }
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest23.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest23.java
index 22ef673..7b15af7 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest23.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest23.java
@@ -26,15 +26,15 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class ZeppelinSparkClusterTest23 extends ZeppelinSparkClusterTest {
 
-  public ZeppelinSparkClusterTest23(String sparkVersion) throws Exception {
-    super(sparkVersion);
+  public ZeppelinSparkClusterTest23(String sparkVersion, String hadoopVersion) throws Exception {
+    super(sparkVersion, hadoopVersion);
   }
 
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.3.2"}
+            {"2.3.2", "2.7"}
     });
   }
 }
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java
index a55a504..e1f05ff 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java
@@ -26,14 +26,14 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class ZeppelinSparkClusterTest24 extends ZeppelinSparkClusterTest {
 
-  public ZeppelinSparkClusterTest24(String sparkVersion) throws Exception {
-    super(sparkVersion);
+  public ZeppelinSparkClusterTest24(String sparkVersion, String hadoopVersion) throws Exception {
+    super(sparkVersion, hadoopVersion);
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.4.4"}
+            {"2.4.4", "2.7"}
     });
   }
 }
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java
index 0a09ad5..8405900 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java
@@ -26,14 +26,15 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class ZeppelinSparkClusterTest30 extends ZeppelinSparkClusterTest {
 
-  public ZeppelinSparkClusterTest30(String sparkVersion) throws Exception {
-    super(sparkVersion);
+  public ZeppelinSparkClusterTest30(String sparkVersion, String hadoopVersion) throws Exception {
+    super(sparkVersion, hadoopVersion);
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"3.0.0-preview2"}
+            {"3.0.0", "2.7"},
+            {"3.0.0", "3.2"}
     });
   }
 }
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index 5658c01..2d4d861 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -261,7 +261,7 @@ public abstract class AbstractTestRestApi {
       LOG.info("Zeppelin Server is started.");
 
       // set up spark interpreter
-      String sparkHome = DownloadUtils.downloadSpark("2.4.4");
+      String sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7");
       InterpreterSettingManager interpreterSettingManager = TestUtils.getInstance(InterpreterSettingManager.class);
       InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
       interpreterSetting.setProperty("SPARK_HOME", sparkHome);
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
index db75c24..4e6211c 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
@@ -47,20 +47,15 @@ public class DownloadUtils {
     }
   }
 
-  public static String downloadSpark(String version) {
-    String hadoopVersion = "2.7";
-    if (version.startsWith("1.")) {
-      // Use hadoop 2.6 for spark 1.x
-      hadoopVersion = "2.6";
-    }
+  public static String downloadSpark(String sparkVersion, String hadoopVersion) {
     String sparkDownloadFolder = downloadFolder + "/spark";
     File targetSparkHomeFolder =
-            new File(sparkDownloadFolder + "/spark-" + version + "-bin-hadoop" + hadoopVersion);
+            new File(sparkDownloadFolder + "/spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion);
     if (targetSparkHomeFolder.exists()) {
       LOGGER.info("Skip to download spark as it is already downloaded.");
       return targetSparkHomeFolder.getAbsolutePath();
     }
-    download("spark", version, "-bin-hadoop" + hadoopVersion + ".tgz");
+    download("spark", sparkVersion, "-bin-hadoop" + hadoopVersion + ".tgz");
     return targetSparkHomeFolder.getAbsolutePath();
   }
 
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
index ab2ebae..736c11a 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
@@ -52,7 +52,7 @@ public class SparkInterpreterLauncherTest {
       System.clearProperty(confVar.getVarName());
     }
 
-    sparkHome = DownloadUtils.downloadSpark("2.3.2");
+    sparkHome = DownloadUtils.downloadSpark("2.3.2", "2.7");
     System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(),
             new File("..").getAbsolutePath());