You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/04/27 22:32:21 UTC

incubator-mnemonic git commit: MNEMONIC-250: Add a test case to verify buffer type dataset & bugfixes

Repository: incubator-mnemonic
Updated Branches:
  refs/heads/master b862d1241 -> 5631b75b6


MNEMONIC-250: Add a test case to verify buffer type dataset & bugfixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/5631b75b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/5631b75b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/5631b75b

Branch: refs/heads/master
Commit: 5631b75b61797d2745b56b4307844dae4c8cc49f
Parents: b862d12
Author: Wang, Gang(Gary) <ga...@intel.com>
Authored: Thu Apr 27 15:15:15 2017 -0700
Committer: Wang, Gang(Gary) <ga...@intel.com>
Committed: Thu Apr 27 15:15:15 2017 -0700

----------------------------------------------------------------------
 build-tools/test.conf                           |  2 +
 .../spark/rdd/DurableRDDBufferDataSpec.scala    | 97 ++++++++++++++++++++
 .../spark/rdd/DurableRDDChunkDataSpec.scala     | 18 ++--
 .../spark/rdd/DurableRDDLongDataSpec.scala      |  3 +-
 .../spark/rdd/DurableRDDPersonDataSpec.scala    |  3 +-
 5 files changed, 110 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/5631b75b/build-tools/test.conf
----------------------------------------------------------------------
diff --git a/build-tools/test.conf b/build-tools/test.conf
index 98a9318..3663847 100644
--- a/build-tools/test.conf
+++ b/build-tools/test.conf
@@ -64,3 +64,5 @@ mvn -Dsuites=org.apache.mnemonic.spark.rdd.DurableRDDChunkDataSpec test -pl mnem
 
 mvn -Dsuites=org.apache.mnemonic.spark.rdd.DurableRDDPersonDataSpec test -pl mnemonic-spark/mnemonic-spark-core -DskipTests=false
 
