You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/04/22 16:48:49 UTC

incubator-mnemonic git commit: MNEMONIC-238: Implement DurableRDD

Repository: incubator-mnemonic
Updated Branches:
  refs/heads/master 0d96c7500 -> cc4ed3fa1


MNEMONIC-238: Implement DurableRDD


Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/cc4ed3fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/cc4ed3fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/cc4ed3fa

Branch: refs/heads/master
Commit: cc4ed3fa1b223bd55d23024e5fa5820685c1a924
Parents: 0d96c75
Author: Wang, Gang(Gary) <ga...@intel.com>
Authored: Sat Apr 22 09:45:11 2017 -0700
Committer: Wang, Gang(Gary) <ga...@intel.com>
Committed: Sat Apr 22 09:47:59 2017 -0700

----------------------------------------------------------------------
 build-tools/test.conf                           |   2 +-
 .../mnemonic/sessions/DurableOutputSession.java |   4 +
 .../apache/mnemonic/sessions/ObjectCreator.java |  28 +++++
 .../apache/mnemonic/sessions/OutputSession.java |   6 +-
 .../mnemonic/spark/MneDurableInputSession.scala |  83 +++++++++++++++
 .../spark/MneDurableOutputSession.scala         | 103 +++++++++++++++++++
 .../apache/mnemonic/spark/rdd/DurableRDD.scala  |  70 ++++++++++++-
 .../spark/rdd/DurableRDDFunctions.scala         |  48 +++++++++
 .../org/apache/mnemonic/spark/TestSpec.scala    |  26 +++++
 .../mnemonic/spark/rdd/DurableRDDSpec.scala     |  35 +++++++
 .../mnemonic/spark/rdd/DurableRDDSuite.scala    |  35 -------
 11 files changed, 398 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/cc4ed3fa/build-tools/test.conf
----------------------------------------------------------------------
diff --git a/build-tools/test.conf b/build-tools/test.conf
index ccf997f..7645b26 100644
--- a/build-tools/test.conf
+++ b/build-tools/test.conf
@@ -58,5 +58,5 @@ mvn -Dtest=MneMapreduceChunkDataTest test -pl mnemonic-hadoop/mnemonic-hadoop-ma
 mvn -Dtest=MneMapredBufferDataTest test -pl mnemonic-hadoop/mnemonic-hadoop-mapreduce -DskipTests=false
 
 # a testcase for module "mnemonic-spark/mnemonic-spark-core" that requires 'pmalloc' memory service to pass
-mvn -Dsuites=org.apache.mnemonic.spark.rdd.DurableRDDSuite test -pl mnemonic-spark/mnemonic-spark-core -DskipTests=false
+mvn -Dtest=DurableRDDSpec test -pl mnemonic-spark/mnemonic-spark-core -DskipTests=false
 

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/cc4ed3fa/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
index 7e8b1be..e8bb17d 100644
--- a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/DurableOutputSession.java
@@ -58,6 +58,10 @@ public abstract class DurableOutputSession<V, A extends RestorableAllocator<A>>
     return m_act;
   }
 
