You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:45 UTC

[61/69] [abbrv] Move some classes to more appropriate packages:

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 2fe85bc..a9969ab 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -25,8 +25,8 @@ import scala.xml.Node
 
 import org.apache.spark.ui.UIUtils._
 import org.apache.spark.ui.Page._
-import org.apache.spark.util.Distribution
-import org.apache.spark.{ExceptionFailure, Utils}
+import org.apache.spark.util.{Utils, Distribution}
+import org.apache.spark.{ExceptionFailure}
 import org.apache.spark.scheduler.cluster.TaskInfo
 import org.apache.spark.executor.TaskMetrics
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index beb0574..71e58a9 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -5,9 +5,9 @@ import java.util.Date
 import scala.xml.Node
 import scala.collection.mutable.HashSet
 
-import org.apache.spark.Utils
 import org.apache.spark.scheduler.cluster.{SchedulingMode, TaskInfo}
 import org.apache.spark.scheduler.Stage
+import org.apache.spark.util.Utils
 
 
 /** Page showing list of all ongoing and recently finished stages */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
index 1eb4a7a..c3ec907 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
@@ -22,9 +22,9 @@ import javax.servlet.http.HttpServletRequest
 import scala.xml.Node
 
 import org.apache.spark.storage.{RDDInfo, StorageUtils}
-import org.apache.spark.Utils
 import org.apache.spark.ui.UIUtils._
 import org.apache.spark.ui.Page._
+import org.apache.spark.util.Utils
 
 /** Page showing list of RDD's currently stored in the cluster */
 private[spark] class IndexPage(parent: BlockManagerUI) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 37baf17..43c1257 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -21,11 +21,11 @@ import javax.servlet.http.HttpServletRequest
 
 import scala.xml.Node
 
-import org.apache.spark.Utils
 import org.apache.spark.storage.{StorageStatus, StorageUtils}
 import org.apache.spark.storage.BlockManagerMasterActor.BlockStatus
 import org.apache.spark.ui.UIUtils._
 import org.apache.spark.ui.Page._
+import org.apache.spark.util.Utils
 
 
 /** Page showing storage details for a given RDD */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