+mvn -Dsuites=org.apache.mnemonic.spark.rdd.DurableRDDBufferDataSpec test -pl mnemonic-spark/mnemonic-spark-core -DskipTests=false
+

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/5631b75b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDBufferDataSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDBufferDataSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDBufferDataSpec.scala
new file mode 100644
index 0000000..d4b478a
--- /dev/null
+++ b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDBufferDataSpec.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.spark.rdd
+
+import java.util.zip.CRC32
+import java.util.zip.Checksum
+
+import scala.util._
+import scala.language.existentials
+import org.apache.mnemonic.spark.TestSpec
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.mnemonic.spark.rdd.DurableRDDFunctions._
+import org.apache.mnemonic.DurableType
+import org.apache.mnemonic.DurableBuffer
+import org.apache.mnemonic.Utils
+import org.apache.mnemonic.EntityFactoryProxy
+import org.apache.mnemonic.sessions.ObjectCreator
+
+class DurableRDDBufferDataSpec extends TestSpec {
+
+  val defaultServiceName = "pmalloc"
+  val defaultSlotKeyId = 2L
+  val defaultPartitionSize = 1024 * 1024 * 1024L
+  val defaultBaseDirectory = "."
+  val defaultNumOfPartitions = 8
+  val defaultNumOfRecordsPerPartition = 20
+
+  behavior of "A DurableRDD with Buffer Type Data"
+
+  it should "supports durable buffer as its data type" in {
+    val dataOffset = 8
+    val conf = new SparkConf()
+        .setMaster("local[*]")
+        .setAppName("Test")
+    val sc = new SparkContext(conf)
+    val seed: RDD[Int] = sc.parallelize(
+          Seq.fill(defaultNumOfPartitions)(defaultNumOfRecordsPerPartition), defaultNumOfPartitions)
+    val data = seed flatMap (recnum => Seq.fill(recnum)(Random.nextInt(1024 * 1024) + 1024 * 1024)) cache
+    val durdd = data.makeDurable[DurableBuffer[_]](
+        defaultServiceName,
+        Array(DurableType.BUFFER), Array(),
+        defaultSlotKeyId, defaultPartitionSize,
+        (v: Int, oc: ObjectCreator[DurableBuffer[_], _])=>
+          {
+            val cs = new CRC32
+            cs.reset
+            val buffer = oc.newDurableObjectRecord(v)
+            val bary = new Array[Byte](v - dataOffset)
+            if (null != buffer) {
+              buffer.clear
+              Random.nextBytes(bary)
+              cs.update(bary, 0, bary.length)
+              buffer.get.putLong(cs.getValue)
+              buffer.get.put(bary)
+            }
+            Option(buffer)
+          })
+    val durtsz = durdd map (_.getSize.toInt) sum
+    val derrcount = durdd map (
+        buffer => {
+          var chksum: Long = -1L
+          val cs = new CRC32
+          cs.reset
+          if (null != buffer) {
+            buffer.clear
+            chksum = buffer.get.getLong
+            val bary = new Array[Byte](buffer.get.remaining)
+            buffer.get.get(bary)
+            cs.update(bary)
+          }
+          if (chksum != cs.getValue) 1 else 0
+        }) sum
+    val (rerrcnt: Long, rsz: Long) = (0L, data.sum.toLong)
+    val (derrcnt: Long, dsz: Long) = (derrcount.toLong, durtsz.toLong)
+    durdd.reset
+    assertResult((rerrcnt, rsz)) {
+      (derrcnt, dsz)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/5631b75b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDChunkDataSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDChunkDataSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDChunkDataSpec.scala
index fe4dc88..f76e8ba 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDChunkDataSpec.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDChunkDataSpec.scala
@@ -30,7 +30,6 @@ import org.apache.mnemonic.spark.rdd.DurableRDDFunctions._
 import org.apache.mnemonic.DurableType
 import org.apache.mnemonic.DurableChunk
 import org.apache.mnemonic.Utils
-import org.apache.mnemonic.NonVolatileMemAllocator
 import org.apache.mnemonic.EntityFactoryProxy
 import org.apache.mnemonic.sessions.ObjectCreator
 
@@ -58,7 +57,7 @@ class DurableRDDChunkDataSpec extends TestSpec {
         defaultServiceName,
         Array(DurableType.CHUNK), Array(),
         defaultSlotKeyId, defaultPartitionSize,
-        (v: Int, oc: ObjectCreator[DurableChunk[_], NonVolatileMemAllocator])=>
+        (v: Int, oc: ObjectCreator[DurableChunk[_], _])=>
           {
             val cs = new CRC32
             cs.reset
@@ -66,7 +65,7 @@ class DurableRDDChunkDataSpec extends TestSpec {
             val chunk = oc.newDurableObjectRecord(v)
             var b: Byte = 0
             if (null != chunk) {
-              for (i <- dataOffset until chunk.getSize.asInstanceOf[Int]) {
+              for (i <- dataOffset until chunk.getSize.toInt) {
                 b = Random.nextInt(255).asInstanceOf[Byte]
                 unsafe.putByte(chunk.get + i, b)
                 cs.update(b)
@@ -75,24 +74,25 @@ class DurableRDDChunkDataSpec extends TestSpec {
             }
             Option(chunk)
           })
-    val durtsz = durdd map (_.getSize.asInstanceOf[Int]) sum
+    val durtsz = durdd map (_.getSize.toInt) sum
     val derrcount = durdd map (
         chunk => {
           val unsafe = Utils.getUnsafe
+          var chksum: Long = -1L
           val cs = new CRC32
           cs.reset
           if (null != chunk) {
             var b: Byte = 0
-            for (j <- dataOffset until chunk.getSize.asInstanceOf[Int]) {
+            for (j <- dataOffset until chunk.getSize.toInt) {
               b = unsafe.getByte(chunk.get + j)
               cs.update(b)
             }
+            chksum = unsafe.getLong(chunk.get)
           }
-          val res = unsafe.getLong(chunk.get)
-          if (res != cs.getValue) 1 else 0
+          if (chksum != cs.getValue) 1 else 0
         }) sum
-    val (rerrcnt: Long, rsz: Long) = (0L, data.sum.asInstanceOf[Long])
-    val (derrcnt: Long, dsz: Long) = (derrcount.asInstanceOf[Long], durtsz.asInstanceOf[Long])
+    val (rerrcnt: Long, rsz: Long) = (0L, data.sum.toLong)
+    val (derrcnt: Long, dsz: Long) = (derrcount.toLong, durtsz.toLong)
     durdd.reset
     assertResult((rerrcnt, rsz)) {
       (derrcnt, dsz)

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/5631b75b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDLongDataSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDLongDataSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDLongDataSpec.scala
index cb2a0b5..52f0e05 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDLongDataSpec.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDLongDataSpec.scala
@@ -27,7 +27,6 @@ import org.apache.mnemonic.spark.rdd.DurableRDDFunctions._
 import org.apache.mnemonic.DurableType
 import org.apache.mnemonic.DurableChunk
 import org.apache.mnemonic.Utils
-import org.apache.mnemonic.NonVolatileMemAllocator
 import org.apache.mnemonic.EntityFactoryProxy
 import org.apache.mnemonic.sessions.ObjectCreator
 
@@ -56,7 +55,7 @@ class DurableRDDLongDataSpec extends TestSpec {
         defaultServiceName,
         Array(DurableType.LONG), Array(),
         defaultSlotKeyId, defaultPartitionSize,
-        (v: Int, oc: ObjectCreator[Long, NonVolatileMemAllocator])=>
+        (v: Int, oc: ObjectCreator[Long, _])=>
           { Some(v.asInstanceOf[Long]) })
     // data.collect().foreach(println)
     // durdd.collect().foreach(println)

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/5631b75b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDPersonDataSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDPersonDataSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDPersonDataSpec.scala
index fff0849..118ebe4 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDPersonDataSpec.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDPersonDataSpec.scala
@@ -28,7 +28,6 @@ import org.apache.mnemonic.spark.Person
 import org.apache.mnemonic.spark.PersonListEFProxy
 import org.apache.mnemonic.DurableType
 import org.apache.mnemonic.Utils
-import org.apache.mnemonic.NonVolatileMemAllocator
 import org.apache.mnemonic.EntityFactoryProxy
 import org.apache.mnemonic.sessions.ObjectCreator
 
@@ -55,7 +54,7 @@ class DurableRDDPersonDataSpec extends TestSpec {
         defaultServiceName,
         Array(DurableType.DURABLE), Array(new PersonListEFProxy),
         defaultSlotKeyId, defaultPartitionSize,
-        (v: Long, oc: ObjectCreator[Person[Long], NonVolatileMemAllocator]) =>
+        (v: Long, oc: ObjectCreator[Person[Long], _]) =>
           {
             val person = oc.newDurableObjectRecord
             person.setAge(v.toShort)