You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/02/11 01:48:41 UTC
zeppelin git commit: ZEPPELIN-3108. Support Spark 2.3
Repository: zeppelin
Updated Branches:
refs/heads/master 29b9b10f3 -> d9faef108
ZEPPELIN-3108. Support Spark 2.3
### What is this PR for?
Spark 2.3 remove `JobProgressListener` which cause zeppelin unable to run spark 2.3.
This PR try to make Zeppelin support spark 2.3 via using `sc.statusTracker`, see `JobProgressUtil.scala`
### What type of PR is it?
[Improvement ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3108
### How should this be tested?
* Verified manually.
### Screenshots (if appropriate)
![screen shot 2018-01-30 at 9 45 01 pm](https://user-images.githubusercontent.com/164491/35569317-dce6f348-0606-11e8-9b18-74a847d64ac9.png)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #2750 from zjffdu/ZEPPELIN-3108 and squashes the following commits:
43ae78a [Jeff Zhang] ZEPPELIN-3108. Support Spark 2.3
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d9faef10
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d9faef10
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d9faef10
Branch: refs/heads/master
Commit: d9faef1085e4ade496ff7f3d7f8472a28678f8e7
Parents: 29b9b10
Author: Jeff Zhang <zj...@apache.org>
Authored: Tue Nov 14 15:29:58 2017 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Sun Feb 11 09:48:32 2018 +0800
----------------------------------------------------------------------
spark/interpreter/pom.xml | 10 +-
.../zeppelin/spark/OldSparkInterpreter.java | 153 +++++++++++++------
spark/pom.xml | 19 ++-
spark/spark-dependencies/pom.xml | 10 +-
.../spark/BaseSparkScalaInterpreter.scala | 16 +-
.../apache/zeppelin/spark/JobProgressUtil.scala | 37 +++++
6 files changed, 164 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9faef10/spark/interpreter/pom.xml
----------------------------------------------------------------------
diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
index 4496462..758f697 100644
--- a/spark/interpreter/pom.xml
+++ b/spark/interpreter/pom.xml
@@ -53,15 +53,7 @@
<pyspark.test.exclude>**/PySparkInterpreterMatplotlibTest.java</pyspark.test.exclude>
<pyspark.test.include>**/*Test.*</pyspark.test.include>
-
- <spark.archive>spark-${spark.version}</spark.archive>
- <spark.src.download.url>
- http://d3kbcqa49mib13.cloudfront.net/${spark.archive}.tgz
- </spark.src.download.url>
- <spark.bin.download.url>
- http://d3kbcqa49mib13.cloudfront.net/spark-${spark.version}-bin-without-hadoop.tgz
- </spark.bin.download.url>
-
+
</properties>
<dependencies>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9faef10/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
index 6a54c3b..da332fe 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.spark.JobProgressUtil;
import org.apache.spark.SecurityManager;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
@@ -44,10 +45,26 @@ import org.apache.spark.repl.SparkILoop;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.scheduler.Pool;
+import org.apache.spark.scheduler.SparkListenerApplicationEnd;
+import org.apache.spark.scheduler.SparkListenerApplicationStart;
+import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
+import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
+import org.apache.spark.scheduler.SparkListenerBlockUpdated;
+import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorAdded;
+import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.scheduler.SparkListenerStageCompleted;
+import org.apache.spark.scheduler.SparkListenerStageSubmitted;
+import org.apache.spark.scheduler.SparkListenerTaskEnd;
+import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
+import org.apache.spark.scheduler.SparkListenerTaskStart;
+import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.ui.SparkUI;
-import org.apache.spark.ui.jobs.JobProgressListener;
+import org.apache.spark.scheduler.SparkListener;
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -113,7 +130,7 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter {
private static InterpreterHookRegistry hooks;
private static SparkEnv env;
private static Object sparkSession; // spark 2.x
- private static JobProgressListener sparkListener;
+ private static SparkListener sparkListener;
private static AbstractFile classOutputDir;
private static Integer sharedInterpreterLock = new Integer(0);
private static AtomicInteger numReferenceOfSparkContext = new AtomicInteger(0);
@@ -173,11 +190,10 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter {
}
}
- static JobProgressListener setupListeners(SparkContext context) {
- JobProgressListener pl = new JobProgressListener(context.getConf()) {
+ static SparkListener setupListeners(SparkContext context) {
+ SparkListener pl = new SparkListener() {
@Override
public synchronized void onJobStart(SparkListenerJobStart jobStart) {
- super.onJobStart(jobStart);
int jobId = jobStart.jobId();
String jobGroupId = jobStart.properties().getProperty("spark.jobGroup.id");
String uiEnabled = jobStart.properties().getProperty("spark.ui.enabled");
@@ -207,6 +223,85 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter {
return jobUrl;
}
+ @Override
+ public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
+
+ }
+
+ @Override
+ public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) {
+
+ }
+
+ @Override
+ public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
+
+ }
+
+ @Override
+ public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
+
+ }
+
+ @Override
+ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
+
+ }
+
+ @Override
+ public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
+
+ }
+
+ @Override
+ public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
+
+ }
+
+ @Override
+ public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
+
+ }
+
+ @Override
+ public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
+
+ }
+
+ @Override
+ public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
+
+ }
+
+ @Override
+ public void onJobEnd(SparkListenerJobEnd jobEnd) {
+
+ }
+
+ @Override
+ public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
+
+ }
+
+ @Override
+ public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
+
+ }
+
+ @Override
+ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+
+ }
+
+ @Override
+ public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
+
+ }
+
+ @Override
+ public void onTaskStart(SparkListenerTaskStart taskStart) {
+
+ }
};
try {
Object listenerBus = context.getClass().getMethod("listenerBus").invoke(context);
@@ -224,7 +319,7 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter {
continue;
}
- if (!parameterTypes[0].isAssignableFrom(JobProgressListener.class)) {
+ if (!parameterTypes[0].isAssignableFrom(SparkListener.class)) {
continue;
}
@@ -1274,48 +1369,10 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter {
@Override
public int getProgress(InterpreterContext context) {
String jobGroup = Utils.buildJobGroupId(context);
- int completedTasks = 0;
- int totalTasks = 0;
-
- DAGScheduler scheduler = sc.dagScheduler();
- if (scheduler == null) {
- return 0;
- }
- HashSet<ActiveJob> jobs = scheduler.activeJobs();
- if (jobs == null || jobs.size() == 0) {
- return 0;
- }
- Iterator<ActiveJob> it = jobs.iterator();
- while (it.hasNext()) {
- ActiveJob job = it.next();
- String g = (String) job.properties().get("spark.jobGroup.id");
- if (jobGroup.equals(g)) {
- int[] progressInfo = null;
- try {
- Object finalStage = job.getClass().getMethod("finalStage").invoke(job);
- if (sparkVersion.getProgress1_0()) {
- progressInfo = getProgressFromStage_1_0x(sparkListener, finalStage);
- } else {
- progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage);
- }
- } catch (IllegalAccessException | IllegalArgumentException
- | InvocationTargetException | NoSuchMethodException
- | SecurityException e) {
- logger.error("Can't get progress info", e);
- return 0;
- }
- totalTasks += progressInfo[0];
- completedTasks += progressInfo[1];
- }
- }
-
- if (totalTasks == 0) {
- return 0;
- }
- return completedTasks * 100 / totalTasks;
+ return JobProgressUtil.progress(sc, jobGroup);
}
- private int[] getProgressFromStage_1_0x(JobProgressListener sparkListener, Object stage)
+ private int[] getProgressFromStage_1_0x(SparkListener sparkListener, Object stage)
throws IllegalAccessException, IllegalArgumentException,
InvocationTargetException, NoSuchMethodException, SecurityException {
int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage);
@@ -1345,7 +1402,7 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter {
return new int[] {numTasks, completedTasks};
}
- private int[] getProgressFromStage_1_1x(JobProgressListener sparkListener, Object stage)
+ private int[] getProgressFromStage_1_1x(SparkListener sparkListener, Object stage)
throws IllegalAccessException, IllegalArgumentException,
InvocationTargetException, NoSuchMethodException, SecurityException {
int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage);
@@ -1421,7 +1478,7 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter {
return FormType.NATIVE;
}
- public JobProgressListener getJobProgressListener() {
+ public SparkListener getJobProgressListener() {
return sparkListener;
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9faef10/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 06b7d9f..c55e453 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -47,6 +47,14 @@
<!-- spark versions -->
<spark.version>2.2.0</spark.version>
<py4j.version>0.10.4</py4j.version>
+
+ <spark.archive>spark-${spark.version}</spark.archive>
+ <spark.src.download.url>
+ http://d3kbcqa49mib13.cloudfront.net/${spark.archive}.tgz
+ </spark.src.download.url>
+ <spark.bin.download.url>
+ http://d3kbcqa49mib13.cloudfront.net/${spark.archive}-bin-without-hadoop.tgz
+ </spark.bin.download.url>
</properties>
<dependencies>
@@ -57,7 +65,6 @@
<version>${project.version}</version>
</dependency>
- <!--test libraries-->
<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-display</artifactId>
@@ -187,6 +194,16 @@
<profiles>
+
+ <profile>
+ <id>spark-2.3</id>
+ <properties>
+ <spark.version>2.3.0</spark.version>
+ <protobuf.version>2.5.0</protobuf.version>
+ <spark.py4j.version>0.10.6</spark.py4j.version>
+ </properties>
+ </profile>
+
<profile>
<id>spark-2.2</id>
<activation>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9faef10/spark/spark-dependencies/pom.xml
----------------------------------------------------------------------
diff --git a/spark/spark-dependencies/pom.xml b/spark/spark-dependencies/pom.xml
index 7643dc9..58977b4 100644
--- a/spark/spark-dependencies/pom.xml
+++ b/spark/spark-dependencies/pom.xml
@@ -28,7 +28,7 @@
</parent>
<groupId>org.apache.zeppelin</groupId>
- <artifactId>zeppelin-spark-dependencies_2.10</artifactId>
+ <artifactId>zeppelin-spark-dependencies</artifactId>
<packaging>jar</packaging>
<version>0.9.0-SNAPSHOT</version>
<name>Zeppelin: Spark dependencies</name>
@@ -54,14 +54,6 @@
<akka.group>org.spark-project.akka</akka.group>
<akka.version>2.3.4-spark</akka.version>
- <spark.archive>spark-${spark.version}</spark.archive>
- <spark.src.download.url>
- http://d3kbcqa49mib13.cloudfront.net/${spark.archive}.tgz
- </spark.src.download.url>
- <spark.bin.download.url>
- http://d3kbcqa49mib13.cloudfront.net/${spark.archive}-bin-without-hadoop.tgz
- </spark.bin.download.url>
-
<!--plugin versions-->
<plugin.shade.version>2.3</plugin.shade.version>
</properties>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9faef10/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
----------------------------------------------------------------------
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index 3ef4fe7..883beb0 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -21,7 +21,7 @@ package org.apache.zeppelin.spark
import java.io.File
import org.apache.spark.sql.SQLContext
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{JobProgressUtil, SparkConf, SparkContext}
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult}
@@ -93,19 +93,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
}
protected def getProgress(jobGroup: String, context: InterpreterContext): Int = {
- val jobIds = sc.statusTracker.getJobIdsForGroup(jobGroup)
- val jobs = jobIds.flatMap { id => sc.statusTracker.getJobInfo(id) }
- val stages = jobs.flatMap { job =>
- job.stageIds().flatMap(sc.statusTracker.getStageInfo)
- }
-
- val taskCount = stages.map(_.numTasks).sum
- val completedTaskCount = stages.map(_.numCompletedTasks).sum
- if (taskCount == 0) {
- 0
- } else {
- (100 * completedTaskCount.toDouble / taskCount).toInt
- }
+ JobProgressUtil.progress(sc, jobGroup)
}
protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9faef10/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala
----------------------------------------------------------------------
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala
new file mode 100644
index 0000000..517bed0
--- /dev/null
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+object JobProgressUtil {
+
+ def progress(sc: SparkContext, jobGroup : String):Int = {
+ val jobIds = sc.statusTracker.getJobIdsForGroup(jobGroup)
+ val jobs = jobIds.flatMap { id => sc.statusTracker.getJobInfo(id) }
+ val stages = jobs.flatMap { job =>
+ job.stageIds().flatMap(sc.statusTracker.getStageInfo)
+ }
+
+ val taskCount = stages.map(_.numTasks).sum
+ val completedTaskCount = stages.map(_.numCompletedTasks).sum
+ if (taskCount == 0) {
+ 0
+ } else {
+ (100 * completedTaskCount.toDouble / taskCount).toInt
+ }
+ }
+}