You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/04/27 22:32:21 UTC
incubator-mnemonic git commit: MNEMONIC-250: Add a test case to
verify buffer type dataset & bugfixes
Repository: incubator-mnemonic
Updated Branches:
refs/heads/master b862d1241 -> 5631b75b6
MNEMONIC-250: Add a test case to verify buffer type dataset & bugfixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/5631b75b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/5631b75b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/5631b75b
Branch: refs/heads/master
Commit: 5631b75b61797d2745b56b4307844dae4c8cc49f
Parents: b862d12
Author: Wang, Gang(Gary) <ga...@intel.com>
Authored: Thu Apr 27 15:15:15 2017 -0700
Committer: Wang, Gang(Gary) <ga...@intel.com>
Committed: Thu Apr 27 15:15:15 2017 -0700
----------------------------------------------------------------------
build-tools/test.conf | 2 +
.../spark/rdd/DurableRDDBufferDataSpec.scala | 97 ++++++++++++++++++++
.../spark/rdd/DurableRDDChunkDataSpec.scala | 18 ++--
.../spark/rdd/DurableRDDLongDataSpec.scala | 3 +-
.../spark/rdd/DurableRDDPersonDataSpec.scala | 3 +-
5 files changed, 110 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/5631b75b/build-tools/test.conf
----------------------------------------------------------------------
diff --git a/build-tools/test.conf b/build-tools/test.conf
index 98a9318..3663847 100644
--- a/build-tools/test.conf
+++ b/build-tools/test.conf
@@ -64,3 +64,5 @@ mvn -Dsuites=org.apache.mnemonic.spark.rdd.DurableRDDChunkDataSpec test -pl mnem
mvn -Dsuites=org.apache.mnemonic.spark.rdd.DurableRDDPersonDataSpec test -pl mnemonic-spark/mnemonic-spark-core -DskipTests=false
+mvn -Dsuites=org.apache.mnemonic.spark.rdd.DurableRDDBufferDataSpec test -pl mnemonic-spark/mnemonic-spark-core -DskipTests=false
+
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/5631b75b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDBufferDataSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDBufferDataSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDBufferDataSpec.scala
new file mode 100644
index 0000000..d4b478a
--- /dev/null
+++ b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDBufferDataSpec.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.spark.rdd
+
+import java.util.zip.CRC32
+import java.util.zip.Checksum
+
+import scala.util._
+import scala.language.existentials
+import org.apache.mnemonic.spark.TestSpec
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.mnemonic.spark.rdd.DurableRDDFunctions._
+import org.apache.mnemonic.DurableType
+import org.apache.mnemonic.DurableBuffer
+import org.apache.mnemonic.Utils
+import org.apache.mnemonic.EntityFactoryProxy
+import org.apache.mnemonic.sessions.ObjectCreator
+
+class DurableRDDBufferDataSpec extends TestSpec {
+
+ val defaultServiceName = "pmalloc"
+ val defaultSlotKeyId = 2L
+ val defaultPartitionSize = 1024 * 1024 * 1024L
+ val defaultBaseDirectory = "."
+ val defaultNumOfPartitions = 8
+ val defaultNumOfRecordsPerPartition = 20
+
+ behavior of "A DurableRDD with Buffer Type Data"
+
+ it should "supports durable buffer as its data type" in {
+ val dataOffset = 8
+ val conf = new SparkConf()
+ .setMaster("local[*]")
+ .setAppName("Test")
+ val sc = new SparkContext(conf)
+ val seed: RDD[Int] = sc.parallelize(
+ Seq.fill(defaultNumOfPartitions)(defaultNumOfRecordsPerPartition), defaultNumOfPartitions)
+ val data = seed flatMap (recnum => Seq.fill(recnum)(Random.nextInt(1024 * 1024) + 1024 * 1024)) cache
+ val durdd = data.makeDurable[DurableBuffer[_]](
+ defaultServiceName,
+ Array(DurableType.BUFFER), Array(),
+ defaultSlotKeyId, defaultPartitionSize,
+ (v: Int, oc: ObjectCreator[DurableBuffer[_], _])=>
+ {
+ val cs = new CRC32
+ cs.reset
+ val buffer = oc.newDurableObjectRecord(v)
+ val bary = new Array[Byte](v - dataOffset)
+ if (null != buffer) {
+ buffer.clear
+ Random.nextBytes(bary)
+ cs.update(bary, 0, bary.length)
+ buffer.get.putLong(cs.getValue)
+ buffer.get.put(bary)
+ }
+ Option(buffer)
+ })
+ val durtsz = durdd map (_.getSize.toInt) sum
+ val derrcount = durdd map (
+ buffer => {
+ var chksum: Long = -1L
+ val cs = new CRC32
+ cs.reset
+ if (null != buffer) {
+ buffer.clear
+ chksum = buffer.get.getLong
+ val bary = new Array[Byte](buffer.get.remaining)
+ buffer.get.get(bary)
+ cs.update(bary)
+ }
+ if (chksum != cs.getValue) 1 else 0
+ }) sum
+ val (rerrcnt: Long, rsz: Long) = (0L, data.sum.toLong)
+ val (derrcnt: Long, dsz: Long) = (derrcount.toLong, durtsz.toLong)
+ durdd.reset
+ assertResult((rerrcnt, rsz)) {
+ (derrcnt, dsz)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/5631b75b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDChunkDataSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDChunkDataSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDChunkDataSpec.scala
index fe4dc88..f76e8ba 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDChunkDataSpec.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDChunkDataSpec.scala
@@ -30,7 +30,6 @@ import org.apache.mnemonic.spark.rdd.DurableRDDFunctions._
import org.apache.mnemonic.DurableType
import org.apache.mnemonic.DurableChunk
import org.apache.mnemonic.Utils
-import org.apache.mnemonic.NonVolatileMemAllocator
import org.apache.mnemonic.EntityFactoryProxy
import org.apache.mnemonic.sessions.ObjectCreator
@@ -58,7 +57,7 @@ class DurableRDDChunkDataSpec extends TestSpec {
defaultServiceName,
Array(DurableType.CHUNK), Array(),
defaultSlotKeyId, defaultPartitionSize,
- (v: Int, oc: ObjectCreator[DurableChunk[_], NonVolatileMemAllocator])=>
+ (v: Int, oc: ObjectCreator[DurableChunk[_], _])=>
{
val cs = new CRC32
cs.reset
@@ -66,7 +65,7 @@ class DurableRDDChunkDataSpec extends TestSpec {
val chunk = oc.newDurableObjectRecord(v)
var b: Byte = 0
if (null != chunk) {
- for (i <- dataOffset until chunk.getSize.asInstanceOf[Int]) {
+ for (i <- dataOffset until chunk.getSize.toInt) {
b = Random.nextInt(255).asInstanceOf[Byte]
unsafe.putByte(chunk.get + i, b)
cs.update(b)
@@ -75,24 +74,25 @@ class DurableRDDChunkDataSpec extends TestSpec {
}
Option(chunk)
})
- val durtsz = durdd map (_.getSize.asInstanceOf[Int]) sum
+ val durtsz = durdd map (_.getSize.toInt) sum
val derrcount = durdd map (
chunk => {
val unsafe = Utils.getUnsafe
+ var chksum: Long = -1L
val cs = new CRC32
cs.reset
if (null != chunk) {
var b: Byte = 0
- for (j <- dataOffset until chunk.getSize.asInstanceOf[Int]) {
+ for (j <- dataOffset until chunk.getSize.toInt) {
b = unsafe.getByte(chunk.get + j)
cs.update(b)
}
+ chksum = unsafe.getLong(chunk.get)
}
- val res = unsafe.getLong(chunk.get)
- if (res != cs.getValue) 1 else 0
+ if (chksum != cs.getValue) 1 else 0
}) sum
- val (rerrcnt: Long, rsz: Long) = (0L, data.sum.asInstanceOf[Long])
- val (derrcnt: Long, dsz: Long) = (derrcount.asInstanceOf[Long], durtsz.asInstanceOf[Long])
+ val (rerrcnt: Long, rsz: Long) = (0L, data.sum.toLong)
+ val (derrcnt: Long, dsz: Long) = (derrcount.toLong, durtsz.toLong)
durdd.reset
assertResult((rerrcnt, rsz)) {
(derrcnt, dsz)
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/5631b75b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDLongDataSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDLongDataSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDLongDataSpec.scala
index cb2a0b5..52f0e05 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDLongDataSpec.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDLongDataSpec.scala
@@ -27,7 +27,6 @@ import org.apache.mnemonic.spark.rdd.DurableRDDFunctions._
import org.apache.mnemonic.DurableType
import org.apache.mnemonic.DurableChunk
import org.apache.mnemonic.Utils
-import org.apache.mnemonic.NonVolatileMemAllocator
import org.apache.mnemonic.EntityFactoryProxy
import org.apache.mnemonic.sessions.ObjectCreator
@@ -56,7 +55,7 @@ class DurableRDDLongDataSpec extends TestSpec {
defaultServiceName,
Array(DurableType.LONG), Array(),
defaultSlotKeyId, defaultPartitionSize,
- (v: Int, oc: ObjectCreator[Long, NonVolatileMemAllocator])=>
+ (v: Int, oc: ObjectCreator[Long, _])=>
{ Some(v.asInstanceOf[Long]) })
// data.collect().foreach(println)
// durdd.collect().foreach(println)
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/5631b75b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDPersonDataSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDPersonDataSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDPersonDataSpec.scala
index fff0849..118ebe4 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDPersonDataSpec.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDPersonDataSpec.scala
@@ -28,7 +28,6 @@ import org.apache.mnemonic.spark.Person
import org.apache.mnemonic.spark.PersonListEFProxy
import org.apache.mnemonic.DurableType
import org.apache.mnemonic.Utils
-import org.apache.mnemonic.NonVolatileMemAllocator
import org.apache.mnemonic.EntityFactoryProxy
import org.apache.mnemonic.sessions.ObjectCreator
@@ -55,7 +54,7 @@ class DurableRDDPersonDataSpec extends TestSpec {
defaultServiceName,
Array(DurableType.DURABLE), Array(new PersonListEFProxy),
defaultSlotKeyId, defaultPartitionSize,
- (v: Long, oc: ObjectCreator[Person[Long], NonVolatileMemAllocator]) =>
+ (v: Long, oc: ObjectCreator[Person[Long], _]) =>
{
val person = oc.newDurableObjectRecord
person.setAge(v.toShort)