+  public void setAllocator(A alloc) {
+    m_act = alloc;
+  }
+
   @Override
   public long getHandler() {
     long ret = 0L;

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/cc4ed3fa/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/ObjectCreator.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/ObjectCreator.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/ObjectCreator.java
new file mode 100644
index 0000000..71f8774
--- /dev/null
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/ObjectCreator.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.sessions;
+
+public interface ObjectCreator<V> {
+
+  V newDurableObjectRecord();
+
+  V newDurableObjectRecord(long size);
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/cc4ed3fa/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/OutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/OutputSession.java b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/OutputSession.java
index 598becc..dcc0ab3 100644
--- a/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/OutputSession.java
+++ b/mnemonic-sessions/src/main/java/org/apache/mnemonic/sessions/OutputSession.java
@@ -20,11 +20,7 @@ package org.apache.mnemonic.sessions;
 
 import java.io.Closeable;
 
-public interface OutputSession<V> extends Closeable {
-
-  V newDurableObjectRecord();
-
-  V newDurableObjectRecord(long size);
+public interface OutputSession<V> extends ObjectCreator<V>, Closeable {
 
   void post(V v);
 

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/cc4ed3fa/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
new file mode 100644
index 0000000..d56bddc
--- /dev/null
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableInputSession.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.spark
+
+import java.io.File
+import scala.reflect.{ classTag, ClassTag }
+
+import org.apache.mnemonic.ConfigurationException
+import org.apache.mnemonic.DurableType
+import org.apache.mnemonic.EntityFactoryProxy
+import org.apache.mnemonic.NonVolatileMemAllocator
+import org.apache.mnemonic.Utils
+import org.apache.mnemonic.collections.DurableSinglyLinkedList
+import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory
+import org.apache.mnemonic.sessions.DurableInputSession
+import org.apache.mnemonic.sessions.SessionIterator
+
+class MneDurableInputSession[V: ClassTag]
+    extends DurableInputSession[V, NonVolatileMemAllocator] {
+
+  private var _flist: Array[File] = null
+
+  def fileList = _flist
+
+  def fileList_=(value: Array[File]): Unit = _flist = value
+
+  override def initNextPool(sessiter: SessionIterator[V, NonVolatileMemAllocator]): Boolean = {
+    var ret: Boolean = false
+    if (null != sessiter.getAllocator) {
+      sessiter.getAllocator.close
+      sessiter.setAllocator(null)
+    }
+    for (file <- fileList) {
+      sessiter.setAllocator(new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(
+        getServiceName), 1024000L, file.toString, true));
+      if (null != sessiter.getAllocator) {
+        sessiter.setHandler(sessiter.getAllocator.getHandler(getSlotKeyId))
+        if (0L != sessiter.getHandler) {
+          val dsllist: DurableSinglyLinkedList[V] = DurableSinglyLinkedListFactory.restore(
+            sessiter.getAllocator, getEntityFactoryProxies, getDurableTypes, sessiter.getHandler, false)
+          if (null != dsllist) {
+            sessiter.setIterator(dsllist.iterator)
+            ret = null != sessiter.getIterator
+          }
+        }
+      }
+    }
+    ret
+  }
+
+}
+
+object MneDurableInputSession {
+  def apply[V: ClassTag](
+    serviceName: String,
+    durableTypes: Array[DurableType],
+    entityFactoryProxies: Array[EntityFactoryProxy],
+    slotKeyId: Long,
+    files: Array[File]): MneDurableInputSession[V] = {
+    var ret = new MneDurableInputSession[V]
+    ret.setServiceName(serviceName)
+    ret.setDurableTypes(durableTypes)
+    ret.setEntityFactoryProxies(entityFactoryProxies)
+    ret.setSlotKeyId(slotKeyId);
+    ret.fileList = files
+    ret
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/cc4ed3fa/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
new file mode 100644
index 0000000..e25f8ce
--- /dev/null
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/MneDurableOutputSession.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.spark
+import java.io.File
+import java.nio.file.Path
+import scala.reflect.{ classTag, ClassTag }
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.mnemonic.ConfigurationException
+import org.apache.mnemonic.DurableType
+import org.apache.mnemonic.EntityFactoryProxy
+import org.apache.mnemonic.NonVolatileMemAllocator
+import org.apache.mnemonic.Utils
+import org.apache.mnemonic.collections.DurableSinglyLinkedList
+import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory
+import org.apache.mnemonic.sessions.DurableOutputSession
+
+class MneDurableOutputSession[V: ClassTag]
+    extends DurableOutputSession[V, NonVolatileMemAllocator] {
+
+  private var _baseDir: Path = null
+  private var _flist: ArrayBuffer[File] = new ArrayBuffer[File]
+  private var _outputFile: File = null
+  private var _outPrefix: String = null
+  private var _outidx: Long = 0L
+
+  def baseDir = _baseDir
+
+  def baseDir_=(value: Path): Unit = _baseDir = value
+
+  def fileList = _flist.toArray
+
+  def outputPrefix = _outPrefix
+
+  def outputPrefix_=(value: String): Unit = _outPrefix = value
+
+  protected def genNextPoolFile(): File = {
+    val file = new File(baseDir.toFile(), f"${_outPrefix}_${_outidx}%05d.mne")
+    _outidx = _outidx + 1
+    _flist += file
+    file
+  }
+
+  def setOutputFile(file: File): Unit = {
+    _outputFile = file
+  }
+
+  def getOutputFile: File = _outputFile
+
+  override def initNextPool(): Boolean = {
+    var ret: Boolean = false
+    if (null != getAllocator) {
+      getAllocator.close();
+      setAllocator(null);
+    }
+    setOutputFile(genNextPoolFile);
+    m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName),
+      getPoolSize, getOutputFile.toString, true);
+    if (null != getAllocator) {
+      m_newpool = true;
+      ret = true;
+    }
+    ret
+  }
+
+}
+
+object MneDurableOutputSession {
+  def apply[V: ClassTag](
+    serviceName: String,
+    durableTypes: Array[DurableType],
+    entityFactoryProxies: Array[EntityFactoryProxy],
+    slotKeyId: Long,
+    partitionPoolSize: Long,
+    baseDir: Path,
+    outputPrefix: String): MneDurableOutputSession[V] = {
+    var ret = new MneDurableOutputSession[V]
+    ret.setServiceName(serviceName)
+    ret.setDurableTypes(durableTypes)
+    ret.setEntityFactoryProxies(entityFactoryProxies)
+    ret.setSlotKeyId(slotKeyId)
+    ret.setPoolSize(partitionPoolSize)
+    ret.baseDir = baseDir
+    ret.outputPrefix = outputPrefix
+    ret
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/cc4ed3fa/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
index 164033e..40e94b7 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
@@ -17,6 +17,74 @@
 
 package org.apache.mnemonic.spark.rdd;
 
-class DurableRDD {
+import java.io.File
+import java.nio.file.Path
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{ Partition, TaskContext }
+import scala.reflect.{ classTag, ClassTag }
+import scala.collection.mutable.HashMap
+import scala.collection.JavaConverters._
+import org.apache.mnemonic.ConfigurationException;
+import org.apache.mnemonic.DurableType;
+import org.apache.mnemonic.EntityFactoryProxy;
+import org.apache.mnemonic.NonVolatileMemAllocator;
+import org.apache.mnemonic.sessions.DurableInputSession;
+import org.apache.mnemonic.sessions.SessionIterator;
+import org.apache.mnemonic.sessions.ObjectCreator
+import org.apache.mnemonic.spark.MneDurableInputSession
+import org.apache.mnemonic.spark.MneDurableOutputSession
+
+class DurableRDD[D: ClassTag, T: ClassTag](
+  var prev: RDD[T],
+  serviceName: String, durableTypes: Array[DurableType],
+  entityFactoryProxies: Array[EntityFactoryProxy], slotKeyId: Long,
+  partitionPoolSize: Long, baseDir: Path,
+  f: (T, ObjectCreator[D]) => Option[D],
+  preservesPartitioning: Boolean = false)
+    extends RDD[D](prev) {
+
+  private val _parmap = HashMap.empty[Partition, Array[File]]
+
+  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
+
+  override protected def getPartitions: Array[Partition] = firstParent[T].partitions
+
+  def prepareDurablePartition(split: Partition, context: TaskContext,
+      iterator: Iterator[T]) {
+    val outsess = MneDurableOutputSession[D](serviceName,
+        durableTypes, entityFactoryProxies, slotKeyId,
+        partitionPoolSize, baseDir,
+        f"mem_${this.hashCode()}%10d_${split.hashCode()}%10d")
+    try {
+      for (item <- iterator) {
+        f(item, outsess) match {
+          case Some(res) => outsess.post(res)
+          case None =>
+        }
+      }
+      _parmap += (split -> outsess.fileList)
+    } finally {
+      outsess.close()
+    }
+  }
+
+  override def compute(split: Partition, context: TaskContext): Iterator[D] = {
+    if (!(_parmap contains split)) {
+      prepareDurablePartition(split, context, firstParent[T].iterator(split, context))
+    }
+    val flist = _parmap.get(split) match {
+      case Some(flst) => flst
+      case None => throw new RuntimeException("Not construct durable partition properly")
+    }
+    val insess = MneDurableInputSession[D](serviceName,
+        durableTypes, entityFactoryProxies, slotKeyId, flist)
+    insess.iterator.asScala
+  }
+
+  override def clearDependencies() {
+    super.clearDependencies()
+    prev = null
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/cc4ed3fa/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
new file mode 100644
index 0000000..e86d7f8
--- /dev/null
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.spark.rdd
+
+import java.nio.file.Path
+import org.apache.spark.rdd.RDD
+import org.apache.spark.TaskContext
+import scala.reflect.ClassTag
+import scala.language.implicitConversions
+import org.apache.mnemonic.DurableType
+import org.apache.mnemonic.EntityFactoryProxy;
+import org.apache.mnemonic.sessions.ObjectCreator
+
+class DurableRDDFunctions[T: ClassTag](rdd: RDD[T]) extends Serializable {
+
+  def makeDurable[D: ClassTag](
+    serviceName: String,
+    durableTypes: Array[DurableType],
+    entityFactoryProxies: Array[EntityFactoryProxy],
+    slotKeyId: Long,
+    partitionPoolSize: Long,
+    baseDir: Path,
+    f: (T, ObjectCreator[D]) => Option[D],
+    preservesPartitioning: Boolean = false) =
+    new DurableRDD[D, T](rdd,
+      serviceName, durableTypes, entityFactoryProxies, slotKeyId,
+      partitionPoolSize, baseDir, f, preservesPartitioning)
+
+}
+
+object DurableRDDFunctions {
+  implicit def addDurableFunctions[T: ClassTag](rdd: RDD[T]) = new DurableRDDFunctions[T](rdd)
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/cc4ed3fa/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/TestSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/TestSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/TestSpec.scala
new file mode 100644
index 0000000..95b894d
--- /dev/null
+++ b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/TestSpec.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.spark
+
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.BeforeAndAfter
+
+abstract class TestSpec extends FlatSpec with Matchers with BeforeAndAfter {
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/cc4ed3fa/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
new file mode 100644
index 0000000..41b9449
--- /dev/null
+++ b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.spark.rdd;
+
+import org.apache.mnemonic.spark.TestSpec
+
+class DurableRDDSpec extends TestSpec {
+
+  behavior of "A DurableRDD"
+
+  it should "have size 0" in {
+    assert(Set.empty.size == 0)
+  }
+
+  it should "produce NoSuchElementException when head is invoked" in {
+    assertThrows[NoSuchElementException] {
+      Set.empty.head
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/cc4ed3fa/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSuite.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSuite.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSuite.scala
deleted file mode 100644
index e59aad7..0000000
--- a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSuite.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mnemonic.spark.rdd;
-
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-import org.scalatest.BeforeAndAfter
-
-class DurableRDDSuite extends FlatSpec with Matchers with BeforeAndAfter {
-
-  "An empty Set" should "have size 0" in {
-    assert(Set.empty.size == 0)
-  }
-
-  it should "produce NoSuchElementException when head is invoked" in {
-    assertThrows[NoSuchElementException] {
-      Set.empty.head
-    }
-  }
-}