You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lb...@apache.org on 2016/01/22 23:07:30 UTC
[22/51] [abbrv] incubator-toree git commit: Moved scala files to new
locations based on new package
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/security/KernelSecurityManager.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/main/scala/org/apache/toree/security/KernelSecurityManager.scala b/kernel-api/src/main/scala/org/apache/toree/security/KernelSecurityManager.scala
new file mode 100644
index 0000000..2529866
--- /dev/null
+++ b/kernel-api/src/main/scala/org/apache/toree/security/KernelSecurityManager.scala
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.security
+
+import java.security.Permission
+import java.util.UUID
+
+import scala.collection.immutable.HashMap
+
+object KernelSecurityManager {
+ val RestrictedGroupName = "restricted-" + UUID.randomUUID().toString
+
+ /**
+ * Special case for this permission since the name changes with each status
+ * code.
+ */
+ private val SystemExitPermissionName = "exitVM." // + status
+
+ /**
+ * Used to indicate which permissions to check. Only checks if the permission
+ * is found in the keys and the value for that permission is true.
+ */
+ private val permissionsToCheck: Map[String, Boolean] = HashMap(
+ "modifyThreadGroup" -> true
+ )
+
+ /**
+ * Checks whether the permission with the provided name is listed to be
+ * checked.
+ *
+ * @param name The name of the permission
+ *
+ * @return True if the permission is listed to be checked, false otherwise
+ */
+ private def shouldCheckPermission(name: String): Boolean =
+ permissionsToCheck.getOrElse(name, shouldCheckPermissionSpecialCases(name))
+
+ /**
+ * Checks whether the permission with the provided name is one of the special
+ * cases that don't exist in the normal name conventions.
+ *
+ * @param name The name of the permission
+ *
+ * @return True if the permission is to be checked, false otherwise
+ */
+ private def shouldCheckPermissionSpecialCases(name: String): Boolean =
+ name.startsWith(SystemExitPermissionName)
+}
+
+class KernelSecurityManager extends SecurityManager {
+ import KernelSecurityManager._
+
+ override def checkPermission(perm: Permission, context: scala.Any): Unit = {
+ // TODO: Investigate why the StackOverflowError occurs in IntelliJ without
+ // this check for FilePermission related to this class
+ // NOTE: The above problem does not happen when built with sbt pack
+ if (perm.getActions == "read" &&
+ perm.getName.contains(this.getClass.getSimpleName))
+ return
+
+ if (shouldCheckPermission(perm.getName))
+ super.checkPermission(perm, context)
+ }
+
+ override def checkPermission(perm: Permission): Unit = {
+ // TODO: Investigate why the StackOverflowError occurs in IntelliJ without
+ // this check for FilePermission related to this class
+ // NOTE: The above problem does not happen when built with sbt pack
+ if (perm.getActions == "read" &&
+ perm.getName.contains(this.getClass.getSimpleName))
+ return
+
+ if (shouldCheckPermission(perm.getName))
+ super.checkPermission(perm)
+ }
+
+ override def getThreadGroup: ThreadGroup = {
+ val currentGroup = Thread.currentThread().getThreadGroup
+
+ // For restricted groups, we can only catch them in the checkAccess if we
+ // set the current group as the parent (to make sure all groups have a
+ // consistent name)
+ if (currentGroup.getName == RestrictedGroupName) {
+ new ThreadGroup(currentGroup, currentGroup.getName)
+ } else {
+ super.getThreadGroup
+ }
+ }
+
+ override def checkAccess(g: ThreadGroup): Unit = {
+ //super.checkAccess(g)
+ if (g == null) return
+
+ val parentGroup = g.getParent
+
+ if (parentGroup != null &&
+ parentGroup.getName == RestrictedGroupName &&
+ g.getName != RestrictedGroupName)
+ throw new SecurityException("Not allowed to modify ThreadGroups!")
+ }
+
+ override def checkExit(status: Int): Unit = {
+ val currentGroup = Thread.currentThread().getThreadGroup
+
+ if (currentGroup.getName == RestrictedGroupName) {
+ // TODO: Determine why System.exit(...) is being blocked in the ShutdownHandler
+ System.out.println("Unauthorized system.exit detected!")
+ //throw new SecurityException("Not allowed to invoke System.exit!")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/utils/ArgumentParsingSupport.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/main/scala/org/apache/toree/utils/ArgumentParsingSupport.scala b/kernel-api/src/main/scala/org/apache/toree/utils/ArgumentParsingSupport.scala
new file mode 100644
index 0000000..a748fe4
--- /dev/null
+++ b/kernel-api/src/main/scala/org/apache/toree/utils/ArgumentParsingSupport.scala
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.utils
+
+import joptsimple.{OptionSpec, OptionParser}
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+import java.io.{PrintStream, OutputStream}
+
+trait ArgumentParsingSupport {
+ protected lazy val parser = new OptionParser()
+ private var options: joptsimple.OptionSet = _
+ parser.allowsUnrecognizedOptions()
+
+ /**
+ * Parses the arguments provided as a string, updating all internal
+ * references to specific arguments.
+ *
+ * @param args The arguments as a string
+ * @param delimiter An optional delimiter for separating arguments
+ */
+ def parseArgs(args: String, delimiter: String = " ") = {
+ options = parser.parse(args.split(delimiter): _*)
+
+ options.nonOptionArguments().asScala.map(_.toString)
+ }
+
+ def printHelp(outputStream: OutputStream, usage: String) = {
+ val printStream = new PrintStream(outputStream)
+
+ printStream.println(s"Usage: $usage\n")
+ parser.printHelpOn(outputStream)
+ }
+
+ implicit def has[T](spec: OptionSpec[T]): Boolean = {
+ require(options != null, "Arguments not parsed yet!")
+ options.has(spec)
+ }
+
+ implicit def get[T](spec: OptionSpec[T]): Option[T] = {
+ require(options != null, "Arguments not parsed yet!")
+ Some(options.valueOf(spec)).filter(_ != null)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/utils/ConditionalOutputStream.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/main/scala/org/apache/toree/utils/ConditionalOutputStream.scala b/kernel-api/src/main/scala/org/apache/toree/utils/ConditionalOutputStream.scala
new file mode 100644
index 0000000..65f7650
--- /dev/null
+++ b/kernel-api/src/main/scala/org/apache/toree/utils/ConditionalOutputStream.scala
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.utils
+
+import java.io.OutputStream
+
+class ConditionalOutputStream(
+ private val outputStream: OutputStream,
+ condition: => Boolean
+) extends OutputStream {
+ require(outputStream != null)
+
+ override def write(b: Int): Unit = if (condition) outputStream.write(b)
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/utils/DownloadSupport.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/main/scala/org/apache/toree/utils/DownloadSupport.scala b/kernel-api/src/main/scala/org/apache/toree/utils/DownloadSupport.scala
new file mode 100644
index 0000000..9d36326
--- /dev/null
+++ b/kernel-api/src/main/scala/org/apache/toree/utils/DownloadSupport.scala
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.utils
+
+import java.net.URL
+import java.nio.channels._
+import java.io.FileOutputStream
+
+/**
+ * A utility for downloading the contents of a file to a specified location.
+ */
+trait DownloadSupport {
+ /**
+ * Download a file located at the given URL to the specified destination file.
+ * The file type of the downloadDestination should match the file type
+ * of the file located at fileUrl. Throws a FileNotFoundException if the
+ * fileUrl or downloadDestination are invalid.
+ *
+ * @param fileUrl A URL for the file to be downloaded
+ * @param destinationUrl Location to download the file to (e.g. /tmp/file.txt)
+ *
+ * @return The URL representing the location of the downloaded file
+ */
+ def downloadFile(fileUrl: URL, destinationUrl: URL): URL = {
+ val rbc = Channels.newChannel(fileUrl.openStream())
+ val fos = new FileOutputStream(destinationUrl.getPath)
+ fos.getChannel.transferFrom(rbc, 0, Long.MaxValue)
+
+ destinationUrl
+ }
+
+ /**
+ * Download a file given a URL string to the specified downloadDestination.
+ *
+ * @param fileToDownload A URL in string format (e.g. file:///tmp/foo, http://ibm.com)
+ * @param destination Location to download the file to (e.g. /tmp/file.txt)
+ *
+ * @return The URL representing the location of the downloaded file
+ */
+ def downloadFile(fileToDownload: String, destination: String): URL = {
+ downloadFile(new URL(fileToDownload), new URL(destination))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/utils/DynamicReflectionSupport.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/main/scala/org/apache/toree/utils/DynamicReflectionSupport.scala b/kernel-api/src/main/scala/org/apache/toree/utils/DynamicReflectionSupport.scala
new file mode 100644
index 0000000..73e02b6
--- /dev/null
+++ b/kernel-api/src/main/scala/org/apache/toree/utils/DynamicReflectionSupport.scala
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.utils
+
+import language.dynamics
+import language.existentials
+
+import java.lang.reflect.Method
+
+/**
+ * Represents dynamic support capabilities for a class. Provides field and
+ * method catchers to reflectively execute potentially-missing cases.
+ * @param klass The class whose fields and methods to access
+ * @param instance The specific instance whose fields and methods to access
+ */
+case class DynamicReflectionSupport(
+ private val klass: Class[_], private val instance: Any
+) extends Dynamic {
+
+ /**
+ * Handles cases of field access not found from type-checks. Attempts to
+ * reflectively access field from provided class (or instance). Will first
+ * check for a method signature matching field name due to Scala
+ * transformation process.
+ * @param name The name of the field
+ * @return The content held by the field
+ */
+ def selectDynamic(name: String): Any = {
+ val method = getMethod(name, Nil)
+ method match {
+ case Some(m) => invokeMethod(m, Nil)
+ case _ =>
+ try {
+ val field = klass.getDeclaredField(name)
+ field.setAccessible(true)
+ field.get(instance)
+ } catch {
+ case ex: NoSuchFieldException =>
+ throw new NoSuchFieldException(
+ klass.getName + "." + name + " does not exist!"
+ )
+ case ex: Throwable => throw ex
+ }
+ }
+ }
+
+ /**
+ * Handles cases of method not found from type-checks. Attempts to
+ * reflectively execute the method from the provided class (or instance).
+ * Will search for a method signature that has matching parameters with
+ * arguments allowed to be subclasses of parameter types.
+ *
+ * @note Chaining in not supported. You must first cast the result of the
+ * execution before attempting to apply a method upon it.
+ *
+ * @param name The name of the method
+ * @param args The list of arguments for the method
+ * @return The result of the method's execution
+ */
+ def applyDynamic(name: String)(args: Any*) = {
+ val method = getMethod(name, args.toList)
+
+ method match {
+ case Some(m) => invokeMethod(m, args.toList)
+ case _ =>
+ throw new NoSuchMethodException(
+ klass.getName + "." + name +
+ "(" + args.map(_.getClass.getName).mkString(",") + ")"
+ )
+ }
+ }
+
+ private def getMethod(name: String, args: Any*): Option[Method] = {
+ val flatArgs = flatten(args)
+ val potentialMethods = klass.getDeclaredMethods.filter(_.getName == name)
+ val method: Option[Method] =
+ potentialMethods.foldLeft[Option[Method]](None) {
+ (current, m) =>
+ if (current != None) current
+ else if (m.getParameterTypes.size != flatArgs.size) current
+ else if (!m.getParameterTypes.zipWithIndex.forall {
+ case (c, i) => isCompatible(c, flatArgs(i).getClass)
+ }) current
+ else Some(m)
+ }
+
+ method
+ }
+
+ private def invokeMethod(method: Method, args: Any*) = {
+ val flatArgs = flatten(args).map(_.asInstanceOf[AnyRef])
+ method.invoke(instance, flatArgs: _*)
+ }
+
+ private def flatten(l: Seq[Any]): Seq[Any] = {
+ l.flatMap {
+ case newList: List[_] => flatten(newList)
+ case newSeq: Seq[_] => flatten(newSeq)
+ case x => List(x)
+ }
+ }
+
+ private def isCompatible(klazz1: Class[_], klazz2: Class[_]): Boolean = {
+ var result =
+ klazz1.isAssignableFrom(klazz2) ||
+ klazz1.isInstance(klazz2) ||
+ klazz1.isInstanceOf[klazz2.type]
+
+ if (!result) {
+ try {
+ klazz1.asInstanceOf[klazz2.type]
+ result = true
+ } catch {
+ case _: Throwable => result = false
+ }
+ }
+
+ result
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/utils/KeyValuePairUtils.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/main/scala/org/apache/toree/utils/KeyValuePairUtils.scala b/kernel-api/src/main/scala/org/apache/toree/utils/KeyValuePairUtils.scala
new file mode 100644
index 0000000..7869aa6
--- /dev/null
+++ b/kernel-api/src/main/scala/org/apache/toree/utils/KeyValuePairUtils.scala
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.utils
+
+import joptsimple.util.KeyValuePair
+
+/**
+ * Provides utility methods for jsimple-opt key pair values.
+ */
+object KeyValuePairUtils {
+ val DefaultDelimiter = ","
+
+ /**
+ * Transforms the provided string into a list of KeyValuePair objects.
+ * @param keyValuePairString The string representing the series of keys and
+ * values
+ * @param delimiter The delimiter used for splitting up the string
+ * @return The list of KeyValuePair objects
+ */
+ def stringToKeyValuePairSeq(
+ keyValuePairString: String,
+ delimiter: String = DefaultDelimiter
+ ): Seq[KeyValuePair] = {
+ require(keyValuePairString != null, "KeyValuePair string cannot be null!")
+
+ keyValuePairString
+ .split(delimiter)
+ .map(_.trim)
+ .filterNot(_.isEmpty)
+ .map(KeyValuePair.valueOf)
+ .toSeq
+ }
+
+ /**
+ * Transforms the provided list of KeyValuePair elements into a string.
+ * @param keyValuePairSeq The sequence of KeyValuePair objects
+ * @param delimiter The separator between string KeyValuePair
+ * @return The resulting string from the list of KeyValuePair objects
+ */
+ def keyValuePairSeqToString(
+ keyValuePairSeq: Seq[KeyValuePair],
+ delimiter: String = DefaultDelimiter
+ ): String = {
+ require(keyValuePairSeq != null, "KeyValuePair sequence cannot be null!")
+
+ keyValuePairSeq
+ .map(pair => pair.key.trim + "=" + pair.value.trim)
+ .mkString(delimiter)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/utils/MultiClassLoader.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/main/scala/org/apache/toree/utils/MultiClassLoader.scala b/kernel-api/src/main/scala/org/apache/toree/utils/MultiClassLoader.scala
new file mode 100644
index 0000000..9ee4da1
--- /dev/null
+++ b/kernel-api/src/main/scala/org/apache/toree/utils/MultiClassLoader.scala
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2015 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.utils
+
+import java.lang.reflect.Method
+import java.net.{URL, URLClassLoader}
+import java.util
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
+import scala.language.existentials
+
+/**
+ * Represents a class loader that supports delegating to multiple class loaders.
+ *
+ * @note Implements URLClassLoader purely to support the Guava requirement for
+ * detecting all classes.
+ *
+ * @param urls The URLs to use for the underlying URLClassLoader
+ * @param classLoaders The class loaders to use as the underlying
+ * implementations of this class loader
+ */
+class MultiClassLoader(
+ private val urls: Seq[URL],
+ private val classLoaders: Seq[ClassLoader]
+) extends URLClassLoader(
+ classLoaders.flatMap({
+ case urlClassLoader: URLClassLoader => urlClassLoader.getURLs.toSeq
+ case _ => Nil
+ }).distinct.toArray,
+ /* Create a parent chain based on a each classloader's parent */ {
+ val parents = classLoaders.flatMap(cl => Option(cl.getParent))
+
+ // If multiple parents, set the parent to another multi class loader
+ if (parents.size > 1) new MultiClassLoader(Nil, parents)
+
+ // If a single parent, set the parent to that single parent
+ else if (parents.size == 1) parents.head
+
+ // If no parent, set to null (default if parent not provided)
+ else null
+ }: ClassLoader
+) { self =>
+ private val logger = LoggerFactory.getLogger(this.getClass)
+
+ /**
+ * Creates a new multi class loader with no URLs of its own, although it may
+ * still expose URLs from provided class loaders.
+ *
+ * @param classLoaders The class loaders to use as the underlying
+ * implementations of this class loader
+ */
+ def this(classLoaders: ClassLoader*) = {
+ this(Nil, classLoaders)
+ }
+
+ override protected def findClass(name: String): Class[_] = {
+ @inline def tryFindClass(classLoader: ClassLoader, name: String) = {
+ Try(Class.forName(name, false, classLoader))
+ }
+
+ // NOTE: Using iterator to evaluate elements one at a time
+ classLoaders.toIterator
+ .map(classLoader => tryFindClass(classLoader, name))
+ .find(_.isSuccess)
+ .map(_.get)
+ .getOrElse(throw new ClassNotFoundException(name))
+ }
+
+ override protected def findResource(name: String): URL = {
+ // NOTE: Using iterator to evaluate elements one at a time
+ classLoaders.toIterator.map(cl => _findResource(cl, name)).find(_ != null)
+ .getOrElse(super.findResource(name))
+ }
+
+ override protected def findResources(name: String): util.Enumeration[URL] = {
+ val internalResources = classLoaders
+ .flatMap(cl => Try(_findResources(cl, name)).toOption)
+ .map(_.asScala)
+ .reduce(_ ++ _)
+
+ (
+ internalResources
+ ++
+ Try(super.findResources(name)).map(_.asScala).getOrElse(Nil)
+ ).asJavaEnumeration
+ }
+
+ private def _findResource[T <: ClassLoader](classLoader: T, name: String) = {
+ _getDeclaredMethod(classLoader.getClass, "findResource", classOf[String])
+ .invoke(classLoader, name).asInstanceOf[URL]
+ }
+
+ private def _findResources[T <: ClassLoader](classLoader: T, name: String) = {
+ _getDeclaredMethod(classLoader.getClass, "findResources", classOf[String])
+ .invoke(classLoader, name).asInstanceOf[util.Enumeration[URL]]
+ }
+
+ private def _loadClass[T <: ClassLoader](
+ classLoader: T,
+ name: String,
+ resolve: Boolean
+ ) = {
+ _getDeclaredMethod(classLoader.getClass, "loadClass",
+ classOf[String], classOf[Boolean]
+ ).invoke(classLoader, name, resolve: java.lang.Boolean).asInstanceOf[Class[_]]
+ }
+
+ private def _getDeclaredMethod(
+ klass: Class[_],
+ name: String,
+ classes: Class[_]*
+ ): Method = {
+ // Attempt to retrieve the method (public/protected/private) for the class,
+ // trying the super class if the method is not available
+ val potentialMethod = Try(klass.getDeclaredMethod(name, classes: _*))
+ .orElse(Try(_getDeclaredMethod(klass.getSuperclass, name, classes: _*)))
+
+ // Allow access to protected/private methods
+ potentialMethod.foreach(_.setAccessible(true))
+
+ potentialMethod match {
+ case Success(method) => method
+ case Failure(error) => throw error
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/utils/MultiOutputStream.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/main/scala/org/apache/toree/utils/MultiOutputStream.scala b/kernel-api/src/main/scala/org/apache/toree/utils/MultiOutputStream.scala
new file mode 100644
index 0000000..6ebaf3e
--- /dev/null
+++ b/kernel-api/src/main/scala/org/apache/toree/utils/MultiOutputStream.scala
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.utils
+
+import java.io.OutputStream
+
+case class MultiOutputStream(val outputStreams: List[OutputStream])
+ extends OutputStream
+{
+ require(outputStreams != null)
+
+ override def write(cbuf: Array[Byte]): Unit =
+ outputStreams.foreach(outputStream => outputStream.write(cbuf))
+
+ override def write(cbuf: Array[Byte], off: Int, len: Int): Unit =
+ outputStreams.foreach(outputStream => outputStream.write(cbuf, off, len))
+
+ override def write(b: Int): Unit =
+ outputStreams.foreach(outputStream => outputStream.write(b))
+
+ override def flush() =
+ outputStreams.foreach(outputStream => outputStream.flush())
+
+ override def close() =
+ outputStreams.foreach(outputStream => outputStream.close())
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/utils/ScheduledTaskManager.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/main/scala/org/apache/toree/utils/ScheduledTaskManager.scala b/kernel-api/src/main/scala/org/apache/toree/utils/ScheduledTaskManager.scala
new file mode 100644
index 0000000..a91e2ba
--- /dev/null
+++ b/kernel-api/src/main/scala/org/apache/toree/utils/ScheduledTaskManager.scala
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ibm.spark.utils
+
+import scala.language.existentials
+import java.util.concurrent._
+import java.util.UUID
+import ScheduledTaskManager._
+
+import scala.util.Try
+
+/**
+ * Constructs timing-based events that are periodically executed. Does not
+ * support hard-killing tasks that are not interruptable.
+ * @param totalThreads The total threads to create for the underlying thread
+ * pool
+ * @param defaultExecutionDelay The default delay to use before all added tasks
+ * @param defaultTimeInterval The default time interval between tasks in
+ * milliseconds
+ */
+class ScheduledTaskManager(
+ private val totalThreads: Int = DefaultMaxThreads,
+ private val defaultExecutionDelay: Long = DefaultExecutionDelay,
+ private val defaultTimeInterval: Long = DefaultTimeInterval
+) {
+ private[utils] val _scheduler = new ScheduledThreadPoolExecutor(totalThreads)
+ _scheduler.setRemoveOnCancelPolicy(true)
+
+ private val _taskMap = new ConcurrentHashMap[String, ScheduledFuture[_]]()
+
+ /**
+ * Adds the specified task to the queued list to execute at the specified
+ * time interval.
+ * @param executionDelay The time delay (in milliseconds) before starting
+ * @param timeInterval The time interval (in milliseconds)
+ * @param task The task to execute
+ * @tparam T The type of return result (currently ignored)
+ * @return The id of the task
+ */
+ def addTask[T](
+ executionDelay: Long = defaultExecutionDelay,
+ timeInterval: Long = defaultTimeInterval,
+ task: => T
+ ) = {
+ val taskId = UUID.randomUUID().toString
+ val runnable: Runnable = new Runnable {
+ override def run(): Unit = Try(task)
+ }
+
+ // Schedule our task at the desired interval
+ _taskMap.put(taskId, _scheduler.scheduleAtFixedRate(
+ runnable, executionDelay, timeInterval, TimeUnit.MILLISECONDS))
+
+ taskId
+ }
+
+ /**
+ * Removes the specified task from the manager.
+ * @param taskId The id of the task to remove
+ * @return True if the task was removed, otherwise false
+ */
+ def removeTask(taskId: String): Boolean = {
+ // Exit if the task with the given id does not exist
+ if (taskId == null || !_taskMap.containsKey(taskId)) return false
+
+ val future = _taskMap.remove(taskId)
+
+ // Stop the future, but allow the current task to finish
+ future.cancel(false)
+
+ true
+ }
+
+ /**
+ * Shuts down the thread pool used for task execution.
+ */
+ def stop() = {
+ _taskMap.clear()
+ _scheduler.shutdown()
+ }
+}
+
+object ScheduledTaskManager {
+ val DefaultMaxThreads = 4
+ val DefaultExecutionDelay = 10 // 10 milliseconds
+ val DefaultTimeInterval = 100 // 100 milliseconds
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/org/apache/toree/utils/TaskManager.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/main/scala/org/apache/toree/utils/TaskManager.scala b/kernel-api/src/main/scala/org/apache/toree/utils/TaskManager.scala
new file mode 100644
index 0000000..c3a43fc
--- /dev/null
+++ b/kernel-api/src/main/scala/org/apache/toree/utils/TaskManager.scala
@@ -0,0 +1,237 @@
+package com.ibm.spark.utils
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.slf4j.LoggerFactory
+
+import scala.concurrent.{promise, Future}
+import java.util.concurrent._
+
+import com.ibm.spark.security.KernelSecurityManager._
+import TaskManager._
+
+import scala.util.Try
+
+/**
+ * Represents a processor of tasks that has X worker threads dedicated to
+ * executing the tasks.
+ *
+ * @param threadGroup The thread group to use with all worker threads
+ * @param minimumWorkers The number of workers to spawn initially and keep
+ * alive even when idle
+ * @param maximumWorkers The max number of worker threads to spawn, defaulting
+ * to the number of processors on the machine
+ * @param keepAliveTime The maximum time in milliseconds for workers to remain
+ * idle before shutting down
+ */
+class TaskManager(
+ private val threadGroup: ThreadGroup = DefaultThreadGroup,
+ private val maxTasks: Int = DefaultMaxTasks,
+ private val minimumWorkers: Int = DefaultMinimumWorkers,
+ private val maximumWorkers: Int = DefaultMaximumWorkers,
+ private val keepAliveTime: Long = DefaultKeepAliveTime
+) {
+ protected val logger = LoggerFactory.getLogger(this.getClass.getName)
+
+ private class TaskManagerThreadFactory extends ThreadFactory {
+ override def newThread(r: Runnable): Thread = {
+ val thread = new Thread(threadGroup, r)
+
+ logger.trace(s"Creating new thread named ${thread.getName}!")
+
+ thread
+ }
+ }
+
+ private[utils] class ScalingThreadPoolExecutor extends ThreadPoolExecutor(
+ minimumWorkers,
+ maximumWorkers,
+ keepAliveTime,
+ TimeUnit.MILLISECONDS,
+ taskQueue,
+ taskManagerThreadFactory
+ ) {
+ protected val logger = LoggerFactory.getLogger(this.getClass.getName)
+
+ /** Used to keep track of tasks separately from the task queue. */
+ private val taskCount = new AtomicInteger(0)
+
+ /**
+ * Syncs the core pool size of the executor with the current number of
+ * tasks, using the minimum worker size and maximum worker size as the
+ * bounds.
+ */
+ private def syncPoolLimits(): Unit = {
+ val totalTasks = taskCount.get()
+ val newCorePoolSize =
+ math.max(minimumWorkers, math.min(totalTasks, maximumWorkers))
+
+ logger.trace(Seq(
+ s"Task execution count is $totalTasks!",
+ s"Updating core pool size to $newCorePoolSize!"
+ ).mkString(" "))
+ executor.foreach(_.setCorePoolSize(newCorePoolSize))
+ }
+
+ override def execute(r: Runnable): Unit = {
+ synchronized {
+ if (taskCount.incrementAndGet() > maximumWorkers)
+ logger.warn(s"Exceeded $maximumWorkers workers during processing!")
+
+ syncPoolLimits()
+ }
+
+ super.execute(r)
+ }
+
+ override def afterExecute(r: Runnable, t: Throwable): Unit = {
+ super.afterExecute(r, t)
+
+ synchronized {
+ taskCount.decrementAndGet()
+ syncPoolLimits()
+ }
+ }
+ }
+
+ private val taskManagerThreadFactory = new TaskManagerThreadFactory
+ private val taskQueue = new ArrayBlockingQueue[Runnable](maxTasks)
+
+ @volatile
+ private[utils] var executor: Option[ScalingThreadPoolExecutor] = None
+
+ /**
+ * Adds a new task to the list to execute.
+ *
+ * @param taskFunction The new task as a block of code
+ *
+ * @return Future representing the return value (or error) from the task
+ */
+ def add[T <: Any](taskFunction: => T): Future[T] = {
+ assert(executor.nonEmpty, "Task manager not started!")
+
+ val taskPromise = promise[T]()
+
+ // Construct runnable that completes the promise
+ logger.trace(s"Queueing new task to be processed!")
+ executor.foreach(_.execute(new Runnable {
+ override def run(): Unit = {
+ var threadName: String = "???"
+ try {
+ threadName = Try(Thread.currentThread().getName).getOrElse(threadName)
+ logger.trace(s"(Thread $threadName) Executing task!")
+ val result = taskFunction
+
+ logger.trace(s"(Thread $threadName) Task finished successfully!")
+ taskPromise.success(result)
+ } catch {
+ case ex: Throwable =>
+ val exName = ex.getClass.getName
+ val exMessage = Option(ex.getLocalizedMessage).getOrElse("???")
+ logger.trace(
+ s"(Thread $threadName) Task failed: ($exName) = $exMessage")
+ taskPromise.tryFailure(ex)
+ }
+ }
+ }))
+
+ taskPromise.future
+ }
+
+ /**
+ * Returns the count of tasks including the currently-running ones.
+ *
+ * @return The count of tasks
+ */
+ def size: Int = taskQueue.size() + executor.map(_.getActiveCount).getOrElse(0)
+
+ /**
+ * Returns whether or not there is a task in the queue to be processed.
+ *
+ * @return True if the internal queue is not empty, otherwise false
+ */
+ def hasTaskInQueue: Boolean = !taskQueue.isEmpty
+
+ /**
+ * Whether or not there is a task being executed currently.
+ *
+ * @return True if there is a task being executed, otherwise false
+ */
+ def isExecutingTask: Boolean = executor.exists(_.getActiveCount > 0)
+
+ /**
+ * Block execution (by sleeping) until all tasks currently queued up for
+ * execution are processed.
+ */
+ def await(): Unit =
+ while (!taskQueue.isEmpty || isExecutingTask) Thread.sleep(1)
+
+ /**
+ * Starts the task manager (begins processing tasks). Creates X new threads
+ * in the process.
+ */
+ def start(): Unit = {
+ logger.trace(
+ s"""
+ |Initializing with the following settings:
+ |- $minimumWorkers core worker pool
+ |- $maximumWorkers maximum workers
+ |- $keepAliveTime milliseconds keep alive time
+ """.stripMargin.trim)
+ executor = Some(new ScalingThreadPoolExecutor)
+ }
+
+ /**
+ * Restarts internal processing of tasks (removing current task).
+ */
+ def restart(): Unit = {
+ stop()
+ start()
+ }
+
+ /**
+ * Stops internal processing of tasks.
+ */
+ def stop(): Unit = {
+ executor.foreach(_.shutdownNow())
+ executor = None
+ }
+}
+
+/**
+ * Represents constants associated with the task manager.
+ */
+object TaskManager {
+ /** The default thread group to use with all worker threads. */
+ val DefaultThreadGroup = new ThreadGroup(RestrictedGroupName)
+
+ /** The default number of maximum tasks accepted by the task manager. */
+ val DefaultMaxTasks = 200
+
+ /**
+ * The default number of workers to spawn initially and keep alive
+ * even when idle.
+ */
+ val DefaultMinimumWorkers = 1
+
+ /** The default maximum number of workers to spawn. */
+ val DefaultMaximumWorkers = Runtime.getRuntime.availableProcessors()
+
+ /** The default timeout in milliseconds for workers waiting for tasks. */
+ val DefaultKeepAliveTime = 1000
+
+ /**
+ * The default timeout in milliseconds to wait before stopping a thread
+ * if it cannot be interrupted.
+ */
+ val InterruptTimeout = 5000
+
+ /** The maximum time to wait to add a task to the queue in milliseconds. */
+ val MaximumTaskQueueTimeout = 10000
+
+ /**
+ * The maximum time in milliseconds to wait to queue up a thread in the
+ * thread factory.
+ */
+ val MaximumThreadQueueTimeout = 10000
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerBridgeSpec.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerBridgeSpec.scala b/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerBridgeSpec.scala
deleted file mode 100644
index d74867c..0000000
--- a/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerBridgeSpec.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.interpreter.broker
-
-import com.ibm.spark.interpreter.broker.producer.{SQLContextProducerLike, JavaSparkContextProducerLike}
-import com.ibm.spark.kernel.api.KernelLike
-import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.{SparkConf, SparkContext}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FunSpec, Matchers, OneInstancePerTest}
-import org.mockito.Mockito._
-
-class BrokerBridgeSpec extends FunSpec with Matchers with OneInstancePerTest
- with MockitoSugar
-{
- private val mockBrokerState = mock[BrokerState]
- private val mockKernel = mock[KernelLike]
-
- private val brokerBridge = new BrokerBridge(
- mockBrokerState,
- mockKernel
- )
-
- describe("BrokerBridge") {
- describe("#state") {
- it("should return the broker state from the constructor") {
- brokerBridge.state should be (mockBrokerState)
- }
- }
-
- describe("#kernel") {
- it("should return the kernel from the constructor") {
- brokerBridge.kernel should be (mockKernel)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerProcessHandlerSpec.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerProcessHandlerSpec.scala b/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerProcessHandlerSpec.scala
deleted file mode 100644
index 35d3235..0000000
--- a/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerProcessHandlerSpec.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.interpreter.broker
-
-import org.apache.commons.exec.ExecuteException
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FunSpec, Matchers, OneInstancePerTest}
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-
-class BrokerProcessHandlerSpec extends FunSpec with Matchers
- with OneInstancePerTest with MockitoSugar
-{
- private val mockBrokerBridge = mock[BrokerBridge]
- private val brokerProcessHandler = new BrokerProcessHandler(
- mockBrokerBridge,
- restartOnFailure = true,
- restartOnCompletion = true
- )
-
- describe("BrokerProcessHandler") {
- describe("#onProcessFailed") {
- it("should invoke the reset method") {
- val mockResetMethod = mock[String => Unit]
- brokerProcessHandler.setResetMethod(mockResetMethod)
-
- brokerProcessHandler.onProcessFailed(mock[ExecuteException])
-
- verify(mockResetMethod).apply(anyString())
- }
-
- it("should invoke the restart method if the proper flag is set to true") {
- val mockRestartMethod = mock[() => Unit]
- brokerProcessHandler.setRestartMethod(mockRestartMethod)
-
- brokerProcessHandler.onProcessFailed(mock[ExecuteException])
-
- verify(mockRestartMethod).apply()
- }
- }
-
- describe("#onProcessComplete") {
- it("should invoke the reset method") {
- val mockResetMethod = mock[String => Unit]
- brokerProcessHandler.setResetMethod(mockResetMethod)
-
- brokerProcessHandler.onProcessComplete(0)
-
- verify(mockResetMethod).apply(anyString())
- }
-
- it("should invoke the restart method if the proper flag is set to true") {
- val mockRestartMethod = mock[() => Unit]
- brokerProcessHandler.setRestartMethod(mockRestartMethod)
-
- brokerProcessHandler.onProcessComplete(0)
-
- verify(mockRestartMethod).apply()
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerProcessSpec.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerProcessSpec.scala b/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerProcessSpec.scala
deleted file mode 100644
index 83face3..0000000
--- a/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerProcessSpec.scala
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.interpreter.broker
-
-import java.io.{OutputStream, InputStream, File}
-
-import org.apache.commons.exec._
-import org.apache.commons.io.FilenameUtils
-import org.mockito.ArgumentCaptor
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FunSpec, Matchers, OneInstancePerTest}
-
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-
-class BrokerProcessSpec extends FunSpec with Matchers
- with OneInstancePerTest with MockitoSugar
-{
- private val TestProcessName = "test_process"
- private val TestEntryResource = "test/entry/resource"
- private val TestOtherResources = Seq("test/resource/1", "test/resource/2")
- private val TestArguments = Seq("a", "b", "c")
- private val TestEnvironment = Map(
- "e1" -> "1",
- "e2" -> "2"
- )
-
- private val mockBrokerBridge = mock[BrokerBridge]
- private val mockBrokerProcessHandler = mock[BrokerProcessHandler]
-
- private val mockExecutor = mock[Executor]
-
- private val brokerProcess = new BrokerProcess(
- processName = TestProcessName,
- entryResource = TestEntryResource,
- otherResources = TestOtherResources,
- brokerBridge = mockBrokerBridge,
- brokerProcessHandler = mockBrokerProcessHandler,
- arguments = TestArguments
- ) {
- @volatile private var _tmpDir: String =
- System.getProperty("java.io.tmpdir")
-
- def setTmpDirectory(newDir: String) = _tmpDir = newDir
- override protected def getTmpDirectory: String = _tmpDir
- override protected def newExecutor(): Executor = mockExecutor
- override protected def copy(
- inputStream: InputStream,
- outputStream: OutputStream
- ): Int = 0
-
- override protected def newProcessEnvironment(): Map[String, String] =
- TestEnvironment
- override protected lazy val getSubDirectory: String = ""
- def doCopyResourceToTmp(resource: String) = copyResourceToTmp(resource)
- }
-
- describe("BrokerProcess") {
- describe("constructor") {
- it("should fail if the process name is null") {
- intercept[IllegalArgumentException] {
- new BrokerProcess(
- processName = null,
- entryResource = TestEntryResource,
- otherResources = TestOtherResources,
- brokerBridge = mockBrokerBridge,
- brokerProcessHandler = mockBrokerProcessHandler,
- arguments = TestArguments
- )
- }
- }
-
- it("should fail if the process name is empty") {
- intercept[IllegalArgumentException] {
- new BrokerProcess(
- processName = " \t\n\r",
- entryResource = TestEntryResource,
- otherResources = TestOtherResources,
- brokerBridge = mockBrokerBridge,
- brokerProcessHandler = mockBrokerProcessHandler,
- arguments = TestArguments
- )
- }
- }
-
- it("should fail if the entry resource is null") {
- intercept[IllegalArgumentException] {
- new BrokerProcess(
- processName = TestProcessName,
- entryResource = null,
- otherResources = TestOtherResources,
- brokerBridge = mockBrokerBridge,
- brokerProcessHandler = mockBrokerProcessHandler,
- arguments = TestArguments
- )
- }
- }
-
- it("should fail if the entry resource is empty") {
- intercept[IllegalArgumentException] {
- new BrokerProcess(
- processName = TestProcessName,
- entryResource = " \t\n\r",
- otherResources = TestOtherResources,
- brokerBridge = mockBrokerBridge,
- brokerProcessHandler = mockBrokerProcessHandler,
- arguments = TestArguments
- )
- }
- }
- }
-
- describe("#copyResourceToTmp") {
- it("should fail if a directory with the resource name already exists") {
- val baseDir = System.getProperty("java.io.tmpdir")
- val newResourceName = "some_resource/"
-
- val resourceFile = new File(baseDir + s"/$newResourceName")
- resourceFile.delete() // Ensure that there is not a file or something
- resourceFile.mkdir()
-
- intercept[BrokerException] {
- brokerProcess.doCopyResourceToTmp(resourceFile.getPath)
- }
-
- resourceFile.delete()
- }
-
- it("should throw an exception if the tmp directory is not set") {
- brokerProcess.setTmpDirectory(null)
-
- intercept[BrokerException] {
- brokerProcess.doCopyResourceToTmp("some file")
- }
- }
-
- it("should return the resulting destination of the resource") {
- val rootDir = System.getProperty("java.io.tmpdir")
- val fileName = FilenameUtils.getBaseName(TestEntryResource)
- val fullPath = Seq(rootDir, fileName).mkString("/")
- val expected = new File(fullPath)
-
- expected.delete()
-
- brokerProcess.setTmpDirectory(rootDir)
- val destination = brokerProcess.doCopyResourceToTmp(TestEntryResource)
-
- val actual = new File(destination)
- actual should be (expected)
- }
- }
-
- describe("#start") {
- it("should throw an exception if the process is already started") {
- brokerProcess.start()
-
- intercept[AssertionError] {
- brokerProcess.start()
- }
- }
-
- it("should execute the process using the entry and provided arguments") {
- val finalResourceDestination = FilenameUtils.concat(
- System.getProperty("java.io.tmpdir"),
- FilenameUtils.getBaseName(TestEntryResource)
- )
- val expected = finalResourceDestination +: TestArguments
-
- val commandLineCaptor = ArgumentCaptor.forClass(classOf[CommandLine])
-
- brokerProcess.start()
- verify(mockExecutor).execute(commandLineCaptor.capture(), any(), any())
-
- val commandLine = commandLineCaptor.getValue
- val actual = commandLine.getArguments
-
- actual should contain theSameElementsAs expected
- }
-
- it("should execute using the environment provided") {
- val finalResourceDestination = FilenameUtils.concat(
- System.getProperty("java.io.tmpdir"),
- FilenameUtils.getBaseName(TestEntryResource)
- )
-
- val environmentCaptor =
- ArgumentCaptor.forClass(classOf[java.util.Map[String, String]])
-
- brokerProcess.start()
- verify(mockExecutor).execute(any(),environmentCaptor.capture() , any())
-
- import scala.collection.JavaConverters._
- val environment = environmentCaptor.getValue.asScala
-
- environment should contain theSameElementsAs TestEnvironment
- }
-
- it("should use the process handler provided to listen for events") {
- val expected = mockBrokerProcessHandler
- val finalResourceDestination = FilenameUtils.concat(
- System.getProperty("java.io.tmpdir"),
- FilenameUtils.getBaseName(TestEntryResource)
- )
-
- val executeRequestHandlerCaptor =
- ArgumentCaptor.forClass(classOf[ExecuteResultHandler])
-
- brokerProcess.start()
- verify(mockExecutor).execute(
- any(), any(), executeRequestHandlerCaptor.capture())
-
- val actual = executeRequestHandlerCaptor.getValue
- actual should be (expected)
- }
- }
-
- describe("#stop") {
- it("should destroy the process if it is running") {
- brokerProcess.start()
-
- val mockExecuteWatchdog = mock[ExecuteWatchdog]
- doReturn(mockExecuteWatchdog).when(mockExecutor).getWatchdog
-
- brokerProcess.stop()
-
- verify(mockExecuteWatchdog).destroyProcess()
- }
-
- it("should not try to destroy the process if it is not running") {
- val mockExecuteWatchdog = mock[ExecuteWatchdog]
- doReturn(mockExecuteWatchdog).when(mockExecutor).getWatchdog
-
- brokerProcess.stop()
-
- verify(mockExecuteWatchdog, never()).destroyProcess()
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerStateSpec.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerStateSpec.scala b/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerStateSpec.scala
deleted file mode 100644
index 84a1ae7..0000000
--- a/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerStateSpec.scala
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.interpreter.broker
-
-import org.scalatest.{OneInstancePerTest, Matchers, FunSpec}
-
-import scala.util.{Failure, Success}
-
-class BrokerStateSpec extends FunSpec with Matchers with OneInstancePerTest {
-
- private val TestMaxQueuedCode = 5
- private val brokerState = new BrokerState(TestMaxQueuedCode)
-
- describe("BrokerState") {
- describe("#pushCode") {
- it("should throw an exception if the queue is maxed out") {
- val code = "some code"
-
- // Fill up to the max of our queue
- for (i <- 1 to TestMaxQueuedCode)
- brokerState.pushCode(code)
-
- // Adding an additional code should throw an exception
- intercept[IllegalStateException] {
- brokerState.pushCode(code)
- }
- }
-
- it("should queue up the code to eventually be executed") {
- val code = "some code"
-
- brokerState.totalQueuedCode() should be (0)
- brokerState.pushCode(code)
- brokerState.totalQueuedCode() should be (1)
- }
- }
-
- describe("#totalQueuedCode") {
- it("should return the total queued code elements") {
- val code = "some code"
-
- // Queue up to the maximum test elements, verifying that the total
- // queued element count increases per push
- for (i <- 1 to TestMaxQueuedCode) {
- brokerState.pushCode(code)
- brokerState.totalQueuedCode() should be (i)
- }
- }
- }
-
- describe("#nextCode") {
- it("should return the next code element if available") {
- val code = "some code"
-
- brokerState.pushCode(code)
-
- brokerState.nextCode().code should be (code)
- }
-
- it("should return null if no code element is available") {
- brokerState.nextCode() should be (null)
- }
- }
-
- describe("#isReady") {
- it("should return true if the broker state is marked as ready") {
- brokerState.markReady()
- brokerState.isReady should be (true)
- }
-
- it("should return false if the broker state is not marked as ready") {
- brokerState.isReady should be (false)
- }
- }
-
- describe("#markReady") {
- it("should mark the state of the broker as ready") {
- // Mark once to make sure that the state gets set
- brokerState.markReady()
- brokerState.isReady should be (true)
-
- // Mark a second time to ensure that the state does not change
- brokerState.markReady()
- brokerState.isReady should be (true)
- }
- }
-
- describe("#markSuccess") {
- it("should mark the future for the code as successful") {
- val future = brokerState.pushCode("some code")
- val BrokerCode(codeId, _) = brokerState.nextCode()
-
- brokerState.markSuccess(codeId)
- future.value.get.isSuccess should be (true)
- }
-
- it("should use the provided message as the contents of the future") {
- val future = brokerState.pushCode("some code")
- val BrokerCode(codeId, _) = brokerState.nextCode()
-
- val message = "some message"
- brokerState.markSuccess(codeId, message)
- future.value.get should be (Success(message))
- }
-
- it("should do nothing if the code id is invalid") {
- val future = brokerState.pushCode("some code")
- val BrokerCode(codeId, _) = brokerState.nextCode()
-
- brokerState.markSuccess(codeId + "1")
- future.isCompleted should be (false)
- }
- }
-
- describe("#markFailure") {
- it("should mark the future for the code as failure") {
- val future = brokerState.pushCode("some code")
- val BrokerCode(codeId, _) = brokerState.nextCode()
-
- brokerState.markFailure(codeId)
- future.value.get.isSuccess should be (false)
- }
-
- it("should use the provided message as the contents of the exception") {
- val future = brokerState.pushCode("some code")
- val BrokerCode(codeId, _) = brokerState.nextCode()
-
- val message = "some message"
- brokerState.markFailure(codeId, message)
-
- val failure = future.value.get.failed.get
- failure.getLocalizedMessage should be (message)
- }
-
- it("should do nothing if the code id is invalid") {
- val future = brokerState.pushCode("some code")
- val BrokerCode(codeId, _) = brokerState.nextCode()
-
- brokerState.markFailure(codeId + "1")
- future.isCompleted should be (false)
- }
- }
-
- describe("#reset") {
- it("should clear any code still in the queue") {
- brokerState.pushCode("some code")
-
- brokerState.reset("")
-
- brokerState.totalQueuedCode() should be (0)
- }
-
- it("should mark any evaluating code as a failure if marked true") {
- val future = brokerState.pushCode("some code")
-
- brokerState.reset("")
-
- future.value.get.isFailure should be (true)
- }
-
- it("should use the message as the contents of the failed code futures") {
- val future = brokerState.pushCode("some code")
-
- val message = "some message"
- brokerState.reset(message)
-
- val failure = future.value.get.failed.get
- failure.getLocalizedMessage should be (message)
- }
-
- it("should mark any evaluating code as a success if marked false") {
- val future = brokerState.pushCode("some code")
-
- brokerState.reset("", markAllAsFailure = false)
-
- future.value.get.isSuccess should be (true)
- }
-
- it("should use the message as the contents of the successful code futures") {
- val future = brokerState.pushCode("some code")
-
- val message = "some message"
- brokerState.reset(message, markAllAsFailure = false)
-
- future.value.get should be (Success(message))
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerTransformerSpec.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerTransformerSpec.scala b/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerTransformerSpec.scala
deleted file mode 100644
index 266c753..0000000
--- a/kernel-api/src/test/scala/com/ibm/spark/interpreter/broker/BrokerTransformerSpec.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2015 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.spark.interpreter.broker
-
-import com.ibm.spark.interpreter.{ExecuteError, Results}
-import org.scalatest.concurrent.Eventually
-import org.scalatest.{OneInstancePerTest, Matchers, FunSpec}
-
-import scala.concurrent.promise
-
-class BrokerTransformerSpec extends FunSpec with Matchers
- with OneInstancePerTest with Eventually
-{
- private val brokerTransformer = new BrokerTransformer
-
- describe("BrokerTransformer") {
- describe("#transformToInterpreterResult") {
- it("should convert to success with result output if no failure") {
- val codeResultPromise = promise[BrokerTypes.CodeResults]()
-
- val transformedFuture = brokerTransformer.transformToInterpreterResult(
- codeResultPromise.future
- )
-
- val successOutput = "some success"
- codeResultPromise.success(successOutput)
-
- eventually {
- val result = transformedFuture.value.get.get
- result should be((Results.Success, Left(successOutput)))
- }
- }
-
- it("should convert to error with broker exception if failure") {
- val codeResultPromise = promise[BrokerTypes.CodeResults]()
-
- val transformedFuture = brokerTransformer.transformToInterpreterResult(
- codeResultPromise.future
- )
-
- val failureException = new BrokerException("some failure")
- codeResultPromise.failure(failureException)
-
- eventually {
- val result = transformedFuture.value.get.get
- result should be((Results.Error, Right(ExecuteError(
- name = failureException.getClass.getName,
- value = failureException.getLocalizedMessage,
- stackTrace = failureException.getStackTrace.map(_.toString).toList
- ))))
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/test/scala/com/ibm/spark/magic/InternalClassLoaderSpec.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/test/scala/com/ibm/spark/magic/InternalClassLoaderSpec.scala b/kernel-api/src/test/scala/com/ibm/spark/magic/InternalClassLoaderSpec.scala
deleted file mode 100644
index c568c6d..0000000
--- a/kernel-api/src/test/scala/com/ibm/spark/magic/InternalClassLoaderSpec.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.magic
-
-import org.scalatest.{Matchers, FunSpec}
-import org.scalatest.mock.MockitoSugar
-
-class InternalClassLoaderSpec extends FunSpec with Matchers with MockitoSugar {
-
- abstract class MockClassLoader extends ClassLoader(null) {
- override def loadClass(name: String): Class[_] = null
- }
-
- describe("InternalClassLoader") {
- describe("#loadClass") {
- it("should invoke super loadClass with loader's package prepended") {
- val expected = classOf[Class[_]]
- val packageName = "com.ibm.spark.magic"
- val className = "SomeClass"
-
- var parentLoadClassCorrectlyInvoked = false
-
- val internalClassLoader = new InternalClassLoader(null) {
- override private[magic] def parentLoadClass(name: String, resolve: Boolean): Class[_] = {
- parentLoadClassCorrectlyInvoked =
- name == s"$packageName.$className" && resolve
- expected
- }
- }
-
- internalClassLoader.loadClass(className, true) should be (expected)
-
- parentLoadClassCorrectlyInvoked should be (true)
- }
-
- it("should use loader's package instead of provided package first") {
- val expected = classOf[Class[_]]
- val forcedPackageName = "com.ibm.spark.magic"
- val packageName = "some.other.package"
- val className = "SomeClass"
-
- var parentLoadClassCorrectlyInvoked = false
-
- val internalClassLoader = new InternalClassLoader(null) {
- override private[magic] def parentLoadClass(name: String, resolve: Boolean): Class[_] = {
- parentLoadClassCorrectlyInvoked =
- name == s"$forcedPackageName.$className" && resolve
- expected
- }
- }
-
- internalClassLoader.loadClass(s"$packageName.$className", true) should be (expected)
-
- parentLoadClassCorrectlyInvoked should be (true)
- }
-
- it("should invoke super loadClass with given package if internal missing") {
- val expected = classOf[Class[_]]
- val packageName = "some.other.package"
- val className = "SomeClass"
-
- var parentLoadClassCorrectlyInvoked = false
-
- var methodCalled = false
- val internalClassLoader = new InternalClassLoader(null) {
- override private[magic] def parentLoadClass(name: String, resolve: Boolean): Class[_] = {
- if (!methodCalled) {
- methodCalled = true
- throw new ClassNotFoundException()
- }
-
- parentLoadClassCorrectlyInvoked =
- name == s"$packageName.$className" && resolve
- expected
- }
- }
-
- internalClassLoader.loadClass(s"$packageName.$className", true) should
- be (expected)
-
- parentLoadClassCorrectlyInvoked should be (true)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/test/scala/com/ibm/spark/magic/MagicLoaderSpec.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/test/scala/com/ibm/spark/magic/MagicLoaderSpec.scala b/kernel-api/src/test/scala/com/ibm/spark/magic/MagicLoaderSpec.scala
deleted file mode 100644
index 0c2b894..0000000
--- a/kernel-api/src/test/scala/com/ibm/spark/magic/MagicLoaderSpec.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
-* Copyright 2014 IBM Corp.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package com.ibm.spark.magic
-
-import java.io.OutputStream
-
-import com.ibm.spark.dependencies.DependencyDownloader
-import com.ibm.spark.interpreter.Interpreter
-import com.ibm.spark.magic.dependencies._
-import org.apache.spark.SparkContext
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FunSpec, Matchers}
-
-
-/**
-* Used for verification of dependency injection. Calls toString on each
-* dependency to assert that they are provided.
-*/
-class LineMagicWithDependencies extends LineMagic
- with IncludeDependencyDownloader
- with IncludeSparkContext
- with IncludeInterpreter
- with IncludeOutputStream
-{
- override def execute(code: String): Unit = {
- sparkContext.cancelAllJobs()
- interpreter.classServerURI
- outputStream.close()
- dependencyDownloader.setPrintStream(null)
- }
-}
-
-class MockLineMagic extends LineMagic {
- override def execute(code: String): Unit = {}
-}
-
-class MockCellMagic extends CellMagic {
- override def execute(code: String): CellMagicOutput =
- CellMagicOutput()
-}
-
-class MagicLoaderSpec extends FunSpec with Matchers with MockitoSugar {
- describe("MagicLoader") {
- describe("#hasLineMagic") {
- it("should return false if a class with the magic name is not found") {
- val magicLoader = new MagicLoader() {
- override def findClass(name: String): Class[_] =
- throw new ClassNotFoundException()
- }
-
- magicLoader.hasLineMagic("potato") should be (false)
- }
-
- it("should return true if a class with the magic name is found") {
- val magicLoader = new MagicLoader() {
- override def findClass(name: String): Class[_] =
- classOf[MockLineMagic]
- }
-
- magicLoader.hasLineMagic("potato") should be (true)
- }
-
- it("should return true if a class with the magic name is found regardless of case"){
- // Only loads a class named "Potato"
- val classLoader = new ClassLoader() {
- override def findClass(name: String) =
- if (name == "Potato") classOf[MockLineMagic]
- else throw new ClassNotFoundException
- }
-
- // Case insensitive matching should be performed on "Potato"
- val magicLoader = new MagicLoader(parentLoader = classLoader) {
- override def magicClassNames = List("Potato")
- }
-
- magicLoader.hasLineMagic("Potato") should be (true)
- magicLoader.hasLineMagic("potato") should be (true)
- magicLoader.hasLineMagic("pOTatO") should be (true)
- }
- }
-
- describe("#hasCellMagic") {
- it("should return false if a class with the magic name is not found") {
- val magicLoader = new MagicLoader() {
- override def findClass(name: String): Class[_] =
- throw new ClassNotFoundException()
- }
-
- magicLoader.hasCellMagic("potato") should be (false)
- }
-
- it("should return true if a class with the magic name is found") {
- val magicLoader = new MagicLoader() {
- override def findClass(name: String): Class[_] =
- classOf[MockCellMagic]
- }
-
- magicLoader.hasCellMagic("potato") should be (true)
- }
-
- it("should return true if a class with the magic name is found regardless of case"){
- // Only loads a class named "Potato"
- val classLoader = new ClassLoader() {
- override def findClass(name: String) =
- if (name == "Potato") classOf[MockCellMagic]
- else throw new ClassNotFoundException
- }
-
- // Case insensitive matching should be performed on "Potato"
- val magicLoader = new MagicLoader(parentLoader = classLoader) {
- override def magicClassNames = List("Potato")
- }
-
- magicLoader.hasCellMagic("Potato") should be (true)
- magicLoader.hasCellMagic("potato") should be (true)
- magicLoader.hasCellMagic("pOTatO") should be (true)
- }
- }
-
- describe("#magicClassName"){
- it("should return the correctly-cased version of the requested magic name") {
- val magicLoader = new MagicLoader() {
- override def magicClassNames = List("Potato")
- }
-
- magicLoader.magicClassName("Potato") should be ("Potato")
- magicLoader.magicClassName("potato") should be ("Potato")
- magicLoader.magicClassName("pOTatO") should be ("Potato")
- }
-
- it("should return the query if a corresponding magic class does not exist") {
- val magicLoader = new MagicLoader() {
- override def magicClassNames = List()
- }
-
- magicLoader.magicClassName("dne") should be ("dne")
- magicLoader.magicClassName("dNE") should be ("dNE")
- }
- }
-
- describe("#createMagicInstance") {
- it("should correctly insert dependencies into a class") {
- val mockInterpreter = mock[Interpreter]
- val mockSparkContext = mock[SparkContext]
- val mockOutputStream = mock[OutputStream]
- val mockDependencyDownloader = mock[DependencyDownloader]
-
- val dependencyMap = new DependencyMap()
- .setInterpreter(mockInterpreter)
- .setSparkContext(mockSparkContext)
- .setOutputStream(mockOutputStream)
- .setDependencyDownloader(mockDependencyDownloader)
-
- val magicLoader = new MagicLoader(
- dependencyMap = dependencyMap,
- parentLoader = new InternalClassLoader(getClass.getClassLoader)
- )
-
- val magicName = "LineMagicWithDependencies"
- val instance = magicLoader.createMagicInstance(magicName)
- .asInstanceOf[LineMagicWithDependencies]
- instance.interpreter should be(mockInterpreter)
- instance.outputStream should be(mockOutputStream)
- instance.sparkContext should be(mockSparkContext)
- instance.dependencyDownloader should be(mockDependencyDownloader)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/test/scala/com/ibm/spark/utils/ArgumentParsingSupportSpec.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/test/scala/com/ibm/spark/utils/ArgumentParsingSupportSpec.scala b/kernel-api/src/test/scala/com/ibm/spark/utils/ArgumentParsingSupportSpec.scala
deleted file mode 100644
index 144e90f..0000000
--- a/kernel-api/src/test/scala/com/ibm/spark/utils/ArgumentParsingSupportSpec.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.utils
-
-import org.scalatest.{BeforeAndAfter, Matchers, FunSpec}
-import joptsimple.{OptionSet, OptionSpec, OptionParser}
-import org.scalatest.mock.MockitoSugar
-
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-
-import collection.JavaConverters._
-
-class ArgumentParsingSupportSpec extends FunSpec with Matchers
- with BeforeAndAfter with MockitoSugar
-{
- private var mockOptions: OptionSet = _
- private var mockParser: OptionParser = _
- private var argumentParsingInstance: ArgumentParsingSupport = _
-
- before {
- mockOptions = mock[OptionSet]
- mockParser = mock[OptionParser]
- doReturn(mockOptions).when(mockParser).parse(anyVararg[String]())
-
- argumentParsingInstance = new Object() with ArgumentParsingSupport {
- override protected lazy val parser: OptionParser = mockParser
- }
- }
-
- describe("ArgumentParsingSupport") {
- describe("#parseArgs") {
- it("should invoke the underlying parser's parse method") {
- doReturn(Nil.asJava).when(mockOptions).nonOptionArguments()
- argumentParsingInstance.parseArgs("")
-
- verify(mockParser).parse(anyString())
- }
-
- it("should return an empty list if there are no non-option arguments") {
- val expected = Nil
- doReturn(expected.asJava).when(mockOptions).nonOptionArguments()
- val actual = argumentParsingInstance.parseArgs((
- "--transitive" :: expected
- ).mkString(" "))
-
- actual should be (expected)
- }
-
- it("should return a list containing non-option arguments") {
- val expected = "non-option" :: Nil
- doReturn(expected.asJava).when(mockOptions).nonOptionArguments()
- val actual = argumentParsingInstance.parseArgs((
- "--transitive" :: expected
- ).mkString(" "))
-
- actual should be (expected)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/test/scala/com/ibm/spark/utils/ConditionalOutputStreamSpec.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/test/scala/com/ibm/spark/utils/ConditionalOutputStreamSpec.scala b/kernel-api/src/test/scala/com/ibm/spark/utils/ConditionalOutputStreamSpec.scala
deleted file mode 100644
index d5c4e4c..0000000
--- a/kernel-api/src/test/scala/com/ibm/spark/utils/ConditionalOutputStreamSpec.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.utils
-
-import java.io.OutputStream
-
-import org.scalatest.mock.MockitoSugar
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-import org.scalatest.{Matchers, FunSpec}
-
-class ConditionalOutputStreamSpec extends FunSpec with Matchers with MockitoSugar {
- describe("ConditionalOutputStream") {
- describe("#()") {
- it("should throw an exception if the output stream is null") {
- intercept[IllegalArgumentException] {
- new ConditionalOutputStream(null, true)
- }
- }
- }
-
- describe("#write") {
- it("should call the underlying write if the condition is true") {
- val mockOutputStream = mock[OutputStream]
- val conditionalOutputStream =
- new ConditionalOutputStream(mockOutputStream, true)
-
- val expected = 101
- conditionalOutputStream.write(expected)
-
- verify(mockOutputStream).write(expected)
- }
-
- it("should call the underlying write if the condition becomes true") {
- val mockOutputStream = mock[OutputStream]
- var condition = false
-
- val conditionalOutputStream =
- new ConditionalOutputStream(mockOutputStream, condition)
-
- condition = true
-
- val expected = 101
- conditionalOutputStream.write(expected)
-
- verify(mockOutputStream).write(expected)
- }
-
- it("should not call the underlying write if the condition is false") {
- val mockOutputStream = mock[OutputStream]
- val conditionalOutputStream =
- new ConditionalOutputStream(mockOutputStream, false)
-
- val expected = 101
- conditionalOutputStream.write(expected)
-
- verify(mockOutputStream, never()).write(any[Byte])
- }
-
- it("should not call the underlying write if the condition becomes false") {
- val mockOutputStream = mock[OutputStream]
- var condition = true
-
- val conditionalOutputStream =
- new ConditionalOutputStream(mockOutputStream, condition)
-
- condition = false
-
- val expected = 101
- conditionalOutputStream.write(expected)
-
- verify(mockOutputStream, never()).write(any[Byte])
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/test/scala/com/ibm/spark/utils/DownloadSupportSpec.scala
----------------------------------------------------------------------
diff --git a/kernel-api/src/test/scala/com/ibm/spark/utils/DownloadSupportSpec.scala b/kernel-api/src/test/scala/com/ibm/spark/utils/DownloadSupportSpec.scala
deleted file mode 100644
index 4b0cc3d..0000000
--- a/kernel-api/src/test/scala/com/ibm/spark/utils/DownloadSupportSpec.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Copyright 2014 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ibm.spark.utils
-
-import java.io.FileNotFoundException
-import java.net.URL
-
-import org.scalatest.{BeforeAndAfter, Matchers, FunSpec}
-import scala.io.Source
-import scala.tools.nsc.io.File
-
-class DownloadSupportSpec extends FunSpec with Matchers with BeforeAndAfter {
- val downloadDestinationUrl = new URL("file:///tmp/testfile2.ext")
-
- val testFileContent = "This is a test"
- val testFileName = "/tmp/testfile.txt"
-
- // Create a test file for downloading
- before {
- File(testFileName).writeAll(testFileContent)
- }
-
- // Cleanup what we made
- after {
- File(testFileName).deleteIfExists()
- File(downloadDestinationUrl.getPath).deleteIfExists()
- }
-
- describe("DownloadSupport"){
- describe("#downloadFile( String, String )"){
- it("should download a file to the download directory"){
- val testFileUrl = "file:///tmp/testfile.txt"
-
- // Create our utility and download the file
- val downloader = new Object with DownloadSupport
- downloader.downloadFile(
- testFileUrl,
- downloadDestinationUrl.getProtocol + "://" +
- downloadDestinationUrl.getPath)
-
- // Verify the file contents are what was in the original file
- val downloadedFileContent: String =
- Source.fromFile(downloadDestinationUrl.getPath).mkString
-
- downloadedFileContent should be (testFileContent)
- }
-
- }
-
- describe("#downloadFile( URL, URL )"){
- it("should download a file to the download directory"){
- val testFileUrl = new URL("file:///tmp/testfile.txt")
-
- val downloader = new Object with DownloadSupport
- downloader.downloadFile(testFileUrl, downloadDestinationUrl)
-
- // Verify the file contents are what was in the original file
- val downloadedFileContent: String =
- Source.fromFile(downloadDestinationUrl.getPath).mkString
-
- downloadedFileContent should be (testFileContent)
- }
-
- it("should throw FileNotFoundException if the download URL is bad"){
- val badFilename = "file:///tmp/testbadfile.txt"
- File(badFilename).deleteIfExists()
- val badFileUrl = new URL(badFilename)
-
- val downloader = new Object with DownloadSupport
- intercept[FileNotFoundException] {
- downloader.downloadFile(badFileUrl, downloadDestinationUrl)
- }
- }
-
- it("should throw FileNotFoundException if the download ") {
- val testFileUrl = new URL("file:///tmp/testfile.txt")
- val badDestinationUrl =
- new URL("file:///tmp/badloc/that/doesnt/exist.txt")
-
- val downloader = new Object with DownloadSupport
- intercept[FileNotFoundException] {
- downloader.downloadFile(testFileUrl, badDestinationUrl)
- }
- }
- }
- }
-
-}