You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/04/26 22:25:03 UTC

incubator-mnemonic git commit: MNEMONIC-248: Add a test case to verify the durable chunk type dataset & bugfixes

Repository: incubator-mnemonic
Updated Branches:
  refs/heads/master 70c53f24b -> f2565bfcf


MNEMONIC-248: Add a test case to verify the durable chunk type dataset & bugfixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/f2565bfc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/f2565bfc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/f2565bfc

Branch: refs/heads/master
Commit: f2565bfcf63ca2c8eb8769478f4614db51d13d5f
Parents: 70c53f2
Author: Wang, Gang(Gary) <ga...@intel.com>
Authored: Wed Apr 26 15:10:43 2017 -0700
Committer: Wang, Gang(Gary) <ga...@intel.com>
Committed: Wed Apr 26 15:10:43 2017 -0700

----------------------------------------------------------------------
 build-tools/test.conf                           |   4 +-
 .../spark/rdd/DurableRDDChunkDataSpec.scala     | 101 +++++++++++++++++++
 .../spark/rdd/DurableRDDLongDataSpec.scala      |  75 ++++++++++++++
 .../mnemonic/spark/rdd/DurableRDDSpec.scala     |  76 --------------
 mnemonic-spark/pom.xml                          |   1 +
 5 files changed, 180 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/f2565bfc/build-tools/test.conf
----------------------------------------------------------------------
diff --git a/build-tools/test.conf b/build-tools/test.conf
index 7645b26..f759c9f 100644
--- a/build-tools/test.conf
+++ b/build-tools/test.conf
@@ -58,5 +58,7 @@ mvn -Dtest=MneMapreduceChunkDataTest test -pl mnemonic-hadoop/mnemonic-hadoop-ma
 mvn -Dtest=MneMapredBufferDataTest test -pl mnemonic-hadoop/mnemonic-hadoop-mapreduce -DskipTests=false
 
 # a testcase for module "mnemonic-spark/mnemonic-spark-core" that requires 'pmalloc' memory service to pass
