You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/04/26 18:39:35 UTC

incubator-mnemonic git commit: MNEMONIC-247: Fix the cleanup mechanism for DurableRDD

Repository: incubator-mnemonic
Updated Branches:
  refs/heads/master 3e628a6bb -> 70c53f24b


MNEMONIC-247: Fix the cleanup mechanism for DurableRDD


Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/70c53f24
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/70c53f24
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/70c53f24

Branch: refs/heads/master
Commit: 70c53f24bca66047868a4bfbd631be2cab2c5fd8
Parents: 3e628a6
Author: Wang, Gang(Gary) <ga...@intel.com>
Authored: Wed Apr 26 11:38:59 2017 -0700
Committer: Wang, Gang(Gary) <ga...@intel.com>
Committed: Wed Apr 26 11:38:59 2017 -0700

----------------------------------------------------------------------
 .../mnemonic/spark/MneDurableInputSession.scala |   9 +-
 .../spark/MneDurableOutputSession.scala         |  33 ++--
 .../apache/mnemonic/spark/rdd/DurableRDD.scala  | 150 +++++++++++++++----
 .../spark/rdd/DurableRDDFunctions.scala         |  21 ++-
 .../mnemonic/spark/rdd/DurableRDDSpec.scala     |   7 +-
 mnemonic-spark/pom.xml                          |   3 +
 6 files changed, 158 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/70c53f24/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
index e3f6f67..ee89d2f 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
@@ -30,7 +30,7 @@ import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory
 import org.apache.mnemonic.sessions.DurableInputSession
 import org.apache.mnemonic.sessions.SessionIterator
 
