You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/04/26 18:39:35 UTC
incubator-mnemonic git commit: MNEMONIC-247: Fix the cleanup
mechanism for DurableRDD
Repository: incubator-mnemonic
Updated Branches:
refs/heads/master 3e628a6bb -> 70c53f24b
MNEMONIC-247: Fix the cleanup mechanism for DurableRDD
Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/70c53f24
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/70c53f24
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/70c53f24
Branch: refs/heads/master
Commit: 70c53f24bca66047868a4bfbd631be2cab2c5fd8
Parents: 3e628a6
Author: Wang, Gang(Gary) <ga...@intel.com>
Authored: Wed Apr 26 11:38:59 2017 -0700
Committer: Wang, Gang(Gary) <ga...@intel.com>
Committed: Wed Apr 26 11:38:59 2017 -0700
----------------------------------------------------------------------
.../mnemonic/spark/MneDurableInputSession.scala | 9 +-
.../spark/MneDurableOutputSession.scala | 33 ++--
.../apache/mnemonic/spark/rdd/DurableRDD.scala | 150 +++++++++++++++----
.../spark/rdd/DurableRDDFunctions.scala | 21 ++-
.../mnemonic/spark/rdd/DurableRDDSpec.scala | 7 +-
mnemonic-spark/pom.xml | 3 +
6 files changed, 158 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/70c53f24/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
index e3f6f67..ee89d2f 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
@@ -30,7 +30,7 @@ import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory
import org.apache.mnemonic.sessions.DurableInputSession
import org.apache.mnemonic.sessions.SessionIterator
-class MneDurableInputSession[V: ClassTag] (
+private[spark] class MneDurableInputSession[V: ClassTag] (
serviceName: String,
durableTypes: Array[DurableType],
entityFactoryProxies: Array[EntityFactoryProxy],
@@ -38,8 +38,6 @@ class MneDurableInputSession[V: ClassTag] (
memPoolList: Array[File] )
extends DurableInputSession[V, NonVolatileMemAllocator] {
- var memPools: Array[File] = null
-
private var flistIter:Iterator[File] = null
initialize(serviceName, durableTypes, entityFactoryProxies,
@@ -55,9 +53,8 @@ class MneDurableInputSession[V: ClassTag] (
setDurableTypes(durableTypes)
setEntityFactoryProxies(entityFactoryProxies)
setSlotKeyId(slotKeyId);
- memPools = memPoolList
- if (null != memPools) {
- flistIter = memPools.iterator
+ if (null != memPoolList) {
+ flistIter = memPoolList.iterator
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/70c53f24/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
index 4383866..0a5b9c0 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
@@ -30,24 +30,21 @@ import org.apache.mnemonic.collections.DurableSinglyLinkedList
import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory
import org.apache.mnemonic.sessions.DurableOutputSession
-class MneDurableOutputSession[V: ClassTag](
+private[spark] class MneDurableOutputSession[V: ClassTag] (
serviceName: String,
durableTypes: Array[DurableType],
entityFactoryProxies: Array[EntityFactoryProxy],
slotKeyId: Long,
partitionPoolSize: Long,
- baseDirectory: String,
- outputMemPrefix: String)
+ durableDirectory: String,
+ outputMemFileNameGen: (Long)=>String)
extends DurableOutputSession[V, NonVolatileMemAllocator] {
- var baseDir: String = null
- var memPools: ArrayBuffer[File] = new ArrayBuffer[File]
- var outputFile: File = null
- var outputPrefix: String = null
+ val memPools: ArrayBuffer[File] = new ArrayBuffer[File]
private var _outidx: Long = 0L
initialize(serviceName, durableTypes, entityFactoryProxies,
- slotKeyId, partitionPoolSize, baseDirectory, outputMemPrefix)
+ slotKeyId, partitionPoolSize, durableDirectory, outputMemFileNameGen)
def initialize(
serviceName: String,
@@ -55,22 +52,20 @@ class MneDurableOutputSession[V: ClassTag](
entityFactoryProxies: Array[EntityFactoryProxy],
slotKeyId: Long,
partitionPoolSize: Long,
- baseDirectory: String,
- outputMemPrefix: String) {
+ durableDirectory: String,
+ outputMemFileNameGen: (Long)=>String) {
setServiceName(serviceName)
setDurableTypes(durableTypes)
setEntityFactoryProxies(entityFactoryProxies)
setSlotKeyId(slotKeyId)
setPoolSize(partitionPoolSize)
- baseDir = baseDirectory
- outputPrefix = outputMemPrefix
if (!initNextPool) {
- throw new RuntimeException("Firstly init next pool failed")
+ throw new DurableException("Firstly init next pool failed")
}
}
protected def genNextPoolFile(): File = {
- val file = new File(baseDir, f"${outputPrefix}_${_outidx}%05d.mne")
+ val file = new File(durableDirectory, outputMemFileNameGen(_outidx))
_outidx += 1
memPools += file
file
@@ -82,9 +77,9 @@ class MneDurableOutputSession[V: ClassTag](
getAllocator.close()
setAllocator(null)
}
- outputFile = genNextPoolFile
+ val outputFile = genNextPoolFile
if (outputFile.exists) {
- outputFile.delete
+ throw new DurableException(s"Durable memory file already exists ${outputFile}")
}
m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName),
getPoolSize, outputFile.toString, true);
@@ -104,11 +99,11 @@ object MneDurableOutputSession {
entityFactoryProxies: Array[EntityFactoryProxy],
slotKeyId: Long,
partitionPoolSize: Long,
- baseDirectory: String,
- outputMemPrefix: String): MneDurableOutputSession[V] = {
+ durableDirectory: String,
+ outputMemFileNameGen: (Long)=>String): MneDurableOutputSession[V] = {
val ret = new MneDurableOutputSession[V] (
serviceName, durableTypes, entityFactoryProxies,
- slotKeyId, partitionPoolSize, baseDirectory, outputMemPrefix)
+ slotKeyId, partitionPoolSize, durableDirectory, outputMemFileNameGen)
ret
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/70c53f24/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
index 9975c14..0c9c647 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
@@ -18,13 +18,16 @@
package org.apache.mnemonic.spark.rdd;
import java.io.File
+import scala.util._
import org.apache.spark.rdd.RDD
-import org.apache.spark.{ Partition, TaskContext }
+import org.apache.spark.{ Partition, TaskContext, SparkContext }
import org.apache.spark.internal.Logging
+import org.apache.commons.io.FileUtils
import scala.reflect.{ classTag, ClassTag }
import scala.collection.mutable.HashMap
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
import org.apache.mnemonic.ConfigurationException
import org.apache.mnemonic.DurableType
import org.apache.mnemonic.EntityFactoryProxy
@@ -36,27 +39,28 @@ import org.apache.mnemonic.spark.MneDurableInputSession
import org.apache.mnemonic.spark.MneDurableOutputSession
import org.apache.mnemonic.spark.DurableException
-class DurableRDD[D: ClassTag, T: ClassTag](
- var prev: RDD[T],
+private[spark] class DurableRDD[D: ClassTag, T: ClassTag] (
+ private var rdd: RDD[T],
serviceName: String, durableTypes: Array[DurableType],
entityFactoryProxies: Array[EntityFactoryProxy], slotKeyId: Long,
- partitionPoolSize: Long, baseDirectory: String,
+ partitionPoolSize: Long, durableDirectory: String,
f: (T, ObjectCreator[D, NonVolatileMemAllocator]) => Option[D],
preservesPartitioning: Boolean = false)
- extends RDD[D](prev) {
+ extends RDD[D](rdd) {
- private val _parmap = HashMap.empty[Partition, Array[File]]
+ val durdddir = DurableRDD.getRddDirName(durableDirectory, id)
+ DurableRDD.resetRddDir(durdddir)
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override protected def getPartitions: Array[Partition] = firstParent[T].partitions
def prepareDurablePartition(split: Partition, context: TaskContext,
- iterator: Iterator[T]) {
+ iterator: Iterator[T]): Array[File] = {
val outsess = MneDurableOutputSession[D](serviceName,
durableTypes, entityFactoryProxies, slotKeyId,
- partitionPoolSize, baseDirectory,
- f"mem_${this.hashCode()}%010d_${split.hashCode()}%010d")
+ partitionPoolSize, durdddir.toString,
+ DurableRDD.genDurableFileName(split.hashCode)_)
try {
for (item <- iterator) {
f(item, outsess) match {
@@ -64,41 +68,133 @@ class DurableRDD[D: ClassTag, T: ClassTag](
case None =>
}
}
- _parmap += (split -> outsess.memPools.toArray)
} finally {
outsess.close()
}
+ outsess.memPools.toArray
}
override def compute(split: Partition, context: TaskContext): Iterator[D] = {
- if (!(_parmap contains split)) {
- prepareDurablePartition(split, context, firstParent[T].iterator(split, context))
- logInfo(s"Done persisting RDD ${prev.id} to ${baseDirectory}")
- }
- val memplist = _parmap.get(split) match {
+ val mempListOpt: Option[Array[File]] =
+ DurableRDD.collectMemPoolFileList(durdddir.toString, DurableRDD.genDurableFileName(split.hashCode)_)
+ val memplist = mempListOpt match {
+ case None => {
+ val mplst = prepareDurablePartition(split, context, firstParent[T].iterator(split, context))
+ logInfo(s"Done transformed RDD #${rdd.id} to durableRDD #${id} on ${durdddir.toString}")
+ mplst
+ }
case Some(mplst) => mplst
- case None => throw new DurableException("Not construct durable partition properly")
}
val insess = MneDurableInputSession[D](serviceName,
durableTypes, entityFactoryProxies, slotKeyId, memplist)
insess.iterator.asScala
}
- override def clearDependencies() {
+ override def clearDependencies {
super.clearDependencies()
- prev = null
+ rdd = null
}
- def close {
- if (null != _parmap) {
- _parmap foreach { case (par, mplst) =>
- mplst foreach {(file) =>
- if (file.exists) {
- file.delete
- }
- }
+ def reset {
+ DurableRDD.resetRddDir(durdddir)
+ }
+}
+
+object DurableRDD {
+
+ val durableSubDirNameTemplate = "durable-rdd-%010d"
+ val durableFileNameTemplate = "mem_%010d_%010d.mne"
+
+ private var durableDir: Option[String] = None
+
+ def getDefaultDurableBaseDir(sc: SparkContext): String = {
+ try {
+ sc.getConf.get("spark.durable-basedir").trim
+ } catch {
+ case _: NoSuchElementException =>
+ throw new IllegalArgumentException("spark.durable-basedir not specified for DurableRDD")
+ case _: Throwable =>
+ throw new DurableException("transforming durable base directories failed")
+ }
+ }
+
+ def setDurableBaseDir(sc: SparkContext, baseDir: String) {
+ durableDir = Option(baseDir) map { bdir =>
+ val fdir = new File(bdir, sc.applicationId)
+ if (!fdir.exists && !fdir.mkdirs) {
+ throw new DurableException(s"Cannot create durable directory ${fdir.toString}")
}
- _parmap.clear()
+ fdir.toString
}
}
+
+ def getDurableDir(sc: SparkContext): Option[String] = {
+ durableDir match {
+ case None => setDurableBaseDir(sc, getDefaultDurableBaseDir(sc))
+ case _ =>
+ }
+ durableDir
+ }
+
+ def getRddDirName(durableDir: String, rddid: Int): String = {
+ new File(durableDir, durableSubDirNameTemplate.format(rddid)).toString
+ }
+
+ def resetRddDir(rddDirName: String) {
+ val durdddir = new File(rddDirName)
+ if (durdddir.exists) {
+ FileUtils.deleteDirectory(durdddir)
+ }
+ if (!durdddir.mkdir) {
+ throw new DurableException(s"Durable RDD directory ${durdddir.toString} cannot be created")
+ }
+ }
+
+ def genDurableFileName(splitId: Int)(mempidx: Long): String = {
+ durableFileNameTemplate.format(splitId, mempidx)
+ }
+
+ def collectMemPoolFileList(durddir: String, memFileNameGen: (Long)=>String): Option[Array[File]] = {
+ val flist: ArrayBuffer[File] = new ArrayBuffer[File]
+ var idx: Long = 0L
+ var file: File = null
+ var wstop = true
+ while (wstop) {
+ file = new File(durddir, memFileNameGen(idx))
+ idx = idx + 1
+ if (file.exists) {
+ flist += file
+ } else {
+ wstop = false
+ }
+ }
+ if (flist.isEmpty) {
+ None
+ } else {
+ Some(flist.toArray)
+ }
+ }
+
+ def apply[D: ClassTag, T: ClassTag] (
+ rdd: RDD[T],
+ serviceName: String, durableTypes: Array[DurableType],
+ entityFactoryProxies: Array[EntityFactoryProxy], slotKeyId: Long,
+ partitionPoolSize: Long,
+ f: (T, ObjectCreator[D, NonVolatileMemAllocator]) => Option[D],
+ preservesPartitioning: Boolean = false) = {
+ val sc: SparkContext = rdd.context
+ val ret = new DurableRDD[D, T](rdd,
+ serviceName, durableTypes, entityFactoryProxies, slotKeyId,
+ partitionPoolSize, getDurableDir(sc).get, f, preservesPartitioning)
+ //sc.cleaner.foreach(_.registerRDDForCleanup(ret))
+ ret
+ }
+
+ def cleanupForApp(sc: SparkContext) {
+ FileUtils.deleteDirectory(new File(getDurableDir(sc).get))
+ }
+
+ def cleanupForRdd(sc: SparkContext, rddid: Int) {
+ FileUtils.deleteDirectory(new File(getRddDirName(getDurableDir(sc).get, rddid)))
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/70c53f24/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
index cb459ea..6e60e33 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
@@ -18,7 +18,6 @@
package org.apache.mnemonic.spark.rdd
import org.apache.spark.rdd.RDD
-import org.apache.spark.TaskContext
import scala.reflect.ClassTag
import scala.language.implicitConversions
import org.apache.mnemonic.NonVolatileMemAllocator
@@ -29,17 +28,17 @@ import org.apache.mnemonic.sessions.ObjectCreator
class DurableRDDFunctions[T: ClassTag](rdd: RDD[T]) extends Serializable {
def makeDurable[D: ClassTag](
- serviceName: String,
- durableTypes: Array[DurableType],
- entityFactoryProxies: Array[EntityFactoryProxy],
- slotKeyId: Long,
- partitionPoolSize: Long,
- baseDirectory: String,
- f: (T, ObjectCreator[D, NonVolatileMemAllocator]) => Option[D],
- preservesPartitioning: Boolean = false) =
- new DurableRDD[D, T](rdd,
+ serviceName: String,
+ durableTypes: Array[DurableType],
+ entityFactoryProxies: Array[EntityFactoryProxy],
+ slotKeyId: Long,
+ partitionPoolSize: Long,
+ f: (T, ObjectCreator[D, NonVolatileMemAllocator]) => Option[D],
+ preservesPartitioning: Boolean = false) = {
+ DurableRDD[D, T](rdd,
serviceName, durableTypes, entityFactoryProxies, slotKeyId,
- partitionPoolSize, baseDirectory, f, preservesPartitioning)
+ partitionPoolSize, f, preservesPartitioning)
+ }
}
object DurableRDDFunctions {
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/70c53f24/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
index ac33e7c..1937863 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
@@ -45,6 +45,7 @@ class DurableRDDSpec extends TestSpec {
.setAppName("Test")
val sc = new SparkContext(conf)
// sc.getConf.getAll.foreach(println)
+ // DurableRDD.setDurableBaseDir(sc, defaultBaseDirectory)
val seed: RDD[Int] = sc.parallelize(
Seq.fill(defaultNumOfPartitions)(defaultNumOfRecordsPerPartition), defaultNumOfPartitions)
val data = seed flatMap (recnum => Seq.fill(recnum)(Random.nextInt)) cache //must be cached to fix rand numbers
@@ -52,14 +53,16 @@ class DurableRDDSpec extends TestSpec {
defaultServiceName,
Array(DurableType.LONG), Array(),
defaultSlotKeyId, defaultPartitionSize,
- defaultBaseDirectory,
(v: Int, oc: ObjectCreator[Long, NonVolatileMemAllocator])=>
{ Some(v.asInstanceOf[Long]) })
// data.collect().foreach(println)
// durdd.collect().foreach(println)
val (rcnt, rsum) = (data.count, data.sum)
val (dcnt, dsum) = (durdd.count, durdd.sum)
- durdd.close
+ durdd.reset
+ /*sys.addShutdownHook({
+ DurableRDD.cleanupForApp(sc)
+ })*/
assertResult((rcnt, rsum)) {
(dcnt, dsum)
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/70c53f24/mnemonic-spark/pom.xml
----------------------------------------------------------------------
diff --git a/mnemonic-spark/pom.xml b/mnemonic-spark/pom.xml
index d321839..73978ac 100644
--- a/mnemonic-spark/pom.xml
+++ b/mnemonic-spark/pom.xml
@@ -165,6 +165,9 @@
<skipTests>${skipTests}</skipTests>
<argLine>-Djava.ext.dirs=${memory.service.dist.dir}:${computing.service.dist.dir}</argLine>
<systemProperties>
+ <spark.durable-basedir>
+ .
+ </spark.durable-basedir>
<spark.service-dist-dirs>
${memory.service.dist.dir}:${computing.service.dist.dir}
</spark.service-dist-dirs>