You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/06/27 06:39:21 UTC

[40/50] [abbrv] incubator-livy git commit: LIVY-355. Refactor statement progress tracker to fix binary compatible issue (#323)

LIVY-355. Refactor statement progress tracker to fix binary compatible issue (#323)

* Refactor statement progress tracker to fix binary compatible issue

Change-Id: Ie91fd77472aeebe138bd6711a0baa82269a6b247

* refactor again to simplify the code

Change-Id: I9380bcb8dd2b594250783633a3c68e290ac7ea28

* isolate statementId to job group logic

Change-Id: If554aee2c0b3d96b54804f94cbb8df9af7843ab4


Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/0ddcaf68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/0ddcaf68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/0ddcaf68

Branch: refs/heads/master
Commit: 0ddcaf68d28a5120b1b74692bd92dde2301f5170
Parents: f5ef489
Author: Saisai Shao <sa...@gmail.com>
Authored: Wed May 10 04:25:57 2017 +0800
Committer: Jeff Zhang <zj...@gmail.com>
Committed: Tue May 9 13:25:57 2017 -0700

----------------------------------------------------------------------
 .../cloudera/livy/repl/SparkInterpreter.scala   |   4 +-
 .../livy/repl/SparkInterpreterSpec.scala        |   2 +-
 .../cloudera/livy/repl/SparkInterpreter.scala   |   4 +-
 .../livy/repl/SparkInterpreterSpec.scala        |   2 +-
 .../com/cloudera/livy/repl/Interpreter.scala    |  10 -
 .../cloudera/livy/repl/ProcessInterpreter.scala |   7 +-
 .../cloudera/livy/repl/PythonInterpreter.scala  |   9 +-
 .../com/cloudera/livy/repl/ReplDriver.scala     |  10 +-
 .../scala/com/cloudera/livy/repl/Session.scala  |  40 +++-
 .../cloudera/livy/repl/SparkRInterpreter.scala  |  10 +-
 .../livy/repl/StatementProgressListener.scala   | 162 -------------
 .../livy/repl/PythonInterpreterSpec.scala       |   6 +-
 .../cloudera/livy/repl/PythonSessionSpec.scala  |   6 +-
 .../livy/repl/ScalaInterpreterSpec.scala        |   2 +-
 .../livy/repl/SparkRInterpreterSpec.scala       |   3 +-
 .../cloudera/livy/repl/SparkRSessionSpec.scala  |   3 +-
 .../cloudera/livy/repl/SparkSessionSpec.scala   |  35 ++-
 .../repl/StatementProgressListenerSpec.scala    | 227 -------------------
 18 files changed, 93 insertions(+), 449 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0ddcaf68/repl/scala-2.10/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala b/repl/scala-2.10/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala
index 5ef5491..d736125 100644
--- a/repl/scala-2.10/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala
+++ b/repl/scala-2.10/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala
@@ -33,8 +33,7 @@ import org.apache.spark.repl.SparkIMain
 /**
  * This represents a Spark interpreter. It is not thread safe.
  */
-class SparkInterpreter(conf: SparkConf,
-    override val statementProgressListener: StatementProgressListener)
+class SparkInterpreter(conf: SparkConf)
   extends AbstractSparkInterpreter with SparkContextInitializer {
 
   private var sparkIMain: SparkIMain = _
@@ -108,7 +107,6 @@ class SparkInterpreter(conf: SparkConf,
       createSparkContext(conf)
     }
 
-    sparkContext.addSparkListener(statementProgressListener)
     sparkContext
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0ddcaf68/repl/scala-2.10/src/test/scala/com/cloudera/livy/repl/SparkInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/test/scala/com/cloudera/livy/repl/SparkInterpreterSpec.scala b/repl/scala-2.10/src/test/scala/com/cloudera/livy/repl/SparkInterpreterSpec.scala
index 3df35b5..e2b783a 100644
--- a/repl/scala-2.10/src/test/scala/com/cloudera/livy/repl/SparkInterpreterSpec.scala
+++ b/repl/scala-2.10/src/test/scala/com/cloudera/livy/repl/SparkInterpreterSpec.scala
@@ -24,7 +24,7 @@ import com.cloudera.livy.LivyBaseUnitTestSuite
 
 class SparkInterpreterSpec extends FunSpec with Matchers with LivyBaseUnitTestSuite {
   describe("SparkInterpreter") {
-    val interpreter = new SparkInterpreter(null, null)
+    val interpreter = new SparkInterpreter(null)
 
     it("should parse Scala compile error.") {
       // Regression test for LIVY-260.

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0ddcaf68/repl/scala-2.11/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala b/repl/scala-2.11/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala
index 6735b3a..f08a46e 100644
--- a/repl/scala-2.11/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala
+++ b/repl/scala-2.11/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala
@@ -33,8 +33,7 @@ import org.apache.spark.repl.SparkILoop
 /**
  * Scala 2.11 version of SparkInterpreter
  */
-class SparkInterpreter(conf: SparkConf,
-    override val statementProgressListener: StatementProgressListener)
+class SparkInterpreter(conf: SparkConf)
   extends AbstractSparkInterpreter with SparkContextInitializer {
 
   protected var sparkContext: SparkContext = _
@@ -94,7 +93,6 @@ class SparkInterpreter(conf: SparkConf,
       createSparkContext(conf)
     }
 
-    sparkContext.addSparkListener(statementProgressListener)
     sparkContext
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0ddcaf68/repl/scala-2.11/src/test/scala/com/cloudera/livy/repl/SparkInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/test/scala/com/cloudera/livy/repl/SparkInterpreterSpec.scala b/repl/scala-2.11/src/test/scala/com/cloudera/livy/repl/SparkInterpreterSpec.scala
index 56656d7..5cb88e3 100644
--- a/repl/scala-2.11/src/test/scala/com/cloudera/livy/repl/SparkInterpreterSpec.scala
+++ b/repl/scala-2.11/src/test/scala/com/cloudera/livy/repl/SparkInterpreterSpec.scala
@@ -24,7 +24,7 @@ import com.cloudera.livy.LivyBaseUnitTestSuite
 
 class SparkInterpreterSpec extends FunSpec with Matchers with LivyBaseUnitTestSuite {
   describe("SparkInterpreter") {
-    val interpreter = new SparkInterpreter(null, null)
+    val interpreter = new SparkInterpreter(null)
 
     it("should parse Scala compile error.") {
       // Regression test for LIVY-.

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0ddcaf68/repl/src/main/scala/com/cloudera/livy/repl/Interpreter.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/com/cloudera/livy/repl/Interpreter.scala b/repl/src/main/scala/com/cloudera/livy/repl/Interpreter.scala
index 59ad878..fa3b640 100644
--- a/repl/src/main/scala/com/cloudera/livy/repl/Interpreter.scala
+++ b/repl/src/main/scala/com/cloudera/livy/repl/Interpreter.scala
@@ -37,8 +37,6 @@ trait Interpreter {
 
   def kind: String
 
-  def statementProgressListener: StatementProgressListener
-
   /**
    * Start the Interpreter.
    *
@@ -47,14 +45,6 @@ trait Interpreter {
   def start(): SparkContext
 
   /**
-   * Execute the code and return the result.
-   */
-  def execute(statementId: Int, code: String): ExecuteResponse = {
-    statementProgressListener.setCurrentStatementId(statementId)
-    execute(code)
-  }
-
-  /**
    * Execute the code and return the result, it may
    * take some time to execute.
    */

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0ddcaf68/repl/src/main/scala/com/cloudera/livy/repl/ProcessInterpreter.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/com/cloudera/livy/repl/ProcessInterpreter.scala b/repl/src/main/scala/com/cloudera/livy/repl/ProcessInterpreter.scala
index 0414bbb..fe10697 100644
--- a/repl/src/main/scala/com/cloudera/livy/repl/ProcessInterpreter.scala
+++ b/repl/src/main/scala/com/cloudera/livy/repl/ProcessInterpreter.scala
@@ -41,8 +41,7 @@ private case class ShutdownRequest(promise: Promise[Unit]) extends Request
  *
  * @param process
  */
-abstract class ProcessInterpreter(process: Process,
-    override val statementProgressListener: StatementProgressListener)
+abstract class ProcessInterpreter(process: Process)
   extends Interpreter with Logging {
   protected[this] val stdin = new PrintWriter(process.getOutputStream)
   protected[this] val stdout = new BufferedReader(new InputStreamReader(process.getInputStream), 1)
@@ -53,9 +52,7 @@ abstract class ProcessInterpreter(process: Process,
     if (ClientConf.TEST_MODE) {
       null.asInstanceOf[SparkContext]
     } else {
-      val sc = SparkContext.getOrCreate()
-      sc.addSparkListener(statementProgressListener)
-      sc
+      SparkContext.getOrCreate()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0ddcaf68/repl/src/main/scala/com/cloudera/livy/repl/PythonInterpreter.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/com/cloudera/livy/repl/PythonInterpreter.scala b/repl/src/main/scala/com/cloudera/livy/repl/PythonInterpreter.scala
index 6e80c09..a04dfef 100644
--- a/repl/src/main/scala/com/cloudera/livy/repl/PythonInterpreter.scala
+++ b/repl/src/main/scala/com/cloudera/livy/repl/PythonInterpreter.scala
@@ -45,7 +45,7 @@ import com.cloudera.livy.sessions._
 // scalastyle:off println
 object PythonInterpreter extends Logging {
 
-  def apply(conf: SparkConf, kind: Kind, listener: StatementProgressListener): Interpreter = {
+  def apply(conf: SparkConf, kind: Kind): Interpreter = {
     val pythonExec = kind match {
         case PySpark() => sys.env.getOrElse("PYSPARK_PYTHON", "python")
         case PySpark3() => sys.env.getOrElse("PYSPARK3_PYTHON", "python3")
@@ -72,7 +72,7 @@ object PythonInterpreter extends Logging {
     env.put("LIVY_SPARK_MAJOR_VERSION", conf.get("spark.livy.spark_major_version", "1"))
     builder.redirectError(Redirect.PIPE)
     val process = builder.start()
-    new PythonInterpreter(process, gatewayServer, kind.toString, listener)
+    new PythonInterpreter(process, gatewayServer, kind.toString)
   }
 
   private def findPySparkArchives(): Seq[String] = {
@@ -190,9 +190,8 @@ object PythonInterpreter extends Logging {
 private class PythonInterpreter(
     process: Process,
     gatewayServer: GatewayServer,
-    pyKind: String,
-    listener: StatementProgressListener)
-  extends ProcessInterpreter(process, listener)
+    pyKind: String)
+  extends ProcessInterpreter(process)
   with Logging
 {
   implicit val formats = DefaultFormats

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0ddcaf68/repl/src/main/scala/com/cloudera/livy/repl/ReplDriver.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/com/cloudera/livy/repl/ReplDriver.scala b/repl/src/main/scala/com/cloudera/livy/repl/ReplDriver.scala
index d368c6a..695a9d0 100644
--- a/repl/src/main/scala/com/cloudera/livy/repl/ReplDriver.scala
+++ b/repl/src/main/scala/com/cloudera/livy/repl/ReplDriver.scala
@@ -44,11 +44,11 @@ class ReplDriver(conf: SparkConf, livyConf: RSCConf)
 
   override protected def initializeContext(): JavaSparkContext = {
     interpreter = kind match {
-      case PySpark() => PythonInterpreter(conf, PySpark(), new StatementProgressListener(livyConf))
+      case PySpark() => PythonInterpreter(conf, PySpark())
       case PySpark3() =>
-        PythonInterpreter(conf, PySpark3(), new StatementProgressListener(livyConf))
-      case Spark() => new SparkInterpreter(conf, new StatementProgressListener(livyConf))
-      case SparkR() => SparkRInterpreter(conf, new StatementProgressListener(livyConf))
+        PythonInterpreter(conf, PySpark3())
+      case Spark() => new SparkInterpreter(conf)
+      case SparkR() => SparkRInterpreter(conf)
     }
     session = new Session(livyConf, interpreter, { s => broadcast(new ReplState(s.toString)) })
 
@@ -94,7 +94,7 @@ class ReplDriver(conf: SparkConf, livyConf: RSCConf)
 
     // Update progress of statements when queried
     statements.foreach { s =>
-      s.updateProgress(interpreter.statementProgressListener.progressOfStatement(s.id))
+      s.updateProgress(session.progressOfStatement(s.id))
     }
 
     new ReplJobResults(statements.sortBy(_.id))

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0ddcaf68/repl/src/main/scala/com/cloudera/livy/repl/Session.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/com/cloudera/livy/repl/Session.scala b/repl/src/main/scala/com/cloudera/livy/repl/Session.scala
index 54056a3..31e520c 100644
--- a/repl/src/main/scala/com/cloudera/livy/repl/Session.scala
+++ b/repl/src/main/scala/com/cloudera/livy/repl/Session.scala
@@ -170,6 +170,29 @@ class Session(
     interpreter.close()
   }
 
+  /**
+   * Get the current progress of given statement id.
+   */
+  def progressOfStatement(stmtId: Int): Double = {
+    val jobGroup = statementIdToJobGroup(stmtId)
+
+    _sc.map { sc =>
+      val jobIds = sc.statusTracker.getJobIdsForGroup(jobGroup)
+      val jobs = jobIds.flatMap { id => sc.statusTracker.getJobInfo(id) }
+      val stages = jobs.flatMap { job =>
+        job.stageIds().flatMap(sc.statusTracker.getStageInfo)
+      }
+
+      val taskCount = stages.map(_.numTasks).sum
+      val completedTaskCount = stages.map(_.numCompletedTasks).sum
+      if (taskCount == 0) {
+        0.0
+      } else {
+        completedTaskCount.toDouble / taskCount
+      }
+    }.getOrElse(0.0)
+  }
+
   private def changeState(newState: SessionState): Unit = {
     synchronized {
       _state = newState
@@ -188,7 +211,7 @@ class Session(
     }
 
     val resultInJson = try {
-      interpreter.execute(executionCount, code) match {
+      interpreter.execute(code) match {
         case Interpreter.ExecuteSuccess(data) =>
           transitToIdle()
 
@@ -240,23 +263,28 @@ class Session(
   }
 
   private def setJobGroup(statementId: Int): String = {
+    val jobGroup = statementIdToJobGroup(statementId)
     val cmd = Kind(interpreter.kind) match {
       case Spark() =>
         // A dummy value to avoid automatic value binding in scala REPL.
-        s"""val _livyJobGroup$statementId = sc.setJobGroup("$statementId",""" +
-          s""""Job group for statement $statementId")"""
+        s"""val _livyJobGroup$jobGroup = sc.setJobGroup("$jobGroup",""" +
+          s""""Job group for statement $jobGroup")"""
       case PySpark() | PySpark3() =>
-        s"""sc.setJobGroup("$statementId", "Job group for statement $statementId")"""
+        s"""sc.setJobGroup("$jobGroup", "Job group for statement $jobGroup")"""
       case SparkR() =>
         interpreter.asInstanceOf[SparkRInterpreter].sparkMajorVersion match {
           case "1" =>
-            s"""setJobGroup(sc, "$statementId", "Job group for statement $statementId", """ +
+            s"""setJobGroup(sc, "$jobGroup", "Job group for statement $jobGroup", """ +
               "FALSE)"
           case "2" =>
-            s"""setJobGroup("$statementId", "Job group for statement $statementId", FALSE)"""
+            s"""setJobGroup("$jobGroup", "Job group for statement $jobGroup", FALSE)"""
         }
     }
     // Set the job group
     executeCode(statementId, cmd)
   }
+
+  private def statementIdToJobGroup(statementId: Int): String = {
+    statementId.toString
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0ddcaf68/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala b/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala
index 7318b1e..469d0a5 100644
--- a/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala
+++ b/repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala
@@ -68,7 +68,7 @@ object SparkRInterpreter {
     ")"
     ).r.unanchored
 
-  def apply(conf: SparkConf, listener: StatementProgressListener): SparkRInterpreter = {
+  def apply(conf: SparkConf): SparkRInterpreter = {
     val backendTimeout = sys.env.getOrElse("SPARKR_BACKEND_TIMEOUT", "120").toInt
     val mirror = universe.runtimeMirror(getClass.getClassLoader)
     val sparkRBackendClass = mirror.classLoader.loadClass("org.apache.spark.api.r.RBackend")
@@ -121,8 +121,7 @@ object SparkRInterpreter {
       val process = builder.start()
       new SparkRInterpreter(process, backendInstance, backendThread,
         conf.get("spark.livy.spark_major_version", "1"),
-        conf.getBoolean("spark.repl.enableHiveContext", false),
-        listener)
+        conf.getBoolean("spark.repl.enableHiveContext", false))
     } catch {
       case e: Exception =>
         if (backendThread != null) {
@@ -137,9 +136,8 @@ class SparkRInterpreter(process: Process,
     backendInstance: Any,
     backendThread: Thread,
     val sparkMajorVersion: String,
-    hiveEnabled: Boolean,
-    statementProgressListener: StatementProgressListener)
-  extends ProcessInterpreter(process, statementProgressListener) {
+    hiveEnabled: Boolean)
+  extends ProcessInterpreter(process) {
   import SparkRInterpreter._
 
   implicit val formats = DefaultFormats

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0ddcaf68/repl/src/main/scala/com/cloudera/livy/repl/StatementProgressListener.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/com/cloudera/livy/repl/StatementProgressListener.scala b/repl/src/main/scala/com/cloudera/livy/repl/StatementProgressListener.scala
deleted file mode 100644
index ae2147b..0000000
--- a/repl/src/main/scala/com/cloudera/livy/repl/StatementProgressListener.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to Cloudera, Inc. under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  Cloudera, Inc. licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.repl
-
-import scala.collection.mutable
-
-import com.google.common.annotations.VisibleForTesting
-import org.apache.spark.Success
-import org.apache.spark.scheduler._
-
-import com.cloudera.livy.rsc.RSCConf
-
-/**
- * [[StatementProgressListener]] is an implementation of SparkListener, used to track the progress
- * of submitted statement, this class builds a mapping relation between statement, jobs, stages
- * and tasks, and uses the finished task number to calculate the statement progress.
- *
- * By default 100 latest statement progresses will be kept, users could also configure
- * livy.rsc.retained_statements to change the cached number.
- *
- * This statement progress can only reflect the statement in which has Spark jobs, if
- * the statement submitted doesn't generate any Spark job, the progress will always return 0.0
- * until completed.
- *
- * Also if the statement includes several Spark jobs, the progress will be flipped because we
- * don't know the actual number of Spark jobs/tasks generated before the statement executed.
- */
-class StatementProgressListener(conf: RSCConf) extends SparkListener {
-
-  case class TaskCount(var currFinishedTasks: Int, var totalTasks: Int)
-  case class JobState(jobId: Int, var isCompleted: Boolean)
-
-  private val retainedStatements = conf.getInt(RSCConf.Entry.RETAINED_STATEMENT_NUMBER)
-
-  /** Statement id to list of jobs map */
-  @VisibleForTesting
-  private[repl] val statementToJobs = new mutable.LinkedHashMap[Int, Seq[JobState]]()
-  @VisibleForTesting
-  private[repl] val jobIdToStatement = new mutable.HashMap[Int, Int]()
-  /** Job id to list of stage ids map */
-  @VisibleForTesting
-  private[repl] val jobIdToStages = new mutable.HashMap[Int, Seq[Int]]()
-  /** Stage id to number of finished/total tasks map */
-  @VisibleForTesting
-  private[repl] val stageIdToTaskCount = new mutable.HashMap[Int, TaskCount]()
-
-  @transient private var currentStatementId: Int = _
-
-  /**
-   * Set current statement id, onJobStart() will use current statement id to build the mapping
-   * relations.
-   */
-  def setCurrentStatementId(stmtId: Int): Unit = {
-    currentStatementId = stmtId
-  }
-
-  /**
-   * Get the current progress of given statement id.
-   */
-  def progressOfStatement(stmtId: Int): Double = synchronized {
-    var finishedTasks = 0
-    var totalTasks = 0
-
-    for {
-      job <- statementToJobs.getOrElse(stmtId, Seq.empty)
-      stageId <- jobIdToStages.getOrElse(job.jobId, Seq.empty)
-      taskCount <- stageIdToTaskCount.get(stageId)
-    } yield {
-      finishedTasks += taskCount.currFinishedTasks
-      totalTasks += taskCount.totalTasks
-    }
-
-    if (totalTasks == 0) {
-      0.0
-    } else {
-      finishedTasks.toDouble / totalTasks
-    }
-  }
-
-  /**
-   * Get the active job ids of the given statement id.
-   */
-  def activeJobsOfStatement(stmtId: Int): Seq[Int] = synchronized {
-    statementToJobs.getOrElse(stmtId, Seq.empty).filter(!_.isCompleted).map(_.jobId)
-  }
-
-  override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
-    val jobs = statementToJobs.getOrElseUpdate(currentStatementId, Seq.empty) :+
-      JobState(jobStart.jobId, isCompleted = false)
-    statementToJobs.put(currentStatementId, jobs)
-    jobIdToStatement(jobStart.jobId) = currentStatementId
-
-    jobIdToStages(jobStart.jobId) = jobStart.stageInfos.map(_.stageId)
-    jobStart.stageInfos.foreach { s => stageIdToTaskCount(s.stageId) = TaskCount(0, s.numTasks) }
-  }
-
-  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
-    taskEnd.reason match {
-      case Success =>
-        stageIdToTaskCount.get(taskEnd.stageId).foreach { t => t.currFinishedTasks += 1 }
-      case _ =>
-        // If task is failed, it will run again, so don't count it.
-    }
-  }
-
-  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
-    // If stage is resubmitted, we should reset the task count of this stage.
-    stageIdToTaskCount.get(stageSubmitted.stageInfo.stageId).foreach { t =>
-      t.currFinishedTasks = 0
-      t.totalTasks = stageSubmitted.stageInfo.numTasks
-    }
-  }
-
-  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
-    stageIdToTaskCount.get(stageCompleted.stageInfo.stageId).foreach { t =>
-      t.currFinishedTasks = t.totalTasks
-    }
-  }
-
-  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
-    jobIdToStatement.get(jobEnd.jobId).foreach { stmtId =>
-      statementToJobs.get(stmtId).foreach { jobs =>
-        jobs.filter(_.jobId == jobEnd.jobId).foreach(_.isCompleted = true)
-      }
-    }
-
-    // Try to clean the old data when job is finished. This will trigger data cleaning in LRU
-    // policy.
-    cleanOldMetadata()
-  }
-
-  private def cleanOldMetadata(): Unit = {
-    if (statementToJobs.size > retainedStatements) {
-      val toRemove = statementToJobs.size - retainedStatements
-      statementToJobs.take(toRemove).foreach { case (_, jobs) =>
-        jobs.foreach { job =>
-          jobIdToStatement.remove(job.jobId)
-          jobIdToStages.remove(job.jobId).foreach { stages =>
-            stages.foreach(s => stageIdToTaskCount.remove(s))
-          }
-        }
-      }
-      (0 until toRemove).foreach(_ => statementToJobs.remove(statementToJobs.head._1))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0ddcaf68/repl/src/test/scala/com/cloudera/livy/repl/PythonInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/PythonInterpreterSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/PythonInterpreterSpec.scala
index c67d580..a4a40af 100644
--- a/repl/src/test/scala/com/cloudera/livy/repl/PythonInterpreterSpec.scala
+++ b/repl/src/test/scala/com/cloudera/livy/repl/PythonInterpreterSpec.scala
@@ -245,8 +245,7 @@ class Python2InterpreterSpec extends PythonBaseInterpreterSpec {
 
   implicit val formats = DefaultFormats
 
-  override def createInterpreter(): Interpreter =
-    PythonInterpreter(new SparkConf(), PySpark(), new StatementProgressListener(new RSCConf()))
+  override def createInterpreter(): Interpreter = PythonInterpreter(new SparkConf(), PySpark())
 
   // Scalastyle is treating unicode escape as non ascii characters. Turn off the check.
   // scalastyle:off non.ascii.character.disallowed
@@ -273,8 +272,7 @@ class Python3InterpreterSpec extends PythonBaseInterpreterSpec {
     test()
   }
 
-  override def createInterpreter(): Interpreter =
-    PythonInterpreter(new SparkConf(), PySpark3(), new StatementProgressListener(new RSCConf()))
+  override def createInterpreter(): Interpreter = PythonInterpreter(new SparkConf(), PySpark3())
 
   it should "check python version is 3.x" in withInterpreter { interpreter =>
     val response = interpreter.execute("""import sys

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0ddcaf68/repl/src/test/scala/com/cloudera/livy/repl/PythonSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/PythonSessionSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/PythonSessionSpec.scala
index 28f457f..1e5958d 100644
--- a/repl/src/test/scala/com/cloudera/livy/repl/PythonSessionSpec.scala
+++ b/repl/src/test/scala/com/cloudera/livy/repl/PythonSessionSpec.scala
@@ -174,8 +174,7 @@ abstract class PythonSessionSpec extends BaseSessionSpec {
 }
 
 class Python2SessionSpec extends PythonSessionSpec {
-  override def createInterpreter(): Interpreter =
-    PythonInterpreter(new SparkConf(), PySpark(), new StatementProgressListener(new RSCConf()))
+  override def createInterpreter(): Interpreter = PythonInterpreter(new SparkConf(), PySpark())
 }
 
 class Python3SessionSpec extends PythonSessionSpec {
@@ -185,8 +184,7 @@ class Python3SessionSpec extends PythonSessionSpec {
     test()
   }
 
-  override def createInterpreter(): Interpreter =
-    PythonInterpreter(new SparkConf(), PySpark3(), new StatementProgressListener(new RSCConf()))
+  override def createInterpreter(): Interpreter = PythonInterpreter(new SparkConf(), PySpark3())
 
   it should "check python version is 3.x" in withSession { session =>
     val statement = execute(session)(

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0ddcaf68/repl/src/test/scala/com/cloudera/livy/repl/ScalaInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/ScalaInterpreterSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/ScalaInterpreterSpec.scala
index a9e1e8b..0126796 100644
--- a/repl/src/test/scala/com/cloudera/livy/repl/ScalaInterpreterSpec.scala
+++ b/repl/src/test/scala/com/cloudera/livy/repl/ScalaInterpreterSpec.scala
@@ -29,7 +29,7 @@ class ScalaInterpreterSpec extends BaseInterpreterSpec {
   implicit val formats = DefaultFormats
 
   override def createInterpreter(): Interpreter =
-    new SparkInterpreter(new SparkConf(), new StatementProgressListener(new RSCConf()))
+    new SparkInterpreter(new SparkConf())
 
   it should "execute `1 + 2` == 3" in withInterpreter { interpreter =>
     val response = interpreter.execute("1 + 2")

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0ddcaf68/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala
index a513867..61f1a36 100644
--- a/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala
+++ b/repl/src/test/scala/com/cloudera/livy/repl/SparkRInterpreterSpec.scala
@@ -34,8 +34,7 @@ class SparkRInterpreterSpec extends BaseInterpreterSpec {
     super.withFixture(test)
   }
 
-  override def createInterpreter(): Interpreter =
-    SparkRInterpreter(new SparkConf(), new StatementProgressListener(new RSCConf()))
+  override def createInterpreter(): Interpreter = SparkRInterpreter(new SparkConf())
 
   it should "execute `1 + 2` == 3" in withInterpreter { interpreter =>
     val response = interpreter.execute("1 + 2")

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0ddcaf68/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala
index a6091d0..c604205 100644
--- a/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala
+++ b/repl/src/test/scala/com/cloudera/livy/repl/SparkRSessionSpec.scala
@@ -31,8 +31,7 @@ class SparkRSessionSpec extends BaseSessionSpec {
     super.withFixture(test)
   }
 
-  override def createInterpreter(): Interpreter =
-    SparkRInterpreter(new SparkConf(), new StatementProgressListener(new RSCConf()))
+  override def createInterpreter(): Interpreter = SparkRInterpreter(new SparkConf())
 
   it should "execute `1 + 2` == 3" in withSession { session =>
     val statement = execute(session)("1 + 2")

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0ddcaf68/repl/src/test/scala/com/cloudera/livy/repl/SparkSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/SparkSessionSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/SparkSessionSpec.scala
index a051513..52b6b42 100644
--- a/repl/src/test/scala/com/cloudera/livy/repl/SparkSessionSpec.scala
+++ b/repl/src/test/scala/com/cloudera/livy/repl/SparkSessionSpec.scala
@@ -32,8 +32,7 @@ import com.cloudera.livy.rsc.driver.StatementState
 
 class SparkSessionSpec extends BaseSessionSpec {
 
-  override def createInterpreter(): Interpreter =
-    new SparkInterpreter(new SparkConf(), new StatementProgressListener(new RSCConf()))
+  override def createInterpreter(): Interpreter = new SparkInterpreter(new SparkConf())
 
   it should "execute `1 + 2` == 3" in withSession { session =>
     val statement = execute(session)("1 + 2")
@@ -240,4 +239,36 @@ class SparkSessionSpec extends BaseSessionSpec {
         "Job 0 cancelled part of cancelled job group 0")
     }
   }
+
+  it should "correctly calculate progress" in withSession { session =>
+    val executeCode =
+      """
+        |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
+      """.stripMargin
+
+    val stmtId = session.execute(executeCode)
+    eventually(timeout(30 seconds), interval(100 millis)) {
+      session.progressOfStatement(stmtId) should be(1.0)
+    }
+  }
+
+  it should "not generate Spark jobs for plain Scala code" in withSession { session =>
+    val executeCode = """1 + 1"""
+
+    val stmtId = session.execute(executeCode)
+    session.progressOfStatement(stmtId) should be (0.0)
+  }
+
+  it should "handle multiple jobs in one statement" in withSession { session =>
+    val executeCode =
+      """
+        |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
+        |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
+      """.stripMargin
+
+    val stmtId = session.execute(executeCode)
+    eventually(timeout(30 seconds), interval(100 millis)) {
+      session.progressOfStatement(stmtId) should be(1.0)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/0ddcaf68/repl/src/test/scala/com/cloudera/livy/repl/StatementProgressListenerSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/com/cloudera/livy/repl/StatementProgressListenerSpec.scala b/repl/src/test/scala/com/cloudera/livy/repl/StatementProgressListenerSpec.scala
deleted file mode 100644
index 2acee4c..0000000
--- a/repl/src/test/scala/com/cloudera/livy/repl/StatementProgressListenerSpec.scala
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to Cloudera, Inc. under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  Cloudera, Inc. licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.repl
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.duration._
-import scala.language.{postfixOps, reflectiveCalls}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.scheduler._
-import org.scalatest._
-import org.scalatest.concurrent.Eventually._
-
-import com.cloudera.livy.LivyBaseUnitTestSuite
-import com.cloudera.livy.rsc.RSCConf
-
-class StatementProgressListenerSpec extends FlatSpec
-    with Matchers
-    with BeforeAndAfterAll
-    with BeforeAndAfter
-    with LivyBaseUnitTestSuite {
-  private val rscConf = new RSCConf()
-    .set(RSCConf.Entry.RETAINED_STATEMENT_NUMBER, 2)
-
-  private val testListener = new StatementProgressListener(rscConf) {
-    var onJobStartedCallback: Option[() => Unit] = None
-    var onJobEndCallback: Option[() => Unit] = None
-    var onStageEndCallback: Option[() => Unit] = None
-    var onTaskEndCallback: Option[() => Unit] = None
-
-    override  def onJobStart(jobStart: SparkListenerJobStart): Unit = {
-      super.onJobStart(jobStart)
-      onJobStartedCallback.foreach(f => f())
-    }
-
-    override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
-      super.onJobEnd(jobEnd)
-      onJobEndCallback.foreach(f => f())
-    }
-
-    override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
-      super.onStageCompleted(stageCompleted)
-      onStageEndCallback.foreach(f => f())
-    }
-
-    override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
-      super.onTaskEnd(taskEnd)
-      onTaskEndCallback.foreach(f => f())
-    }
-  }
-
-  private val statementId = new AtomicInteger(0)
-
-  private def getStatementId = statementId.getAndIncrement()
-
-  private var sparkInterpreter: SparkInterpreter = _
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    sparkInterpreter = new SparkInterpreter(new SparkConf(), testListener)
-    sparkInterpreter.start()
-  }
-
-  override def afterAll(): Unit = {
-    sparkInterpreter.close()
-    super.afterAll()
-  }
-
-  after {
-    testListener.onJobStartedCallback = None
-    testListener.onJobEndCallback = None
-    testListener.onStageEndCallback = None
-    testListener.onTaskEndCallback = None
-  }
-
-  it should "correctly calculate progress" in {
-    val executeCode =
-      """
-        |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
-      """.stripMargin
-    val stmtId = getStatementId
-
-    def verifyJobs(): Unit = {
-      testListener.statementToJobs.get(stmtId) should not be (None)
-
-      // One job will be submitted
-      testListener.statementToJobs(stmtId).size should be (1)
-      val jobId = testListener.statementToJobs(stmtId).head.jobId
-      testListener.jobIdToStatement(jobId) should be (stmtId)
-
-      // 1 stage will be generated
-      testListener.jobIdToStages(jobId).size should be (1)
-      val stageIds = testListener.jobIdToStages(jobId)
-
-      // 2 tasks per stage will be generated
-      stageIds.foreach { id =>
-        testListener.stageIdToTaskCount(id).currFinishedTasks should be (0)
-        testListener.stageIdToTaskCount(id).totalTasks should be (2)
-      }
-    }
-
-    var taskEndCalls = 0
-    def verifyTasks(): Unit = {
-      taskEndCalls += 1
-      testListener.progressOfStatement(stmtId) should be (taskEndCalls.toDouble / 2)
-    }
-
-    var stageEndCalls = 0
-    def verifyStages(): Unit = {
-      stageEndCalls += 1
-      testListener.progressOfStatement(stmtId) should be (stageEndCalls.toDouble / 1)
-    }
-
-    testListener.onJobStartedCallback = Some(verifyJobs)
-    testListener.onTaskEndCallback = Some(verifyTasks)
-    testListener.onStageEndCallback = Some(verifyStages)
-    sparkInterpreter.execute(stmtId, executeCode)
-
-    eventually(timeout(30 seconds), interval(100 millis)) {
-      testListener.progressOfStatement(stmtId) should be(1.0)
-    }
-  }
-
-  it should "not generate Spark jobs for plain Scala code" in {
-    val executeCode = """1 + 1"""
-    val stmtId = getStatementId
-
-    def verifyJobs(): Unit = {
-      fail("No job will be submitted")
-    }
-
-    testListener.onJobStartedCallback = Some(verifyJobs)
-    testListener.progressOfStatement(stmtId) should be (0.0)
-    sparkInterpreter.execute(stmtId, executeCode)
-    testListener.progressOfStatement(stmtId) should be (0.0)
-  }
-
-  it should "handle multiple jobs in one statement" in {
-    val executeCode =
-      """
-        |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
-        |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
-      """.stripMargin
-    val stmtId = getStatementId
-
-    var jobs = 0
-    def verifyJobs(): Unit = {
-      jobs += 1
-
-      testListener.statementToJobs.get(stmtId) should not be (None)
-      // One job will be submitted
-      testListener.statementToJobs(stmtId).size should be (jobs)
-      val jobId = testListener.statementToJobs(stmtId)(jobs - 1).jobId
-      testListener.jobIdToStatement(jobId) should be (stmtId)
-
-      // 1 stages will be generated
-      testListener.jobIdToStages(jobId).size should be (1)
-      val stageIds = testListener.jobIdToStages(jobId)
-
-      // 2 tasks per stage will be generated
-      stageIds.foreach { id =>
-        testListener.stageIdToTaskCount(id).currFinishedTasks should be (0)
-        testListener.stageIdToTaskCount(id).totalTasks should be (2)
-      }
-    }
-
-    val taskProgress = ArrayBuffer[Double]()
-    def verifyTasks(): Unit = {
-      taskProgress += testListener.progressOfStatement(stmtId)
-    }
-
-    val stageProgress = ArrayBuffer[Double]()
-    def verifyStages(): Unit = {
-      stageProgress += testListener.progressOfStatement(stmtId)
-    }
-
-    testListener.onJobStartedCallback = Some(verifyJobs)
-    testListener.onTaskEndCallback = Some(verifyTasks)
-    testListener.onStageEndCallback = Some(verifyStages)
-    sparkInterpreter.execute(stmtId, executeCode)
-
-    taskProgress.toArray should be (Array(0.5, 1.0, 0.75, 1.0))
-    stageProgress.toArray should be (Array(1.0, 1.0))
-
-    eventually(timeout(30 seconds), interval(100 millis)) {
-      testListener.progressOfStatement(stmtId) should be(1.0)
-    }
-  }
-
-  it should "remove old statement progress" in {
-    val executeCode =
-      """
-        |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect()
-      """.stripMargin
-    val stmtId = getStatementId
-
-    def onJobEnd(): Unit = {
-      testListener.statementToJobs(stmtId).size should be (1)
-      testListener.statementToJobs(stmtId).head.isCompleted should be (true)
-
-      testListener.statementToJobs.size should be (2)
-      testListener.statementToJobs.get(0) should be (None)
-      testListener.jobIdToStatement.filter(_._2 == 0) should be (Map.empty)
-    }
-
-    testListener.onJobEndCallback = Some(onJobEnd)
-    sparkInterpreter.execute(stmtId, executeCode)
-  }
-}