-class MneDurableInputSession[V: ClassTag] (
+private[spark] class MneDurableInputSession[V: ClassTag] (
     serviceName: String,
     durableTypes: Array[DurableType],
     entityFactoryProxies: Array[EntityFactoryProxy],
@@ -38,8 +38,6 @@ class MneDurableInputSession[V: ClassTag] (
     memPoolList: Array[File] )
     extends DurableInputSession[V, NonVolatileMemAllocator] {
 
-  var memPools: Array[File] = null
-
   private var flistIter:Iterator[File] = null
 
   initialize(serviceName, durableTypes, entityFactoryProxies,
@@ -55,9 +53,8 @@ class MneDurableInputSession[V: ClassTag] (
     setDurableTypes(durableTypes)
     setEntityFactoryProxies(entityFactoryProxies)
     setSlotKeyId(slotKeyId);
-    memPools = memPoolList
-    if (null != memPools) {
-      flistIter = memPools.iterator
+    if (null != memPoolList) {
+      flistIter = memPoolList.iterator
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/70c53f24/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
index 4383866..0a5b9c0 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
@@ -30,24 +30,21 @@ import org.apache.mnemonic.collections.DurableSinglyLinkedList
 import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory
 import org.apache.mnemonic.sessions.DurableOutputSession
 
-class MneDurableOutputSession[V: ClassTag](
+private[spark] class MneDurableOutputSession[V: ClassTag] (
     serviceName: String,
     durableTypes: Array[DurableType],
     entityFactoryProxies: Array[EntityFactoryProxy],
     slotKeyId: Long,
     partitionPoolSize: Long,
-    baseDirectory: String,
-    outputMemPrefix: String)
+    durableDirectory: String,
+    outputMemFileNameGen: (Long)=>String)
     extends DurableOutputSession[V, NonVolatileMemAllocator] {
 
-  var baseDir: String = null
-  var memPools: ArrayBuffer[File] = new ArrayBuffer[File]
-  var outputFile: File = null
-  var outputPrefix: String = null
+  val memPools: ArrayBuffer[File] = new ArrayBuffer[File]
   private var _outidx: Long = 0L
 
   initialize(serviceName, durableTypes, entityFactoryProxies,
-      slotKeyId, partitionPoolSize, baseDirectory, outputMemPrefix)
+      slotKeyId, partitionPoolSize, durableDirectory, outputMemFileNameGen)
 
   def initialize(
     serviceName: String,
@@ -55,22 +52,20 @@ class MneDurableOutputSession[V: ClassTag](
     entityFactoryProxies: Array[EntityFactoryProxy],
     slotKeyId: Long,
     partitionPoolSize: Long,
-    baseDirectory: String,
-    outputMemPrefix: String) {
+    durableDirectory: String,
+    outputMemFileNameGen: (Long)=>String) {
     setServiceName(serviceName)
     setDurableTypes(durableTypes)
     setEntityFactoryProxies(entityFactoryProxies)
     setSlotKeyId(slotKeyId)
     setPoolSize(partitionPoolSize)
-    baseDir = baseDirectory
-    outputPrefix = outputMemPrefix
     if (!initNextPool) {
-      throw new RuntimeException("Firstly init next pool failed")
+      throw new DurableException("Firstly init next pool failed")
     }
   }
 
   protected def genNextPoolFile(): File = {
-    val file = new File(baseDir, f"${outputPrefix}_${_outidx}%05d.mne")
+    val file = new File(durableDirectory, outputMemFileNameGen(_outidx))
     _outidx += 1
     memPools += file
     file
@@ -82,9 +77,9 @@ class MneDurableOutputSession[V: ClassTag](
       getAllocator.close()
       setAllocator(null)
     }
-    outputFile = genNextPoolFile
+    val outputFile = genNextPoolFile
     if (outputFile.exists) {
-      outputFile.delete
+      throw new DurableException(s"Durable memory file already exists ${outputFile}")
     }
     m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName),
       getPoolSize, outputFile.toString, true);
@@ -104,11 +99,11 @@ object MneDurableOutputSession {
     entityFactoryProxies: Array[EntityFactoryProxy],
     slotKeyId: Long,
     partitionPoolSize: Long,
-    baseDirectory: String,
-    outputMemPrefix: String): MneDurableOutputSession[V] = {
+    durableDirectory: String,
+    outputMemFileNameGen: (Long)=>String): MneDurableOutputSession[V] = {
     val ret = new MneDurableOutputSession[V] (
         serviceName, durableTypes, entityFactoryProxies,
-        slotKeyId, partitionPoolSize, baseDirectory, outputMemPrefix)
+        slotKeyId, partitionPoolSize, durableDirectory, outputMemFileNameGen)
     ret
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/70c53f24/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
index 9975c14..0c9c647 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
@@ -18,13 +18,16 @@
 package org.apache.mnemonic.spark.rdd;
 
 import java.io.File
+import scala.util._
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.{ Partition, TaskContext }
+import org.apache.spark.{ Partition, TaskContext, SparkContext }
 import org.apache.spark.internal.Logging
+import org.apache.commons.io.FileUtils
 import scala.reflect.{ classTag, ClassTag }
 import scala.collection.mutable.HashMap
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 import org.apache.mnemonic.ConfigurationException
 import org.apache.mnemonic.DurableType
 import org.apache.mnemonic.EntityFactoryProxy
@@ -36,27 +39,28 @@ import org.apache.mnemonic.spark.MneDurableInputSession
 import org.apache.mnemonic.spark.MneDurableOutputSession
 import org.apache.mnemonic.spark.DurableException
 
-class DurableRDD[D: ClassTag, T: ClassTag](
-  var prev: RDD[T],
+private[spark] class DurableRDD[D: ClassTag, T: ClassTag] (
+  private var rdd: RDD[T],
   serviceName: String, durableTypes: Array[DurableType],
   entityFactoryProxies: Array[EntityFactoryProxy], slotKeyId: Long,
-  partitionPoolSize: Long, baseDirectory: String,
+  partitionPoolSize: Long, durableDirectory: String,
   f: (T, ObjectCreator[D, NonVolatileMemAllocator]) => Option[D],
   preservesPartitioning: Boolean = false)
-    extends RDD[D](prev) {
+    extends RDD[D](rdd) {
 
-  private val _parmap = HashMap.empty[Partition, Array[File]]
+  val durdddir = DurableRDD.getRddDirName(durableDirectory, id)
+  DurableRDD.resetRddDir(durdddir)
 
   override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
 
   override protected def getPartitions: Array[Partition] = firstParent[T].partitions
 
   def prepareDurablePartition(split: Partition, context: TaskContext,
-      iterator: Iterator[T]) {
+      iterator: Iterator[T]): Array[File] = {
     val outsess = MneDurableOutputSession[D](serviceName,
         durableTypes, entityFactoryProxies, slotKeyId,
-        partitionPoolSize, baseDirectory,
-        f"mem_${this.hashCode()}%010d_${split.hashCode()}%010d")
+        partitionPoolSize, durdddir.toString,
+        DurableRDD.genDurableFileName(split.hashCode)_)
     try {
       for (item <- iterator) {
         f(item, outsess) match {
@@ -64,41 +68,133 @@ class DurableRDD[D: ClassTag, T: ClassTag](
           case None =>
         }
       }
-      _parmap += (split -> outsess.memPools.toArray)
     } finally {
       outsess.close()
     }
+    outsess.memPools.toArray
   }
 
   override def compute(split: Partition, context: TaskContext): Iterator[D] = {
-    if (!(_parmap contains split)) {
-      prepareDurablePartition(split, context, firstParent[T].iterator(split, context))
-      logInfo(s"Done persisting RDD ${prev.id} to ${baseDirectory}")
-    }
-    val memplist = _parmap.get(split) match {
+    val mempListOpt: Option[Array[File]] =
+      DurableRDD.collectMemPoolFileList(durdddir.toString, DurableRDD.genDurableFileName(split.hashCode)_)
+    val memplist = mempListOpt match {
+      case None => {
+        val mplst = prepareDurablePartition(split, context, firstParent[T].iterator(split, context))
+        logInfo(s"Done transformed RDD #${rdd.id} to durableRDD #${id} on ${durdddir.toString}")
+        mplst
+      }
       case Some(mplst) => mplst
-      case None => throw new DurableException("Not construct durable partition properly")
     }
     val insess = MneDurableInputSession[D](serviceName,
         durableTypes, entityFactoryProxies, slotKeyId, memplist)
     insess.iterator.asScala
   }
 
-  override def clearDependencies() {
+  override def clearDependencies {
     super.clearDependencies()
-    prev = null
+    rdd = null
   }
 
-  def close {
-    if (null != _parmap) {
-      _parmap foreach { case (par, mplst) =>
-        mplst foreach {(file) =>
-          if (file.exists) {
-            file.delete
-          }
-        }
+  def reset {
+    DurableRDD.resetRddDir(durdddir)
+  }
+}
+
+object DurableRDD {
+
+  val durableSubDirNameTemplate = "durable-rdd-%010d"
+  val durableFileNameTemplate = "mem_%010d_%010d.mne"
+
+  private var durableDir: Option[String] = None
+
+  def getDefaultDurableBaseDir(sc: SparkContext): String = {
+    try {
+      sc.getConf.get("spark.durable-basedir").trim
+    } catch {
+      case _: NoSuchElementException =>
+        throw new IllegalArgumentException("spark.durable-basedir not specified for DurableRDD")
+      case _: Throwable =>
+        throw new DurableException("transforming durable base directories failed")
+    }
+  }
+
+  def setDurableBaseDir(sc: SparkContext, baseDir: String) {
+    durableDir = Option(baseDir) map { bdir =>
+      val fdir = new File(bdir, sc.applicationId)
+      if (!fdir.exists && !fdir.mkdirs) {
+        throw new DurableException(s"Cannot create durable directory ${fdir.toString}")
       }
-      _parmap.clear()
+      fdir.toString
     }
   }
+
+  def getDurableDir(sc: SparkContext): Option[String] = {
+    durableDir match {
+      case None => setDurableBaseDir(sc, getDefaultDurableBaseDir(sc))
+      case _ =>
+    }
+    durableDir
+  }
+
+  def getRddDirName(durableDir: String, rddid: Int): String = {
+    new File(durableDir, durableSubDirNameTemplate.format(rddid)).toString
+  }
+
+  def resetRddDir(rddDirName: String) {
+    val durdddir = new File(rddDirName)
+    if (durdddir.exists) {
+      FileUtils.deleteDirectory(durdddir)
+    }
+    if (!durdddir.mkdir) {
+      throw new DurableException(s"Durable RDD directory ${durdddir.toString} cannot be created")
+    }
+  }
+
+  def genDurableFileName(splitId: Int)(mempidx: Long): String = {
+    durableFileNameTemplate.format(splitId, mempidx)
+  }
+
+  def collectMemPoolFileList(durddir: String, memFileNameGen: (Long)=>String): Option[Array[File]] = {
+    val flist: ArrayBuffer[File] = new ArrayBuffer[File]
+    var idx: Long = 0L
+    var file: File = null
+    var wstop = true
+    while (wstop) {
+      file = new File(durddir, memFileNameGen(idx))
+      idx = idx + 1
+      if (file.exists) {
+        flist += file
+      } else {
+        wstop = false
+      }
+    }
+    if (flist.isEmpty) {
+      None
+    } else {
+      Some(flist.toArray)
+    }
+  }
+
+  def apply[D: ClassTag, T: ClassTag] (
+      rdd: RDD[T],
+      serviceName: String, durableTypes: Array[DurableType],
+      entityFactoryProxies: Array[EntityFactoryProxy], slotKeyId: Long,
+      partitionPoolSize: Long,
+      f: (T, ObjectCreator[D, NonVolatileMemAllocator]) => Option[D],
+      preservesPartitioning: Boolean = false) = {
+    val sc: SparkContext = rdd.context
+    val ret = new DurableRDD[D, T](rdd,
+      serviceName, durableTypes, entityFactoryProxies, slotKeyId,
+      partitionPoolSize, getDurableDir(sc).get, f, preservesPartitioning)
+    //sc.cleaner.foreach(_.registerRDDForCleanup(ret))
+    ret
+  }
+
+  def cleanupForApp(sc: SparkContext) {
+    FileUtils.deleteDirectory(new File(getDurableDir(sc).get))
+  }
+
+  def cleanupForRdd(sc: SparkContext, rddid: Int) {
+    FileUtils.deleteDirectory(new File(getRddDirName(getDurableDir(sc).get, rddid)))
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/70c53f24/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
index cb459ea..6e60e33 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
@@ -18,7 +18,6 @@
 package org.apache.mnemonic.spark.rdd
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.TaskContext
 import scala.reflect.ClassTag
 import scala.language.implicitConversions
 import org.apache.mnemonic.NonVolatileMemAllocator
@@ -29,17 +28,17 @@ import org.apache.mnemonic.sessions.ObjectCreator
 class DurableRDDFunctions[T: ClassTag](rdd: RDD[T]) extends Serializable {
 
   def makeDurable[D: ClassTag](
-    serviceName: String,
-    durableTypes: Array[DurableType],
-    entityFactoryProxies: Array[EntityFactoryProxy],
-    slotKeyId: Long,
-    partitionPoolSize: Long,
-    baseDirectory: String,
-    f: (T, ObjectCreator[D, NonVolatileMemAllocator]) => Option[D],
-    preservesPartitioning: Boolean = false) =
-    new DurableRDD[D, T](rdd,
+      serviceName: String,
+      durableTypes: Array[DurableType],
+      entityFactoryProxies: Array[EntityFactoryProxy],
+      slotKeyId: Long,
+      partitionPoolSize: Long,
+      f: (T, ObjectCreator[D, NonVolatileMemAllocator]) => Option[D],
+      preservesPartitioning: Boolean = false) = {
+    DurableRDD[D, T](rdd,
       serviceName, durableTypes, entityFactoryProxies, slotKeyId,
-      partitionPoolSize, baseDirectory, f, preservesPartitioning)
+      partitionPoolSize, f, preservesPartitioning)
+  }
 }
 
 object DurableRDDFunctions {

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/70c53f24/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
index ac33e7c..1937863 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
@@ -45,6 +45,7 @@ class DurableRDDSpec extends TestSpec {
         .setAppName("Test")
     val sc = new SparkContext(conf)
     // sc.getConf.getAll.foreach(println)
+    // DurableRDD.setDurableBaseDir(sc, defaultBaseDirectory)
     val seed: RDD[Int] = sc.parallelize(
         Seq.fill(defaultNumOfPartitions)(defaultNumOfRecordsPerPartition), defaultNumOfPartitions)
     val data = seed flatMap (recnum => Seq.fill(recnum)(Random.nextInt)) cache //must be cached to fix rand numbers
@@ -52,14 +53,16 @@ class DurableRDDSpec extends TestSpec {
         defaultServiceName,
         Array(DurableType.LONG), Array(),
         defaultSlotKeyId, defaultPartitionSize,
-        defaultBaseDirectory,
         (v: Int, oc: ObjectCreator[Long, NonVolatileMemAllocator])=>
           { Some(v.asInstanceOf[Long]) })
     // data.collect().foreach(println)
     // durdd.collect().foreach(println)
     val (rcnt, rsum) = (data.count, data.sum)
     val (dcnt, dsum) = (durdd.count, durdd.sum)
-    durdd.close
+    durdd.reset
+    /*sys.addShutdownHook({
+      DurableRDD.cleanupForApp(sc)
+    })*/
     assertResult((rcnt, rsum)) {
       (dcnt, dsum)
     }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/70c53f24/mnemonic-spark/pom.xml
----------------------------------------------------------------------
diff --git a/mnemonic-spark/pom.xml b/mnemonic-spark/pom.xml
index d321839..73978ac 100644
--- a/mnemonic-spark/pom.xml
+++ b/mnemonic-spark/pom.xml
@@ -165,6 +165,9 @@
             <skipTests>${skipTests}</skipTests>
             <argLine>-Djava.ext.dirs=${memory.service.dist.dir}:${computing.service.dist.dir}</argLine>
             <systemProperties>
+              <spark.durable-basedir>
+                .
+              </spark.durable-basedir>
               <spark.service-dist-dirs>
                 ${memory.service.dist.dir}:${computing.service.dist.dir}
               </spark.service-dist-dirs>