-mvn -Dtest=DurableRDDSpec test -pl mnemonic-spark/mnemonic-spark-core -DskipTests=false
+mvn -Dsuites=org.apache.mnemonic.spark.rdd.DurableRDDLongDataSpec test -pl mnemonic-spark/mnemonic-spark-core -DskipTests=false
+
+mvn -Dsuites=org.apache.mnemonic.spark.rdd.DurableRDDChunkDataSpec test -pl mnemonic-spark/mnemonic-spark-core -DskipTests=false
 

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/f2565bfc/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDChunkDataSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDChunkDataSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDChunkDataSpec.scala
new file mode 100644
index 0000000..fe4dc88
--- /dev/null
+++ b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDChunkDataSpec.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.spark.rdd
+
+import java.util.zip.CRC32
+import java.util.zip.Checksum
+
+import scala.util._
+import scala.language.existentials
+import org.apache.mnemonic.spark.TestSpec
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.mnemonic.spark.rdd.DurableRDDFunctions._
+import org.apache.mnemonic.DurableType
+import org.apache.mnemonic.DurableChunk
+import org.apache.mnemonic.Utils
+import org.apache.mnemonic.NonVolatileMemAllocator
+import org.apache.mnemonic.EntityFactoryProxy
+import org.apache.mnemonic.sessions.ObjectCreator
+
+class DurableRDDChunkDataSpec extends TestSpec {
+
+  val defaultServiceName = "pmalloc"
+  val defaultSlotKeyId = 2L
+  val defaultPartitionSize = 1024 * 1024 * 1024L
+  val defaultBaseDirectory = "."
+  val defaultNumOfPartitions = 8
+  val defaultNumOfRecordsPerPartition = 20
+
+  behavior of "A DurableRDD with Chunk Type Data"
+
+  it should "supports durable chunk as its data type" in {
+    val dataOffset = 8
+    val conf = new SparkConf()
+        .setMaster("local[*]")
+        .setAppName("Test")
+    val sc = new SparkContext(conf)
+    val seed: RDD[Int] = sc.parallelize(
+          Seq.fill(defaultNumOfPartitions)(defaultNumOfRecordsPerPartition), defaultNumOfPartitions)
+    val data = seed flatMap (recnum => Seq.fill(recnum)(Random.nextInt(1024 * 1024) + 1024 * 1024)) cache
+    val durdd = data.makeDurable[DurableChunk[_]](
+        defaultServiceName,
+        Array(DurableType.CHUNK), Array(),
+        defaultSlotKeyId, defaultPartitionSize,
+        (v: Int, oc: ObjectCreator[DurableChunk[_], NonVolatileMemAllocator])=>
+          {
+            val cs = new CRC32
+            cs.reset
+            val unsafe = Utils.getUnsafe
+            val chunk = oc.newDurableObjectRecord(v)
+            var b: Byte = 0
+            if (null != chunk) {
+              for (i <- dataOffset until chunk.getSize.asInstanceOf[Int]) {
+                b = Random.nextInt(255).asInstanceOf[Byte]
+                unsafe.putByte(chunk.get + i, b)
+                cs.update(b)
+              }
+              unsafe.putLong(chunk.get, cs.getValue)
+            }
+            Option(chunk)
+          })
+    val durtsz = durdd map (_.getSize.asInstanceOf[Int]) sum
+    val derrcount = durdd map (
+        chunk => {
+          val unsafe = Utils.getUnsafe
+          val cs = new CRC32
+          cs.reset
+          if (null != chunk) {
+            var b: Byte = 0
+            for (j <- dataOffset until chunk.getSize.asInstanceOf[Int]) {
+              b = unsafe.getByte(chunk.get + j)
+              cs.update(b)
+            }
+          }
+          val res = unsafe.getLong(chunk.get)
+          if (res != cs.getValue) 1 else 0
+        }) sum
+    val (rerrcnt: Long, rsz: Long) = (0L, data.sum.asInstanceOf[Long])
+    val (derrcnt: Long, dsz: Long) = (derrcount.asInstanceOf[Long], durtsz.asInstanceOf[Long])
+    durdd.reset
+    assertResult((rerrcnt, rsz)) {
+      (derrcnt, dsz)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/f2565bfc/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDLongDataSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDLongDataSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDLongDataSpec.scala
new file mode 100644
index 0000000..cb2a0b5
--- /dev/null
+++ b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDLongDataSpec.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.spark.rdd
+
+import scala.util._
+import scala.language.existentials
+import org.apache.mnemonic.spark.TestSpec
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.mnemonic.spark.rdd.DurableRDDFunctions._
+import org.apache.mnemonic.DurableType
+import org.apache.mnemonic.DurableChunk
+import org.apache.mnemonic.Utils
+import org.apache.mnemonic.NonVolatileMemAllocator
+import org.apache.mnemonic.EntityFactoryProxy
+import org.apache.mnemonic.sessions.ObjectCreator
+
+class DurableRDDLongDataSpec extends TestSpec {
+
+  val defaultServiceName = "pmalloc"
+  val defaultSlotKeyId = 2L
+  val defaultPartitionSize = 1024 * 1024 * 1024L
+  val defaultBaseDirectory = "."
+  val defaultNumOfPartitions = 8
+  val defaultNumOfRecordsPerPartition = 5000
+
+  behavior of "A DurableRDD with Long Type Data"
+
+  it should "have unmodified values if only converting the data type from Int to Long" in {
+    val conf = new SparkConf()
+        .setMaster("local[*]")
+        .setAppName("Test")
+    val sc = new SparkContext(conf)
+    // sc.getConf.getAll.foreach(println)
+    // DurableRDD.setDurableBaseDir(sc, defaultBaseDirectory)
+    val seed: RDD[Int] = sc.parallelize(
+        Seq.fill(defaultNumOfPartitions)(defaultNumOfRecordsPerPartition), defaultNumOfPartitions)
+    val data = seed flatMap (recnum => Seq.fill(recnum)(Random.nextInt)) cache //must be cached to fix rand numbers
+    val durdd = data.makeDurable[Long](
+        defaultServiceName,
+        Array(DurableType.LONG), Array(),
+        defaultSlotKeyId, defaultPartitionSize,
+        (v: Int, oc: ObjectCreator[Long, NonVolatileMemAllocator])=>
+          { Some(v.asInstanceOf[Long]) })
+    // data.collect().foreach(println)
+    // durdd.collect().foreach(println)
+    val (rcnt, rsum) = (data.count, data.sum)
+    val (dcnt, dsum) = (durdd.count, durdd.sum)
+    durdd.reset
+    /*sys.addShutdownHook({
+      DurableRDD.cleanupForApp(sc)
+    })*/
+    assertResult((rcnt, rsum)) {
+      (dcnt, dsum)
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/f2565bfc/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
deleted file mode 100644
index 1937863..0000000
--- a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mnemonic.spark.rdd;
-
-import scala.util._
-import org.apache.mnemonic.spark.TestSpec
-import org.apache.spark.SparkConf
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.apache.mnemonic.spark.rdd.DurableRDDFunctions._
-import org.apache.mnemonic.DurableType
-import org.apache.mnemonic.NonVolatileMemAllocator
-import org.apache.mnemonic.EntityFactoryProxy
-import org.apache.mnemonic.sessions.ObjectCreator
-
-class DurableRDDSpec extends TestSpec {
-
-  val defaultServiceName = "pmalloc"
-  val defaultSlotKeyId = 2L
-  val defaultPartitionSize = 1024 * 1024 * 1024L
-  val defaultBaseDirectory = "."
-  val defaultNumOfPartitions = 8
-  val defaultNumOfRecordsPerPartition = 5000
-
-  behavior of "A DurableRDD"
-
-  it should "have the same sum value" in {
-    val conf = new SparkConf()
-        .setMaster("local[*]")
-        .setAppName("Test")
-    val sc = new SparkContext(conf)
-    // sc.getConf.getAll.foreach(println)
-    // DurableRDD.setDurableBaseDir(sc, defaultBaseDirectory)
-    val seed: RDD[Int] = sc.parallelize(
-        Seq.fill(defaultNumOfPartitions)(defaultNumOfRecordsPerPartition), defaultNumOfPartitions)
-    val data = seed flatMap (recnum => Seq.fill(recnum)(Random.nextInt)) cache //must be cached to fix rand numbers
-    val durdd = data.makeDurable[Long](
-        defaultServiceName,
-        Array(DurableType.LONG), Array(),
-        defaultSlotKeyId, defaultPartitionSize,
-        (v: Int, oc: ObjectCreator[Long, NonVolatileMemAllocator])=>
-          { Some(v.asInstanceOf[Long]) })
-    // data.collect().foreach(println)
-    // durdd.collect().foreach(println)
-    val (rcnt, rsum) = (data.count, data.sum)
-    val (dcnt, dsum) = (durdd.count, durdd.sum)
-    durdd.reset
-    /*sys.addShutdownHook({
-      DurableRDD.cleanupForApp(sc)
-    })*/
-    assertResult((rcnt, rsum)) {
-      (dcnt, dsum)
-    }
-  }
-
-  it should "produce NoSuchElementException when head is invoked" in {
-    assertThrows[NoSuchElementException] {
-      Set.empty.head
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/f2565bfc/mnemonic-spark/pom.xml
----------------------------------------------------------------------
diff --git a/mnemonic-spark/pom.xml b/mnemonic-spark/pom.xml
index 73978ac..ac67f93 100644
--- a/mnemonic-spark/pom.xml
+++ b/mnemonic-spark/pom.xml
@@ -160,6 +160,7 @@
           <configuration>
             <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
             <junitxml>.</junitxml>
+            <parallel>false</parallel>
             <filereports>SparkIntegrationTestSuite.txt</filereports>
             <forkMode>always</forkMode>
             <skipTests>${skipTests}</skipTests>