You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/01/07 05:50:34 UTC

spark git commit: [SPARK-7689] Remove TTL-based metadata cleaning in Spark 2.0

Repository: spark
Updated Branches:
  refs/heads/master 6b6d02be0 -> 8e19c7663


[SPARK-7689] Remove TTL-based metadata cleaning in Spark 2.0

This PR removes `spark.cleaner.ttl` and the associated TTL-based metadata cleaning code.

Now that we have the `ContextCleaner` and a timer to trigger periodic GCs, I don't think that `spark.cleaner.ttl` is necessary anymore. The TTL-based cleaning isn't enabled by default, isn't included in our end-to-end tests, and has been a source of user confusion when it is misconfigured. If the TTL is set too low, data which is still being used may be evicted / deleted, leading to hard to diagnose bugs.

For all of these reasons, I think that we should remove this functionality in Spark 2.0. Additional benefits of doing this include marginally reduced memory usage, since we no longer need to store timetsamps in hashmaps, and a handful fewer threads.

Author: Josh Rosen <jo...@databricks.com>

Closes #10534 from JoshRosen/remove-ttl-based-cleaning.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e19c766
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e19c766
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e19c766

Branch: refs/heads/master
Commit: 8e19c7663a067d55b32af68d62da42c7cd5d6009
Parents: 6b6d02b
Author: Josh Rosen <jo...@databricks.com>
Authored: Wed Jan 6 20:50:31 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Jan 6 20:50:31 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/MapOutputTracker.scala     |  25 +--
 .../scala/org/apache/spark/SparkContext.scala   |  21 +--
 .../shuffle/FileShuffleBlockResolver.scala      |  28 ++-
 .../org/apache/spark/storage/BlockManager.scala |  63 ++-----
 .../org/apache/spark/util/MetadataCleaner.scala | 110 ------------
 .../apache/spark/util/TimeStampedHashSet.scala  |  86 ----------
 .../util/TimeStampedWeakValueHashMap.scala      | 171 -------------------
 .../spark/util/TimeStampedHashMapSuite.scala    |  86 ----------
 docs/configuration.md                           |  11 --
 .../org/apache/spark/streaming/Checkpoint.scala |   3 +-
 .../spark/streaming/dstream/DStream.scala       |  14 +-
 .../spark/streaming/StreamingContextSuite.scala |  19 ++-
 12 files changed, 48 insertions(+), 589 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8e19c766/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 8670f70..1b59beb 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -18,7 +18,6 @@
 package org.apache.spark
 
 import java.io._
-import java.util.Arrays
 import java.util.concurrent.ConcurrentHashMap
 import java.util.zip.{GZIPInputStream, GZIPOutputStream}
 
@@ -267,8 +266,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
 }
 
 /**
- * MapOutputTracker for the driver. This uses TimeStampedHashMap to keep track of map
- * output information, which allows old output information based on a TTL.
+ * MapOutputTracker for the driver.
  */
 private[spark] class MapOutputTrackerMaster(conf: SparkConf)
   extends MapOutputTracker(conf) {
@@ -291,17 +289,10 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
   // can be read locally, but may lead to more delay in scheduling if those locations are busy.
   private val REDUCER_PREF_LOCS_FRACTION = 0.2
 
-  /**
-   * Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the driver,
-   * so that statuses are dropped only by explicit de-registering or by TTL-based cleaning (if set).
-   * Other than these two scenarios, nothing should be dropped from this HashMap.
-   */
-  protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
-  private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]()
-
-  // For cleaning up TimeStampedHashMaps
-  private val metadataCleaner =
-    new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup, conf)
+  // HashMaps for storing mapStatuses and cached serialized statuses in the driver.
+  // Statuses are dropped only by explicit de-registering.
+  protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
+  private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
 
   def registerShuffle(shuffleId: Int, numMaps: Int) {
     if (mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)).isDefined) {
@@ -462,14 +453,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
     sendTracker(StopMapOutputTracker)
     mapStatuses.clear()
     trackerEndpoint = null
-    metadataCleaner.cancel()
     cachedSerializedStatuses.clear()
   }
