You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/04/25 20:17:55 UTC
incubator-mnemonic git commit: MNEMONIC-242: Add a test case to
verify the summation of Long type dataset & bugfixes
Repository: incubator-mnemonic
Updated Branches:
refs/heads/master 3a8c52cef -> 1e6bf9558
MNEMONIC-242: Add a test case to verify the summation of Long type dataset & bugfixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/1e6bf955
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/1e6bf955
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/1e6bf955
Branch: refs/heads/master
Commit: 1e6bf955827e3eb10586f4279722689735966735
Parents: 3a8c52c
Author: Wang, Gang(Gary) <ga...@intel.com>
Authored: Tue Apr 25 13:01:23 2017 -0700
Committer: Wang, Gang(Gary) <ga...@intel.com>
Committed: Tue Apr 25 13:01:23 2017 -0700
----------------------------------------------------------------------
.gitignore | 1 +
.../mnemonic/NonVolatileMemAllocator.java | 6 +-
.../main/java/org/apache/mnemonic/Utils.java | 10 +--
.../apache/mnemonic/VolatileMemAllocator.java | 6 +-
.../hadoop/MneDurableOutputSession.java | 3 -
.../mnemonic/sessions/DurableOutputSession.java | 4 +-
.../mnemonic/spark/DurableException.scala | 22 +++++++
.../mnemonic/spark/MneDurableInputSession.scala | 45 ++++++++++----
.../spark/MneDurableOutputSession.scala | 65 ++++++++++++++------
.../apache/mnemonic/spark/rdd/DurableRDD.scala | 31 +++++++---
.../spark/rdd/DurableRDDFunctions.scala | 5 +-
.../mnemonic/spark/rdd/DurableRDDSpec.scala | 39 +++++++++++-
mnemonic-spark/pom.xml | 10 +++
pom.xml | 1 +
14 files changed, 194 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 182beb7..368b13a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -46,4 +46,5 @@ testlog/
*.pyc
.cache-tests
.cache-main
+*.mne
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-core/src/main/java/org/apache/mnemonic/NonVolatileMemAllocator.java
----------------------------------------------------------------------
diff --git a/mnemonic-core/src/main/java/org/apache/mnemonic/NonVolatileMemAllocator.java b/mnemonic-core/src/main/java/org/apache/mnemonic/NonVolatileMemAllocator.java
index cd75055..25d69ba 100644
--- a/mnemonic-core/src/main/java/org/apache/mnemonic/NonVolatileMemAllocator.java
+++ b/mnemonic-core/src/main/java/org/apache/mnemonic/NonVolatileMemAllocator.java
@@ -56,9 +56,11 @@ public class NonVolatileMemAllocator extends RestorableAllocator<NonVolatileMemA
* a place holder, always specify it as true
*/
public NonVolatileMemAllocator(NonVolatileMemoryAllocatorService nvmasvc, long capacity, String uri, boolean isnew) {
- assert null != nvmasvc : "NonVolatileMemoryAllocatorService object is null";
+ if (null == nvmasvc) {
+ throw new IllegalArgumentException("NonVolatileMemoryAllocatorService object is null");
+ }
if (capacity <= 0) {
- throw new IllegalArgumentException("BigDataPMemAllocator cannot be initialized with capacity <= 0.");
+ throw new IllegalArgumentException("NonVolatileMemAllocator cannot be initialized with capacity <= 0.");
}
m_nvmasvc = nvmasvc;
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-core/src/main/java/org/apache/mnemonic/Utils.java
----------------------------------------------------------------------
diff --git a/mnemonic-core/src/main/java/org/apache/mnemonic/Utils.java b/mnemonic-core/src/main/java/org/apache/mnemonic/Utils.java
index faadec4..41b361e 100644
--- a/mnemonic-core/src/main/java/org/apache/mnemonic/Utils.java
+++ b/mnemonic-core/src/main/java/org/apache/mnemonic/Utils.java
@@ -113,17 +113,18 @@ public class Utils {
*
* @return the non-volatile memory allocator service instance
*/
- public static NonVolatileMemoryAllocatorService getNonVolatileMemoryAllocatorService(String id) {
+ public static synchronized NonVolatileMemoryAllocatorService getNonVolatileMemoryAllocatorService(String id) {
NonVolatileMemoryAllocatorService ret = null;
if (null == m_nvmasvcloader) {
m_nvmasvcloader = ServiceLoader.load(NonVolatileMemoryAllocatorService.class);
}
Iterator<NonVolatileMemoryAllocatorService> svcit = m_nvmasvcloader.iterator();
NonVolatileMemoryAllocatorService svc = null;
- while (null == ret && svcit.hasNext()) {
+ while (svcit.hasNext()) {
svc = svcit.next();
if (svc.getServiceId().equals(id)) {
ret = svc;
+ break;
}
}
assert null != ret : "NonVolatileMemoryAllocatorService \'" + id + "\' not found!";
@@ -138,17 +139,18 @@ public class Utils {
*
* @return the durable general computing service instance
*/
- public static GeneralComputingService getGeneralComputingService(String id) {
+ public static synchronized GeneralComputingService getGeneralComputingService(String id) {
GeneralComputingService ret = null;
if (null == m_gcompsvcloader) {
m_gcompsvcloader = ServiceLoader.load(GeneralComputingService.class);
}
Iterator<GeneralComputingService> svcit = m_gcompsvcloader.iterator();
GeneralComputingService svc = null;
- while (null == ret && svcit.hasNext()) {
+ while (svcit.hasNext()) {
svc = svcit.next();
if (svc.getServiceId().equals(id)) {
ret = svc;
+ break;
}
}
assert null != ret : "GeneralComputingService \'" + id + "\' not found!";
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-core/src/main/java/org/apache/mnemonic/VolatileMemAllocator.java
----------------------------------------------------------------------
diff --git a/mnemonic-core/src/main/java/org/apache/mnemonic/VolatileMemAllocator.java b/mnemonic-core/src/main/java/org/apache/mnemonic/VolatileMemAllocator.java
index e4c04cc..a94b751 100644
--- a/mnemonic-core/src/main/java/org/apache/mnemonic/VolatileMemAllocator.java
+++ b/mnemonic-core/src/main/java/org/apache/mnemonic/VolatileMemAllocator.java
@@ -56,9 +56,11 @@ public class VolatileMemAllocator extends RestorableAllocator<VolatileMemAllocat
* a place holder, always specify it as true
*/
public VolatileMemAllocator(VolatileMemoryAllocatorService vmasvc, long capacity, String uri, boolean isnew) {
- assert null != vmasvc : "VolatileMemoryAllocatorService object is null";
+ if (null == vmasvc) {
+ throw new IllegalArgumentException("VolatileMemoryAllocatorService object is null");
+ }
if (capacity <= 0) {
- throw new IllegalArgumentException("BigDataMemAllocator cannot be initialized with capacity <= 0.");
+ throw new IllegalArgumentException("VolatileMemAllocator cannot be initialized with capacity <= 0.");
}
m_vmasvc = vmasvc;
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
index a34d036..12e0b3d 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
@@ -22,11 +22,9 @@ import org.apache.mnemonic.ConfigurationException;
import org.apache.mnemonic.DurableType;
import org.apache.mnemonic.NonVolatileMemAllocator;
import org.apache.mnemonic.Utils;
-import org.apache.mnemonic.collections.DurableSinglyLinkedList;
import org.apache.mnemonic.sessions.DurableOutputSession;
import java.text.NumberFormat;
-import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -57,7 +55,6 @@ public class MneDurableOutputSession<V>
}
public void initialize(String prefix) {
- m_recordmap = new HashMap<V, DurableSinglyLinkedList<V>>();
readConfig(prefix);
initNextPool();
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
index 71f8e58..e21b5a8 100644
--- a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
@@ -26,6 +26,7 @@ import org.apache.mnemonic.RestorableAllocator;
import org.apache.mnemonic.collections.DurableSinglyLinkedList;
import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
+import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
@@ -39,7 +40,8 @@ public abstract class DurableOutputSession<V, A extends RestorableAllocator<A>>
private EntityFactoryProxy[] entityFactoryProxies;
private long slotKeyId;
- protected Map<V, DurableSinglyLinkedList<V>> m_recordmap;
+ protected Map<V, DurableSinglyLinkedList<V>> m_recordmap =
+ new HashMap<V, DurableSinglyLinkedList<V>>();
protected boolean m_newpool;
protected long m_poolidx = 0L;
protected Pair<DurableType[], EntityFactoryProxy[]> m_recparmpair;
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/DurableException.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/DurableException.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/DurableException.scala
new file mode 100644
index 0000000..c96aa73
--- /dev/null
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/DurableException.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.spark
+
+class DurableException(msg: String) extends RuntimeException(msg) {
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
index d027c52..e3f6f67 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
@@ -30,10 +30,36 @@ import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory
import org.apache.mnemonic.sessions.DurableInputSession
import org.apache.mnemonic.sessions.SessionIterator
-class MneDurableInputSession[V: ClassTag]
+class MneDurableInputSession[V: ClassTag] (
+ serviceName: String,
+ durableTypes: Array[DurableType],
+ entityFactoryProxies: Array[EntityFactoryProxy],
+ slotKeyId: Long,
+ memPoolList: Array[File] )
extends DurableInputSession[V, NonVolatileMemAllocator] {
- var fileList: Array[File] = null
+ var memPools: Array[File] = null
+
+ private var flistIter:Iterator[File] = null
+
+ initialize(serviceName, durableTypes, entityFactoryProxies,
+ slotKeyId, memPoolList)
+
+ def initialize (
+ serviceName: String,
+ durableTypes: Array[DurableType],
+ entityFactoryProxies: Array[EntityFactoryProxy],
+ slotKeyId: Long,
+ memPoolList: Array[File]) {
+ setServiceName(serviceName)
+ setDurableTypes(durableTypes)
+ setEntityFactoryProxies(entityFactoryProxies)
+ setSlotKeyId(slotKeyId);
+ memPools = memPoolList
+ if (null != memPools) {
+ flistIter = memPools.iterator
+ }
+ }
override def initNextPool(sessiter: SessionIterator[V, NonVolatileMemAllocator]): Boolean = {
var ret: Boolean = false
@@ -41,9 +67,9 @@ class MneDurableInputSession[V: ClassTag]
sessiter.getAllocator.close
sessiter.setAllocator(null)
}
- for (file <- fileList) {
+ if (null != flistIter && flistIter.hasNext) {
sessiter.setAllocator(new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(
- getServiceName), 1024000L, file.toString, true));
+ getServiceName), 1024000L, flistIter.next.toString, true));
if (null != sessiter.getAllocator) {
sessiter.setHandler(sessiter.getAllocator.getHandler(getSlotKeyId))
if (0L != sessiter.getHandler) {
@@ -67,13 +93,10 @@ object MneDurableInputSession {
durableTypes: Array[DurableType],
entityFactoryProxies: Array[EntityFactoryProxy],
slotKeyId: Long,
- files: Array[File]): MneDurableInputSession[V] = {
- var ret = new MneDurableInputSession[V]
- ret.setServiceName(serviceName)
- ret.setDurableTypes(durableTypes)
- ret.setEntityFactoryProxies(entityFactoryProxies)
- ret.setSlotKeyId(slotKeyId);
- ret.fileList = files
+ memPoolList: Array[File]): MneDurableInputSession[V] = {
+ val ret = new MneDurableInputSession[V] (
+ serviceName, durableTypes, entityFactoryProxies,
+ slotKeyId, memPoolList)
ret
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
index 657cb16..4383866 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
@@ -16,6 +16,7 @@
*/
package org.apache.mnemonic.spark
+
import java.io.File
import scala.reflect.{ classTag, ClassTag }
import scala.collection.mutable.ArrayBuffer
@@ -29,34 +30,67 @@ import org.apache.mnemonic.collections.DurableSinglyLinkedList
import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory
import org.apache.mnemonic.sessions.DurableOutputSession
-class MneDurableOutputSession[V: ClassTag]
+class MneDurableOutputSession[V: ClassTag](
+ serviceName: String,
+ durableTypes: Array[DurableType],
+ entityFactoryProxies: Array[EntityFactoryProxy],
+ slotKeyId: Long,
+ partitionPoolSize: Long,
+ baseDirectory: String,
+ outputMemPrefix: String)
extends DurableOutputSession[V, NonVolatileMemAllocator] {
var baseDir: String = null
- var fileList: ArrayBuffer[File] = new ArrayBuffer[File]
+ var memPools: ArrayBuffer[File] = new ArrayBuffer[File]
var outputFile: File = null
var outputPrefix: String = null
private var _outidx: Long = 0L
+ initialize(serviceName, durableTypes, entityFactoryProxies,
+ slotKeyId, partitionPoolSize, baseDirectory, outputMemPrefix)
+
+ def initialize(
+ serviceName: String,
+ durableTypes: Array[DurableType],
+ entityFactoryProxies: Array[EntityFactoryProxy],
+ slotKeyId: Long,
+ partitionPoolSize: Long,
+ baseDirectory: String,
+ outputMemPrefix: String) {
+ setServiceName(serviceName)
+ setDurableTypes(durableTypes)
+ setEntityFactoryProxies(entityFactoryProxies)
+ setSlotKeyId(slotKeyId)
+ setPoolSize(partitionPoolSize)
+ baseDir = baseDirectory
+ outputPrefix = outputMemPrefix
+ if (!initNextPool) {
+ throw new RuntimeException("Firstly init next pool failed")
+ }
+ }
+
protected def genNextPoolFile(): File = {
val file = new File(baseDir, f"${outputPrefix}_${_outidx}%05d.mne")
- _outidx = _outidx + 1
- fileList += file
+ _outidx += 1
+ memPools += file
file
}
override def initNextPool(): Boolean = {
var ret: Boolean = false
if (null != getAllocator) {
- getAllocator.close();
- setAllocator(null);
+ getAllocator.close()
+ setAllocator(null)
+ }
+ outputFile = genNextPoolFile
+ if (outputFile.exists) {
+ outputFile.delete
}
- outputFile = genNextPoolFile;
m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName),
getPoolSize, outputFile.toString, true);
if (null != getAllocator) {
m_newpool = true;
- ret = true;
+ ret = true
}
ret
}
@@ -70,16 +104,11 @@ object MneDurableOutputSession {
entityFactoryProxies: Array[EntityFactoryProxy],
slotKeyId: Long,
partitionPoolSize: Long,
- baseDir: String,
- outputPrefix: String): MneDurableOutputSession[V] = {
- var ret = new MneDurableOutputSession[V]
- ret.setServiceName(serviceName)
- ret.setDurableTypes(durableTypes)
- ret.setEntityFactoryProxies(entityFactoryProxies)
- ret.setSlotKeyId(slotKeyId)
- ret.setPoolSize(partitionPoolSize)
- ret.baseDir = baseDir
- ret.outputPrefix = outputPrefix
+ baseDirectory: String,
+ outputMemPrefix: String): MneDurableOutputSession[V] = {
+ val ret = new MneDurableOutputSession[V] (
+ serviceName, durableTypes, entityFactoryProxies,
+ slotKeyId, partitionPoolSize, baseDirectory, outputMemPrefix)
ret
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
index 4c0adb4..9975c14 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
@@ -21,6 +21,7 @@ import java.io.File
import org.apache.spark.rdd.RDD
import org.apache.spark.{ Partition, TaskContext }
+import org.apache.spark.internal.Logging
import scala.reflect.{ classTag, ClassTag }
import scala.collection.mutable.HashMap
import scala.collection.JavaConverters._
@@ -33,12 +34,13 @@ import org.apache.mnemonic.sessions.SessionIterator
import org.apache.mnemonic.sessions.ObjectCreator
import org.apache.mnemonic.spark.MneDurableInputSession
import org.apache.mnemonic.spark.MneDurableOutputSession
+import org.apache.mnemonic.spark.DurableException
class DurableRDD[D: ClassTag, T: ClassTag](
var prev: RDD[T],
serviceName: String, durableTypes: Array[DurableType],
entityFactoryProxies: Array[EntityFactoryProxy], slotKeyId: Long,
- partitionPoolSize: Long, baseDir: String,
+ partitionPoolSize: Long, baseDirectory: String,
f: (T, ObjectCreator[D, NonVolatileMemAllocator]) => Option[D],
preservesPartitioning: Boolean = false)
extends RDD[D](prev) {
@@ -53,8 +55,8 @@ class DurableRDD[D: ClassTag, T: ClassTag](
iterator: Iterator[T]) {
val outsess = MneDurableOutputSession[D](serviceName,
durableTypes, entityFactoryProxies, slotKeyId,
- partitionPoolSize, baseDir,
- f"mem_${this.hashCode()}%10d_${split.hashCode()}%10d")
+ partitionPoolSize, baseDirectory,
+ f"mem_${this.hashCode()}%010d_${split.hashCode()}%010d")
try {
for (item <- iterator) {
f(item, outsess) match {
@@ -62,7 +64,7 @@ class DurableRDD[D: ClassTag, T: ClassTag](
case None =>
}
}
- _parmap += (split -> outsess.fileList.toArray)
+ _parmap += (split -> outsess.memPools.toArray)
} finally {
outsess.close()
}
@@ -71,13 +73,14 @@ class DurableRDD[D: ClassTag, T: ClassTag](
override def compute(split: Partition, context: TaskContext): Iterator[D] = {
if (!(_parmap contains split)) {
prepareDurablePartition(split, context, firstParent[T].iterator(split, context))
+ logInfo(s"Done persisting RDD ${prev.id} to ${baseDirectory}")
}
- val flist = _parmap.get(split) match {
- case Some(flst) => flst
- case None => throw new RuntimeException("Not construct durable partition properly")
+ val memplist = _parmap.get(split) match {
+ case Some(mplst) => mplst
+ case None => throw new DurableException("Not construct durable partition properly")
}
val insess = MneDurableInputSession[D](serviceName,
- durableTypes, entityFactoryProxies, slotKeyId, flist)
+ durableTypes, entityFactoryProxies, slotKeyId, memplist)
insess.iterator.asScala
}
@@ -86,4 +89,16 @@ class DurableRDD[D: ClassTag, T: ClassTag](
prev = null
}
+ def close {
+ if (null != _parmap) {
+ _parmap foreach { case (par, mplst) =>
+ mplst foreach {(file) =>
+ if (file.exists) {
+ file.delete
+ }
+ }
+ }
+ _parmap.clear()
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
index 7ef8325..cb459ea 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
@@ -34,13 +34,12 @@ class DurableRDDFunctions[T: ClassTag](rdd: RDD[T]) extends Serializable {
entityFactoryProxies: Array[EntityFactoryProxy],
slotKeyId: Long,
partitionPoolSize: Long,
- baseDir: String,
+ baseDirectory: String,
f: (T, ObjectCreator[D, NonVolatileMemAllocator]) => Option[D],
preservesPartitioning: Boolean = false) =
new DurableRDD[D, T](rdd,
serviceName, durableTypes, entityFactoryProxies, slotKeyId,
- partitionPoolSize, baseDir, f, preservesPartitioning)
-
+ partitionPoolSize, baseDirectory, f, preservesPartitioning)
}
object DurableRDDFunctions {
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
index 41b9449..6349ce5 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
@@ -18,13 +18,48 @@
package org.apache.mnemonic.spark.rdd;
import org.apache.mnemonic.spark.TestSpec
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.mnemonic.spark.rdd.DurableRDDFunctions._
+import org.apache.mnemonic.DurableType
+import org.apache.mnemonic.NonVolatileMemAllocator
+import org.apache.mnemonic.EntityFactoryProxy
+import org.apache.mnemonic.sessions.ObjectCreator
class DurableRDDSpec extends TestSpec {
+ val defaultServiceName = "pmalloc"
+ val defaultSlotKeyId = 2L
+ val defaultPartitionSize = 1024 * 1024 * 1024L
+ val defaultBaseDirectory = "."
+ val defaultNumOfRecordsPerPartition = 5000L
+
behavior of "A DurableRDD"
- it should "have size 0" in {
- assert(Set.empty.size == 0)
+ it should "have the same sum value" in {
+ val conf = new SparkConf()
+ .setMaster("local[*]")
+ .setAppName("Test")
+ val sc = new SparkContext(conf)
+ // sc.getConf.getAll.foreach(println)
+ val ds = Seq(5, 6, 3, 8, 4)
+ val data: RDD[Int] = sc.parallelize(ds)
+ val durdd = data.makeDurable[Long](
+ defaultServiceName,
+ Array(DurableType.LONG), Array(),
+ defaultSlotKeyId, defaultPartitionSize,
+ defaultBaseDirectory,
+ (v: Int, oc: ObjectCreator[Long, NonVolatileMemAllocator])=>
+ { Some(v.asInstanceOf[Long]) })
+ // data.collect().foreach(println)
+ // durdd.collect().foreach(println)
+ val (rcnt, rsum) = (data.count, data.sum)
+ val (dcnt, dsum) = (durdd.count, durdd.sum)
+ durdd.close
+ assertResult((rcnt, rsum)) {
+ (dcnt, dsum)
+ }
}
it should "produce NoSuchElementException when head is invoked" in {
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/mnemonic-spark/pom.xml
----------------------------------------------------------------------
diff --git a/mnemonic-spark/pom.xml b/mnemonic-spark/pom.xml
index 76beef8..d321839 100644
--- a/mnemonic-spark/pom.xml
+++ b/mnemonic-spark/pom.xml
@@ -161,7 +161,17 @@
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>SparkIntegrationTestSuite.txt</filereports>
+ <forkMode>always</forkMode>
<skipTests>${skipTests}</skipTests>
+ <argLine>-Djava.ext.dirs=${memory.service.dist.dir}:${computing.service.dist.dir}</argLine>
+ <systemProperties>
+ <spark.service-dist-dirs>
+ ${memory.service.dist.dir}:${computing.service.dist.dir}
+ </spark.service-dist-dirs>
+ <spark.executorEnv.SPARK_JAVA_OPTS>
+ -D${memory.service.dist.dir}:${computing.service.dist.dir}
+ </spark.executorEnv.SPARK_JAVA_OPTS>
+ </systemProperties>
</configuration>
<executions>
<execution>
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/1e6bf955/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ad101f6..a9ba680 100644
--- a/pom.xml
+++ b/pom.xml
@@ -290,6 +290,7 @@
<configuration>
<excludes>
<exclude>.git/**/*</exclude>
+ <exclude>**/.project</exclude>
<exclude>**/*.</exclude>
<exclude>**/target/**/*</exclude>
<exclude>**/README*</exclude>