new file mode 100644
index 0000000..7108595
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.lang.reflect.Field
+
+import scala.collection.mutable.Map
+import scala.collection.mutable.Set
+
+import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
+import org.objectweb.asm.Opcodes._
+import java.io.{InputStream, IOException, ByteArrayOutputStream, ByteArrayInputStream, BufferedInputStream}
+import org.apache.spark.Logging
+
+private[spark] object ClosureCleaner extends Logging {
+  // Get an ASM class reader for a given class from the JAR that loaded it
+  private def getClassReader(cls: Class[_]): ClassReader = {
+    // Copy data over, before delegating to ClassReader - else we can run out of open file handles.
+    val className = cls.getName.replaceFirst("^.*\\.", "") + ".class"
+    val resourceStream = cls.getResourceAsStream(className)
+    // todo: Fixme - continuing with earlier behavior ...
+    if (resourceStream == null) return new ClassReader(resourceStream)
+
+    val baos = new ByteArrayOutputStream(128)
+    Utils.copyStream(resourceStream, baos, true)
+    new ClassReader(new ByteArrayInputStream(baos.toByteArray))
+  }
+
+  // Check whether a class represents a Scala closure
+  private def isClosure(cls: Class[_]): Boolean = {
+    cls.getName.contains("$anonfun$")
+  }
+  
+  // Get a list of the classes of the outer objects of a given closure object, obj;
+  // the outer objects are defined as any closures that obj is nested within, plus
+  // possibly the class that the outermost closure is in, if any. We stop searching
+  // for outer objects beyond that because cloning the user's object is probably
+  // not a good idea (whereas we can clone closure objects just fine since we
+  // understand how all their fields are used).
+  private def getOuterClasses(obj: AnyRef): List[Class[_]] = {
+    for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") {
+      f.setAccessible(true)
+      if (isClosure(f.getType)) {
+        return f.getType :: getOuterClasses(f.get(obj))
+      } else {
+        return f.getType :: Nil // Stop at the first $outer that is not a closure
+      }
+    }
+    return Nil
+  }
+  
+  // Get a list of the outer objects for a given closure object.
+  private def getOuterObjects(obj: AnyRef): List[AnyRef] = {
+    for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") {
+      f.setAccessible(true)
+      if (isClosure(f.getType)) {
+        return f.get(obj) :: getOuterObjects(f.get(obj))
+      } else {
+        return f.get(obj) :: Nil // Stop at the first $outer that is not a closure
+      }
+    }
+    return Nil
+  }
+  
+  private def getInnerClasses(obj: AnyRef): List[Class[_]] = {
+    val seen = Set[Class[_]](obj.getClass)
+    var stack = List[Class[_]](obj.getClass)
+    while (!stack.isEmpty) {
+      val cr = getClassReader(stack.head)
+      stack = stack.tail
+      val set = Set[Class[_]]()
+      cr.accept(new InnerClosureFinder(set), 0)
+      for (cls <- set -- seen) {
+        seen += cls
+        stack = cls :: stack
+      }
+    }
+    return (seen - obj.getClass).toList
+  }
+  
+  private def createNullValue(cls: Class[_]): AnyRef = {
+    if (cls.isPrimitive) {
+      new java.lang.Byte(0: Byte) // Should be convertible to any primitive type
+    } else {
+      null
+    }
+  }
+  
+  def clean(func: AnyRef) {
+    // TODO: cache outerClasses / innerClasses / accessedFields
+    val outerClasses = getOuterClasses(func)
+    val innerClasses = getInnerClasses(func)
+    val outerObjects = getOuterObjects(func)
+    
+    val accessedFields = Map[Class[_], Set[String]]()
+    for (cls <- outerClasses)
+      accessedFields(cls) = Set[String]()
+    for (cls <- func.getClass :: innerClasses)
+      getClassReader(cls).accept(new FieldAccessFinder(accessedFields), 0)
+    //logInfo("accessedFields: " + accessedFields)
+
+    val inInterpreter = {
+      try {
+        val interpClass = Class.forName("spark.repl.Main")
+        interpClass.getMethod("interp").invoke(null) != null
+      } catch {
+        case _: ClassNotFoundException => true
+      }
+    }
+
+    var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse
+    var outer: AnyRef = null
+    if (outerPairs.size > 0 && !isClosure(outerPairs.head._1)) {
+      // The closure is ultimately nested inside a class; keep the object of that
+      // class without cloning it since we don't want to clone the user's objects.
+      outer = outerPairs.head._2
+      outerPairs = outerPairs.tail
+    }
+    // Clone the closure objects themselves, nulling out any fields that are not
+    // used in the closure we're working on or any of its inner closures.
+    for ((cls, obj) <- outerPairs) {
+      outer = instantiateClass(cls, outer, inInterpreter)
+      for (fieldName <- accessedFields(cls)) {
+        val field = cls.getDeclaredField(fieldName)
+        field.setAccessible(true)
+        val value = field.get(obj)
+        //logInfo("1: Setting " + fieldName + " on " + cls + " to " + value);
+        field.set(outer, value)
+      }
+    }
+    
+    if (outer != null) {
+      //logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
+      val field = func.getClass.getDeclaredField("$outer")
+      field.setAccessible(true)
+      field.set(func, outer)
+    }
+  }
+  
+  private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = {
+    //logInfo("Creating a " + cls + " with outer = " + outer)
+    if (!inInterpreter) {
+      // This is a bona fide closure class, whose constructor has no effects
+      // other than to set its fields, so use its constructor
+      val cons = cls.getConstructors()(0)
+      val params = cons.getParameterTypes.map(createNullValue).toArray
+      if (outer != null)
+        params(0) = outer // First param is always outer object
+      return cons.newInstance(params: _*).asInstanceOf[AnyRef]
+    } else {
+      // Use reflection to instantiate object without calling constructor
+      val rf = sun.reflect.ReflectionFactory.getReflectionFactory()
+      val parentCtor = classOf[java.lang.Object].getDeclaredConstructor()
+      val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
+      val obj = newCtor.newInstance().asInstanceOf[AnyRef]
+      if (outer != null) {
+        //logInfo("3: Setting $outer on " + cls + " to " + outer);
+        val field = cls.getDeclaredField("$outer")
+        field.setAccessible(true)
+        field.set(obj, outer)
+      }
+      return obj
+    }
+  }
+}
+
+private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) {
+  override def visitMethod(access: Int, name: String, desc: String,
+      sig: String, exceptions: Array[String]): MethodVisitor = {
+    return new MethodVisitor(ASM4) {
+      override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) {
+        if (op == GETFIELD) {
+          for (cl <- output.keys if cl.getName == owner.replace('/', '.')) {
+            output(cl) += name
+          }
+        }
+      }
+      
+      override def visitMethodInsn(op: Int, owner: String, name: String,
+          desc: String) {
+        // Check for calls a getter method for a variable in an interpreter wrapper object.
+        // This means that the corresponding field will be accessed, so we should save it.
+        if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) {
+          for (cl <- output.keys if cl.getName == owner.replace('/', '.')) {
+            output(cl) += name
+          }
+        }
+      }
+    }
+  }
+}
+
+private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM4) {
+  var myName: String = null
+  
+  override def visit(version: Int, access: Int, name: String, sig: String,
+      superName: String, interfaces: Array[String]) {
+    myName = name
+  }
+  
+  override def visitMethod(access: Int, name: String, desc: String,
+      sig: String, exceptions: Array[String]): MethodVisitor = {
+    return new MethodVisitor(ASM4) {
+      override def visitMethodInsn(op: Int, owner: String, name: String,
+          desc: String) {
+        val argTypes = Type.getArgumentTypes(desc)
+        if (op == INVOKESPECIAL && name == "<init>" && argTypes.length > 0
+            && argTypes(0).toString.startsWith("L") // is it an object?
+            && argTypes(0).getInternalName == myName)
+          output += Class.forName(
+              owner.replace('/', '.'),
+              false,
+              Thread.currentThread.getContextClassLoader)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/util/MemoryParam.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/MemoryParam.scala b/core/src/main/scala/org/apache/spark/util/MemoryParam.scala
index 0ee6707..4869c98 100644
--- a/core/src/main/scala/org/apache/spark/util/MemoryParam.scala
+++ b/core/src/main/scala/org/apache/spark/util/MemoryParam.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.util
 
-import org.apache.spark.Utils
-
 /**
  * An extractor object for parsing JVM memory strings, such as "10g", into an Int representing
  * the number of megabytes. Supports the same formats as Utils.memoryStringToMb.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
new file mode 100644
index 0000000..a25b37a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.lang.reflect.Field
+import java.lang.reflect.Modifier
+import java.lang.reflect.{Array => JArray}
+import java.util.IdentityHashMap
+import java.util.concurrent.ConcurrentHashMap
+import java.util.Random
+
+import javax.management.MBeanServer
+import java.lang.management.ManagementFactory
+
+import scala.collection.mutable.ArrayBuffer
+
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet
+import org.apache.spark.Logging
+
+/**
+ * Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in 
+ * memory-aware caches.
+ *
+ * Based on the following JavaWorld article:
+ * http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html
+ */
+private[spark] object SizeEstimator extends Logging {
+
+  // Sizes of primitive types
+  private val BYTE_SIZE    = 1
+  private val BOOLEAN_SIZE = 1
+  private val CHAR_SIZE    = 2
+  private val SHORT_SIZE   = 2
+  private val INT_SIZE     = 4
+  private val LONG_SIZE    = 8
+  private val FLOAT_SIZE   = 4
+  private val DOUBLE_SIZE  = 8
+
+  // Alignment boundary for objects
+  // TODO: Is this arch dependent ?
+  private val ALIGN_SIZE = 8
+
+  // A cache of ClassInfo objects for each class
+  private val classInfos = new ConcurrentHashMap[Class[_], ClassInfo]
+
+  // Object and pointer sizes are arch dependent
+  private var is64bit = false
+
+  // Size of an object reference
+  // Based on https://wikis.oracle.com/display/HotSpotInternals/CompressedOops
+  private var isCompressedOops = false
+  private var pointerSize = 4
+
+  // Minimum size of a java.lang.Object
+  private var objectSize = 8
+
+  initialize()
+
+  // Sets object size, pointer size based on architecture and CompressedOops settings
+  // from the JVM.
+  private def initialize() {
+    is64bit = System.getProperty("os.arch").contains("64")
+    isCompressedOops = getIsCompressedOops
+
+    objectSize = if (!is64bit) 8 else {
+      if(!isCompressedOops) {
+        16
+      } else {
+        12
+      }
+    }
+    pointerSize = if (is64bit && !isCompressedOops) 8 else 4
+    classInfos.clear()
+    classInfos.put(classOf[Object], new ClassInfo(objectSize, Nil))
+  }
+
+  private def getIsCompressedOops : Boolean = {
+    if (System.getProperty("spark.test.useCompressedOops") != null) {
+      return System.getProperty("spark.test.useCompressedOops").toBoolean 
+    }
+
+    try {
+      val hotSpotMBeanName = "com.sun.management:type=HotSpotDiagnostic"
+      val server = ManagementFactory.getPlatformMBeanServer()
+
+      // NOTE: This should throw an exception in non-Sun JVMs
+      val hotSpotMBeanClass = Class.forName("com.sun.management.HotSpotDiagnosticMXBean")
+      val getVMMethod = hotSpotMBeanClass.getDeclaredMethod("getVMOption",
+          Class.forName("java.lang.String"))
+
+      val bean = ManagementFactory.newPlatformMXBeanProxy(server, 
+        hotSpotMBeanName, hotSpotMBeanClass)
+      // TODO: We could use reflection on the VMOption returned ?
+      return getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true")
+    } catch {
+      case e: Exception => {
+        // Guess whether they've enabled UseCompressedOops based on whether maxMemory < 32 GB
+        val guess = Runtime.getRuntime.maxMemory < (32L*1024*1024*1024)
+        val guessInWords = if (guess) "yes" else "not"
+        logWarning("Failed to check whether UseCompressedOops is set; assuming " + guessInWords)
+        return guess
+      }
+    }
+  }
+
+  /**
+   * The state of an ongoing size estimation. Contains a stack of objects to visit as well as an
+   * IdentityHashMap of visited objects, and provides utility methods for enqueueing new objects
+   * to visit.
+   */
+  private class SearchState(val visited: IdentityHashMap[AnyRef, AnyRef]) {
+    val stack = new ArrayBuffer[AnyRef]
+    var size = 0L
+
+    def enqueue(obj: AnyRef) {
+      if (obj != null && !visited.containsKey(obj)) {
+        visited.put(obj, null)
+        stack += obj
+      }
+    }
+
+    def isFinished(): Boolean = stack.isEmpty
+
+    def dequeue(): AnyRef = {
+      val elem = stack.last
+      stack.trimEnd(1)
+      return elem
+    }
+  }
+
+  /**
+   * Cached information about each class. We remember two things: the "shell size" of the class
+   * (size of all non-static fields plus the java.lang.Object size), and any fields that are
+   * pointers to objects.
+   */
+  private class ClassInfo(
+    val shellSize: Long,
+    val pointerFields: List[Field]) {}
+
+  def estimate(obj: AnyRef): Long = estimate(obj, new IdentityHashMap[AnyRef, AnyRef])
+
+  private def estimate(obj: AnyRef, visited: IdentityHashMap[AnyRef, AnyRef]): Long = {
+    val state = new SearchState(visited)
+    state.enqueue(obj)
+    while (!state.isFinished) {
+      visitSingleObject(state.dequeue(), state)
+    }
+    return state.size
+  }
+
+  private def visitSingleObject(obj: AnyRef, state: SearchState) {
+    val cls = obj.getClass
+    if (cls.isArray) {
+      visitArray(obj, cls, state)
+    } else if (obj.isInstanceOf[ClassLoader] || obj.isInstanceOf[Class[_]]) {
+      // Hadoop JobConfs created in the interpreter have a ClassLoader, which greatly confuses
+      // the size estimator since it references the whole REPL. Do nothing in this case. In
+      // general all ClassLoaders and Classes will be shared between objects anyway.
+    } else {
+      val classInfo = getClassInfo(cls)
+      state.size += classInfo.shellSize
+      for (field <- classInfo.pointerFields) {
+        state.enqueue(field.get(obj))
+      }
+    }
+  }
+
+  // Estimat the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
+  private val ARRAY_SIZE_FOR_SAMPLING = 200
+  private val ARRAY_SAMPLE_SIZE = 100 // should be lower than ARRAY_SIZE_FOR_SAMPLING
+
+  private def visitArray(array: AnyRef, cls: Class[_], state: SearchState) {
+    val length = JArray.getLength(array)
+    val elementClass = cls.getComponentType
+
+    // Arrays have object header and length field which is an integer
+    var arrSize: Long = alignSize(objectSize + INT_SIZE)
+
+    if (elementClass.isPrimitive) {
+      arrSize += alignSize(length * primitiveSize(elementClass))
+      state.size += arrSize
+    } else {
+      arrSize += alignSize(length * pointerSize)
+      state.size += arrSize
+
+      if (length <= ARRAY_SIZE_FOR_SAMPLING) {
+        for (i <- 0 until length) {
+          state.enqueue(JArray.get(array, i))
+        }
+      } else {
+        // Estimate the size of a large array by sampling elements without replacement.
+        var size = 0.0
+        val rand = new Random(42)
+        val drawn = new IntOpenHashSet(ARRAY_SAMPLE_SIZE)
+        for (i <- 0 until ARRAY_SAMPLE_SIZE) {
+          var index = 0
+          do {
+            index = rand.nextInt(length)
+          } while (drawn.contains(index))
+          drawn.add(index)
+          val elem = JArray.get(array, index)
+          size += SizeEstimator.estimate(elem, state.visited)
+        }
+        state.size += ((length / (ARRAY_SAMPLE_SIZE * 1.0)) * size).toLong
+      }
+    }
+  }
+
+  private def primitiveSize(cls: Class[_]): Long = {
+    if (cls == classOf[Byte])
+      BYTE_SIZE
+    else if (cls == classOf[Boolean])
+      BOOLEAN_SIZE
+    else if (cls == classOf[Char])
+      CHAR_SIZE
+    else if (cls == classOf[Short])
+      SHORT_SIZE
+    else if (cls == classOf[Int])
+      INT_SIZE
+    else if (cls == classOf[Long])
+      LONG_SIZE
+    else if (cls == classOf[Float])
+      FLOAT_SIZE
+    else if (cls == classOf[Double])
+      DOUBLE_SIZE
+    else throw new IllegalArgumentException(
+      "Non-primitive class " + cls + " passed to primitiveSize()")
+  }
+
+  /**
+   * Get or compute the ClassInfo for a given class.
+   */
+  private def getClassInfo(cls: Class[_]): ClassInfo = {
+    // Check whether we've already cached a ClassInfo for this class
+    val info = classInfos.get(cls)
+    if (info != null) {
+      return info
+    }
+    
+    val parent = getClassInfo(cls.getSuperclass)
+    var shellSize = parent.shellSize
+    var pointerFields = parent.pointerFields
+
+    for (field <- cls.getDeclaredFields) {
+      if (!Modifier.isStatic(field.getModifiers)) {
+        val fieldClass = field.getType
+        if (fieldClass.isPrimitive) {
+          shellSize += primitiveSize(fieldClass)
+        } else {
+          field.setAccessible(true) // Enable future get()'s on this field
+          shellSize += pointerSize
+          pointerFields = field :: pointerFields
+        }
+      }
+    }
+
+    shellSize = alignSize(shellSize)
+
+    // Create and cache a new ClassInfo
+    val newInfo = new ClassInfo(shellSize, pointerFields)
+    classInfos.put(cls, newInfo)
+    return newInfo
+  }
+
+  private def alignSize(size: Long): Long = {
+    val rem = size % ALIGN_SIZE
+    return if (rem == 0) size else (size + ALIGN_SIZE - rem)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
new file mode 100644
index 0000000..bb47fc0
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io._
+import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket}
+import java.util.{Locale, Random, UUID}
+import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
+import java.util.regex.Pattern
+
+import scala.collection.Map
+import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.JavaConversions._
+import scala.io.Source
+
+import com.google.common.io.Files
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+
+import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
+
+import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
+import org.apache.spark.deploy.SparkHadoopUtil
+import java.nio.ByteBuffer
+import org.apache.spark.{SparkEnv, SparkException, Logging}
+
+
+/**
+ * Various utility methods used by Spark.
+ */
+private[spark] object Utils extends Logging {
+
+  /** Serialize an object using Java serialization */
+  def serialize[T](o: T): Array[Byte] = {
+    val bos = new ByteArrayOutputStream()
+    val oos = new ObjectOutputStream(bos)
+    oos.writeObject(o)
+    oos.close()
+    return bos.toByteArray
+  }
+
+  /** Deserialize an object using Java serialization */
+  def deserialize[T](bytes: Array[Byte]): T = {
+    val bis = new ByteArrayInputStream(bytes)
+    val ois = new ObjectInputStream(bis)
+    return ois.readObject.asInstanceOf[T]
+  }
+
+  /** Deserialize an object using Java serialization and the given ClassLoader */
+  def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
+    val bis = new ByteArrayInputStream(bytes)
+    val ois = new ObjectInputStream(bis) {
+      override def resolveClass(desc: ObjectStreamClass) =
+        Class.forName(desc.getName, false, loader)
+    }
+    return ois.readObject.asInstanceOf[T]
+  }
+
+  /** Serialize via nested stream using specific serializer */
+  def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(f: SerializationStream => Unit) = {
+    val osWrapper = ser.serializeStream(new OutputStream {
+      def write(b: Int) = os.write(b)
+
+      override def write(b: Array[Byte], off: Int, len: Int) = os.write(b, off, len)
+    })
+    try {
+      f(osWrapper)
+    } finally {
+      osWrapper.close()
+    }
+  }
+
+  /** Deserialize via nested stream using specific serializer */
+  def deserializeViaNestedStream(is: InputStream, ser: SerializerInstance)(f: DeserializationStream => Unit) = {
+    val isWrapper = ser.deserializeStream(new InputStream {
+      def read(): Int = is.read()
+
+      override def read(b: Array[Byte], off: Int, len: Int): Int = is.read(b, off, len)
+    })
+    try {
+      f(isWrapper)
+    } finally {
+      isWrapper.close()
+    }
+  }
+
+  /**
+   * Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}.
+   */
+  def writeByteBuffer(bb: ByteBuffer, out: ObjectOutput) = {
+    if (bb.hasArray) {
+      out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
+    } else {
+      val bbval = new Array[Byte](bb.remaining())
+      bb.get(bbval)
+      out.write(bbval)
+    }
+  }
+
+  def isAlpha(c: Char): Boolean = {
+    (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
+  }
+
+  /** Split a string into words at non-alphabetic characters */
+  def splitWords(s: String): Seq[String] = {
+    val buf = new ArrayBuffer[String]
+    var i = 0
+    while (i < s.length) {
+      var j = i
+      while (j < s.length && isAlpha(s.charAt(j))) {
+        j += 1
+      }
+      if (j > i) {
+        buf += s.substring(i, j)
+      }
+      i = j
+      while (i < s.length && !isAlpha(s.charAt(i))) {
+        i += 1
+      }
+    }
+    return buf
+  }
+
+  private val shutdownDeletePaths = new collection.mutable.HashSet[String]()
+
+  // Register the path to be deleted via shutdown hook
+  def registerShutdownDeleteDir(file: File) {
+    val absolutePath = file.getAbsolutePath()
+    shutdownDeletePaths.synchronized {
+      shutdownDeletePaths += absolutePath
+    }
+  }
+
+  // Is the path already registered to be deleted via a shutdown hook ?
+  def hasShutdownDeleteDir(file: File): Boolean = {
+    val absolutePath = file.getAbsolutePath()
+    shutdownDeletePaths.synchronized {
+      shutdownDeletePaths.contains(absolutePath)
+    }
+  }
+
+  // Note: if file is child of some registered path, while not equal to it, then return true;
+  // else false. This is to ensure that two shutdown hooks do not try to delete each others
+  // paths - resulting in IOException and incomplete cleanup.
+  def hasRootAsShutdownDeleteDir(file: File): Boolean = {
+    val absolutePath = file.getAbsolutePath()
+    val retval = shutdownDeletePaths.synchronized {
+      shutdownDeletePaths.find { path =>
+        !absolutePath.equals(path) && absolutePath.startsWith(path)
+      }.isDefined
+    }
+    if (retval) {
+      logInfo("path = " + file + ", already present as root for deletion.")
+    }
+    retval
+  }
+
+  /** Create a temporary directory inside the given parent directory */
+  def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
+    var attempts = 0
+    val maxAttempts = 10
+    var dir: File = null
+    while (dir == null) {
+      attempts += 1
+      if (attempts > maxAttempts) {
+        throw new IOException("Failed to create a temp directory (under " + root + ") after " +
+          maxAttempts + " attempts!")
+      }
+      try {
+        dir = new File(root, "spark-" + UUID.randomUUID.toString)
+        if (dir.exists() || !dir.mkdirs()) {
+          dir = null
+        }
+      } catch { case e: IOException => ; }
+    }
+
+    registerShutdownDeleteDir(dir)
+
+    // Add a shutdown hook to delete the temp dir when the JVM exits
+    Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dir " + dir) {
+      override def run() {
+        // Attempt to delete if some patch which is parent of this is not already registered.
+        if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
+      }
+    })
+    dir
+  }
+
+  /** Copy all data from an InputStream to an OutputStream */
+  def copyStream(in: InputStream,
+                 out: OutputStream,
+                 closeStreams: Boolean = false)
+  {
+    val buf = new Array[Byte](8192)
+    var n = 0
+    while (n != -1) {
+      n = in.read(buf)
+      if (n != -1) {
+        out.write(buf, 0, n)
+      }
+    }
+    if (closeStreams) {
+      in.close()
+      out.close()
+    }
+  }
+
+  /**
+   * Download a file requested by the executor. Supports fetching the file in a variety of ways,
+   * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
+   *
+   * Throws SparkException if the target file already exists and has different contents than
+   * the requested file.
+   */
+  def fetchFile(url: String, targetDir: File) {
+    val filename = url.split("/").last
+    val tempDir = getLocalDir
+    val tempFile =  File.createTempFile("fetchFileTemp", null, new File(tempDir))
+    val targetFile = new File(targetDir, filename)
+    val uri = new URI(url)
+    uri.getScheme match {
+      case "http" | "https" | "ftp" =>
+        logInfo("Fetching " + url + " to " + tempFile)
+        val in = new URL(url).openStream()
+        val out = new FileOutputStream(tempFile)
+        Utils.copyStream(in, out, true)
+        if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
+          tempFile.delete()
+          throw new SparkException(
+            "File " + targetFile + " exists and does not match contents of" + " " + url)
+        } else {
+          Files.move(tempFile, targetFile)
+        }
+      case "file" | null =>
+        // In the case of a local file, copy the local file to the target directory.
+        // Note the difference between uri vs url.
+        val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
+        if (targetFile.exists) {
+          // If the target file already exists, warn the user if
+          if (!Files.equal(sourceFile, targetFile)) {
+            throw new SparkException(
+              "File " + targetFile + " exists and does not match contents of" + " " + url)
+          } else {
+            // Do nothing if the file contents are the same, i.e. this file has been copied
+            // previously.
+            logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
+              + targetFile.getAbsolutePath)
+          }
+        } else {
+          // The file does not exist in the target directory. Copy it there.
+          logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
+          Files.copy(sourceFile, targetFile)
+        }
+      case _ =>
+        // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
+        val env = SparkEnv.get
+        val uri = new URI(url)
+        val conf = env.hadoop.newConfiguration()
+        val fs = FileSystem.get(uri, conf)
+        val in = fs.open(new Path(uri))
+        val out = new FileOutputStream(tempFile)
+        Utils.copyStream(in, out, true)
+        if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
+          tempFile.delete()
+          throw new SparkException("File " + targetFile + " exists and does not match contents of" +
+            " " + url)
+        } else {
+          Files.move(tempFile, targetFile)
+        }
+    }
+    // Decompress the file if it's a .tar or .tar.gz
+    if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {
+      logInfo("Untarring " + filename)
+      Utils.execute(Seq("tar", "-xzf", filename), targetDir)
+    } else if (filename.endsWith(".tar")) {
+      logInfo("Untarring " + filename)
+      Utils.execute(Seq("tar", "-xf", filename), targetDir)
+    }
+    // Make the file executable - That's necessary for scripts
+    FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
+  }
+
+  /**
+   * Get a temporary directory using Spark's spark.local.dir property, if set. This will always
+   * return a single directory, even though the spark.local.dir property might be a list of
+   * multiple paths.
+   */
+  def getLocalDir: String = {
+    System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
+  }
+
+  /**
+   * Shuffle the elements of a collection into a random order, returning the
+   * result in a new collection. Unlike scala.util.Random.shuffle, this method
+   * uses a local random number generator, avoiding inter-thread contention.
+   */
+  def randomize[T: ClassManifest](seq: TraversableOnce[T]): Seq[T] = {
+    randomizeInPlace(seq.toArray)
+  }
+
+  /**
+   * Shuffle the elements of an array into a random order, modifying the
+   * original array. Returns the original array.
+   */
+  def randomizeInPlace[T](arr: Array[T], rand: Random = new Random): Array[T] = {
+    for (i <- (arr.length - 1) to 1 by -1) {
+      val j = rand.nextInt(i)
+      val tmp = arr(j)
+      arr(j) = arr(i)
+      arr(i) = tmp
+    }
+    arr
+  }
+
+  /**
+   * Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4).
+   * Note, this is typically not used from within core spark.
+   */
+  lazy val localIpAddress: String = findLocalIpAddress()
+  lazy val localIpAddressHostname: String = getAddressHostName(localIpAddress)
+
+  private def findLocalIpAddress(): String = {
+    val defaultIpOverride = System.getenv("SPARK_LOCAL_IP")
+    if (defaultIpOverride != null) {
+      defaultIpOverride
+    } else {
+      val address = InetAddress.getLocalHost
+      if (address.isLoopbackAddress) {
+        // Address resolves to something like 127.0.1.1, which happens on Debian; try to find
+        // a better address using the local network interfaces
+        for (ni <- NetworkInterface.getNetworkInterfaces) {
+          for (addr <- ni.getInetAddresses if !addr.isLinkLocalAddress &&
+               !addr.isLoopbackAddress && addr.isInstanceOf[Inet4Address]) {
+            // We've found an address that looks reasonable!
+            logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
+              " a loopback address: " + address.getHostAddress + "; using " + addr.getHostAddress +
+              " instead (on interface " + ni.getName + ")")
+            logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
+            return addr.getHostAddress
+          }
+        }
+        logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
+          " a loopback address: " + address.getHostAddress + ", but we couldn't find any" +
+          " external IP address!")
+        logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
+      }
+      address.getHostAddress
+    }
+  }
+
+  private var customHostname: Option[String] = None
+
+  /**
+   * Allow setting a custom host name because when we run on Mesos we need to use the same
+   * hostname it reports to the master.
+   */
+  def setCustomHostname(hostname: String) {
+    // DEBUG code
+    Utils.checkHost(hostname)
+    customHostname = Some(hostname)
+  }
+
+  /**
+   * Get the local machine's hostname.
+   */
+  def localHostName(): String = {
+    customHostname.getOrElse(localIpAddressHostname)
+  }
+
+  def getAddressHostName(address: String): String = {
+    InetAddress.getByName(address).getHostName
+  }
+
+  def localHostPort(): String = {
+    val retval = System.getProperty("spark.hostPort", null)
+    if (retval == null) {
+      logErrorWithStack("spark.hostPort not set but invoking localHostPort")
+      return localHostName()
+    }
+
+    retval
+  }
+
+  def checkHost(host: String, message: String = "") {
+    assert(host.indexOf(':') == -1, message)
+  }
+
+  def checkHostPort(hostPort: String, message: String = "") {
+    assert(hostPort.indexOf(':') != -1, message)
+  }
+
+  // Used by DEBUG code : remove when all testing done
+  def logErrorWithStack(msg: String) {
+    try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
+  }
+
+  // Typically, this will be of order of number of nodes in cluster
+  // If not, we should change it to LRUCache or something.
+  private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()
+
+  def parseHostPort(hostPort: String): (String,  Int) = {
+    {
+      // Check cache first.
+      var cached = hostPortParseResults.get(hostPort)
+      if (cached != null) return cached
+    }
+
+    val indx: Int = hostPort.lastIndexOf(':')
+    // This is potentially broken - when dealing with ipv6 addresses for example, sigh ...
+    // but then hadoop does not support ipv6 right now.
+    // For now, we assume that if port exists, then it is valid - not check if it is an int > 0
+    if (-1 == indx) {
+      val retval = (hostPort, 0)
+      hostPortParseResults.put(hostPort, retval)
+      return retval
+    }
+
+    val retval = (hostPort.substring(0, indx).trim(), hostPort.substring(indx + 1).trim().toInt)
+    hostPortParseResults.putIfAbsent(hostPort, retval)
+    hostPortParseResults.get(hostPort)
+  }
+
+  private[spark] val daemonThreadFactory: ThreadFactory =
+    new ThreadFactoryBuilder().setDaemon(true).build()
+
+  /**
+   * Wrapper over newCachedThreadPool.
+   */
+  def newDaemonCachedThreadPool(): ThreadPoolExecutor =
+    Executors.newCachedThreadPool(daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
+
+  /**
+   * Return the string to tell how long has passed in seconds. The passing parameter should be in
+   * millisecond.
+   */
+  def getUsedTimeMs(startTimeMs: Long): String = {
+    return " " + (System.currentTimeMillis - startTimeMs) + " ms"
+  }
+
+  /**
+   * Wrapper over newFixedThreadPool.
+   */
+  def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor =
+    Executors.newFixedThreadPool(nThreads, daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
+
+  /**
+   * Delete a file or directory and its contents recursively.
+   */
+  def deleteRecursively(file: File) {
+    if (file.isDirectory) {
+      for (child <- file.listFiles()) {
+        deleteRecursively(child)
+      }
+    }
+    if (!file.delete()) {
+      throw new IOException("Failed to delete: " + file)
+    }
+  }
+
+  /**
+   * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
+   * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
+   * environment variable.
+   */
+  def memoryStringToMb(str: String): Int = {
+    val lower = str.toLowerCase
+    if (lower.endsWith("k")) {
+      (lower.substring(0, lower.length-1).toLong / 1024).toInt
+    } else if (lower.endsWith("m")) {
+      lower.substring(0, lower.length-1).toInt
+    } else if (lower.endsWith("g")) {
+      lower.substring(0, lower.length-1).toInt * 1024
+    } else if (lower.endsWith("t")) {
+      lower.substring(0, lower.length-1).toInt * 1024 * 1024
+    } else {// no suffix, so it's just a number in bytes
+      (lower.toLong / 1024 / 1024).toInt
+    }
+  }
+
+  /**
+   * Convert a quantity in bytes to a human-readable string such as "4.0 MB".
+   */
+  def bytesToString(size: Long): String = {
+    val TB = 1L << 40
+    val GB = 1L << 30
+    val MB = 1L << 20
+    val KB = 1L << 10
+
+    val (value, unit) = {
+      if (size >= 2*TB) {
+        (size.asInstanceOf[Double] / TB, "TB")
+      } else if (size >= 2*GB) {
+        (size.asInstanceOf[Double] / GB, "GB")
+      } else if (size >= 2*MB) {
+        (size.asInstanceOf[Double] / MB, "MB")
+      } else if (size >= 2*KB) {
+        (size.asInstanceOf[Double] / KB, "KB")
+      } else {
+        (size.asInstanceOf[Double], "B")
+      }
+    }
+    "%.1f %s".formatLocal(Locale.US, value, unit)
+  }
+
+  /**
+   * Returns a human-readable string representing a duration such as "35ms"
+   */
+  def msDurationToString(ms: Long): String = {
+    val second = 1000
+    val minute = 60 * second
+    val hour = 60 * minute
+
+    ms match {
+      case t if t < second =>
+        "%d ms".format(t)
+      case t if t < minute =>
+        "%.1f s".format(t.toFloat / second)
+      case t if t < hour =>
+        "%.1f m".format(t.toFloat / minute)
+      case t =>
+        "%.2f h".format(t.toFloat / hour)
+    }
+  }
+
+  /**
+   * Convert a quantity in megabytes to a human-readable string such as "4.0 MB".
+   */
+  def megabytesToString(megabytes: Long): String = {
+    bytesToString(megabytes * 1024L * 1024L)
+  }
+
+  /**
+   * Execute a command in the given working directory, throwing an exception if it completes
+   * with an exit code other than 0.
+   */
+  def execute(command: Seq[String], workingDir: File) {
+    val process = new ProcessBuilder(command: _*)
+        .directory(workingDir)
+        .redirectErrorStream(true)
+        .start()
+    new Thread("read stdout for " + command(0)) {
+      override def run() {
+        for (line <- Source.fromInputStream(process.getInputStream).getLines) {
+          System.err.println(line)
+        }
+      }
+    }.start()
+    val exitCode = process.waitFor()
+    if (exitCode != 0) {
+      throw new SparkException("Process " + command + " exited with code " + exitCode)
+    }
+  }
+
+  /**
+   * Execute a command in the current working directory, throwing an exception if it completes
+   * with an exit code other than 0.
+   */
+  def execute(command: Seq[String]) {
+    execute(command, new File("."))
+  }
+
+  /**
+   * Execute a command and get its output, throwing an exception if it yields a code other than 0.
+   */
+  def executeAndGetOutput(command: Seq[String], workingDir: File = new File("."),
+                          extraEnvironment: Map[String, String] = Map.empty): String = {
+    val builder = new ProcessBuilder(command: _*)
+        .directory(workingDir)
+    val environment = builder.environment()
+    for ((key, value) <- extraEnvironment) {
+      environment.put(key, value)
+    }
+    val process = builder.start()
+    new Thread("read stderr for " + command(0)) {
+      override def run() {
+        for (line <- Source.fromInputStream(process.getErrorStream).getLines) {
+          System.err.println(line)
+        }
+      }
+    }.start()
+    val output = new StringBuffer
+    val stdoutThread = new Thread("read stdout for " + command(0)) {
+      override def run() {
+        for (line <- Source.fromInputStream(process.getInputStream).getLines) {
+          output.append(line)
+        }
+      }
+    }
+    stdoutThread.start()
+    val exitCode = process.waitFor()
+    stdoutThread.join()   // Wait for it to finish reading output
+    if (exitCode != 0) {
+      throw new SparkException("Process " + command + " exited with code " + exitCode)
+    }
+    output.toString
+  }
+
+  /**
+   * A regular expression to match classes of the "core" Spark API that we want to skip when
+   * finding the call site of a method.
+   */
+  private val SPARK_CLASS_REGEX = """^spark(\.api\.java)?(\.rdd)?\.[A-Z]""".r
+
+  private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
+                                    val firstUserLine: Int, val firstUserClass: String)
+
+  /**
+   * When called inside a class in the spark package, returns the name of the user code class
+   * (outside the spark package) that called into Spark, as well as which Spark method they called.
+   * This is used, for example, to tell users where in their code each RDD got created.
+   */
+  def getCallSiteInfo: CallSiteInfo = {
+    val trace = Thread.currentThread.getStackTrace().filter( el =>
+      (!el.getMethodName.contains("getStackTrace")))
+
+    // Keep crawling up the stack trace until we find the first function not inside of the spark
+    // package. We track the last (shallowest) contiguous Spark method. This might be an RDD
+    // transformation, a SparkContext function (such as parallelize), or anything else that leads
+    // to instantiation of an RDD. We also track the first (deepest) user method, file, and line.
+    var lastSparkMethod = "<unknown>"
+    var firstUserFile = "<unknown>"
+    var firstUserLine = 0
+    var finished = false
+    var firstUserClass = "<unknown>"
+
+    for (el <- trace) {
+      if (!finished) {
+        if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName) != None) {
+          lastSparkMethod = if (el.getMethodName == "<init>") {
+            // Spark method is a constructor; get its class name
+            el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
+          } else {
+            el.getMethodName
+          }
+        }
+        else {
+          firstUserLine = el.getLineNumber
+          firstUserFile = el.getFileName
+          firstUserClass = el.getClassName
+          finished = true
+        }
+      }
+    }
+    new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
+  }
+
+  def formatSparkCallSite = {
+    val callSiteInfo = getCallSiteInfo
+    "%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile,
+                         callSiteInfo.firstUserLine)
+  }
+
+  /** Return a string containing part of a file from byte 'start' to 'end'. */
+  def offsetBytes(path: String, start: Long, end: Long): String = {
+    val file = new File(path)
+    val length = file.length()
+    val effectiveEnd = math.min(length, end)
+    val effectiveStart = math.max(0, start)
+    val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt)
+    val stream = new FileInputStream(file)
+
+    stream.skip(effectiveStart)
+    stream.read(buff)
+    stream.close()
+    Source.fromBytes(buff).mkString
+  }
+
+  /**
+   * Clone an object using a Spark serializer.
+   */
+  def clone[T](value: T, serializer: SerializerInstance): T = {
+    serializer.deserialize[T](serializer.serialize(value))
+  }
+
+  /**
+   * Detect whether this thread might be executing a shutdown hook. Will always return true if
+   * the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g.
+   * if System.exit was just called by a concurrent thread).
+   *
+   * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing
+   * an IllegalStateException.
+   */
+  def inShutdown(): Boolean = {
+    try {
+      val hook = new Thread {
+        override def run() {}
+      }
+      Runtime.getRuntime.addShutdownHook(hook)
+      Runtime.getRuntime.removeShutdownHook(hook)
+    } catch {
+      case ise: IllegalStateException => return true
+    }
+    return false
+  }
+
+  def isSpace(c: Char): Boolean = {
+    " \t\r\n".indexOf(c) != -1
+  }
+
+  /**
+   * Split a string of potentially quoted arguments from the command line the way that a shell
+   * would do it to determine arguments to a command. For example, if the string is 'a "b c" d',
+   * then it would be parsed as three arguments: 'a', 'b c' and 'd'.
+   */
+  def splitCommandString(s: String): Seq[String] = {
+    val buf = new ArrayBuffer[String]
+    var inWord = false
+    var inSingleQuote = false
+    var inDoubleQuote = false
+    var curWord = new StringBuilder
+    def endWord() {
+      buf += curWord.toString
+      curWord.clear()
+    }
+    var i = 0
+    while (i < s.length) {
+      var nextChar = s.charAt(i)
+      if (inDoubleQuote) {
+        if (nextChar == '"') {
+          inDoubleQuote = false
+        } else if (nextChar == '\\') {
+          if (i < s.length - 1) {
+            // Append the next character directly, because only " and \ may be escaped in
+            // double quotes after the shell's own expansion
+            curWord.append(s.charAt(i + 1))
+            i += 1
+          }
+        } else {
+          curWord.append(nextChar)
+        }
+      } else if (inSingleQuote) {
+        if (nextChar == '\'') {
+          inSingleQuote = false
+        } else {
+          curWord.append(nextChar)
+        }
+        // Backslashes are not treated specially in single quotes
+      } else if (nextChar == '"') {
+        inWord = true
+        inDoubleQuote = true
+      } else if (nextChar == '\'') {
+        inWord = true
+        inSingleQuote = true
+      } else if (!isSpace(nextChar)) {
+        curWord.append(nextChar)
+        inWord = true
+      } else if (inWord && isSpace(nextChar)) {
+        endWord()
+        inWord = false
+      }
+      i += 1
+    }
+    if (inWord || inDoubleQuote || inSingleQuote) {
+      endWord()
+    }
+    return buf
+  }
+
+ /* Calculates 'x' modulo 'mod', takes to consideration sign of x,
+  * i.e. if 'x' is negative, than 'x' % 'mod' is negative too
+  * so function return (x % mod) + mod in that case.
+  */
+  def nonNegativeMod(x: Int, mod: Int): Int = {
+    val rawMod = x % mod
+    rawMod + (if (rawMod < 0) mod else 0)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index 23b14f4..d9103ae 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.apache.spark.rdd._
 import org.apache.spark.SparkContext._
 import storage.StorageLevel
+import org.apache.spark.util.Utils
 
 class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
   initLogging()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/test/scala/org/apache/spark/ClosureCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ClosureCleanerSuite.scala
deleted file mode 100644
index 8494899..0000000
--- a/core/src/test/scala/org/apache/spark/ClosureCleanerSuite.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io.NotSerializableException
-
-import org.scalatest.FunSuite
-import org.apache.spark.LocalSparkContext._
-import SparkContext._
-
-class ClosureCleanerSuite extends FunSuite {
-  test("closures inside an object") {
-    assert(TestObject.run() === 30) // 6 + 7 + 8 + 9
-  }
-
-  test("closures inside a class") {
-    val obj = new TestClass
-    assert(obj.run() === 30) // 6 + 7 + 8 + 9
-  }
-
-  test("closures inside a class with no default constructor") {
-    val obj = new TestClassWithoutDefaultConstructor(5)
-    assert(obj.run() === 30) // 6 + 7 + 8 + 9
-  }
-
-  test("closures that don't use fields of the outer class") {
-    val obj = new TestClassWithoutFieldAccess
-    assert(obj.run() === 30) // 6 + 7 + 8 + 9
-  }
-
-  test("nested closures inside an object") {
-    assert(TestObjectWithNesting.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1
-  }
-
-  test("nested closures inside a class") {
-    val obj = new TestClassWithNesting(1)
-    assert(obj.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1
-  }
-}
-
-// A non-serializable class we create in closures to make sure that we aren't
-// keeping references to unneeded variables from our outer closures.
-class NonSerializable {}
-
-object TestObject {
-  def run(): Int = {
-    var nonSer = new NonSerializable
-    var x = 5
-    return withSpark(new SparkContext("local", "test")) { sc =>
-      val nums = sc.parallelize(Array(1, 2, 3, 4))
-      nums.map(_ + x).reduce(_ + _)
-    }
-  }
-}
-
-class TestClass extends Serializable {
-  var x = 5
-  
-  def getX = x
-
-  def run(): Int = {
-    var nonSer = new NonSerializable
-    return withSpark(new SparkContext("local", "test")) { sc =>
-      val nums = sc.parallelize(Array(1, 2, 3, 4))
-      nums.map(_ + getX).reduce(_ + _)
-    }
-  }
-}
-
-class TestClassWithoutDefaultConstructor(x: Int) extends Serializable {
-  def getX = x
-
-  def run(): Int = {
-    var nonSer = new NonSerializable
-    return withSpark(new SparkContext("local", "test")) { sc =>
-      val nums = sc.parallelize(Array(1, 2, 3, 4))
-      nums.map(_ + getX).reduce(_ + _)
-    }
-  }
-}
-
-// This class is not serializable, but we aren't using any of its fields in our
-// closures, so they won't have a $outer pointing to it and should still work.
-class TestClassWithoutFieldAccess {
-  var nonSer = new NonSerializable
-
-  def run(): Int = {
-    var nonSer2 = new NonSerializable
-    var x = 5
-    return withSpark(new SparkContext("local", "test")) { sc =>
-      val nums = sc.parallelize(Array(1, 2, 3, 4))
-      nums.map(_ + x).reduce(_ + _)
-    }
-  }
-}
-
-
-object TestObjectWithNesting {
-  def run(): Int = {
-    var nonSer = new NonSerializable
-    var answer = 0
-    return withSpark(new SparkContext("local", "test")) { sc =>
-      val nums = sc.parallelize(Array(1, 2, 3, 4))
-      var y = 1
-      for (i <- 1 to 4) {
-        var nonSer2 = new NonSerializable
-        var x = i
-        answer += nums.map(_ + x + y).reduce(_ + _)
-      }
-      answer
-    }
-  }
-}
-
-class TestClassWithNesting(val y: Int) extends Serializable {
-  def getY = y
-
-  def run(): Int = {
-    var nonSer = new NonSerializable
-    var answer = 0
-    return withSpark(new SparkContext("local", "test")) { sc =>
-      val nums = sc.parallelize(Array(1, 2, 3, 4))
-      for (i <- 1 to 4) {
-        var nonSer2 = new NonSerializable
-        var x = i
-        answer += nums.map(_ + x + getY).reduce(_ + _)
-      }
-      answer
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/test/scala/org/apache/spark/DriverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index b08aad1..01a72d8 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest.FunSuite
 import org.scalatest.concurrent.Timeouts
 import org.scalatest.prop.TableDrivenPropertyChecks._
 import org.scalatest.time.SpanSugar._
+import org.apache.spark.util.Utils
 
 class DriverSuite extends FunSuite with Timeouts {
   test("driver should exit after finishing") {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/test/scala/org/apache/spark/FailureSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index ee89a7a..af448fc 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark
 import org.scalatest.FunSuite
 
 import SparkContext._
+import org.apache.spark.util.NonSerializable
 
 // Common state shared by FailureSuite-launched tasks. We use a global object
 // for this because any local variables used in the task closures will rightfully

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/test/scala/org/apache/spark/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/KryoSerializerSuite.scala
deleted file mode 100644
index d7b23c9..0000000
--- a/core/src/test/scala/org/apache/spark/KryoSerializerSuite.scala
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import scala.collection.mutable
-
-import org.scalatest.FunSuite
-import com.esotericsoftware.kryo._
-
-import KryoTest._
-
-class KryoSerializerSuite extends FunSuite with SharedSparkContext {
-  test("basic types") {
-    val ser = (new KryoSerializer).newInstance()
-    def check[T](t: T) {
-      assert(ser.deserialize[T](ser.serialize(t)) === t)
-    }
-    check(1)
-    check(1L)
-    check(1.0f)
-    check(1.0)
-    check(1.toByte)
-    check(1.toShort)
-    check("")
-    check("hello")
-    check(Integer.MAX_VALUE)
-    check(Integer.MIN_VALUE)
-    check(java.lang.Long.MAX_VALUE)
-    check(java.lang.Long.MIN_VALUE)
-    check[String](null)
-    check(Array(1, 2, 3))
-    check(Array(1L, 2L, 3L))
-    check(Array(1.0, 2.0, 3.0))
-    check(Array(1.0f, 2.9f, 3.9f))
-    check(Array("aaa", "bbb", "ccc"))
-    check(Array("aaa", "bbb", null))
-    check(Array(true, false, true))
-    check(Array('a', 'b', 'c'))
-    check(Array[Int]())
-    check(Array(Array("1", "2"), Array("1", "2", "3", "4")))
-  }
-
-  test("pairs") {
-    val ser = (new KryoSerializer).newInstance()
-    def check[T](t: T) {
-      assert(ser.deserialize[T](ser.serialize(t)) === t)
-    }
-    check((1, 1))
-    check((1, 1L))
-    check((1L, 1))
-    check((1L,  1L))
-    check((1.0, 1))
-    check((1, 1.0))
-    check((1.0, 1.0))
-    check((1.0, 1L))
-    check((1L, 1.0))
-    check((1.0, 1L))
-    check(("x", 1))
-    check(("x", 1.0))
-    check(("x", 1L))
-    check((1, "x"))
-    check((1.0, "x"))
-    check((1L, "x"))
-    check(("x", "x"))
-  }
-
-  test("Scala data structures") {
-    val ser = (new KryoSerializer).newInstance()
-    def check[T](t: T) {
-      assert(ser.deserialize[T](ser.serialize(t)) === t)
-    }
-    check(List[Int]())
-    check(List[Int](1, 2, 3))
-    check(List[String]())
-    check(List[String]("x", "y", "z"))
-    check(None)
-    check(Some(1))
-    check(Some("hi"))
-    check(mutable.ArrayBuffer(1, 2, 3))
-    check(mutable.ArrayBuffer("1", "2", "3"))
-    check(mutable.Map())
-    check(mutable.Map(1 -> "one", 2 -> "two"))
-    check(mutable.Map("one" -> 1, "two" -> 2))
-    check(mutable.HashMap(1 -> "one", 2 -> "two"))
-    check(mutable.HashMap("one" -> 1, "two" -> 2))
-    check(List(Some(mutable.HashMap(1->1, 2->2)), None, Some(mutable.HashMap(3->4))))
-    check(List(mutable.HashMap("one" -> 1, "two" -> 2),mutable.HashMap(1->"one",2->"two",3->"three")))
-  }
-
-  test("custom registrator") {
-    import KryoTest._
-    System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
-
-    val ser = (new KryoSerializer).newInstance()
-    def check[T](t: T) {
-      assert(ser.deserialize[T](ser.serialize(t)) === t)
-    }
-
-    check(CaseClass(17, "hello"))
-
-    val c1 = new ClassWithNoArgConstructor
-    c1.x = 32
-    check(c1)
-
-    val c2 = new ClassWithoutNoArgConstructor(47)
-    check(c2)
-
-    val hashMap = new java.util.HashMap[String, String]
-    hashMap.put("foo", "bar")
-    check(hashMap)
-
-    System.clearProperty("spark.kryo.registrator")
-  }
-
-  test("kryo with collect") {
-    val control = 1 :: 2 :: Nil
-    val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_)).collect().map(_.x)
-    assert(control === result.toSeq)
-  }
-
-  test("kryo with parallelize") {
-    val control = 1 :: 2 :: Nil
-    val result = sc.parallelize(control.map(new ClassWithoutNoArgConstructor(_))).map(_.x).collect()
-    assert (control === result.toSeq)
-  }
-
-  test("kryo with parallelize for specialized tuples") {
-    assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).count === 3)
-  }
-
-  test("kryo with parallelize for primitive arrays") {
-    assert (sc.parallelize( Array(1, 2, 3) ).count === 3)
-  }
-
-  test("kryo with collect for specialized tuples") {
-    assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === (1, 11))
-  }
-
-  test("kryo with reduce") {
-    val control = 1 :: 2 :: Nil
-    val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))
-        .reduce((t1, t2) => new ClassWithoutNoArgConstructor(t1.x + t2.x)).x
-    assert(control.sum === result)
-  }
-
-  // TODO: this still doesn't work
-  ignore("kryo with fold") {
-    val control = 1 :: 2 :: Nil
-    val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))
-        .fold(new ClassWithoutNoArgConstructor(10))((t1, t2) => new ClassWithoutNoArgConstructor(t1.x + t2.x)).x
-    assert(10 + control.sum === result)
-  }
-
-  override def beforeAll() {
-    System.setProperty("spark.serializer", "org.apache.spark.KryoSerializer")
-    System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
-    super.beforeAll()
-  }
-
-  override def afterAll() {
-    super.afterAll()
-    System.clearProperty("spark.kryo.registrator")
-    System.clearProperty("spark.serializer")
-  }
-}
-
-object KryoTest {
-  case class CaseClass(i: Int, s: String) {}
-
-  class ClassWithNoArgConstructor {
-    var x: Int = 0
-    override def equals(other: Any) = other match {
-      case c: ClassWithNoArgConstructor => x == c.x
-      case _ => false
-    }
-  }
-
-  class ClassWithoutNoArgConstructor(val x: Int) {
-    override def equals(other: Any) = other match {
-      case c: ClassWithoutNoArgConstructor => x == c.x
-      case _ => false
-    }
-  }
-
-  class MyRegistrator extends KryoRegistrator {
-    override def registerClasses(k: Kryo) {
-      k.register(classOf[CaseClass])
-      k.register(classOf[ClassWithNoArgConstructor])
-      k.register(classOf[ClassWithoutNoArgConstructor])
-      k.register(classOf[java.util.HashMap[_, _]])
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/test/scala/org/apache/spark/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/PairRDDFunctionsSuite.scala
deleted file mode 100644
index f79752b..0000000
--- a/core/src/test/scala/org/apache/spark/PairRDDFunctionsSuite.scala
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashSet
-
-import org.scalatest.FunSuite
-
-import com.google.common.io.Files
-import org.apache.spark.SparkContext._
-
-
-class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
-  test("groupByKey") {
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
-    val groups = pairs.groupByKey().collect()
-    assert(groups.size === 2)
-    val valuesFor1 = groups.find(_._1 == 1).get._2
-    assert(valuesFor1.toList.sorted === List(1, 2, 3))
-    val valuesFor2 = groups.find(_._1 == 2).get._2
-    assert(valuesFor2.toList.sorted === List(1))
-  }
-
-  test("groupByKey with duplicates") {
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
-    val groups = pairs.groupByKey().collect()
-    assert(groups.size === 2)
-    val valuesFor1 = groups.find(_._1 == 1).get._2
-    assert(valuesFor1.toList.sorted === List(1, 1, 2, 3))
-    val valuesFor2 = groups.find(_._1 == 2).get._2
-    assert(valuesFor2.toList.sorted === List(1))
-  }
-
-  test("groupByKey with negative key hash codes") {
-    val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1)))
-    val groups = pairs.groupByKey().collect()
-    assert(groups.size === 2)
-    val valuesForMinus1 = groups.find(_._1 == -1).get._2
-    assert(valuesForMinus1.toList.sorted === List(1, 2, 3))
-    val valuesFor2 = groups.find(_._1 == 2).get._2
-    assert(valuesFor2.toList.sorted === List(1))
-  }
-
-  test("groupByKey with many output partitions") {
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
-    val groups = pairs.groupByKey(10).collect()
-    assert(groups.size === 2)
-    val valuesFor1 = groups.find(_._1 == 1).get._2
-    assert(valuesFor1.toList.sorted === List(1, 2, 3))
-    val valuesFor2 = groups.find(_._1 == 2).get._2
-    assert(valuesFor2.toList.sorted === List(1))
-  }
-
-  test("reduceByKey") {
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
-    val sums = pairs.reduceByKey(_+_).collect()
-    assert(sums.toSet === Set((1, 7), (2, 1)))
-  }
-
-  test("reduceByKey with collectAsMap") {
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
-    val sums = pairs.reduceByKey(_+_).collectAsMap()
-    assert(sums.size === 2)
-    assert(sums(1) === 7)
-    assert(sums(2) === 1)
-  }
-
-  test("reduceByKey with many output partitons") {
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
-    val sums = pairs.reduceByKey(_+_, 10).collect()
-    assert(sums.toSet === Set((1, 7), (2, 1)))
-  }
-
-  test("reduceByKey with partitioner") {
-    val p = new Partitioner() {
-      def numPartitions = 2
-      def getPartition(key: Any) = key.asInstanceOf[Int]
-    }
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p)
-    val sums = pairs.reduceByKey(_+_)
-    assert(sums.collect().toSet === Set((1, 4), (0, 1)))
-    assert(sums.partitioner === Some(p))
-    // count the dependencies to make sure there is only 1 ShuffledRDD
-    val deps = new HashSet[RDD[_]]()
-    def visit(r: RDD[_]) {
-      for (dep <- r.dependencies) {
-        deps += dep.rdd
-        visit(dep.rdd)
-      }
-    }
-    visit(sums)
-    assert(deps.size === 2) // ShuffledRDD, ParallelCollection
-  }
-
-  test("join") {
-    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
-    val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
-    val joined = rdd1.join(rdd2).collect()
-    assert(joined.size === 4)
-    assert(joined.toSet === Set(
-      (1, (1, 'x')),
-      (1, (2, 'x')),
-      (2, (1, 'y')),
-      (2, (1, 'z'))
-    ))
-  }
-
-  test("join all-to-all") {
-    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3)))
-    val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y')))
-    val joined = rdd1.join(rdd2).collect()
-    assert(joined.size === 6)
-    assert(joined.toSet === Set(
-      (1, (1, 'x')),
-      (1, (1, 'y')),
-      (1, (2, 'x')),
-      (1, (2, 'y')),
-      (1, (3, 'x')),
-      (1, (3, 'y'))
-    ))
-  }
-
-  test("leftOuterJoin") {
-    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
-    val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
-    val joined = rdd1.leftOuterJoin(rdd2).collect()
-    assert(joined.size === 5)
-    assert(joined.toSet === Set(
-      (1, (1, Some('x'))),
-      (1, (2, Some('x'))),
-      (2, (1, Some('y'))),
-      (2, (1, Some('z'))),
-      (3, (1, None))
-    ))
-  }
-
-  test("rightOuterJoin") {
-    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
-    val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
-    val joined = rdd1.rightOuterJoin(rdd2).collect()
-    assert(joined.size === 5)
-    assert(joined.toSet === Set(
-      (1, (Some(1), 'x')),
-      (1, (Some(2), 'x')),
-      (2, (Some(1), 'y')),
-      (2, (Some(1), 'z')),
-      (4, (None, 'w'))
-    ))
-  }
-
-  test("join with no matches") {
-    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
-    val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
-    val joined = rdd1.join(rdd2).collect()
-    assert(joined.size === 0)
-  }
-
-  test("join with many output partitions") {
-    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
-    val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
-    val joined = rdd1.join(rdd2, 10).collect()
-    assert(joined.size === 4)
-    assert(joined.toSet === Set(
-      (1, (1, 'x')),
-      (1, (2, 'x')),
-      (2, (1, 'y')),
-      (2, (1, 'z'))
-    ))
-  }
-
-  test("groupWith") {
-    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
-    val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
-    val joined = rdd1.groupWith(rdd2).collect()
-    assert(joined.size === 4)
-    assert(joined.toSet === Set(
-      (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))),
-      (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))),
-      (3, (ArrayBuffer(1), ArrayBuffer())),
-      (4, (ArrayBuffer(), ArrayBuffer('w')))
-    ))
-  }
-
-  test("zero-partition RDD") {
-    val emptyDir = Files.createTempDir()
-    val file = sc.textFile(emptyDir.getAbsolutePath)
-    assert(file.partitions.size == 0)
-    assert(file.collect().toList === Nil)
-    // Test that a shuffle on the file works, because this used to be a bug
-    assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
-  }
-
-  test("keys and values") {
-    val rdd = sc.parallelize(Array((1, "a"), (2, "b")))
-    assert(rdd.keys.collect().toList === List(1, 2))
-    assert(rdd.values.collect().toList === List("a", "b"))
-  }
-
-  test("default partitioner uses partition size") {
-    // specify 2000 partitions
-    val a = sc.makeRDD(Array(1, 2, 3, 4), 2000)
-    // do a map, which loses the partitioner
-    val b = a.map(a => (a, (a * 2).toString))
-    // then a group by, and see we didn't revert to 2 partitions
-    val c = b.groupByKey()
-    assert(c.partitions.size === 2000)
-  }
-
-  test("default partitioner uses largest partitioner") {
-    val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2)
-    val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000)
-    val c = a.join(b)
-    assert(c.partitions.size === 2000)
-  }
-
-  test("subtract") {
-    val a = sc.parallelize(Array(1, 2, 3), 2)
-    val b = sc.parallelize(Array(2, 3, 4), 4)
-    val c = a.subtract(b)
-    assert(c.collect().toSet === Set(1))
-    assert(c.partitions.size === a.partitions.size)
-  }
-
-  test("subtract with narrow dependency") {
-    // use a deterministic partitioner
-    val p = new Partitioner() {
-      def numPartitions = 5
-      def getPartition(key: Any) = key.asInstanceOf[Int]
-    }
-    // partitionBy so we have a narrow dependency
-    val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
-    // more partitions/no partitioner so a shuffle dependency
-    val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
-    val c = a.subtract(b)
-    assert(c.collect().toSet === Set((1, "a"), (3, "c")))
-    // Ideally we could keep the original partitioner...
-    assert(c.partitioner === None)
-  }
-
-  test("subtractByKey") {
-    val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2)
-    val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4)
-    val c = a.subtractByKey(b)
-    assert(c.collect().toSet === Set((1, "a"), (1, "a")))
-    assert(c.partitions.size === a.partitions.size)
-  }
-
-  test("subtractByKey with narrow dependency") {
-    // use a deterministic partitioner
-    val p = new Partitioner() {
-      def numPartitions = 5
-      def getPartition(key: Any) = key.asInstanceOf[Int]
-    }
-    // partitionBy so we have a narrow dependency
-    val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
-    // more partitions/no partitioner so a shuffle dependency
-    val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
-    val c = a.subtractByKey(b)
-    assert(c.collect().toSet === Set((1, "a"), (1, "a")))
-    assert(c.partitioner.get === p)
-  }
-
-  test("foldByKey") {
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
-    val sums = pairs.foldByKey(0)(_+_).collect()
-    assert(sums.toSet === Set((1, 7), (2, 1)))
-  }
-
-  test("foldByKey with mutable result type") {
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
-    val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache()
-    // Fold the values using in-place mutation
-    val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect()
-    assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1))))
-    // Check that the mutable objects in the original RDD were not changed
-    assert(bufs.collect().toSet === Set(
-      (1, ArrayBuffer(1)),
-      (1, ArrayBuffer(2)),
-      (1, ArrayBuffer(3)),
-      (1, ArrayBuffer(1)),
-      (2, ArrayBuffer(1))))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala b/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala
index adbe805..5a18dd1 100644
--- a/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala
@@ -2,7 +2,7 @@ package org.apache.spark
 
 import org.scalatest.FunSuite
 import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.PartitionPruningRDD
+import org.apache.spark.rdd.{RDD, PartitionPruningRDD}
 
 
 class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 7669cf6..7d93891 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark
 
-import org.scalatest.FunSuite
+import scala.math.abs
 import scala.collection.mutable.ArrayBuffer
-import SparkContext._
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext._
 import org.apache.spark.util.StatCounter
-import scala.math.abs
+import org.apache.spark.rdd.RDD
 
 class PartitioningSuite extends FunSuite with SharedSparkContext {