You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/04/25 20:17:55 UTC

incubator-mnemonic git commit: MNEMONIC-242: Add a test case to verify the summation of Long type dataset & bugfixes

Repository: incubator-mnemonic
Updated Branches:
  refs/heads/master 3a8c52cef -> 1e6bf9558


MNEMONIC-242: Add a test case to verify the summation of Long type dataset & bugfixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/1e6bf955
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/1e6bf955
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/1e6bf955

Branch: refs/heads/master
Commit: 1e6bf955827e3eb10586f4279722689735966735
Parents: 3a8c52c
Author: Wang, Gang(Gary) <ga...@intel.com>
Authored: Tue Apr 25 13:01:23 2017 -0700
Committer: Wang, Gang(Gary) <ga...@intel.com>
Committed: Tue Apr 25 13:01:23 2017 -0700

----------------------------------------------------------------------
 .gitignore                                      |  1 +
 .../mnemonic/NonVolatileMemAllocator.java       |  6 +-
 .../main/java/org/apache/mnemonic/Utils.java    | 10 +--
 .../apache/mnemonic/VolatileMemAllocator.java   |  6 +-
 .../hadoop/MneDurableOutputSession.java         |  3 -
 .../mnemonic/sessions/DurableOutputSession.java |  4 +-
 .../mnemonic/spark/DurableException.scala       | 22 +++++++
 .../mnemonic/spark/MneDurableInputSession.scala | 45 ++++++++++----
 .../spark/MneDurableOutputSession.scala         | 65 ++++++++++++++------
 .../apache/mnemonic/spark/rdd/DurableRDD.scala  | 31 +++++++---
 .../spark/rdd/DurableRDDFunctions.scala         |  5 +-
 .../mnemonic/spark/rdd/DurableRDDSpec.scala     | 39 +++++++++++-
 mnemonic-spark/pom.xml                          | 10 +++
 pom.xml                                         |  1 +
 14 files changed, 194 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 182beb7..368b13a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -46,4 +46,5 @@ testlog/
 *.pyc
 .cache-tests
 .cache-main