-
-  private def cleanup(cleanupTime: Long) {
-    mapStatuses.clearOldValues(cleanupTime)
-    cachedSerializedStatuses.clearOldValues(cleanupTime)
-  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8e19c766/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4a99c0b..98075ce 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -21,6 +21,7 @@ import java.io._
 import java.lang.reflect.Constructor
 import java.net.URI
 import java.util.{Arrays, Properties, UUID}
+import java.util.concurrent.ConcurrentMap
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
 import java.util.UUID.randomUUID
 
@@ -32,6 +33,7 @@ import scala.language.implicitConversions
 import scala.reflect.{classTag, ClassTag}
 import scala.util.control.NonFatal
 
+import com.google.common.collect.MapMaker
 import org.apache.commons.lang.SerializationUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -199,7 +201,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   private var _eventLogDir: Option[URI] = None
   private var _eventLogCodec: Option[String] = None
   private var _env: SparkEnv = _
-  private var _metadataCleaner: MetadataCleaner = _
   private var _jobProgressListener: JobProgressListener = _
   private var _statusTracker: SparkStatusTracker = _
   private var _progressBar: Option[ConsoleProgressBar] = None
@@ -271,8 +272,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   private[spark] val addedJars = HashMap[String, Long]()
 
   // Keeps track of all persisted RDDs
-  private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]
-  private[spark] def metadataCleaner: MetadataCleaner = _metadataCleaner
+  private[spark] val persistentRdds = {
+    val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
+    map.asScala
+  }
   private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener
 
   def statusTracker: SparkStatusTracker = _statusTracker
@@ -439,8 +442,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       _conf.set("spark.repl.class.uri", replUri)
     }
 
-    _metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)
-
     _statusTracker = new SparkStatusTracker(this)
 
     _progressBar =
@@ -1674,11 +1675,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
         env.metricsSystem.report()
       }
     }
-    if (metadataCleaner != null) {
-      Utils.tryLogNonFatalError {
-        metadataCleaner.cancel()
-      }
-    }
     Utils.tryLogNonFatalError {
       _cleaner.foreach(_.stop())
     }
@@ -2085,11 +2081,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     }
   }
 
-  /** Called by MetadataCleaner to clean up the persistentRdds map periodically */
-  private[spark] def cleanup(cleanupTime: Long) {
-    persistentRdds.clearOldValues(cleanupTime)
-  }
-
   // In order to prevent multiple SparkContexts from being active at the same time, mark this
   // context as having finished construction.
   // NOTE: this must be placed at the end of the SparkContext constructor.

http://git-wip-us.apache.org/repos/asf/spark/blob/8e19c766/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
index 7abcb29..294e16c 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.shuffle
 
-import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
 
 import scala.collection.JavaConverters._
 
@@ -27,7 +27,7 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.storage._
-import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
+import org.apache.spark.util.Utils
 
 /** A group of writers for a ShuffleMapTask, one writer per reducer. */
 private[spark] trait ShuffleWriterGroup {
@@ -63,10 +63,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
     val completedMapTasks = new ConcurrentLinkedQueue[Int]()
   }
 
