You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/07/05 03:06:57 UTC
[28/33] incubator-livy git commit: LIVY-375. Change Livy code package
name to org.apache.livy
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/main/scala/org/apache/livy/test/framework/RealCluster.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/main/scala/org/apache/livy/test/framework/RealCluster.scala b/integration-test/src/main/scala/org/apache/livy/test/framework/RealCluster.scala
new file mode 100644
index 0000000..0acc580
--- /dev/null
+++ b/integration-test/src/main/scala/org/apache/livy/test/framework/RealCluster.scala
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test.framework
+
+import java.io.{File, IOException}
+import java.nio.file.Files
+import java.security.PrivilegedExceptionAction
+import javax.servlet.http.HttpServletResponse._
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.sys.process._
+import scala.util.Random
+
+import com.decodified.scalassh._
+import com.ning.http.client.AsyncHttpClient
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.security.UserGroupInformation
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.livy.{LivyConf, Logging}
+
+private class RealClusterConfig(config: Map[String, String]) {
+ val ip = config("ip")
+
+ val sshLogin = config("ssh.login")
+ val sshPubKey = config("ssh.pubkey")
+ val livyPort = config.getOrElse(LivyConf.SERVER_PORT.key, "8998").toInt
+ val livyClasspath = config.getOrElse("livy.classpath", "")
+
+ val deployLivy = config.getOrElse("deploy-livy", "true").toBoolean
+ val noDeployLivyHome = config.get("livy-home")
+
+ val sparkHome = config("env.spark_home")
+ val sparkConf = config.getOrElse("env.spark_conf", "/etc/spark/conf")
+ val hadoopConf = config.getOrElse("env.hadoop_conf", "/etc/hadoop/conf")
+
+ val javaHome = config.getOrElse("env.java_home", "/usr/java/default")
+}
+
+class RealCluster(_config: Map[String, String])
+ extends Cluster with ClusterUtils with Logging {
+
+ private val config = new RealClusterConfig(_config)
+
+ private var livyIsRunning = false
+ private var livyHomePath: String = _
+ private var livyEpoch = 0
+
+ private var _configDir: File = _
+
+ private var hdfsScratch: Path = _
+
+ private var sparkConfDir: String = _
+ private var tempDirPath: String = _
+ private var _hasSparkR: Boolean = _
+
+ override def isRealSpark(): Boolean = true
+
+ override def hasSparkR(): Boolean = _hasSparkR
+
+ override def configDir(): File = _configDir
+
+ override def hdfsScratchDir(): Path = hdfsScratch
+
+ override def doAsClusterUser[T](task: => T): T = {
+ val user = UserGroupInformation.createRemoteUser(config.sshLogin)
+ user.doAs(new PrivilegedExceptionAction[T] {
+ override def run(): T = task
+ })
+ }
+
+ private def sshClient[T](body: SshClient => SSH.Result[T]): T = {
+ val sshLogin = PublicKeyLogin(
+ config.sshLogin, None, config.sshPubKey :: Nil)
+ val hostConfig = HostConfig(login = sshLogin, hostKeyVerifier = HostKeyVerifiers.DontVerify)
+ SSH(config.ip, hostConfig)(body) match {
+ case Left(err) => throw new IOException(err)
+ case Right(result) => result
+ }
+ }
+
+ private def exec(cmd: String): CommandResult = {
+ info(s"Running command: $cmd")
+ val result = sshClient(_.exec(cmd))
+ result.exitCode match {
+ case Some(ec) if ec > 0 =>
+ throw new IOException(s"Command '$cmd' failed: $ec\n" +
+ s"stdout: ${result.stdOutAsString()}\n" +
+ s"stderr: ${result.stdErrAsString()}\n")
+ case _ =>
+ }
+ result
+ }
+
+ private def upload(local: String, remote: String): Unit = {
+ info(s"Uploading local path $local")
+ sshClient(_.upload(local, remote))
+ }
+
+ private def download(remote: String, local: String): Unit = {
+ info(s"Downloading remote path $remote")
+ sshClient(_.download(remote, local))
+ }
+
+ override def deploy(): Unit = {
+ // Make sure Livy is not running.
+ stopLivy()
+
+ // Check whether SparkR is supported in YARN (need the sparkr.zip archive).\
+ _hasSparkR = try {
+ exec(s"test -f ${config.sparkHome}/R/lib/sparkr.zip")
+ true
+ } catch {
+ case e: IOException => false
+ }
+
+ // Copy the remove Hadoop configuration to a local temp dir so that tests can use it to
+ // talk to HDFS and YARN.
+ val localTemp = new File(sys.props("java.io.tmpdir") + File.separator + "hadoop-conf")
+ download(config.hadoopConf, localTemp.getAbsolutePath())
+ _configDir = localTemp
+
+ // Create a temp directory where test files will be written.
+ tempDirPath = s"/tmp/livy-it-${Random.alphanumeric.take(16).mkString}"
+ exec(s"mkdir -p $tempDirPath")
+
+ // Also create an HDFS scratch directory for tests.
+ doAsClusterUser {
+ hdfsScratch = fs.makeQualified(
+ new Path(s"/tmp/livy-it-${Random.alphanumeric.take(16).mkString}"))
+ fs.mkdirs(hdfsScratch)
+ fs.setPermission(hdfsScratch, new FsPermission("777"))
+ }
+
+ // Create a copy of the Spark configuration, and make sure the master is "yarn-cluster".
+ sparkConfDir = s"$tempDirPath/spark-conf"
+ val sparkProps = s"$sparkConfDir/spark-defaults.conf"
+ Seq(
+ s"cp -Lr ${config.sparkConf} $sparkConfDir",
+ s"touch $sparkProps",
+ s"sed -i.old '/spark.master.*/d' $sparkProps",
+ s"sed -i.old '/spark.submit.deployMode.*/d' $sparkProps",
+ s"echo 'spark.master=yarn-cluster' >> $sparkProps"
+ ).foreach(exec)
+
+ if (config.deployLivy) {
+ try {
+ info(s"Deploying Livy to ${config.ip}...")
+ val version = sys.props("project.version")
+ val assemblyZip = new File(s"../assembly/target/livy-server-$version.zip")
+ assert(assemblyZip.isFile,
+ s"Can't find livy assembly zip at ${assemblyZip.getCanonicalPath}")
+ val assemblyName = assemblyZip.getName()
+
+ // SSH to the node to unzip and install Livy.
+ upload(assemblyZip.getCanonicalPath, s"$tempDirPath/$assemblyName")
+ exec(s"unzip -o $tempDirPath/$assemblyName -d $tempDirPath")
+ livyHomePath = s"$tempDirPath/livy-server-$version"
+ info(s"Deployed Livy to ${config.ip} at $livyHomePath.")
+ } catch {
+ case e: Exception =>
+ error(s"Failed to deploy Livy to ${config.ip}.", e)
+ cleanUp()
+ throw e
+ }
+ } else {
+ livyHomePath = config.noDeployLivyHome.get
+ info("Skipping deployment.")
+ }
+
+ runLivy()
+ }
+
+ override def cleanUp(): Unit = {
+ stopLivy()
+ if (tempDirPath != null) {
+ exec(s"rm -rf $tempDirPath")
+ }
+ if (hdfsScratch != null) {
+ doAsClusterUser {
+ // Cannot use the shared `fs` since this runs in a shutdown hook, and that instance
+ // may have been closed already.
+ val fs = FileSystem.newInstance(hadoopConf)
+ try {
+ fs.delete(hdfsScratch, true)
+ } finally {
+ fs.close()
+ }
+ }
+ }
+ }
+
+ override def runLivy(): Unit = synchronized {
+ assert(!livyIsRunning, "Livy is running already.")
+
+ val livyConf = Map(
+ "livy.server.port" -> config.livyPort.toString,
+ // "livy.server.recovery.mode=local",
+ "livy.environment" -> "development",
+ LivyConf.LIVY_SPARK_MASTER.key -> "yarn",
+ LivyConf.LIVY_SPARK_DEPLOY_MODE.key -> "cluster")
+ val livyConfFile = File.createTempFile("livy.", ".properties")
+ saveProperties(livyConf, livyConfFile)
+ upload(livyConfFile.getAbsolutePath(), s"$livyHomePath/conf/livy.conf")
+
+ val env = Map(
+ "JAVA_HOME" -> config.javaHome,
+ "HADOOP_CONF_DIR" -> config.hadoopConf,
+ "SPARK_CONF_DIR" -> sparkConfDir,
+ "SPARK_HOME" -> config.sparkHome,
+ "CLASSPATH" -> config.livyClasspath,
+ "LIVY_PID_DIR" -> s"$tempDirPath/pid",
+ "LIVY_LOG_DIR" -> s"$tempDirPath/logs",
+ "LIVY_MAX_LOG_FILES" -> "16",
+ "LIVY_IDENT_STRING" -> "it"
+ )
+ val livyEnvFile = File.createTempFile("livy-env.", ".sh")
+ saveProperties(env, livyEnvFile)
+ upload(livyEnvFile.getAbsolutePath(), s"$livyHomePath/conf/livy-env.sh")
+
+ info(s"Starting Livy @ port ${config.livyPort}...")
+ exec(s"env -i $livyHomePath/bin/livy-server start")
+ livyIsRunning = true
+
+ val httpClient = new AsyncHttpClient()
+ eventually(timeout(1 minute), interval(1 second)) {
+ assert(httpClient.prepareGet(livyEndpoint).execute().get().getStatusCode == SC_OK)
+ }
+ info(s"Started Livy.")
+ }
+
+ override def stopLivy(): Unit = synchronized {
+ info("Stopping Livy Server")
+ try {
+ exec(s"$livyHomePath/bin/livy-server stop")
+ } catch {
+ case e: Exception =>
+ if (livyIsRunning) {
+ throw e
+ }
+ }
+
+ if (livyIsRunning) {
+ // Wait a tiny bit so that the process finishes closing its output files.
+ Thread.sleep(2)
+
+ livyEpoch += 1
+ val logName = "livy-it-server.out"
+ val localName = s"livy-it-server-$livyEpoch.out"
+ val logPath = s"$tempDirPath/logs/$logName"
+ val localLog = sys.props("java.io.tmpdir") + File.separator + localName
+ download(logPath, localLog)
+ info(s"Log for epoch $livyEpoch available at $localLog")
+ }
+ }
+
+ override def livyEndpoint: String = s"http://${config.ip}:${config.livyPort}"
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/test/scala/com/cloudera/livy/test/BatchIT.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/test/scala/com/cloudera/livy/test/BatchIT.scala b/integration-test/src/test/scala/com/cloudera/livy/test/BatchIT.scala
deleted file mode 100644
index 179239a..0000000
--- a/integration-test/src/test/scala/com/cloudera/livy/test/BatchIT.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test
-
-import java.io.File
-import java.util.UUID
-
-import scala.language.postfixOps
-
-import org.apache.commons.io.IOUtils
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.OptionValues._
-
-import com.cloudera.livy.sessions.SessionState
-import com.cloudera.livy.test.apps._
-import com.cloudera.livy.test.framework.{BaseIntegrationTestSuite, LivyRestClient}
-
-class BatchIT extends BaseIntegrationTestSuite with BeforeAndAfterAll {
- private var testLibPath: String = _
-
- protected override def beforeAll() = {
- super.beforeAll()
- testLibPath = uploadToHdfs(new File(testLib))
- }
-
- test("submit spark app") {
- val output = newOutputPath()
- withTestLib(classOf[SimpleSparkApp], List(output)) { s =>
- s.verifySessionSuccess()
-
- // Make sure the test lib has created the test output.
- cluster.fs.isDirectory(new Path(output)) shouldBe true
-
- // Make sure appInfo is reported correctly.
- val state = s.snapshot()
- state.appInfo.sparkUiUrl.value should startWith ("http")
- }
- }
-
- test("submit an app that fails") {
- val output = newOutputPath()
- withTestLib(classOf[FailingApp], List(output)) { s =>
- // At this point the application has exited. State should be 'dead' instead of 'error'.
- s.verifySessionDead()
-
- // The file is written to make sure the app actually ran, instead of just failing for
- // some other reason.
- cluster.fs.isFile(new Path(output)) shouldBe true
- }
- }
-
- pytest("submit a pyspark application") {
- val scriptPath = uploadResource("batch.py")
- val output = newOutputPath()
- withScript(scriptPath, List(output)) { s =>
- s.verifySessionSuccess()
- cluster.fs.isDirectory(new Path(output)) shouldBe true
- }
- }
-
- // This is disabled since R scripts don't seem to work in yarn-cluster mode. There's a
- // TODO comment in Spark's ApplicationMaster.scala.
- ignore("submit a SparkR application") {
- val hdfsPath = uploadResource("rtest.R")
- withScript(hdfsPath, List.empty) { s =>
- s.verifySessionSuccess()
- }
- }
-
- test("deleting a session should kill YARN app") {
- val output = newOutputPath()
- withTestLib(classOf[SimpleSparkApp], List(output, "false")) { s =>
- s.verifySessionState(SessionState.Running())
- s.snapshot().appInfo.driverLogUrl.value should include ("containerlogs")
-
- val appId = s.appId()
-
- // Delete the session then verify the YARN app state is KILLED.
- s.stop()
- cluster.yarnClient.getApplicationReport(appId).getFinalApplicationStatus shouldBe
- FinalApplicationStatus.KILLED
- }
- }
-
- test("killing YARN app should change batch state to dead") {
- val output = newOutputPath()
- withTestLib(classOf[SimpleSparkApp], List(output, "false")) { s =>
- s.verifySessionState(SessionState.Running())
- val appId = s.appId()
-
- // Kill the YARN app and check batch state should be KILLED.
- cluster.yarnClient.killApplication(appId)
- s.verifySessionDead()
-
- cluster.yarnClient.getApplicationReport(appId).getFinalApplicationStatus shouldBe
- FinalApplicationStatus.KILLED
-
- s.snapshot().log should contain ("Application killed by user.")
- }
- }
-
- test("recover batch sessions") {
- val output1 = newOutputPath()
- val output2 = newOutputPath()
- withTestLib(classOf[SimpleSparkApp], List(output1)) { s1 =>
- s1.stop()
- withTestLib(classOf[SimpleSparkApp], List(output2, "false")) { s2 =>
- s2.verifySessionRunning()
-
- restartLivy()
-
- // Verify previous active session still appears after restart.
- s2.verifySessionRunning()
- // Verify deleted session doesn't show up.
- s1.verifySessionDoesNotExist()
-
- s2.stop()
-
- // Verify new session doesn't reuse old session id.
- withTestLib(classOf[SimpleSparkApp], List(output2)) { s3 =>
- s3.id should be > s2.id
- }
- }
- }
- }
-
- private def newOutputPath(): String = {
- cluster.hdfsScratchDir().toString() + "/" + UUID.randomUUID().toString()
- }
-
- private def uploadResource(name: String): String = {
- val hdfsPath = new Path(cluster.hdfsScratchDir(), UUID.randomUUID().toString + "-" + name)
- val in = getClass.getResourceAsStream("/" + name)
- val out = cluster.fs.create(hdfsPath)
- try {
- IOUtils.copy(in, out)
- } finally {
- in.close()
- out.close()
- }
- hdfsPath.toUri().getPath()
- }
-
- private def withScript[R]
- (scriptPath: String, args: List[String], sparkConf: Map[String, String] = Map.empty)
- (f: (LivyRestClient#BatchSession) => R): R = {
- val s = livyClient.startBatch(scriptPath, None, args, sparkConf)
- withSession(s)(f)
- }
-
- private def withTestLib[R]
- (testClass: Class[_], args: List[String], sparkConf: Map[String, String] = Map.empty)
- (f: (LivyRestClient#BatchSession) => R): R = {
- val s = livyClient.startBatch(testLibPath, Some(testClass.getName()), args, sparkConf)
- withSession(s)(f)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/test/scala/com/cloudera/livy/test/InteractiveIT.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/test/scala/com/cloudera/livy/test/InteractiveIT.scala b/integration-test/src/test/scala/com/cloudera/livy/test/InteractiveIT.scala
deleted file mode 100644
index 88a995c..0000000
--- a/integration-test/src/test/scala/com/cloudera/livy/test/InteractiveIT.scala
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test
-
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.regex.Pattern
-
-import org.apache.hadoop.yarn.api.records.YarnApplicationState
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.OptionValues._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import com.cloudera.livy.rsc.RSCConf
-import com.cloudera.livy.sessions._
-import com.cloudera.livy.test.framework.{BaseIntegrationTestSuite, LivyRestClient}
-
-class InteractiveIT extends BaseIntegrationTestSuite {
- test("basic interactive session") {
- withNewSession(Spark()) { s =>
- s.run("val sparkVersion = sc.version").result().left.foreach(info(_))
- s.run("1+1").verifyResult("res0: Int = 2")
- s.run("""sc.getConf.get("spark.executor.instances")""").verifyResult("res1: String = 1")
- s.run("val sql = new org.apache.spark.sql.SQLContext(sc)").verifyResult(
- ".*" + Pattern.quote(
- "sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext") + ".*")
- s.run("abcde").verifyError(evalue = ".*?:[0-9]+: error: not found: value abcde.*")
- s.run("throw new IllegalStateException()")
- .verifyError(evalue = ".*java\\.lang\\.IllegalStateException.*")
-
- // Verify Livy internal configurations are not exposed.
- // TODO separate all these checks to different sub tests after merging new IT code.
- s.run("""sc.getConf.getAll.exists(_._1.startsWith("spark.__livy__."))""")
- .verifyResult(".*false")
- s.run("""sys.props.exists(_._1.startsWith("spark.__livy__."))""").verifyResult(".*false")
-
- // Make sure appInfo is reported correctly.
- val state = s.snapshot()
- state.appInfo.driverLogUrl.value should include ("containerlogs")
- state.appInfo.sparkUiUrl.value should startWith ("http")
-
- // Stop session and verify the YARN app state is finished.
- // This is important because if YARN app state is killed, Spark history is not archived.
- val appId = s.appId()
- s.stop()
- val appReport = cluster.yarnClient.getApplicationReport(appId)
- appReport.getYarnApplicationState() shouldEqual YarnApplicationState.FINISHED
- }
- }
-
- pytest("pyspark interactive session") {
- withNewSession(PySpark()) { s =>
- s.run("1+1").verifyResult("2")
- s.run("sqlContext").verifyResult(startsWith("<pyspark.sql.context.HiveContext"))
- s.run("sc.parallelize(range(100)).map(lambda x: x * 2).reduce(lambda x, y: x + y)")
- .verifyResult("9900")
- s.run("from pyspark.sql.types import Row").verifyResult("")
- s.run("x = [Row(age=1, name=u'a'), Row(age=2, name=u'b'), Row(age=3, name=u'c')]")
- .verifyResult("")
- s.run("%table x").verifyResult(".*headers.*type.*name.*data.*")
- s.run("abcde").verifyError(ename = "NameError", evalue = "name 'abcde' is not defined")
- s.run("raise KeyError, 'foo'").verifyError(ename = "KeyError", evalue = "'foo'")
- }
- }
-
- rtest("R interactive session") {
- withNewSession(SparkR()) { s =>
- // R's output sometimes includes the count of statements, which makes it annoying to test
- // things. This helps a bit.
- val curr = new AtomicInteger()
- def count: Int = curr.incrementAndGet()
-
- s.run("1+1").verifyResult(startsWith(s"[$count] 2"))
- s.run("sqlContext <- sparkRSQL.init(sc)").verifyResult(null)
- s.run("hiveContext <- sparkRHive.init(sc)").verifyResult(null)
- s.run("""localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))""")
- .verifyResult(null)
- s.run("df <- createDataFrame(sqlContext, localDF)").verifyResult(null)
- s.run("printSchema(df)").verifyResult(literal(
- """|root
- | |-- name: string (nullable = true)
- | |-- age: double (nullable = true)""".stripMargin))
- }
- }
-
- test("application kills session") {
- withNewSession(Spark()) { s =>
- s.runFatalStatement("System.exit(0)")
- }
- }
-
- test("should kill RSCDriver if it doesn't respond to end session") {
- val testConfName = s"${RSCConf.LIVY_SPARK_PREFIX}${RSCConf.Entry.TEST_STUCK_END_SESSION.key()}"
- withNewSession(Spark(), Map(testConfName -> "true")) { s =>
- val appId = s.appId()
- s.stop()
- val appReport = cluster.yarnClient.getApplicationReport(appId)
- appReport.getYarnApplicationState() shouldBe YarnApplicationState.KILLED
- }
- }
-
- test("should kill RSCDriver if it didn't register itself in time") {
- val testConfName =
- s"${RSCConf.LIVY_SPARK_PREFIX}${RSCConf.Entry.TEST_STUCK_START_DRIVER.key()}"
- withNewSession(Spark(), Map(testConfName -> "true"), false) { s =>
- eventually(timeout(2 minutes), interval(5 seconds)) {
- val appId = s.appId()
- appId should not be null
- val appReport = cluster.yarnClient.getApplicationReport(appId)
- appReport.getYarnApplicationState() shouldBe YarnApplicationState.KILLED
- }
- }
- }
-
- test("user jars are properly imported in Scala interactive sessions") {
- // Include a popular Java library to test importing user jars.
- val sparkConf = Map("spark.jars.packages" -> "org.codehaus.plexus:plexus-utils:3.0.24")
- withNewSession(Spark(), sparkConf) { s =>
- // Check is the library loaded in JVM in the proper class loader.
- s.run("Thread.currentThread.getContextClassLoader.loadClass" +
- """("org.codehaus.plexus.util.FileUtils")""")
- .verifyResult(".*Class\\[_\\] = class org.codehaus.plexus.util.FileUtils")
-
- // Check does Scala interpreter see the library.
- s.run("import org.codehaus.plexus.util._").verifyResult("import org.codehaus.plexus.util._")
-
- // Check does SparkContext see classes defined by Scala interpreter.
- s.run("case class Item(i: Int)").verifyResult("defined class Item")
- s.run("val rdd = sc.parallelize(Array.fill(10){new Item(scala.util.Random.nextInt(1000))})")
- .verifyResult("rdd.*")
- s.run("rdd.count()").verifyResult(".*= 10")
- }
- }
-
- test("heartbeat should kill expired session") {
- // Set it to 2s because verifySessionIdle() is calling GET every second.
- val heartbeatTimeout = Duration.create("2s")
- withNewSession(Spark(), Map.empty, true, heartbeatTimeout.toSeconds.toInt) { s =>
- // If the test reaches here, that means verifySessionIdle() is successfully keeping the
- // session alive. Now verify heartbeat is killing expired session.
- Thread.sleep(heartbeatTimeout.toMillis * 2)
- s.verifySessionDoesNotExist()
- }
- }
-
- test("recover interactive session") {
- withNewSession(Spark()) { s =>
- val stmt1 = s.run("1")
- stmt1.verifyResult("res0: Int = 1")
-
- restartLivy()
-
- // Verify session still exists.
- s.verifySessionIdle()
- s.run("2").verifyResult("res1: Int = 2")
- // Verify statement result is preserved.
- stmt1.verifyResult("res0: Int = 1")
-
- s.stop()
-
- restartLivy()
-
- // Verify deleted session doesn't show up after recovery.
- s.verifySessionDoesNotExist()
-
- // Verify new session doesn't reuse old session id.
- withNewSession(Spark(), Map.empty, false) { s1 =>
- s1.id should be > s.id
- }
- }
- }
-
- private def withNewSession[R] (
- kind: Kind,
- sparkConf: Map[String, String] = Map.empty,
- waitForIdle: Boolean = true,
- heartbeatTimeoutInSecond: Int = 0)
- (f: (LivyRestClient#InteractiveSession) => R): R = {
- withSession(livyClient.startSession(kind, sparkConf, heartbeatTimeoutInSecond)) { s =>
- if (waitForIdle) {
- s.verifySessionIdle()
- }
- f(s)
- }
- }
-
- private def startsWith(result: String): String = Pattern.quote(result) + ".*"
-
- private def literal(result: String): String = Pattern.quote(result)
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/test/scala/com/cloudera/livy/test/JobApiIT.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/test/scala/com/cloudera/livy/test/JobApiIT.scala b/integration-test/src/test/scala/com/cloudera/livy/test/JobApiIT.scala
deleted file mode 100644
index 87dcfe1..0000000
--- a/integration-test/src/test/scala/com/cloudera/livy/test/JobApiIT.scala
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.test
-
-import java.io.{File, InputStream}
-import java.net.URI
-import java.nio.charset.StandardCharsets.UTF_8
-import java.nio.file.{Files, StandardCopyOption}
-import java.util.concurrent.{Future => JFuture, TimeUnit}
-import javax.servlet.http.HttpServletResponse
-
-import scala.collection.JavaConverters._
-import scala.util.{Properties, Try}
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.scalatest.BeforeAndAfterAll
-
-import com.cloudera.livy._
-import com.cloudera.livy.client.common.HttpMessages._
-import com.cloudera.livy.sessions.SessionKindModule
-import com.cloudera.livy.test.framework.BaseIntegrationTestSuite
-import com.cloudera.livy.test.jobs._
-import com.cloudera.livy.utils.LivySparkUtils
-
-// Proper type representing the return value of "GET /sessions". At some point we should make
-// SessionServlet use something like this.
-class SessionList {
- val from: Int = -1
- val total: Int = -1
- val sessions: List[SessionInfo] = Nil
-}
-
-class JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll with Logging {
-
- private var client: LivyClient = _
- private var sessionId: Int = _
- private var client2: LivyClient = _
- private val mapper = new ObjectMapper()
- .registerModule(DefaultScalaModule)
- .registerModule(new SessionKindModule())
-
- override def afterAll(): Unit = {
- super.afterAll()
- Seq(client, client2).foreach { c =>
- if (c != null) {
- c.stop(true)
- }
- }
-
- livyClient.connectSession(sessionId).stop()
- }
-
- test("create a new session and upload test jar") {
- val tempClient = createClient(livyEndpoint)
-
- try {
- // Figure out the session ID by poking at the REST endpoint. We should probably expose this
- // in the Java API.
- val list = sessionList()
- assert(list.total === 1)
- val tempSessionId = list.sessions(0).id
-
- livyClient.connectSession(tempSessionId).verifySessionIdle()
- waitFor(tempClient.uploadJar(new File(testLib)))
-
- client = tempClient
- sessionId = tempSessionId
- } finally {
- if (client == null) {
- try {
- tempClient.stop(true)
- } catch {
- case e: Exception => warn("Error stopping client.", e)
- }
- }
- }
- }
-
- test("upload file") {
- assume(client != null, "Client not active.")
-
- val file = Files.createTempFile("filetest", ".txt")
- Files.write(file, "hello".getBytes(UTF_8))
-
- waitFor(client.uploadFile(file.toFile()))
-
- val result = waitFor(client.submit(new FileReader(file.toFile().getName(), false)))
- assert(result === "hello")
- }
-
- test("add file from HDFS") {
- assume(client != null, "Client not active.")
- val file = Files.createTempFile("filetest2", ".txt")
- Files.write(file, "hello".getBytes(UTF_8))
-
- val uri = new URI(uploadToHdfs(file.toFile()))
- waitFor(client.addFile(uri))
-
- val task = new FileReader(new File(uri.getPath()).getName(), false)
- val result = waitFor(client.submit(task))
- assert(result === "hello")
- }
-
- test("run simple jobs") {
- assume(client != null, "Client not active.")
-
- val result = waitFor(client.submit(new Echo("hello")))
- assert(result === "hello")
-
- val result2 = waitFor(client.run(new Echo("hello")))
- assert(result2 === "hello")
- }
-
- test("run spark job") {
- assume(client != null, "Client not active.")
- val result = waitFor(client.submit(new SmallCount(100)))
- assert(result === 100)
- }
-
- test("run spark sql job") {
- assume(client != null, "Client not active.")
- val result = waitFor(client.submit(new SQLGetTweets(false)))
- assert(result.size() > 0)
- }
-
- test("stop a client without destroying the session") {
- assume(client != null, "Client not active.")
- client.stop(false)
- client = null
- }
-
- test("connect to an existing session") {
- livyClient.connectSession(sessionId).verifySessionIdle()
- val sessionUri = s"$livyEndpoint/sessions/$sessionId"
- client2 = createClient(sessionUri)
- }
-
- test("submit job using new client") {
- assume(client2 != null, "Client not active.")
- val result = waitFor(client2.submit(new Echo("hello")))
- assert(result === "hello")
- }
-
- scalaTest("run scala jobs") {
- assume(client2 != null, "Client not active.")
-
- val jobs = Seq(
- new ScalaEcho("abcde"),
- new ScalaEcho(Seq(1, 2, 3, 4)),
- new ScalaEcho(Map(1 -> 2, 3 -> 4)),
- new ScalaEcho(ValueHolder("abcde")),
- new ScalaEcho(ValueHolder(Seq(1, 2, 3, 4))),
- new ScalaEcho(Some("abcde"))
- )
-
- jobs.foreach { job =>
- val result = waitFor(client2.submit(job))
- assert(result === job.value)
- }
- }
-
- protected def scalaTest(desc: String)(testFn: => Unit): Unit = {
- test(desc) {
- assume(sys.env("LIVY_SPARK_SCALA_VERSION") ==
- LivySparkUtils.formatScalaVersion(Properties.versionNumberString),
- s"Scala test can only be run with ${Properties.versionString}")
- testFn
- }
- }
-
- test("ensure failing jobs do not affect session state") {
- assume(client2 != null, "Client not active.")
-
- try {
- waitFor(client2.submit(new Failure()))
- fail("Job should have failued.")
- } catch {
- case e: Exception =>
- assert(e.getMessage().contains(classOf[Failure.JobFailureException].getName()))
- }
-
- val result = waitFor(client2.submit(new Echo("foo")))
- assert(result === "foo")
- }
-
- test("return null should not throw NPE") {
- assume(client2 != null, "Client not active")
-
- val result = waitFor(client2.submit(new VoidJob()))
- assert(result === null)
- }
-
- test("destroy the session") {
- assume(client2 != null, "Client not active.")
- client2.stop(true)
-
- val list = sessionList()
- assert(list.total === 0)
-
- val sessionUri = s"$livyEndpoint/sessions/$sessionId"
- Try(createClient(sessionUri)).foreach { client =>
- client.stop(true)
- fail("Should not have been able to connect to destroyed session.")
- }
-
- sessionId = -1
- }
-
- pytest("validate Python-API requests") {
- val addFileContent = "hello from addfile"
- val addFilePath = createTempFilesForTest("add_file", ".txt", addFileContent, true)
- val addPyFileContent = "def test_add_pyfile(): return \"hello from addpyfile\""
- val addPyFilePath = createTempFilesForTest("add_pyfile", ".py", addPyFileContent, true)
- val uploadFileContent = "hello from uploadfile"
- val uploadFilePath = createTempFilesForTest("upload_pyfile", ".py", uploadFileContent, false)
- val uploadPyFileContent = "def test_upload_pyfile(): return \"hello from uploadpyfile\""
- val uploadPyFilePath = createTempFilesForTest("upload_pyfile", ".py",
- uploadPyFileContent, false)
-
- val builder = new ProcessBuilder(Seq("python", createPyTestsForPythonAPI().toString).asJava)
-
- val env = builder.environment()
- env.put("LIVY_END_POINT", livyEndpoint)
- env.put("ADD_FILE_URL", addFilePath)
- env.put("ADD_PYFILE_URL", addPyFilePath)
- env.put("UPLOAD_FILE_URL", uploadFilePath)
- env.put("UPLOAD_PYFILE_URL", uploadPyFilePath)
-
- builder.redirectOutput(new File(sys.props("java.io.tmpdir") + "/pytest_results.txt"))
- builder.redirectErrorStream(true)
-
- val process = builder.start()
-
- process.waitFor()
-
- assert(process.exitValue() === 0)
- }
-
- private def createPyTestsForPythonAPI(): File = {
- var source: InputStream = null
- try {
- source = getClass.getClassLoader.getResourceAsStream("test_python_api.py")
- val file = Files.createTempFile("", "").toFile
- Files.copy(source, file.toPath, StandardCopyOption.REPLACE_EXISTING)
- file
- } finally {
- source.close()
- }
- }
-
- private def createTempFilesForTest(
- fileName: String,
- fileExtension: String,
- fileContent: String,
- uploadFileToHdfs: Boolean): String = {
- val path = Files.createTempFile(fileName, fileExtension)
- Files.write(path, fileContent.getBytes(UTF_8))
- if (uploadFileToHdfs) {
- uploadToHdfs(path.toFile())
- } else {
- path.toString
- }
- }
-
- private def waitFor[T](future: JFuture[T]): T = {
- future.get(30, TimeUnit.SECONDS)
- }
-
- private def sessionList(): SessionList = {
- val response = httpClient.prepareGet(s"$livyEndpoint/sessions/").execute().get()
- assert(response.getStatusCode === HttpServletResponse.SC_OK)
- mapper.readValue(response.getResponseBodyAsStream, classOf[SessionList])
- }
-
- private def createClient(uri: String): LivyClient = {
- new LivyClientBuilder().setURI(new URI(uri)).build()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala b/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala
new file mode 100644
index 0000000..0b3a061
--- /dev/null
+++ b/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test
+
+import java.io.File
+import java.util.UUID
+
+import scala.language.postfixOps
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.OptionValues._
+
+import org.apache.livy.sessions.SessionState
+import org.apache.livy.test.apps._
+import org.apache.livy.test.framework.{BaseIntegrationTestSuite, LivyRestClient}
+
+class BatchIT extends BaseIntegrationTestSuite with BeforeAndAfterAll {
+ private var testLibPath: String = _
+
+ protected override def beforeAll() = {
+ super.beforeAll()
+ testLibPath = uploadToHdfs(new File(testLib))
+ }
+
+ test("submit spark app") {
+ val output = newOutputPath()
+ withTestLib(classOf[SimpleSparkApp], List(output)) { s =>
+ s.verifySessionSuccess()
+
+ // Make sure the test lib has created the test output.
+ cluster.fs.isDirectory(new Path(output)) shouldBe true
+
+ // Make sure appInfo is reported correctly.
+ val state = s.snapshot()
+ state.appInfo.sparkUiUrl.value should startWith ("http")
+ }
+ }
+
+ test("submit an app that fails") {
+ val output = newOutputPath()
+ withTestLib(classOf[FailingApp], List(output)) { s =>
+ // At this point the application has exited. State should be 'dead' instead of 'error'.
+ s.verifySessionDead()
+
+ // The file is written to make sure the app actually ran, instead of just failing for
+ // some other reason.
+ cluster.fs.isFile(new Path(output)) shouldBe true
+ }
+ }
+
+ pytest("submit a pyspark application") {
+ val scriptPath = uploadResource("batch.py")
+ val output = newOutputPath()
+ withScript(scriptPath, List(output)) { s =>
+ s.verifySessionSuccess()
+ cluster.fs.isDirectory(new Path(output)) shouldBe true
+ }
+ }
+
+ // This is disabled since R scripts don't seem to work in yarn-cluster mode. There's a
+ // TODO comment in Spark's ApplicationMaster.scala.
+ ignore("submit a SparkR application") {
+ val hdfsPath = uploadResource("rtest.R")
+ withScript(hdfsPath, List.empty) { s =>
+ s.verifySessionSuccess()
+ }
+ }
+
+ test("deleting a session should kill YARN app") {
+ val output = newOutputPath()
+ withTestLib(classOf[SimpleSparkApp], List(output, "false")) { s =>
+ s.verifySessionState(SessionState.Running())
+ s.snapshot().appInfo.driverLogUrl.value should include ("containerlogs")
+
+ val appId = s.appId()
+
+ // Delete the session then verify the YARN app state is KILLED.
+ s.stop()
+ cluster.yarnClient.getApplicationReport(appId).getFinalApplicationStatus shouldBe
+ FinalApplicationStatus.KILLED
+ }
+ }
+
+ test("killing YARN app should change batch state to dead") {
+ val output = newOutputPath()
+ withTestLib(classOf[SimpleSparkApp], List(output, "false")) { s =>
+ s.verifySessionState(SessionState.Running())
+ val appId = s.appId()
+
+ // Kill the YARN app and check batch state should be KILLED.
+ cluster.yarnClient.killApplication(appId)
+ s.verifySessionDead()
+
+ cluster.yarnClient.getApplicationReport(appId).getFinalApplicationStatus shouldBe
+ FinalApplicationStatus.KILLED
+
+ s.snapshot().log should contain ("Application killed by user.")
+ }
+ }
+
+ test("recover batch sessions") {
+ val output1 = newOutputPath()
+ val output2 = newOutputPath()
+ withTestLib(classOf[SimpleSparkApp], List(output1)) { s1 =>
+ s1.stop()
+ withTestLib(classOf[SimpleSparkApp], List(output2, "false")) { s2 =>
+ s2.verifySessionRunning()
+
+ restartLivy()
+
+ // Verify previous active session still appears after restart.
+ s2.verifySessionRunning()
+ // Verify deleted session doesn't show up.
+ s1.verifySessionDoesNotExist()
+
+ s2.stop()
+
+ // Verify new session doesn't reuse old session id.
+ withTestLib(classOf[SimpleSparkApp], List(output2)) { s3 =>
+ s3.id should be > s2.id
+ }
+ }
+ }
+ }
+
+ private def newOutputPath(): String = {
+ cluster.hdfsScratchDir().toString() + "/" + UUID.randomUUID().toString()
+ }
+
+ private def uploadResource(name: String): String = {
+ val hdfsPath = new Path(cluster.hdfsScratchDir(), UUID.randomUUID().toString + "-" + name)
+ val in = getClass.getResourceAsStream("/" + name)
+ val out = cluster.fs.create(hdfsPath)
+ try {
+ IOUtils.copy(in, out)
+ } finally {
+ in.close()
+ out.close()
+ }
+ hdfsPath.toUri().getPath()
+ }
+
+ private def withScript[R]
+ (scriptPath: String, args: List[String], sparkConf: Map[String, String] = Map.empty)
+ (f: (LivyRestClient#BatchSession) => R): R = {
+ val s = livyClient.startBatch(scriptPath, None, args, sparkConf)
+ withSession(s)(f)
+ }
+
+ private def withTestLib[R]
+ (testClass: Class[_], args: List[String], sparkConf: Map[String, String] = Map.empty)
+ (f: (LivyRestClient#BatchSession) => R): R = {
+ val s = livyClient.startBatch(testLibPath, Some(testClass.getName()), args, sparkConf)
+ withSession(s)(f)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
new file mode 100644
index 0000000..728fd1c
--- /dev/null
+++ b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test
+
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.regex.Pattern
+
+import org.apache.hadoop.yarn.api.records.YarnApplicationState
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.OptionValues._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.apache.livy.rsc.RSCConf
+import org.apache.livy.sessions._
+import org.apache.livy.test.framework.{BaseIntegrationTestSuite, LivyRestClient}
+
+class InteractiveIT extends BaseIntegrationTestSuite {
+ test("basic interactive session") {
+ withNewSession(Spark()) { s =>
+ s.run("val sparkVersion = sc.version").result().left.foreach(info(_))
+ s.run("1+1").verifyResult("res0: Int = 2")
+ s.run("""sc.getConf.get("spark.executor.instances")""").verifyResult("res1: String = 1")
+ s.run("val sql = new org.apache.spark.sql.SQLContext(sc)").verifyResult(
+ ".*" + Pattern.quote(
+ "sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext") + ".*")
+ s.run("abcde").verifyError(evalue = ".*?:[0-9]+: error: not found: value abcde.*")
+ s.run("throw new IllegalStateException()")
+ .verifyError(evalue = ".*java\\.lang\\.IllegalStateException.*")
+
+ // Verify Livy internal configurations are not exposed.
+ // TODO separate all these checks to different sub tests after merging new IT code.
+ s.run("""sc.getConf.getAll.exists(_._1.startsWith("spark.__livy__."))""")
+ .verifyResult(".*false")
+ s.run("""sys.props.exists(_._1.startsWith("spark.__livy__."))""").verifyResult(".*false")
+
+ // Make sure appInfo is reported correctly.
+ val state = s.snapshot()
+ state.appInfo.driverLogUrl.value should include ("containerlogs")
+ state.appInfo.sparkUiUrl.value should startWith ("http")
+
+ // Stop session and verify the YARN app state is finished.
+ // This is important because if YARN app state is killed, Spark history is not archived.
+ val appId = s.appId()
+ s.stop()
+ val appReport = cluster.yarnClient.getApplicationReport(appId)
+ appReport.getYarnApplicationState() shouldEqual YarnApplicationState.FINISHED
+ }
+ }
+
+ pytest("pyspark interactive session") {
+ withNewSession(PySpark()) { s =>
+ s.run("1+1").verifyResult("2")
+ s.run("sqlContext").verifyResult(startsWith("<pyspark.sql.context.HiveContext"))
+ s.run("sc.parallelize(range(100)).map(lambda x: x * 2).reduce(lambda x, y: x + y)")
+ .verifyResult("9900")
+ s.run("from pyspark.sql.types import Row").verifyResult("")
+ s.run("x = [Row(age=1, name=u'a'), Row(age=2, name=u'b'), Row(age=3, name=u'c')]")
+ .verifyResult("")
+ s.run("%table x").verifyResult(".*headers.*type.*name.*data.*")
+ s.run("abcde").verifyError(ename = "NameError", evalue = "name 'abcde' is not defined")
+ s.run("raise KeyError, 'foo'").verifyError(ename = "KeyError", evalue = "'foo'")
+ }
+ }
+
+ rtest("R interactive session") {
+ withNewSession(SparkR()) { s =>
+ // R's output sometimes includes the count of statements, which makes it annoying to test
+ // things. This helps a bit.
+ val curr = new AtomicInteger()
+ def count: Int = curr.incrementAndGet()
+
+ s.run("1+1").verifyResult(startsWith(s"[$count] 2"))
+ s.run("sqlContext <- sparkRSQL.init(sc)").verifyResult(null)
+ s.run("hiveContext <- sparkRHive.init(sc)").verifyResult(null)
+ s.run("""localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))""")
+ .verifyResult(null)
+ s.run("df <- createDataFrame(sqlContext, localDF)").verifyResult(null)
+ s.run("printSchema(df)").verifyResult(literal(
+ """|root
+ | |-- name: string (nullable = true)
+ | |-- age: double (nullable = true)""".stripMargin))
+ }
+ }
+
+ test("application kills session") {
+ withNewSession(Spark()) { s =>
+ s.runFatalStatement("System.exit(0)")
+ }
+ }
+
+ test("should kill RSCDriver if it doesn't respond to end session") {
+ val testConfName = s"${RSCConf.LIVY_SPARK_PREFIX}${RSCConf.Entry.TEST_STUCK_END_SESSION.key()}"
+ withNewSession(Spark(), Map(testConfName -> "true")) { s =>
+ val appId = s.appId()
+ s.stop()
+ val appReport = cluster.yarnClient.getApplicationReport(appId)
+ appReport.getYarnApplicationState() shouldBe YarnApplicationState.KILLED
+ }
+ }
+
+ test("should kill RSCDriver if it didn't register itself in time") {
+ val testConfName =
+ s"${RSCConf.LIVY_SPARK_PREFIX}${RSCConf.Entry.TEST_STUCK_START_DRIVER.key()}"
+ withNewSession(Spark(), Map(testConfName -> "true"), false) { s =>
+ eventually(timeout(2 minutes), interval(5 seconds)) {
+ val appId = s.appId()
+ appId should not be null
+ val appReport = cluster.yarnClient.getApplicationReport(appId)
+ appReport.getYarnApplicationState() shouldBe YarnApplicationState.KILLED
+ }
+ }
+ }
+
+ test("user jars are properly imported in Scala interactive sessions") {
+ // Include a popular Java library to test importing user jars.
+ val sparkConf = Map("spark.jars.packages" -> "org.codehaus.plexus:plexus-utils:3.0.24")
+ withNewSession(Spark(), sparkConf) { s =>
+ // Check is the library loaded in JVM in the proper class loader.
+ s.run("Thread.currentThread.getContextClassLoader.loadClass" +
+ """("org.codehaus.plexus.util.FileUtils")""")
+ .verifyResult(".*Class\\[_\\] = class org.codehaus.plexus.util.FileUtils")
+
+ // Check does Scala interpreter see the library.
+ s.run("import org.codehaus.plexus.util._").verifyResult("import org.codehaus.plexus.util._")
+
+ // Check does SparkContext see classes defined by Scala interpreter.
+ s.run("case class Item(i: Int)").verifyResult("defined class Item")
+ s.run("val rdd = sc.parallelize(Array.fill(10){new Item(scala.util.Random.nextInt(1000))})")
+ .verifyResult("rdd.*")
+ s.run("rdd.count()").verifyResult(".*= 10")
+ }
+ }
+
+ test("heartbeat should kill expired session") {
+ // Set it to 2s because verifySessionIdle() is calling GET every second.
+ val heartbeatTimeout = Duration.create("2s")
+ withNewSession(Spark(), Map.empty, true, heartbeatTimeout.toSeconds.toInt) { s =>
+ // If the test reaches here, that means verifySessionIdle() is successfully keeping the
+ // session alive. Now verify heartbeat is killing expired session.
+ Thread.sleep(heartbeatTimeout.toMillis * 2)
+ s.verifySessionDoesNotExist()
+ }
+ }
+
+ test("recover interactive session") {
+ withNewSession(Spark()) { s =>
+ val stmt1 = s.run("1")
+ stmt1.verifyResult("res0: Int = 1")
+
+ restartLivy()
+
+ // Verify session still exists.
+ s.verifySessionIdle()
+ s.run("2").verifyResult("res1: Int = 2")
+ // Verify statement result is preserved.
+ stmt1.verifyResult("res0: Int = 1")
+
+ s.stop()
+
+ restartLivy()
+
+ // Verify deleted session doesn't show up after recovery.
+ s.verifySessionDoesNotExist()
+
+ // Verify new session doesn't reuse old session id.
+ withNewSession(Spark(), Map.empty, false) { s1 =>
+ s1.id should be > s.id
+ }
+ }
+ }
+
+ private def withNewSession[R] (
+ kind: Kind,
+ sparkConf: Map[String, String] = Map.empty,
+ waitForIdle: Boolean = true,
+ heartbeatTimeoutInSecond: Int = 0)
+ (f: (LivyRestClient#InteractiveSession) => R): R = {
+ withSession(livyClient.startSession(kind, sparkConf, heartbeatTimeoutInSecond)) { s =>
+ if (waitForIdle) {
+ s.verifySessionIdle()
+ }
+ f(s)
+ }
+ }
+
+ private def startsWith(result: String): String = Pattern.quote(result) + ".*"
+
+ private def literal(result: String): String = Pattern.quote(result)
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala b/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala
new file mode 100644
index 0000000..becaaac
--- /dev/null
+++ b/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.test
+
+import java.io.{File, InputStream}
+import java.net.URI
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, StandardCopyOption}
+import java.util.concurrent.{Future => JFuture, TimeUnit}
+import javax.servlet.http.HttpServletResponse
+
+import scala.collection.JavaConverters._
+import scala.util.{Properties, Try}
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.livy._
+import org.apache.livy.client.common.HttpMessages._
+import org.apache.livy.sessions.SessionKindModule
+import org.apache.livy.test.framework.BaseIntegrationTestSuite
+import org.apache.livy.test.jobs._
+import org.apache.livy.utils.LivySparkUtils
+
+// Proper type representing the return value of "GET /sessions". At some point we should make
+// SessionServlet use something like this.
+class SessionList {
+ val from: Int = -1
+ val total: Int = -1
+ val sessions: List[SessionInfo] = Nil
+}
+
+class JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll with Logging {
+
+ private var client: LivyClient = _
+ private var sessionId: Int = _
+ private var client2: LivyClient = _
+ private val mapper = new ObjectMapper()
+ .registerModule(DefaultScalaModule)
+ .registerModule(new SessionKindModule())
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ Seq(client, client2).foreach { c =>
+ if (c != null) {
+ c.stop(true)
+ }
+ }
+
+ livyClient.connectSession(sessionId).stop()
+ }
+
+ test("create a new session and upload test jar") {
+ val tempClient = createClient(livyEndpoint)
+
+ try {
+ // Figure out the session ID by poking at the REST endpoint. We should probably expose this
+ // in the Java API.
+ val list = sessionList()
+ assert(list.total === 1)
+ val tempSessionId = list.sessions(0).id
+
+ livyClient.connectSession(tempSessionId).verifySessionIdle()
+ waitFor(tempClient.uploadJar(new File(testLib)))
+
+ client = tempClient
+ sessionId = tempSessionId
+ } finally {
+ if (client == null) {
+ try {
+ tempClient.stop(true)
+ } catch {
+ case e: Exception => warn("Error stopping client.", e)
+ }
+ }
+ }
+ }
+
+ test("upload file") {
+ assume(client != null, "Client not active.")
+
+ val file = Files.createTempFile("filetest", ".txt")
+ Files.write(file, "hello".getBytes(UTF_8))
+
+ waitFor(client.uploadFile(file.toFile()))
+
+ val result = waitFor(client.submit(new FileReader(file.toFile().getName(), false)))
+ assert(result === "hello")
+ }
+
+ test("add file from HDFS") {
+ assume(client != null, "Client not active.")
+ val file = Files.createTempFile("filetest2", ".txt")
+ Files.write(file, "hello".getBytes(UTF_8))
+
+ val uri = new URI(uploadToHdfs(file.toFile()))
+ waitFor(client.addFile(uri))
+
+ val task = new FileReader(new File(uri.getPath()).getName(), false)
+ val result = waitFor(client.submit(task))
+ assert(result === "hello")
+ }
+
+ test("run simple jobs") {
+ assume(client != null, "Client not active.")
+
+ val result = waitFor(client.submit(new Echo("hello")))
+ assert(result === "hello")
+
+ val result2 = waitFor(client.run(new Echo("hello")))
+ assert(result2 === "hello")
+ }
+
+ test("run spark job") {
+ assume(client != null, "Client not active.")
+ val result = waitFor(client.submit(new SmallCount(100)))
+ assert(result === 100)
+ }
+
+ test("run spark sql job") {
+ assume(client != null, "Client not active.")
+ val result = waitFor(client.submit(new SQLGetTweets(false)))
+ assert(result.size() > 0)
+ }
+
+ test("stop a client without destroying the session") {
+ assume(client != null, "Client not active.")
+ client.stop(false)
+ client = null
+ }
+
+ test("connect to an existing session") {
+ livyClient.connectSession(sessionId).verifySessionIdle()
+ val sessionUri = s"$livyEndpoint/sessions/$sessionId"
+ client2 = createClient(sessionUri)
+ }
+
+ test("submit job using new client") {
+ assume(client2 != null, "Client not active.")
+ val result = waitFor(client2.submit(new Echo("hello")))
+ assert(result === "hello")
+ }
+
+ scalaTest("run scala jobs") {
+ assume(client2 != null, "Client not active.")
+
+ val jobs = Seq(
+ new ScalaEcho("abcde"),
+ new ScalaEcho(Seq(1, 2, 3, 4)),
+ new ScalaEcho(Map(1 -> 2, 3 -> 4)),
+ new ScalaEcho(ValueHolder("abcde")),
+ new ScalaEcho(ValueHolder(Seq(1, 2, 3, 4))),
+ new ScalaEcho(Some("abcde"))
+ )
+
+ jobs.foreach { job =>
+ val result = waitFor(client2.submit(job))
+ assert(result === job.value)
+ }
+ }
+
+ protected def scalaTest(desc: String)(testFn: => Unit): Unit = {
+ test(desc) {
+ assume(sys.env("LIVY_SPARK_SCALA_VERSION") ==
+ LivySparkUtils.formatScalaVersion(Properties.versionNumberString),
+ s"Scala test can only be run with ${Properties.versionString}")
+ testFn
+ }
+ }
+
+ test("ensure failing jobs do not affect session state") {
+ assume(client2 != null, "Client not active.")
+
+ try {
+ waitFor(client2.submit(new Failure()))
+ fail("Job should have failued.")
+ } catch {
+ case e: Exception =>
+ assert(e.getMessage().contains(classOf[Failure.JobFailureException].getName()))
+ }
+
+ val result = waitFor(client2.submit(new Echo("foo")))
+ assert(result === "foo")
+ }
+
+ test("return null should not throw NPE") {
+ assume(client2 != null, "Client not active")
+
+ val result = waitFor(client2.submit(new VoidJob()))
+ assert(result === null)
+ }
+
+ test("destroy the session") {
+ assume(client2 != null, "Client not active.")
+ client2.stop(true)
+
+ val list = sessionList()
+ assert(list.total === 0)
+
+ val sessionUri = s"$livyEndpoint/sessions/$sessionId"
+ Try(createClient(sessionUri)).foreach { client =>
+ client.stop(true)
+ fail("Should not have been able to connect to destroyed session.")
+ }
+
+ sessionId = -1
+ }
+
+ pytest("validate Python-API requests") {
+ val addFileContent = "hello from addfile"
+ val addFilePath = createTempFilesForTest("add_file", ".txt", addFileContent, true)
+ val addPyFileContent = "def test_add_pyfile(): return \"hello from addpyfile\""
+ val addPyFilePath = createTempFilesForTest("add_pyfile", ".py", addPyFileContent, true)
+ val uploadFileContent = "hello from uploadfile"
+ val uploadFilePath = createTempFilesForTest("upload_pyfile", ".py", uploadFileContent, false)
+ val uploadPyFileContent = "def test_upload_pyfile(): return \"hello from uploadpyfile\""
+ val uploadPyFilePath = createTempFilesForTest("upload_pyfile", ".py",
+ uploadPyFileContent, false)
+
+ val builder = new ProcessBuilder(Seq("python", createPyTestsForPythonAPI().toString).asJava)
+
+ val env = builder.environment()
+ env.put("LIVY_END_POINT", livyEndpoint)
+ env.put("ADD_FILE_URL", addFilePath)
+ env.put("ADD_PYFILE_URL", addPyFilePath)
+ env.put("UPLOAD_FILE_URL", uploadFilePath)
+ env.put("UPLOAD_PYFILE_URL", uploadPyFilePath)
+
+ builder.redirectOutput(new File(sys.props("java.io.tmpdir") + "/pytest_results.txt"))
+ builder.redirectErrorStream(true)
+
+ val process = builder.start()
+
+ process.waitFor()
+
+ assert(process.exitValue() === 0)
+ }
+
+ private def createPyTestsForPythonAPI(): File = {
+ var source: InputStream = null
+ try {
+ source = getClass.getClassLoader.getResourceAsStream("test_python_api.py")
+ val file = Files.createTempFile("", "").toFile
+ Files.copy(source, file.toPath, StandardCopyOption.REPLACE_EXISTING)
+ file
+ } finally {
+ source.close()
+ }
+ }
+
+ private def createTempFilesForTest(
+ fileName: String,
+ fileExtension: String,
+ fileContent: String,
+ uploadFileToHdfs: Boolean): String = {
+ val path = Files.createTempFile(fileName, fileExtension)
+ Files.write(path, fileContent.getBytes(UTF_8))
+ if (uploadFileToHdfs) {
+ uploadToHdfs(path.toFile())
+ } else {
+ path.toString
+ }
+ }
+
+ private def waitFor[T](future: JFuture[T]): T = {
+ future.get(30, TimeUnit.SECONDS)
+ }
+
+ private def sessionList(): SessionList = {
+ val response = httpClient.prepareGet(s"$livyEndpoint/sessions/").execute().get()
+ assert(response.getStatusCode === HttpServletResponse.SC_OK)
+ mapper.readValue(response.getResponseBodyAsStream, classOf[SessionList])
+ }
+
+ private def createClient(uri: String): LivyClient = {
+ new LivyClientBuilder().setURI(new URI(uri)).build()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala
----------------------------------------------------------------------
diff --git a/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala b/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala
index 0501685..aa30b88 100644
--- a/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala
+++ b/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package com.cloudera.livy.test
+package org.apache.livy.test
import java.io.File
import java.net.URI
@@ -26,11 +26,11 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.scalatest.BeforeAndAfterAll
-import com.cloudera.livy._
-import com.cloudera.livy.client.common.HttpMessages._
-import com.cloudera.livy.sessions.SessionKindModule
-import com.cloudera.livy.test.framework.BaseIntegrationTestSuite
-import com.cloudera.livy.test.jobs.spark2._
+import org.apache.livy._
+import org.apache.livy.client.common.HttpMessages._
+import org.apache.livy.sessions.SessionKindModule
+import org.apache.livy.test.framework.BaseIntegrationTestSuite
+import org.apache.livy.test.jobs.spark2._
class Spark2JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll with Logging {
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d139099..9c1263b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,9 +20,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-main</artifactId>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<name>livy-main</name>
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/python-api/pom.xml
----------------------------------------------------------------------
diff --git a/python-api/pom.xml b/python-api/pom.xml
index 3ea50c1..86379ce 100644
--- a/python-api/pom.xml
+++ b/python-api/pom.xml
@@ -21,15 +21,15 @@
<modelVersion>4.0.0</modelVersion>
<parent>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-main</artifactId>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-python-api</artifactId>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/python-api/setup.py
----------------------------------------------------------------------
diff --git a/python-api/setup.py b/python-api/setup.py
index 5f55ffd..a7e3aab 100644
--- a/python-api/setup.py
+++ b/python-api/setup.py
@@ -39,14 +39,14 @@ requirements = [
setup(
name='livy-python-api',
- version="0.4.0-SNAPSHOT",
+ version="0.4.0-incubating-SNAPSHOT",
packages=["livy", "livy-tests"],
package_dir={
"": "src/main/python",
"livy-tests": "src/test/python/livy-tests",
},
- url='https://github.com/cloudera/livy',
- author_email='livy-user@cloudera.org',
+ url='https://github.com/apache/incubator-livy',
+ author_email='user@livy.incubator.apache.org',
license='Apache License, Version 2.0',
description=DESCRIPTION,
platforms=['any'],
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/pom.xml
----------------------------------------------------------------------
diff --git a/repl/pom.xml b/repl/pom.xml
index 3af51bd..14931ba 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -20,33 +20,33 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>multi-scala-project-root</artifactId>
<relativePath>../scala/pom.xml</relativePath>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>livy-repl-parent</artifactId>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<dependencies>
<dependency>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-rsc</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<exclusions>
<!-- Provided and shaded by livy-rsc already. -->
<exclusion>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-client-common</artifactId>
</exclusion>
</exclusions>
@@ -189,7 +189,7 @@
<relocations>
<relocation>
<pattern>org.json4s</pattern>
- <shadedPattern>com.cloudera.livy.shaded.json4s</shadedPattern>
+ <shadedPattern>org.apache.livy.shaded.json4s</shadedPattern>
</relocation>
</relocations>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/scala-2.10/pom.xml
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/pom.xml b/repl/scala-2.10/pom.xml
index c81659b..4d84611 100644
--- a/repl/scala-2.10/pom.xml
+++ b/repl/scala-2.10/pom.xml
@@ -19,15 +19,15 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-repl_2.10</artifactId>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
<packaging>jar</packaging>
<parent>
- <groupId>com.cloudera.livy</groupId>
+ <groupId>org.apache.livy</groupId>
<artifactId>livy-repl-parent</artifactId>
- <version>0.4.0-SNAPSHOT</version>
+ <version>0.4.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/scala-2.10/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala b/repl/scala-2.10/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala
deleted file mode 100644
index dc761cc..0000000
--- a/repl/scala-2.10/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.repl
-
-import java.io._
-import java.net.URLClassLoader
-import java.nio.file.Paths
-
-import scala.tools.nsc.Settings
-import scala.tools.nsc.interpreter.JPrintWriter
-import scala.tools.nsc.interpreter.Results.Result
-import scala.util.{Failure, Success, Try}
-
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.repl.SparkIMain
-
-/**
- * This represents a Spark interpreter. It is not thread safe.
- */
-class SparkInterpreter(conf: SparkConf)
- extends AbstractSparkInterpreter with SparkContextInitializer {
-
- private var sparkIMain: SparkIMain = _
- protected var sparkContext: SparkContext = _
-
- override def start(): SparkContext = {
- require(sparkIMain == null && sparkContext == null)
-
- val settings = new Settings()
- settings.embeddedDefaults(Thread.currentThread().getContextClassLoader())
- settings.usejavacp.value = true
-
- sparkIMain = new SparkIMain(settings, new JPrintWriter(outputStream, true))
- sparkIMain.initializeSynchronous()
-
- // Spark 1.6 does not have "classServerUri"; instead, the local directory where class files
- // are stored needs to be registered in SparkConf. See comment in
- // SparkILoop::createSparkContext().
- Try(sparkIMain.getClass().getMethod("classServerUri")) match {
- case Success(method) =>
- method.setAccessible(true)
- conf.set("spark.repl.class.uri", method.invoke(sparkIMain).asInstanceOf[String])
-
- case Failure(_) =>
- val outputDir = sparkIMain.getClass().getMethod("getClassOutputDirectory")
- outputDir.setAccessible(true)
- conf.set("spark.repl.class.outputDir",
- outputDir.invoke(sparkIMain).asInstanceOf[File].getAbsolutePath())
- }
-
- restoreContextClassLoader {
- // Call sparkIMain.setContextClassLoader() to make sure SparkContext and repl are using the
- // same ClassLoader. Otherwise if someone defined a new class in interactive shell,
- // SparkContext cannot see them and will result in job stage failure.
- val setContextClassLoaderMethod = sparkIMain.getClass().getMethod("setContextClassLoader")
- setContextClassLoaderMethod.setAccessible(true)
- setContextClassLoaderMethod.invoke(sparkIMain)
-
- // With usejavacp=true, the Scala interpreter looks for jars under System Classpath. But it
- // doesn't look for jars added to MutableURLClassLoader. Thus extra jars are not visible to
- // the interpreter. SparkContext can use them via JVM ClassLoaders but users cannot import
- // them using Scala import statement.
- //
- // For instance: If we import a package using SparkConf:
- // "spark.jars.packages": "com.databricks:spark-csv_2.10:1.4.0"
- // then "import com.databricks.spark.csv._" in the interpreter, it will throw an error.
- //
- // Adding them to the interpreter manually to fix this issue.
- var classLoader = Thread.currentThread().getContextClassLoader
- while (classLoader != null) {
- if (classLoader.getClass.getCanonicalName == "org.apache.spark.util.MutableURLClassLoader")
- {
- val extraJarPath = classLoader.asInstanceOf[URLClassLoader].getURLs()
- // Check if the file exists. Otherwise an exception will be thrown.
- .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile }
- // Livy rsc and repl are also in the extra jars list. Filter them out.
- .filterNot { u => Paths.get(u.toURI).getFileName.toString.startsWith("livy-") }
- // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it.
- .filterNot { u =>
- Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect")
- }
-
- extraJarPath.foreach { p => debug(s"Adding $p to Scala interpreter's class path...") }
- sparkIMain.addUrlsToClassPath(extraJarPath: _*)
- classLoader = null
- } else {
- classLoader = classLoader.getParent
- }
- }
-
- createSparkContext(conf)
- }
-
- sparkContext
- }
-
- protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
- sparkIMain.beQuietDuring {
- sparkIMain.bind(name, tpe, value, modifier)
- }
- }
-
- override def close(): Unit = synchronized {
- if (sparkContext != null) {
- sparkContext.stop()
- sparkContext = null
- }
-
- if (sparkIMain != null) {
- sparkIMain.close()
- sparkIMain = null
- }
- }
-
- override protected def isStarted(): Boolean = {
- sparkContext != null && sparkIMain != null
- }
-
- override protected def interpret(code: String): Result = {
- sparkIMain.interpret(code)
- }
-
- override protected[repl] def parseError(stdout: String): (String, Seq[String]) = {
- // An example of Scala 2.10 runtime exception error message:
- // java.lang.Exception: message
- // at $iwC$$iwC$$iwC$$iwC$$iwC.error(<console>:25)
- // at $iwC$$iwC$$iwC.error2(<console>:27)
- // at $iwC$$iwC.<init>(<console>:41)
- // at $iwC.<init>(<console>:43)
- // at <init>(<console>:45)
- // at .<init>(<console>:49)
- // at .<clinit>(<console>)
- // at .<init>(<console>:7)
- // at .<clinit>(<console>)
- // at $print(<console>)
- // at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
- // at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
- // ...
-
- val (ename, traceback) = super.parseError(stdout)
-
- // Remove internal frames.
- val startOfInternalFrames = traceback.indexWhere(_.contains("$iwC$$iwC.<init>"))
- var endOfInternalFrames = traceback.indexWhere(!_.trim.startsWith("at"), startOfInternalFrames)
- if (endOfInternalFrames == -1) {
- endOfInternalFrames = traceback.length
- }
-
- val cleanedTraceback = if (startOfInternalFrames == -1) {
- traceback
- } else {
- traceback.view.zipWithIndex
- .filterNot { z => z._2 >= startOfInternalFrames && z._2 < endOfInternalFrames }
- .map { _._1.replaceAll("(\\$iwC\\$)*\\$iwC", "<user code>") }
- }
-
- (ename, cleanedTraceback)
- }
-
- override protected def valueOfTerm(name: String): Option[Any] = {
- sparkIMain.valueOfTerm(name)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/scala-2.10/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala b/repl/scala-2.10/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
new file mode 100644
index 0000000..f5b5b32
--- /dev/null
+++ b/repl/scala-2.10/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import java.io._
+import java.net.URLClassLoader
+import java.nio.file.Paths
+
+import scala.tools.nsc.Settings
+import scala.tools.nsc.interpreter.JPrintWriter
+import scala.tools.nsc.interpreter.Results.Result
+import scala.util.{Failure, Success, Try}
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.repl.SparkIMain
+
+/**
+ * This represents a Spark interpreter. It is not thread safe.
+ */
+class SparkInterpreter(conf: SparkConf)
+ extends AbstractSparkInterpreter with SparkContextInitializer {
+
+ private var sparkIMain: SparkIMain = _
+ protected var sparkContext: SparkContext = _
+
+ override def start(): SparkContext = {
+ require(sparkIMain == null && sparkContext == null)
+
+ val settings = new Settings()
+ settings.embeddedDefaults(Thread.currentThread().getContextClassLoader())
+ settings.usejavacp.value = true
+
+ sparkIMain = new SparkIMain(settings, new JPrintWriter(outputStream, true))
+ sparkIMain.initializeSynchronous()
+
+ // Spark 1.6 does not have "classServerUri"; instead, the local directory where class files
+ // are stored needs to be registered in SparkConf. See comment in
+ // SparkILoop::createSparkContext().
+ Try(sparkIMain.getClass().getMethod("classServerUri")) match {
+ case Success(method) =>
+ method.setAccessible(true)
+ conf.set("spark.repl.class.uri", method.invoke(sparkIMain).asInstanceOf[String])
+
+ case Failure(_) =>
+ val outputDir = sparkIMain.getClass().getMethod("getClassOutputDirectory")
+ outputDir.setAccessible(true)
+ conf.set("spark.repl.class.outputDir",
+ outputDir.invoke(sparkIMain).asInstanceOf[File].getAbsolutePath())
+ }
+
+ restoreContextClassLoader {
+ // Call sparkIMain.setContextClassLoader() to make sure SparkContext and repl are using the
+ // same ClassLoader. Otherwise if someone defined a new class in interactive shell,
+ // SparkContext cannot see them and will result in job stage failure.
+ val setContextClassLoaderMethod = sparkIMain.getClass().getMethod("setContextClassLoader")
+ setContextClassLoaderMethod.setAccessible(true)
+ setContextClassLoaderMethod.invoke(sparkIMain)
+
+ // With usejavacp=true, the Scala interpreter looks for jars under System Classpath. But it
+ // doesn't look for jars added to MutableURLClassLoader. Thus extra jars are not visible to
+ // the interpreter. SparkContext can use them via JVM ClassLoaders but users cannot import
+ // them using Scala import statement.
+ //
+ // For instance: If we import a package using SparkConf:
+ // "spark.jars.packages": "com.databricks:spark-csv_2.10:1.4.0"
+ // then "import com.databricks.spark.csv._" in the interpreter, it will throw an error.
+ //
+ // Adding them to the interpreter manually to fix this issue.
+ var classLoader = Thread.currentThread().getContextClassLoader
+ while (classLoader != null) {
+ if (classLoader.getClass.getCanonicalName == "org.apache.spark.util.MutableURLClassLoader")
+ {
+ val extraJarPath = classLoader.asInstanceOf[URLClassLoader].getURLs()
+ // Check if the file exists. Otherwise an exception will be thrown.
+ .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile }
+ // Livy rsc and repl are also in the extra jars list. Filter them out.
+ .filterNot { u => Paths.get(u.toURI).getFileName.toString.startsWith("livy-") }
+ // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it.
+ .filterNot { u =>
+ Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect")
+ }
+
+ extraJarPath.foreach { p => debug(s"Adding $p to Scala interpreter's class path...") }
+ sparkIMain.addUrlsToClassPath(extraJarPath: _*)
+ classLoader = null
+ } else {
+ classLoader = classLoader.getParent
+ }
+ }
+
+ createSparkContext(conf)
+ }
+
+ sparkContext
+ }
+
+ protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
+ sparkIMain.beQuietDuring {
+ sparkIMain.bind(name, tpe, value, modifier)
+ }
+ }
+
+ override def close(): Unit = synchronized {
+ if (sparkContext != null) {
+ sparkContext.stop()
+ sparkContext = null
+ }
+
+ if (sparkIMain != null) {
+ sparkIMain.close()
+ sparkIMain = null
+ }
+ }
+
+ override protected def isStarted(): Boolean = {
+ sparkContext != null && sparkIMain != null
+ }
+
+ override protected def interpret(code: String): Result = {
+ sparkIMain.interpret(code)
+ }
+
+ override protected[repl] def parseError(stdout: String): (String, Seq[String]) = {
+ // An example of Scala 2.10 runtime exception error message:
+ // java.lang.Exception: message
+ // at $iwC$$iwC$$iwC$$iwC$$iwC.error(<console>:25)
+ // at $iwC$$iwC$$iwC.error2(<console>:27)
+ // at $iwC$$iwC.<init>(<console>:41)
+ // at $iwC.<init>(<console>:43)
+ // at <init>(<console>:45)
+ // at .<init>(<console>:49)
+ // at .<clinit>(<console>)
+ // at .<init>(<console>:7)
+ // at .<clinit>(<console>)
+ // at $print(<console>)
+ // at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
+ // at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
+ // ...
+
+ val (ename, traceback) = super.parseError(stdout)
+
+ // Remove internal frames.
+ val startOfInternalFrames = traceback.indexWhere(_.contains("$iwC$$iwC.<init>"))
+ var endOfInternalFrames = traceback.indexWhere(!_.trim.startsWith("at"), startOfInternalFrames)
+ if (endOfInternalFrames == -1) {
+ endOfInternalFrames = traceback.length
+ }
+
+ val cleanedTraceback = if (startOfInternalFrames == -1) {
+ traceback
+ } else {
+ traceback.view.zipWithIndex
+ .filterNot { z => z._2 >= startOfInternalFrames && z._2 < endOfInternalFrames }
+ .map { _._1.replaceAll("(\\$iwC\\$)*\\$iwC", "<user code>") }
+ }
+
+ (ename, cleanedTraceback)
+ }
+
+ override protected def valueOfTerm(name: String): Option[Any] = {
+ sparkIMain.valueOfTerm(name)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/repl/scala-2.10/src/test/scala/com/cloudera/livy/repl/SparkInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/test/scala/com/cloudera/livy/repl/SparkInterpreterSpec.scala b/repl/scala-2.10/src/test/scala/com/cloudera/livy/repl/SparkInterpreterSpec.scala
deleted file mode 100644
index 960b59e..0000000
--- a/repl/scala-2.10/src/test/scala/com/cloudera/livy/repl/SparkInterpreterSpec.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.repl
-
-import org.scalatest._
-
-import com.cloudera.livy.LivyBaseUnitTestSuite
-
-class SparkInterpreterSpec extends FunSpec with Matchers with LivyBaseUnitTestSuite {
- describe("SparkInterpreter") {
- val interpreter = new SparkInterpreter(null)
-
- it("should parse Scala compile error.") {
- // Regression test for LIVY-260.
- val error =
- """<console>:27: error: type mismatch;
- | found : Int
- | required: String
- | sc.setJobGroup(groupName, groupName, true)
- | ^
- |<console>:27: error: type mismatch;
- | found : Int
- | required: String
- | sc.setJobGroup(groupName, groupName, true)
- | ^
- |""".stripMargin
-
- val expectedTraceback = AbstractSparkInterpreter.KEEP_NEWLINE_REGEX.split(
- """ found : Int
- | required: String
- | sc.setJobGroup(groupName, groupName, true)
- | ^
- |<console>:27: error: type mismatch;
- | found : Int
- | required: String
- | sc.setJobGroup(groupName, groupName, true)
- | ^
- |""".stripMargin)
-
- val (ename, traceback) = interpreter.parseError(error)
- ename shouldBe "<console>:27: error: type mismatch;"
- traceback shouldBe expectedTraceback
- }
-
- it("should parse Scala runtime error and remove internal frames.") {
- val error =
- """java.lang.RuntimeException: message
- | at $iwC$$iwC$$iwC$$iwC$$iwC.error(<console>:25)
- | at $iwC$$iwC$$iwC.error2(<console>:27)
- | at $iwC$$iwC.<init>(<console>:41)
- | at $iwC.<init>(<console>:43)
- | at <init>(<console>:45)
- | at .<init>(<console>:49)
- | at .<clinit>(<console>)
- | at .<init>(<console>:7)
- | at .<clinit>(<console>)
- | at $print(<console>)
- | at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
- |""".stripMargin
-
- val expectedTraceback = AbstractSparkInterpreter.KEEP_NEWLINE_REGEX.split(
- """ at <user code>.error(<console>:25)
- | at <user code>.error2(<console>:27)
- |""".stripMargin)
-
- val (ename, traceback) = interpreter.parseError(error)
- ename shouldBe "java.lang.RuntimeException: message"
- traceback shouldBe expectedTraceback
- }
- }
-}