+*.mne
 

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-core/src/main/java/org/apache/mnemonic/NonVolatileMemAllocator.java
----------------------------------------------------------------------
diff --git a/mnemonic-core/src/main/java/org/apache/mnemonic/NonVolatileMemAllocator.java b/mnemonic-core/src/main/java/org/apache/mnemonic/NonVolatileMemAllocator.java
index cd75055..25d69ba 100644
--- a/mnemonic-core/src/main/java/org/apache/mnemonic/NonVolatileMemAllocator.java
+++ b/mnemonic-core/src/main/java/org/apache/mnemonic/NonVolatileMemAllocator.java
@@ -56,9 +56,11 @@ public class NonVolatileMemAllocator extends RestorableAllocator<NonVolatileMemA
    *          a place holder, always specify it as true
    */
   public NonVolatileMemAllocator(NonVolatileMemoryAllocatorService nvmasvc, long capacity, String uri, boolean isnew) {
-    assert null != nvmasvc : "NonVolatileMemoryAllocatorService object is null";
+    if (null == nvmasvc) {
+      throw new IllegalArgumentException("NonVolatileMemoryAllocatorService object is null");
+    }
     if (capacity <= 0) {
-      throw new IllegalArgumentException("BigDataPMemAllocator cannot be initialized with capacity <= 0.");
+      throw new IllegalArgumentException("NonVolatileMemAllocator cannot be initialized with capacity <= 0.");
     }
 
     m_nvmasvc = nvmasvc;

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-core/src/main/java/org/apache/mnemonic/Utils.java
----------------------------------------------------------------------
diff --git a/mnemonic-core/src/main/java/org/apache/mnemonic/Utils.java b/mnemonic-core/src/main/java/org/apache/mnemonic/Utils.java
index faadec4..41b361e 100644
--- a/mnemonic-core/src/main/java/org/apache/mnemonic/Utils.java
+++ b/mnemonic-core/src/main/java/org/apache/mnemonic/Utils.java
@@ -113,17 +113,18 @@ public class Utils {
    *
    * @return the non-volatile memory allocator service instance
    */
-  public static NonVolatileMemoryAllocatorService getNonVolatileMemoryAllocatorService(String id) {
+  public static synchronized NonVolatileMemoryAllocatorService getNonVolatileMemoryAllocatorService(String id) {
     NonVolatileMemoryAllocatorService ret = null;
     if (null == m_nvmasvcloader) {
       m_nvmasvcloader = ServiceLoader.load(NonVolatileMemoryAllocatorService.class);
     }
     Iterator<NonVolatileMemoryAllocatorService> svcit = m_nvmasvcloader.iterator();
     NonVolatileMemoryAllocatorService svc = null;
-    while (null == ret && svcit.hasNext()) {
+    while (svcit.hasNext()) {
       svc = svcit.next();
       if (svc.getServiceId().equals(id)) {
         ret = svc;
+        break;
       }
     }
     assert null != ret : "NonVolatileMemoryAllocatorService \'" + id + "\' not found!";
@@ -138,17 +139,18 @@ public class Utils {
    *
    * @return the durable general computing service instance
    */
-  public static GeneralComputingService getGeneralComputingService(String id) {
+  public static synchronized GeneralComputingService getGeneralComputingService(String id) {
     GeneralComputingService ret = null;
     if (null == m_gcompsvcloader) {
       m_gcompsvcloader = ServiceLoader.load(GeneralComputingService.class);
     }
     Iterator<GeneralComputingService> svcit = m_gcompsvcloader.iterator();
     GeneralComputingService svc = null;
-    while (null == ret && svcit.hasNext()) {
+    while (svcit.hasNext()) {
       svc = svcit.next();
       if (svc.getServiceId().equals(id)) {
         ret = svc;
+        break;
       }
     }
     assert null != ret : "GeneralComputingService \'" + id + "\' not found!";

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-core/src/main/java/org/apache/mnemonic/VolatileMemAllocator.java
----------------------------------------------------------------------
diff --git a/mnemonic-core/src/main/java/org/apache/mnemonic/VolatileMemAllocator.java b/mnemonic-core/src/main/java/org/apache/mnemonic/VolatileMemAllocator.java
index e4c04cc..a94b751 100644
--- a/mnemonic-core/src/main/java/org/apache/mnemonic/VolatileMemAllocator.java
+++ b/mnemonic-core/src/main/java/org/apache/mnemonic/VolatileMemAllocator.java
@@ -56,9 +56,11 @@ public class VolatileMemAllocator extends RestorableAllocator<VolatileMemAllocat
    *          a place holder, always specify it as true
    */
   public VolatileMemAllocator(VolatileMemoryAllocatorService vmasvc, long capacity, String uri, boolean isnew) {
-    assert null != vmasvc : "VolatileMemoryAllocatorService object is null";
+    if (null == vmasvc) {
+      throw new IllegalArgumentException("VolatileMemoryAllocatorService object is null");
+    }
     if (capacity <= 0) {
-      throw new IllegalArgumentException("BigDataMemAllocator cannot be initialized with capacity <= 0.");
+      throw new IllegalArgumentException("VolatileMemAllocator cannot be initialized with capacity <= 0.");
     }
 
     m_vmasvc = vmasvc;

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
index a34d036..12e0b3d 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
@@ -22,11 +22,9 @@ import org.apache.mnemonic.ConfigurationException;
 import org.apache.mnemonic.DurableType;
 import org.apache.mnemonic.NonVolatileMemAllocator;
 import org.apache.mnemonic.Utils;
-import org.apache.mnemonic.collections.DurableSinglyLinkedList;
 import org.apache.mnemonic.sessions.DurableOutputSession;
 
 import java.text.NumberFormat;
-import java.util.HashMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -57,7 +55,6 @@ public class MneDurableOutputSession<V>
   }
 
   public void initialize(String prefix) {
-    m_recordmap = new HashMap<V, DurableSinglyLinkedList<V>>();
     readConfig(prefix);
     initNextPool();
   }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
index 71f8e58..e21b5a8 100644
--- a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
@@ -26,6 +26,7 @@ import org.apache.mnemonic.RestorableAllocator;
 import org.apache.mnemonic.collections.DurableSinglyLinkedList;
 import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
 
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -39,7 +40,8 @@ public abstract class DurableOutputSession<V, A extends RestorableAllocator<A>>
   private EntityFactoryProxy[] entityFactoryProxies;
   private long slotKeyId;
 
-  protected Map<V, DurableSinglyLinkedList<V>> m_recordmap;
+  protected Map<V, DurableSinglyLinkedList<V>> m_recordmap =
+      new HashMap<V, DurableSinglyLinkedList<V>>();
   protected boolean m_newpool;
   protected long m_poolidx = 0L;
   protected Pair<DurableType[], EntityFactoryProxy[]> m_recparmpair;

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/DurableException.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/DurableException.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/DurableException.scala
new file mode 100644
index 0000000..c96aa73
--- /dev/null
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/DurableException.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.spark
+
+class DurableException(msg: String) extends RuntimeException(msg) {
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
index d027c52..e3f6f67 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
@@ -30,10 +30,36 @@ import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory
 import org.apache.mnemonic.sessions.DurableInputSession
 import org.apache.mnemonic.sessions.SessionIterator
 
-class MneDurableInputSession[V: ClassTag]
+class MneDurableInputSession[V: ClassTag] (
+    serviceName: String,
+    durableTypes: Array[DurableType],
+    entityFactoryProxies: Array[EntityFactoryProxy],
+    slotKeyId: Long,
+    memPoolList: Array[File] )
     extends DurableInputSession[V, NonVolatileMemAllocator] {
 
-  var fileList: Array[File] = null
+  var memPools: Array[File] = null
+
+  private var flistIter:Iterator[File] = null
+
+  initialize(serviceName, durableTypes, entityFactoryProxies,
+      slotKeyId, memPoolList)
+
+  def initialize (
+    serviceName: String,
+    durableTypes: Array[DurableType],
+    entityFactoryProxies: Array[EntityFactoryProxy],
+    slotKeyId: Long,
+    memPoolList: Array[File]) {
+    setServiceName(serviceName)
+    setDurableTypes(durableTypes)
+    setEntityFactoryProxies(entityFactoryProxies)
+    setSlotKeyId(slotKeyId);
+    memPools = memPoolList
+    if (null != memPools) {
+      flistIter = memPools.iterator
+    }
+  }
 
   override def initNextPool(sessiter: SessionIterator[V, NonVolatileMemAllocator]): Boolean = {
     var ret: Boolean = false
@@ -41,9 +67,9 @@ class MneDurableInputSession[V: ClassTag]
       sessiter.getAllocator.close
       sessiter.setAllocator(null)
     }
-    for (file <- fileList) {
+    if (null != flistIter && flistIter.hasNext) {
       sessiter.setAllocator(new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(
-        getServiceName), 1024000L, file.toString, true));
+        getServiceName), 1024000L, flistIter.next.toString, true));
       if (null != sessiter.getAllocator) {
         sessiter.setHandler(sessiter.getAllocator.getHandler(getSlotKeyId))
         if (0L != sessiter.getHandler) {
@@ -67,13 +93,10 @@ object MneDurableInputSession {
     durableTypes: Array[DurableType],
     entityFactoryProxies: Array[EntityFactoryProxy],
     slotKeyId: Long,
-    files: Array[File]): MneDurableInputSession[V] = {
-    var ret = new MneDurableInputSession[V]
-    ret.setServiceName(serviceName)
-    ret.setDurableTypes(durableTypes)
-    ret.setEntityFactoryProxies(entityFactoryProxies)
-    ret.setSlotKeyId(slotKeyId);
-    ret.fileList = files
+    memPoolList: Array[File]): MneDurableInputSession[V] = {
+    val ret = new MneDurableInputSession[V] (
+        serviceName, durableTypes, entityFactoryProxies,
+        slotKeyId, memPoolList)
     ret
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
index 657cb16..4383866 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
@@ -16,6 +16,7 @@
  */
 
 package org.apache.mnemonic.spark
+
 import java.io.File
 import scala.reflect.{ classTag, ClassTag }
 import scala.collection.mutable.ArrayBuffer
@@ -29,34 +30,67 @@ import org.apache.mnemonic.collections.DurableSinglyLinkedList
 import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory
 import org.apache.mnemonic.sessions.DurableOutputSession
 
-class MneDurableOutputSession[V: ClassTag]
+class MneDurableOutputSession[V: ClassTag](
+    serviceName: String,
+    durableTypes: Array[DurableType],
+    entityFactoryProxies: Array[EntityFactoryProxy],
+    slotKeyId: Long,
+    partitionPoolSize: Long,
+    baseDirectory: String,
+    outputMemPrefix: String)
     extends DurableOutputSession[V, NonVolatileMemAllocator] {
 
   var baseDir: String = null
-  var fileList: ArrayBuffer[File] = new ArrayBuffer[File]
+  var memPools: ArrayBuffer[File] = new ArrayBuffer[File]
   var outputFile: File = null
   var outputPrefix: String = null
   private var _outidx: Long = 0L
 
+  initialize(serviceName, durableTypes, entityFactoryProxies,
+      slotKeyId, partitionPoolSize, baseDirectory, outputMemPrefix)
+
+  def initialize(
+    serviceName: String,
+    durableTypes: Array[DurableType],
+    entityFactoryProxies: Array[EntityFactoryProxy],
+    slotKeyId: Long,
+    partitionPoolSize: Long,
+    baseDirectory: String,
+    outputMemPrefix: String) {
+    setServiceName(serviceName)
+    setDurableTypes(durableTypes)
+    setEntityFactoryProxies(entityFactoryProxies)
+    setSlotKeyId(slotKeyId)
+    setPoolSize(partitionPoolSize)
+    baseDir = baseDirectory
+    outputPrefix = outputMemPrefix
+    if (!initNextPool) {
+      throw new RuntimeException("Firstly init next pool failed")
+    }
+  }
+
   protected def genNextPoolFile(): File = {
     val file = new File(baseDir, f"${outputPrefix}_${_outidx}%05d.mne")
-    _outidx = _outidx + 1
-    fileList += file
+    _outidx += 1
+    memPools += file
     file
   }
 
   override def initNextPool(): Boolean = {
     var ret: Boolean = false
     if (null != getAllocator) {
-      getAllocator.close();
-      setAllocator(null);
+      getAllocator.close()
+      setAllocator(null)
+    }
+    outputFile = genNextPoolFile
+    if (outputFile.exists) {
+      outputFile.delete
     }
-    outputFile = genNextPoolFile;
     m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName),
       getPoolSize, outputFile.toString, true);
     if (null != getAllocator) {
       m_newpool = true;
-      ret = true;
+      ret = true
     }
     ret
   }
@@ -70,16 +104,11 @@ object MneDurableOutputSession {
     entityFactoryProxies: Array[EntityFactoryProxy],
     slotKeyId: Long,
     partitionPoolSize: Long,
-    baseDir: String,
-    outputPrefix: String): MneDurableOutputSession[V] = {
-    var ret = new MneDurableOutputSession[V]
-    ret.setServiceName(serviceName)
-    ret.setDurableTypes(durableTypes)
-    ret.setEntityFactoryProxies(entityFactoryProxies)
-    ret.setSlotKeyId(slotKeyId)
-    ret.setPoolSize(partitionPoolSize)
-    ret.baseDir = baseDir
-    ret.outputPrefix = outputPrefix
+    baseDirectory: String,
+    outputMemPrefix: String): MneDurableOutputSession[V] = {
+    val ret = new MneDurableOutputSession[V] (
+        serviceName, durableTypes, entityFactoryProxies,
+        slotKeyId, partitionPoolSize, baseDirectory, outputMemPrefix)
     ret
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
index 4c0adb4..9975c14 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
@@ -21,6 +21,7 @@ import java.io.File
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.{ Partition, TaskContext }
+import org.apache.spark.internal.Logging
 import scala.reflect.{ classTag, ClassTag }
 import scala.collection.mutable.HashMap
 import scala.collection.JavaConverters._
@@ -33,12 +34,13 @@ import org.apache.mnemonic.sessions.SessionIterator
 import org.apache.mnemonic.sessions.ObjectCreator
 import org.apache.mnemonic.spark.MneDurableInputSession
 import org.apache.mnemonic.spark.MneDurableOutputSession
+import org.apache.mnemonic.spark.DurableException
 
 class DurableRDD[D: ClassTag, T: ClassTag](
   var prev: RDD[T],
   serviceName: String, durableTypes: Array[DurableType],
   entityFactoryProxies: Array[EntityFactoryProxy], slotKeyId: Long,
-  partitionPoolSize: Long, baseDir: String,
+  partitionPoolSize: Long, baseDirectory: String,
   f: (T, ObjectCreator[D, NonVolatileMemAllocator]) => Option[D],
   preservesPartitioning: Boolean = false)
     extends RDD[D](prev) {
@@ -53,8 +55,8 @@ class DurableRDD[D: ClassTag, T: ClassTag](
       iterator: Iterator[T]) {
     val outsess = MneDurableOutputSession[D](serviceName,
         durableTypes, entityFactoryProxies, slotKeyId,
-        partitionPoolSize, baseDir,
-        f"mem_${this.hashCode()}%10d_${split.hashCode()}%10d")
+        partitionPoolSize, baseDirectory,
+        f"mem_${this.hashCode()}%010d_${split.hashCode()}%010d")
     try {
       for (item <- iterator) {
         f(item, outsess) match {
@@ -62,7 +64,7 @@ class DurableRDD[D: ClassTag, T: ClassTag](
           case None =>
         }
       }
-      _parmap += (split -> outsess.fileList.toArray)
+      _parmap += (split -> outsess.memPools.toArray)
     } finally {
       outsess.close()
     }
@@ -71,13 +73,14 @@ class DurableRDD[D: ClassTag, T: ClassTag](
   override def compute(split: Partition, context: TaskContext): Iterator[D] = {
     if (!(_parmap contains split)) {
       prepareDurablePartition(split, context, firstParent[T].iterator(split, context))
+      logInfo(s"Done persisting RDD ${prev.id} to ${baseDirectory}")
     }
-    val flist = _parmap.get(split) match {
-      case Some(flst) => flst
-      case None => throw new RuntimeException("Not construct durable partition properly")
+    val memplist = _parmap.get(split) match {
+      case Some(mplst) => mplst
+      case None => throw new DurableException("Not construct durable partition properly")
     }
     val insess = MneDurableInputSession[D](serviceName,
-        durableTypes, entityFactoryProxies, slotKeyId, flist)
+        durableTypes, entityFactoryProxies, slotKeyId, memplist)
     insess.iterator.asScala
   }
 
@@ -86,4 +89,16 @@ class DurableRDD[D: ClassTag, T: ClassTag](
     prev = null
   }
 
+  def close {
+    if (null != _parmap) {
+      _parmap foreach { case (par, mplst) =>
+        mplst foreach {(file) =>
+          if (file.exists) {
+            file.delete
+          }
+        }
+      }
+      _parmap.clear()
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
index 7ef8325..cb459ea 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
@@ -34,13 +34,12 @@ class DurableRDDFunctions[T: ClassTag](rdd: RDD[T]) extends Serializable {
     entityFactoryProxies: Array[EntityFactoryProxy],
     slotKeyId: Long,
     partitionPoolSize: Long,
-    baseDir: String,
+    baseDirectory: String,
     f: (T, ObjectCreator[D, NonVolatileMemAllocator]) => Option[D],
     preservesPartitioning: Boolean = false) =
     new DurableRDD[D, T](rdd,
       serviceName, durableTypes, entityFactoryProxies, slotKeyId,
-      partitionPoolSize, baseDir, f, preservesPartitioning)
-
+      partitionPoolSize, baseDirectory, f, preservesPartitioning)
 }
 
 object DurableRDDFunctions {

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
index 41b9449..6349ce5 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
@@ -18,13 +18,48 @@
 package org.apache.mnemonic.spark.rdd;
 
 import org.apache.mnemonic.spark.TestSpec
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.mnemonic.spark.rdd.DurableRDDFunctions._
+import org.apache.mnemonic.DurableType
+import org.apache.mnemonic.NonVolatileMemAllocator
+import org.apache.mnemonic.EntityFactoryProxy
+import org.apache.mnemonic.sessions.ObjectCreator
 
 class DurableRDDSpec extends TestSpec {
 
+  val defaultServiceName = "pmalloc"
+  val defaultSlotKeyId = 2L
+  val defaultPartitionSize = 1024 * 1024 * 1024L
+  val defaultBaseDirectory = "."
+  val defaultNumOfRecordsPerPartition = 5000L
+
   behavior of "A DurableRDD"
 
-  it should "have size 0" in {
-    assert(Set.empty.size == 0)
+  it should "have the same sum value" in {
+    val conf = new SparkConf()
+        .setMaster("local[*]")
+        .setAppName("Test")
+    val sc = new SparkContext(conf)
+    // sc.getConf.getAll.foreach(println)
+    val ds = Seq(5, 6, 3, 8, 4)
+    val data: RDD[Int] = sc.parallelize(ds)
+    val durdd = data.makeDurable[Long](
+        defaultServiceName,
+        Array(DurableType.LONG), Array(),
+        defaultSlotKeyId, defaultPartitionSize,
+        defaultBaseDirectory,
+        (v: Int, oc: ObjectCreator[Long, NonVolatileMemAllocator])=>
+          { Some(v.asInstanceOf[Long]) })
+    // data.collect().foreach(println)
+    // durdd.collect().foreach(println)
+    val (rcnt, rsum) = (data.count, data.sum)
+    val (dcnt, dsum) = (durdd.count, durdd.sum)
+    durdd.close
+    assertResult((rcnt, rsum)) {
+      (dcnt, dsum)
+    }
   }
 
   it should "produce NoSuchElementException when head is invoked" in {

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-spark/pom.xml
----------------------------------------------------------------------
diff --git a/mnemonic-spark/pom.xml b/mnemonic-spark/pom.xml
index 76beef8..d321839 100644
--- a/mnemonic-spark/pom.xml
+++ b/mnemonic-spark/pom.xml
@@ -161,7 +161,17 @@
             <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
             <junitxml>.</junitxml>
             <filereports>SparkIntegrationTestSuite.txt</filereports>
+            <forkMode>always</forkMode>
             <skipTests>${skipTests}</skipTests>
+            <argLine>-Djava.ext.dirs=${memory.service.dist.dir}:${computing.service.dist.dir}</argLine>
+            <systemProperties>
+              <spark.service-dist-dirs>
+                ${memory.service.dist.dir}:${computing.service.dist.dir}
+              </spark.service-dist-dirs>
+              <spark.executorEnv.SPARK_JAVA_OPTS>
+                -D${memory.service.dist.dir}:${computing.service.dist.dir}
+              </spark.executorEnv.SPARK_JAVA_OPTS>
+            </systemProperties>
           </configuration>
           <executions>
             <execution>

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ad101f6..a9ba680 100644
--- a/pom.xml
+++ b/pom.xml
@@ -290,6 +290,7 @@
           <configuration>
             <excludes>
               <exclude>.git/**/*</exclude>
+	      <exclude>**/.project</exclude>
               <exclude>**/*.</exclude>
 	      <exclude>**/target/**/*</exclude>
               <exclude>**/README*</exclude>