-  private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]
-
-  private val metadataCleaner =
-    new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf)
+  private val shuffleStates = new ConcurrentHashMap[ShuffleId, ShuffleState]
 
   /**
    * Get a ShuffleWriterGroup for the given map task, which will register it as complete
@@ -75,9 +72,12 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
   def forMapTask(shuffleId: Int, mapId: Int, numReducers: Int, serializer: Serializer,
       writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
     new ShuffleWriterGroup {
-      shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers))
-      private val shuffleState = shuffleStates(shuffleId)
-
+      private val shuffleState: ShuffleState = {
+        // Note: we do _not_ want to just wrap this java ConcurrentHashMap into a Scala map and use
+        // .getOrElseUpdate() because that's actually NOT atomic.
+        shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers))
+        shuffleStates.get(shuffleId)
+      }
       val openStartTime = System.nanoTime
       val serializerInstance = serializer.newInstance()
       val writers: Array[DiskBlockObjectWriter] = {
@@ -114,7 +114,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
 
   /** Remove all the blocks / files related to a particular shuffle. */
   private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = {
-    shuffleStates.get(shuffleId) match {
+    Option(shuffleStates.get(shuffleId)) match {
       case Some(state) =>
         for (mapId <- state.completedMapTasks.asScala; reduceId <- 0 until state.numReducers) {
           val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
@@ -131,11 +131,5 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
     }
   }
 
-  private def cleanup(cleanupTime: Long) {
-    shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId))
-  }
-
-  override def stop() {
-    metadataCleaner.cancel()
-  }
+  override def stop(): Unit = {}
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8e19c766/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 8caf9e5..5c80ac1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -19,7 +19,9 @@ package org.apache.spark.storage
 
 import java.io._
 import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.util.concurrent.ConcurrentHashMap
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.duration._
@@ -75,7 +77,7 @@ private[spark] class BlockManager(
 
   val diskBlockManager = new DiskBlockManager(this, conf)
 
-  private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
+  private val blockInfo = new ConcurrentHashMap[BlockId, BlockInfo]
 
   private val futureExecutionContext = ExecutionContext.fromExecutorService(
     ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
@@ -147,11 +149,6 @@ private[spark] class BlockManager(
   private var asyncReregisterTask: Future[Unit] = null
   private val asyncReregisterLock = new Object
 
-  private val metadataCleaner = new MetadataCleaner(
-    MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
-  private val broadcastCleaner = new MetadataCleaner(
-    MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
-
   // Field related to peer block managers that are necessary for block replication
   @volatile private var cachedPeers: Seq[BlockManagerId] = _
   private val peerFetchLock = new Object
@@ -232,7 +229,7 @@ private[spark] class BlockManager(
    */
   private def reportAllBlocks(): Unit = {
     logInfo(s"Reporting ${blockInfo.size} blocks to the master.")
-    for ((blockId, info) <- blockInfo) {
+    for ((blockId, info) <- blockInfo.asScala) {
       val status = getCurrentBlockStatus(blockId, info)
       if (!tryToReportBlockStatus(blockId, info, status)) {
         logError(s"Failed to report $blockId to master; giving up.")
@@ -313,7 +310,7 @@ private[spark] class BlockManager(
    * NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
    */
   def getStatus(blockId: BlockId): Option[BlockStatus] = {
-    blockInfo.get(blockId).map { info =>
+    blockInfo.asScala.get(blockId).map { info =>
       val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
       val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
       // Assume that block is not in external block store
@@ -327,7 +324,7 @@ private[spark] class BlockManager(
    * may not know of).
    */
   def getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] = {
-    (blockInfo.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq
+    (blockInfo.asScala.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq
   }
 
   /**
@@ -439,7 +436,7 @@ private[spark] class BlockManager(
   }
 
   private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
-    val info = blockInfo.get(blockId).orNull
+    val info = blockInfo.get(blockId)
     if (info != null) {
       info.synchronized {
         // Double check to make sure the block is still there. There is a small chance that the
@@ -447,7 +444,7 @@ private[spark] class BlockManager(
         // Note that this only checks metadata tracking. If user intentionally deleted the block
         // on disk or from off heap storage without using removeBlock, this conditional check will
         // still pass but eventually we will get an exception because we can't find the block.
-        if (blockInfo.get(blockId).isEmpty) {
+        if (blockInfo.asScala.get(blockId).isEmpty) {
           logWarning(s"Block $blockId had been removed")
           return None
         }
@@ -731,7 +728,7 @@ private[spark] class BlockManager(
     val putBlockInfo = {
       val tinfo = new BlockInfo(level, tellMaster)
       // Do atomically !
-      val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
+      val oldBlockOpt = Option(blockInfo.putIfAbsent(blockId, tinfo))
       if (oldBlockOpt.isDefined) {
         if (oldBlockOpt.get.waitForReady()) {
           logWarning(s"Block $blockId already exists on this machine; not re-adding it")
@@ -1032,7 +1029,7 @@ private[spark] class BlockManager(
       data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
 
     logInfo(s"Dropping block $blockId from memory")
-    val info = blockInfo.get(blockId).orNull
+    val info = blockInfo.get(blockId)
 
     // If the block has not already been dropped
     if (info != null) {
@@ -1043,7 +1040,7 @@ private[spark] class BlockManager(
           // If we get here, the block write failed.
           logWarning(s"Block $blockId was marked as failure. Nothing to drop")
           return None
-        } else if (blockInfo.get(blockId).isEmpty) {
+        } else if (blockInfo.asScala.get(blockId).isEmpty) {
           logWarning(s"Block $blockId was already dropped.")
           return None
         }
@@ -1095,7 +1092,7 @@ private[spark] class BlockManager(
   def removeRdd(rddId: Int): Int = {
     // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.
     logInfo(s"Removing RDD $rddId")
-    val blocksToRemove = blockInfo.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
+    val blocksToRemove = blockInfo.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
     blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
     blocksToRemove.size
   }
@@ -1105,7 +1102,7 @@ private[spark] class BlockManager(
    */
   def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = {
     logDebug(s"Removing broadcast $broadcastId")
-    val blocksToRemove = blockInfo.keys.collect {
+    val blocksToRemove = blockInfo.asScala.keys.collect {
       case bid @ BroadcastBlockId(`broadcastId`, _) => bid
     }
     blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) }
@@ -1117,7 +1114,7 @@ private[spark] class BlockManager(
    */
   def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
     logDebug(s"Removing block $blockId")
-    val info = blockInfo.get(blockId).orNull
+    val info = blockInfo.get(blockId)
     if (info != null) {
       info.synchronized {
         // Removals are idempotent in disk store and memory store. At worst, we get a warning.
@@ -1141,36 +1138,6 @@ private[spark] class BlockManager(
     }
   }
 
-  private def dropOldNonBroadcastBlocks(cleanupTime: Long): Unit = {
-    logInfo(s"Dropping non broadcast blocks older than $cleanupTime")
-    dropOldBlocks(cleanupTime, !_.isBroadcast)
-  }
-
-  private def dropOldBroadcastBlocks(cleanupTime: Long): Unit = {
-    logInfo(s"Dropping broadcast blocks older than $cleanupTime")
-    dropOldBlocks(cleanupTime, _.isBroadcast)
-  }
-
-  private def dropOldBlocks(cleanupTime: Long, shouldDrop: (BlockId => Boolean)): Unit = {
-    val iterator = blockInfo.getEntrySet.iterator
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp)
-      if (time < cleanupTime && shouldDrop(id)) {
-        info.synchronized {
-          val level = info.level
-          if (level.useMemory) { memoryStore.remove(id) }
-          if (level.useDisk) { diskStore.remove(id) }
-          if (level.useOffHeap) { externalBlockStore.remove(id) }
-          iterator.remove()
-          logInfo(s"Dropped block $id")
-        }
-        val status = getCurrentBlockStatus(id, info)
-        reportBlockStatus(id, info, status)
-      }
-    }
-  }
-
   private def shouldCompress(blockId: BlockId): Boolean = {
     blockId match {
       case _: ShuffleBlockId => compressShuffle
@@ -1248,8 +1215,6 @@ private[spark] class BlockManager(
     if (externalBlockStoreInitialized) {
       externalBlockStore.clear()
     }
-    metadataCleaner.cancel()
-    broadcastCleaner.cancel()
     futureExecutionContext.shutdownNow()
     logInfo("BlockManager stopped")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8e19c766/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
deleted file mode 100644
index a8bbad0..0000000
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.util.{Timer, TimerTask}
-
-import org.apache.spark.{Logging, SparkConf}
-
-/**
- * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
- */
-private[spark] class MetadataCleaner(
-    cleanerType: MetadataCleanerType.MetadataCleanerType,
-    cleanupFunc: (Long) => Unit,
-    conf: SparkConf)
-  extends Logging
-{
-  val name = cleanerType.toString
-
-  private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType)
-  private val periodSeconds = math.max(10, delaySeconds / 10)
-  private val timer = new Timer(name + " cleanup timer", true)
-
-
-  private val task = new TimerTask {
-    override def run() {
-      try {
-        cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
-        logInfo("Ran metadata cleaner for " + name)
-      } catch {
-        case e: Exception => logError("Error running cleanup task for " + name, e)
-      }
-    }
-  }
-
-  if (delaySeconds > 0) {
-    logDebug(
-      "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds " +
-      "and period of " + periodSeconds + " secs")
-    timer.schedule(task, delaySeconds * 1000, periodSeconds * 1000)
-  }
-
-  def cancel() {
-    timer.cancel()
-  }
-}
-
-private[spark] object MetadataCleanerType extends Enumeration {
-
-  val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, BLOCK_MANAGER,
-  SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value
-
-  type MetadataCleanerType = Value
-
-  def systemProperty(which: MetadataCleanerType.MetadataCleanerType): String = {
-    "spark.cleaner.ttl." + which.toString
-  }
-}
-
-// TODO: This mutates a Conf to set properties right now, which is kind of ugly when used in the
-// initialization of StreamingContext. It's okay for users trying to configure stuff themselves.
-private[spark] object MetadataCleaner {
-  def getDelaySeconds(conf: SparkConf): Int = {
-    conf.getTimeAsSeconds("spark.cleaner.ttl", "-1").toInt
-  }
-
-  def getDelaySeconds(
-      conf: SparkConf,
-      cleanerType: MetadataCleanerType.MetadataCleanerType): Int = {
-    conf.get(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString).toInt
-  }
-
-  def setDelaySeconds(
-      conf: SparkConf,
-      cleanerType: MetadataCleanerType.MetadataCleanerType,
-      delay: Int) {
-    conf.set(MetadataCleanerType.systemProperty(cleanerType), delay.toString)
-  }
-
-  /**
-   * Set the default delay time (in seconds).
-   * @param conf SparkConf instance
-   * @param delay default delay time to set
-   * @param resetAll whether to reset all to default
-   */
-  def setDelaySeconds(conf: SparkConf, delay: Int, resetAll: Boolean = true) {
-    conf.set("spark.cleaner.ttl", delay.toString)
-    if (resetAll) {
-      for (cleanerType <- MetadataCleanerType.values) {
-        System.clearProperty(MetadataCleanerType.systemProperty(cleanerType))
-      }
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/8e19c766/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala
deleted file mode 100644
index 65efeb1..0000000
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.util.concurrent.ConcurrentHashMap
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.Set
-
-private[spark] class TimeStampedHashSet[A] extends Set[A] {
-  val internalMap = new ConcurrentHashMap[A, Long]()
-
-  def contains(key: A): Boolean = {
-    internalMap.contains(key)
-  }
-
-  def iterator: Iterator[A] = {
-    val jIterator = internalMap.entrySet().iterator()
-    jIterator.asScala.map(_.getKey)
-  }
-
-  override def + (elem: A): Set[A] = {
-    val newSet = new TimeStampedHashSet[A]
-    newSet ++= this
-    newSet += elem
-    newSet
-  }
-
-  override def - (elem: A): Set[A] = {
-    val newSet = new TimeStampedHashSet[A]
-    newSet ++= this
-    newSet -= elem
-    newSet
-  }
-
-  override def += (key: A): this.type = {
-    internalMap.put(key, currentTime)
-    this
-  }
-
-  override def -= (key: A): this.type = {
-    internalMap.remove(key)
-    this
-  }
-
-  override def empty: Set[A] = new TimeStampedHashSet[A]()
-
-  override def size(): Int = internalMap.size()
-
-  override def foreach[U](f: (A) => U): Unit = {
-    val iterator = internalMap.entrySet().iterator()
-    while(iterator.hasNext) {
-      f(iterator.next.getKey)
-    }
-  }
-
-  /**
-   * Removes old values that have timestamp earlier than `threshTime`
-   */
-  def clearOldValues(threshTime: Long) {
-    val iterator = internalMap.entrySet().iterator()
-    while(iterator.hasNext) {
-      val entry = iterator.next()
-      if (entry.getValue < threshTime) {
-        iterator.remove()
-      }
-    }
-  }
-
-  private def currentTime: Long = System.currentTimeMillis()
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e19c766/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala
deleted file mode 100644
index 310c0c1..0000000
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.lang.ref.WeakReference
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.mutable
-import scala.language.implicitConversions
-
-import org.apache.spark.Logging
-
-/**
- * A wrapper of TimeStampedHashMap that ensures the values are weakly referenced and timestamped.
- *
- * If the value is garbage collected and the weak reference is null, get() will return a
- * non-existent value. These entries are removed from the map periodically (every N inserts), as
- * their values are no longer strongly reachable. Further, key-value pairs whose timestamps are
- * older than a particular threshold can be removed using the clearOldValues method.
- *
- * TimeStampedWeakValueHashMap exposes a scala.collection.mutable.Map interface, which allows it
- * to be a drop-in replacement for Scala HashMaps. Internally, it uses a Java ConcurrentHashMap,
- * so all operations on this HashMap are thread-safe.
- *
- * @param updateTimeStampOnGet Whether timestamp of a pair will be updated when it is accessed.
- */
-private[spark] class TimeStampedWeakValueHashMap[A, B](updateTimeStampOnGet: Boolean = false)
-  extends mutable.Map[A, B]() with Logging {
-
-  import TimeStampedWeakValueHashMap._
-
-  private val internalMap = new TimeStampedHashMap[A, WeakReference[B]](updateTimeStampOnGet)
-  private val insertCount = new AtomicInteger(0)
-
-  /** Return a map consisting only of entries whose values are still strongly reachable. */
-  private def nonNullReferenceMap = internalMap.filter { case (_, ref) => ref.get != null }
-
-  def get(key: A): Option[B] = internalMap.get(key)
-
-  def iterator: Iterator[(A, B)] = nonNullReferenceMap.iterator
-
-  override def + [B1 >: B](kv: (A, B1)): mutable.Map[A, B1] = {
-    val newMap = new TimeStampedWeakValueHashMap[A, B1]
-    val oldMap = nonNullReferenceMap.asInstanceOf[mutable.Map[A, WeakReference[B1]]]
-    newMap.internalMap.putAll(oldMap.toMap)
-    newMap.internalMap += kv
-    newMap
-  }
-
-  override def - (key: A): mutable.Map[A, B] = {
-    val newMap = new TimeStampedWeakValueHashMap[A, B]
-    newMap.internalMap.putAll(nonNullReferenceMap.toMap)
-    newMap.internalMap -= key
-    newMap
-  }
-
-  override def += (kv: (A, B)): this.type = {
-    internalMap += kv
-    if (insertCount.incrementAndGet() % CLEAR_NULL_VALUES_INTERVAL == 0) {
-      clearNullValues()
-    }
-    this
-  }
-
-  override def -= (key: A): this.type = {
-    internalMap -= key
-    this
-  }
-
-  override def update(key: A, value: B): Unit = this += ((key, value))
-
-  override def apply(key: A): B = internalMap.apply(key)
-
-  override def filter(p: ((A, B)) => Boolean): mutable.Map[A, B] = nonNullReferenceMap.filter(p)
-
-  override def empty: mutable.Map[A, B] = new TimeStampedWeakValueHashMap[A, B]()
-
-  override def size: Int = internalMap.size
-
-  override def foreach[U](f: ((A, B)) => U): Unit = nonNullReferenceMap.foreach(f)
-
-  def putIfAbsent(key: A, value: B): Option[B] = internalMap.putIfAbsent(key, value)
-
-  def toMap: Map[A, B] = iterator.toMap
-
-  /** Remove old key-value pairs with timestamps earlier than `threshTime`. */
-  def clearOldValues(threshTime: Long): Unit = internalMap.clearOldValues(threshTime)
-
-  /** Remove entries with values that are no longer strongly reachable. */
-  def clearNullValues() {
-    val it = internalMap.getEntrySet.iterator
-    while (it.hasNext) {
-      val entry = it.next()
-      if (entry.getValue.value.get == null) {
-        logDebug("Removing key " + entry.getKey + " because it is no longer strongly reachable.")
-        it.remove()
-      }
-    }
-  }
-
-  // For testing
-
-  def getTimestamp(key: A): Option[Long] = {
-    internalMap.getTimeStampedValue(key).map(_.timestamp)
-  }
-
-  def getReference(key: A): Option[WeakReference[B]] = {
-    internalMap.getTimeStampedValue(key).map(_.value)
-  }
-}
-
-/**
- * Helper methods for converting to and from WeakReferences.
- */
-private object TimeStampedWeakValueHashMap {
-
-  // Number of inserts after which entries with null references are removed
-  val CLEAR_NULL_VALUES_INTERVAL = 100
-
-  /* Implicit conversion methods to WeakReferences. */
-
-  implicit def toWeakReference[V](v: V): WeakReference[V] = new WeakReference[V](v)
-
-  implicit def toWeakReferenceTuple[K, V](kv: (K, V)): (K, WeakReference[V]) = {
-    kv match { case (k, v) => (k, toWeakReference(v)) }
-  }
-
-  implicit def toWeakReferenceFunction[K, V, R](p: ((K, V)) => R): ((K, WeakReference[V])) => R = {
-    (kv: (K, WeakReference[V])) => p(kv)
-  }
-
-  /* Implicit conversion methods from WeakReferences. */
-
-  implicit def fromWeakReference[V](ref: WeakReference[V]): V = ref.get
-
-  implicit def fromWeakReferenceOption[V](v: Option[WeakReference[V]]): Option[V] = {
-    v match {
-      case Some(ref) => Option(fromWeakReference(ref))
-      case None => None
-    }
-  }
-
-  implicit def fromWeakReferenceTuple[K, V](kv: (K, WeakReference[V])): (K, V) = {
-    kv match { case (k, v) => (k, fromWeakReference(v)) }
-  }
-
-  implicit def fromWeakReferenceIterator[K, V](
-      it: Iterator[(K, WeakReference[V])]): Iterator[(K, V)] = {
-    it.map(fromWeakReferenceTuple)
-  }
-
-  implicit def fromWeakReferenceMap[K, V](
-      map: mutable.Map[K, WeakReference[V]]) : mutable.Map[K, V] = {
-    mutable.Map(map.mapValues(fromWeakReference).toSeq: _*)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e19c766/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
index 9b31690..25fc15d 100644
--- a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.util
 
-import java.lang.ref.WeakReference
-
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.util.Random
@@ -34,10 +32,6 @@ class TimeStampedHashMapSuite extends SparkFunSuite {
   testMap(new TimeStampedHashMap[String, String]())
   testMapThreadSafety(new TimeStampedHashMap[String, String]())
 
-  // Test TimeStampedWeakValueHashMap basic functionality
-  testMap(new TimeStampedWeakValueHashMap[String, String]())
-  testMapThreadSafety(new TimeStampedWeakValueHashMap[String, String]())
-
   test("TimeStampedHashMap - clearing by timestamp") {
     // clearing by insertion time
     val map = new TimeStampedHashMap[String, String](updateTimeStampOnGet = false)
@@ -68,86 +62,6 @@ class TimeStampedHashMapSuite extends SparkFunSuite {
     assert(map1.get("k2").isDefined)
   }
 
-  test("TimeStampedWeakValueHashMap - clearing by timestamp") {
-    // clearing by insertion time
-    val map = new TimeStampedWeakValueHashMap[String, String](updateTimeStampOnGet = false)
-    map("k1") = "v1"
-    assert(map("k1") === "v1")
-    Thread.sleep(10)
-    val threshTime = System.currentTimeMillis
-    assert(map.getTimestamp("k1").isDefined)
-    assert(map.getTimestamp("k1").get < threshTime)
-    map.clearOldValues(threshTime)
-    assert(map.get("k1") === None)
-
-    // clearing by modification time
-    val map1 = new TimeStampedWeakValueHashMap[String, String](updateTimeStampOnGet = true)
-    map1("k1") = "v1"
-    map1("k2") = "v2"
-    assert(map1("k1") === "v1")
-    Thread.sleep(10)
-    val threshTime1 = System.currentTimeMillis
-    Thread.sleep(10)
-    assert(map1("k2") === "v2")     // access k2 to update its access time to > threshTime
-    assert(map1.getTimestamp("k1").isDefined)
-    assert(map1.getTimestamp("k1").get < threshTime1)
-    assert(map1.getTimestamp("k2").isDefined)
-    assert(map1.getTimestamp("k2").get >= threshTime1)
-    map1.clearOldValues(threshTime1) // should only clear k1
-    assert(map1.get("k1") === None)
-    assert(map1.get("k2").isDefined)
-  }
-
-  test("TimeStampedWeakValueHashMap - clearing weak references") {
-    var strongRef = new Object
-    val weakRef = new WeakReference(strongRef)
-    val map = new TimeStampedWeakValueHashMap[String, Object]
-    map("k1") = strongRef
-    map("k2") = "v2"
-    map("k3") = "v3"
-    val isEquals = map("k1") == strongRef
-    assert(isEquals)
-
-    // clear strong reference to "k1"
-    strongRef = null
-    val startTime = System.currentTimeMillis
-    System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC.
-    System.runFinalization()  // Make a best effort to call finalizer on all cleaned objects.
-    while(System.currentTimeMillis - startTime < 10000 && weakRef.get != null) {
-      System.gc()
-      System.runFinalization()
-      Thread.sleep(100)
-    }
-    assert(map.getReference("k1").isDefined)
-    val ref = map.getReference("k1").get
-    assert(ref.get === null)
-    assert(map.get("k1") === None)
-
-    // operations should only display non-null entries
-    assert(map.iterator.forall { case (k, v) => k != "k1" })
-    assert(map.filter { case (k, v) => k != "k2" }.size === 1)
-    assert(map.filter { case (k, v) => k != "k2" }.head._1 === "k3")
-    assert(map.toMap.size === 2)
-    assert(map.toMap.forall { case (k, v) => k != "k1" })
-    val buffer = new ArrayBuffer[String]
-    map.foreach { case (k, v) => buffer += v.toString }
-    assert(buffer.size === 2)
-    assert(buffer.forall(_ != "k1"))
-    val plusMap = map + (("k4", "v4"))
-    assert(plusMap.size === 3)
-    assert(plusMap.forall { case (k, v) => k != "k1" })
-    val minusMap = map - "k2"
-    assert(minusMap.size === 1)
-    assert(minusMap.head._1 == "k3")
-
-    // clear null values - should only clear k1
-    map.clearNullValues()
-    assert(map.getReference("k1") === None)
-    assert(map.get("k1") === None)
-    assert(map.get("k2").isDefined)
-    assert(map.get("k2").get === "v2")
-  }
-
   /** Test basic operations of a Scala mutable Map. */
   def testMap(hashMapConstructor: => mutable.Map[String, String]) {
     def newMap() = hashMapConstructor

http://git-wip-us.apache.org/repos/asf/spark/blob/8e19c766/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 7d743d5..3ffc77d 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -824,17 +824,6 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.cleaner.ttl</code></td>
-  <td>(infinite)</td>
-  <td>
-    Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks
-    generated, etc.). Periodic cleanups will ensure that metadata older than this duration will be
-    forgotten. This is useful for running Spark for many hours / days (for example, running 24/7 in
-    case of Spark Streaming applications). Note that any RDD that persists in memory for more than
-    this duration will be cleared as well.
-  </td>
-</tr>
-<tr>
   <td><code>spark.executor.cores</code></td>
   <td>1 in YARN mode, all the available cores on the worker in standalone mode.</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/8e19c766/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 61b230a..b186d29 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -27,8 +27,8 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.spark.{Logging, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
 import org.apache.spark.streaming.scheduler.JobGenerator
-import org.apache.spark.util.{MetadataCleaner, Utils}
 
 private[streaming]
 class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
@@ -40,7 +40,6 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
   val checkpointDir = ssc.checkpointDir
   val checkpointDuration = ssc.checkpointDuration
   val pendingTimes = ssc.scheduler.getPendingTimes().toArray
-  val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
   val sparkConfPairs = ssc.conf.getAll
 
   def createSparkConf(): SparkConf = {

http://git-wip-us.apache.org/repos/asf/spark/blob/8e19c766/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 91a43e1..c59348a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -32,7 +32,7 @@ import org.apache.spark.streaming._
 import org.apache.spark.streaming.StreamingContext.rddToFileName
 import org.apache.spark.streaming.scheduler.Job
 import org.apache.spark.streaming.ui.UIUtils
-import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}
+import org.apache.spark.util.{CallSite, Utils}
 
 /**
  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -271,18 +271,6 @@ abstract class DStream[T: ClassTag] (
         checkpointDuration + "). Please set it to higher than " + checkpointDuration + "."
     )
 
-    val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf)
-    logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
-    require(
-      metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
-      "It seems you are doing some DStream window operation or setting a checkpoint interval " +
-        "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
-        "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" +
-        "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " +
-        "set the Java cleaner delay to more than " +
-        math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds."
-    )
-
     dependencies.foreach(_.validateAtStart())
 
     logInfo("Slide time = " + slideDuration)

http://git-wip-us.apache.org/repos/asf/spark/blob/8e19c766/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 860fac2..0ae4c45 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -81,9 +81,9 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
 
   test("from conf with settings") {
     val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
-    myConf.set("spark.cleaner.ttl", "10s")
+    myConf.set("spark.dummyTimeConfig", "10s")
     ssc = new StreamingContext(myConf, batchDuration)
-    assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
+    assert(ssc.conf.getTimeAsSeconds("spark.dummyTimeConfig", "-1") === 10)
   }
 
   test("from existing SparkContext") {
@@ -93,26 +93,27 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
 
   test("from existing SparkContext with settings") {
     val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
-    myConf.set("spark.cleaner.ttl", "10s")
+    myConf.set("spark.dummyTimeConfig", "10s")
     ssc = new StreamingContext(myConf, batchDuration)
-    assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
+    assert(ssc.conf.getTimeAsSeconds("spark.dummyTimeConfig", "-1") === 10)
   }
 
   test("from checkpoint") {
     val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
-    myConf.set("spark.cleaner.ttl", "10s")
+    myConf.set("spark.dummyTimeConfig", "10s")
     val ssc1 = new StreamingContext(myConf, batchDuration)
     addInputStream(ssc1).register()
     ssc1.start()
     val cp = new Checkpoint(ssc1, Time(1000))
     assert(
       Utils.timeStringAsSeconds(cp.sparkConfPairs
-          .toMap.getOrElse("spark.cleaner.ttl", "-1")) === 10)
+          .toMap.getOrElse("spark.dummyTimeConfig", "-1")) === 10)
     ssc1.stop()
     val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
-    assert(newCp.createSparkConf().getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
+    assert(
+      newCp.createSparkConf().getTimeAsSeconds("spark.dummyTimeConfig", "-1") === 10)
     ssc = new StreamingContext(null, newCp, null)
-    assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
+    assert(ssc.conf.getTimeAsSeconds("spark.dummyTimeConfig", "-1") === 10)
   }
 
   test("checkPoint from conf") {
@@ -288,7 +289,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
 
   test("stop gracefully") {
     val conf = new SparkConf().setMaster(master).setAppName(appName)
-    conf.set("spark.cleaner.ttl", "3600s")
+    conf.set("spark.dummyTimeConfig", "3600s")
     sc = new SparkContext(conf)
     for (i <- 1 to 4) {
       logInfo("==================================\n\n\n")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org