You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mnemonic.apache.org by ga...@apache.org on 2017/04/26 22:25:03 UTC
incubator-mnemonic git commit: MNEMONIC-248: Add a test case to
verify the durable chunk type dataset & bugfixes
Repository: incubator-mnemonic
Updated Branches:
refs/heads/master 70c53f24b -> f2565bfcf
MNEMONIC-248: Add a test case to verify the durable chunk type dataset & bugfixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/f2565bfc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/f2565bfc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/f2565bfc
Branch: refs/heads/master
Commit: f2565bfcf63ca2c8eb8769478f4614db51d13d5f
Parents: 70c53f2
Author: Wang, Gang(Gary) <ga...@intel.com>
Authored: Wed Apr 26 15:10:43 2017 -0700
Committer: Wang, Gang(Gary) <ga...@intel.com>
Committed: Wed Apr 26 15:10:43 2017 -0700
----------------------------------------------------------------------
build-tools/test.conf | 4 +-
.../spark/rdd/DurableRDDChunkDataSpec.scala | 101 +++++++++++++++++++
.../spark/rdd/DurableRDDLongDataSpec.scala | 75 ++++++++++++++
.../mnemonic/spark/rdd/DurableRDDSpec.scala | 76 --------------
mnemonic-spark/pom.xml | 1 +
5 files changed, 180 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/f2565bfc/build-tools/test.conf
----------------------------------------------------------------------
diff --git a/build-tools/test.conf b/build-tools/test.conf
index 7645b26..f759c9f 100644
--- a/build-tools/test.conf
+++ b/build-tools/test.conf
@@ -58,5 +58,7 @@ mvn -Dtest=MneMapreduceChunkDataTest test -pl mnemonic-hadoop/mnemonic-hadoop-ma
mvn -Dtest=MneMapredBufferDataTest test -pl mnemonic-hadoop/mnemonic-hadoop-mapreduce -DskipTests=false
# a testcase for module "mnemonic-spark/mnemonic-spark-core" that requires 'pmalloc' memory service to pass
-mvn -Dtest=DurableRDDSpec test -pl mnemonic-spark/mnemonic-spark-core -DskipTests=false
+mvn -Dsuites=org.apache.mnemonic.spark.rdd.DurableRDDLongDataSpec test -pl mnemonic-spark/mnemonic-spark-core -DskipTests=false
+
+mvn -Dsuites=org.apache.mnemonic.spark.rdd.DurableRDDChunkDataSpec test -pl mnemonic-spark/mnemonic-spark-core -DskipTests=false
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/f2565bfc/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDChunkDataSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDChunkDataSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDChunkDataSpec.scala
new file mode 100644
index 0000000..fe4dc88
--- /dev/null
+++ b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDChunkDataSpec.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.spark.rdd
+
+import java.util.zip.CRC32
+import java.util.zip.Checksum
+
+import scala.util._
+import scala.language.existentials
+import org.apache.mnemonic.spark.TestSpec
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.mnemonic.spark.rdd.DurableRDDFunctions._
+import org.apache.mnemonic.DurableType
+import org.apache.mnemonic.DurableChunk
+import org.apache.mnemonic.Utils
+import org.apache.mnemonic.NonVolatileMemAllocator
+import org.apache.mnemonic.EntityFactoryProxy
+import org.apache.mnemonic.sessions.ObjectCreator
+
+class DurableRDDChunkDataSpec extends TestSpec {
+
+ val defaultServiceName = "pmalloc"
+ val defaultSlotKeyId = 2L
+ val defaultPartitionSize = 1024 * 1024 * 1024L
+ val defaultBaseDirectory = "."
+ val defaultNumOfPartitions = 8
+ val defaultNumOfRecordsPerPartition = 20
+
+ behavior of "A DurableRDD with Chunk Type Data"
+
+ it should "supports durable chunk as its data type" in {
+ val dataOffset = 8
+ val conf = new SparkConf()
+ .setMaster("local[*]")
+ .setAppName("Test")
+ val sc = new SparkContext(conf)
+ val seed: RDD[Int] = sc.parallelize(
+ Seq.fill(defaultNumOfPartitions)(defaultNumOfRecordsPerPartition), defaultNumOfPartitions)
+ val data = seed flatMap (recnum => Seq.fill(recnum)(Random.nextInt(1024 * 1024) + 1024 * 1024)) cache
+ val durdd = data.makeDurable[DurableChunk[_]](
+ defaultServiceName,
+ Array(DurableType.CHUNK), Array(),
+ defaultSlotKeyId, defaultPartitionSize,
+ (v: Int, oc: ObjectCreator[DurableChunk[_], NonVolatileMemAllocator])=>
+ {
+ val cs = new CRC32
+ cs.reset
+ val unsafe = Utils.getUnsafe
+ val chunk = oc.newDurableObjectRecord(v)
+ var b: Byte = 0
+ if (null != chunk) {
+ for (i <- dataOffset until chunk.getSize.asInstanceOf[Int]) {
+ b = Random.nextInt(255).asInstanceOf[Byte]
+ unsafe.putByte(chunk.get + i, b)
+ cs.update(b)
+ }
+ unsafe.putLong(chunk.get, cs.getValue)
+ }
+ Option(chunk)
+ })
+ val durtsz = durdd map (_.getSize.asInstanceOf[Int]) sum
+ val derrcount = durdd map (
+ chunk => {
+ val unsafe = Utils.getUnsafe
+ val cs = new CRC32
+ cs.reset
+ if (null != chunk) {
+ var b: Byte = 0
+ for (j <- dataOffset until chunk.getSize.asInstanceOf[Int]) {
+ b = unsafe.getByte(chunk.get + j)
+ cs.update(b)
+ }
+ }
+ val res = unsafe.getLong(chunk.get)
+ if (res != cs.getValue) 1 else 0
+ }) sum
+ val (rerrcnt: Long, rsz: Long) = (0L, data.sum.asInstanceOf[Long])
+ val (derrcnt: Long, dsz: Long) = (derrcount.asInstanceOf[Long], durtsz.asInstanceOf[Long])
+ durdd.reset
+ assertResult((rerrcnt, rsz)) {
+ (derrcnt, dsz)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/f2565bfc/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDLongDataSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDLongDataSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDLongDataSpec.scala
new file mode 100644
index 0000000..cb2a0b5
--- /dev/null
+++ b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDLongDataSpec.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.spark.rdd
+
+import scala.util._
+import scala.language.existentials
+import org.apache.mnemonic.spark.TestSpec
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.mnemonic.spark.rdd.DurableRDDFunctions._
+import org.apache.mnemonic.DurableType
+import org.apache.mnemonic.DurableChunk
+import org.apache.mnemonic.Utils
+import org.apache.mnemonic.NonVolatileMemAllocator
+import org.apache.mnemonic.EntityFactoryProxy
+import org.apache.mnemonic.sessions.ObjectCreator
+
+class DurableRDDLongDataSpec extends TestSpec {
+
+ val defaultServiceName = "pmalloc"
+ val defaultSlotKeyId = 2L
+ val defaultPartitionSize = 1024 * 1024 * 1024L
+ val defaultBaseDirectory = "."
+ val defaultNumOfPartitions = 8
+ val defaultNumOfRecordsPerPartition = 5000
+
+ behavior of "A DurableRDD with Long Type Data"
+
+ it should "have unmodified values if only converting the data type from Int to Long" in {
+ val conf = new SparkConf()
+ .setMaster("local[*]")
+ .setAppName("Test")
+ val sc = new SparkContext(conf)
+ // sc.getConf.getAll.foreach(println)
+ // DurableRDD.setDurableBaseDir(sc, defaultBaseDirectory)
+ val seed: RDD[Int] = sc.parallelize(
+ Seq.fill(defaultNumOfPartitions)(defaultNumOfRecordsPerPartition), defaultNumOfPartitions)
+ val data = seed flatMap (recnum => Seq.fill(recnum)(Random.nextInt)) cache //must be cached to fix rand numbers
+ val durdd = data.makeDurable[Long](
+ defaultServiceName,
+ Array(DurableType.LONG), Array(),
+ defaultSlotKeyId, defaultPartitionSize,
+ (v: Int, oc: ObjectCreator[Long, NonVolatileMemAllocator])=>
+ { Some(v.asInstanceOf[Long]) })
+ // data.collect().foreach(println)
+ // durdd.collect().foreach(println)
+ val (rcnt, rsum) = (data.count, data.sum)
+ val (dcnt, dsum) = (durdd.count, durdd.sum)
+ durdd.reset
+ /*sys.addShutdownHook({
+ DurableRDD.cleanupForApp(sc)
+ })*/
+ assertResult((rcnt, rsum)) {
+ (dcnt, dsum)
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/f2565bfc/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala b/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
deleted file mode 100644
index 1937863..0000000
--- a/mnemonic-spark/mnemonic-spark-core/src/test/scala/org/apache/mnemonic/spark/rdd/DurableRDDSpec.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mnemonic.spark.rdd;
-
-import scala.util._
-import org.apache.mnemonic.spark.TestSpec
-import org.apache.spark.SparkConf
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.apache.mnemonic.spark.rdd.DurableRDDFunctions._
-import org.apache.mnemonic.DurableType
-import org.apache.mnemonic.NonVolatileMemAllocator
-import org.apache.mnemonic.EntityFactoryProxy
-import org.apache.mnemonic.sessions.ObjectCreator
-
-class DurableRDDSpec extends TestSpec {
-
- val defaultServiceName = "pmalloc"
- val defaultSlotKeyId = 2L
- val defaultPartitionSize = 1024 * 1024 * 1024L
- val defaultBaseDirectory = "."
- val defaultNumOfPartitions = 8
- val defaultNumOfRecordsPerPartition = 5000
-
- behavior of "A DurableRDD"
-
- it should "have the same sum value" in {
- val conf = new SparkConf()
- .setMaster("local[*]")
- .setAppName("Test")
- val sc = new SparkContext(conf)
- // sc.getConf.getAll.foreach(println)
- // DurableRDD.setDurableBaseDir(sc, defaultBaseDirectory)
- val seed: RDD[Int] = sc.parallelize(
- Seq.fill(defaultNumOfPartitions)(defaultNumOfRecordsPerPartition), defaultNumOfPartitions)
- val data = seed flatMap (recnum => Seq.fill(recnum)(Random.nextInt)) cache //must be cached to fix rand numbers
- val durdd = data.makeDurable[Long](
- defaultServiceName,
- Array(DurableType.LONG), Array(),
- defaultSlotKeyId, defaultPartitionSize,
- (v: Int, oc: ObjectCreator[Long, NonVolatileMemAllocator])=>
- { Some(v.asInstanceOf[Long]) })
- // data.collect().foreach(println)
- // durdd.collect().foreach(println)
- val (rcnt, rsum) = (data.count, data.sum)
- val (dcnt, dsum) = (durdd.count, durdd.sum)
- durdd.reset
- /*sys.addShutdownHook({
- DurableRDD.cleanupForApp(sc)
- })*/
- assertResult((rcnt, rsum)) {
- (dcnt, dsum)
- }
- }
-
- it should "produce NoSuchElementException when head is invoked" in {
- assertThrows[NoSuchElementException] {
- Set.empty.head
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/f2565bfc/mnemonic-spark/pom.xml
----------------------------------------------------------------------
diff --git a/mnemonic-spark/pom.xml b/mnemonic-spark/pom.xml
index 73978ac..ac67f93 100644
--- a/mnemonic-spark/pom.xml
+++ b/mnemonic-spark/pom.xml
@@ -160,6 +160,7 @@
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
+ <parallel>false</parallel>
<filereports>SparkIntegrationTestSuite.txt</filereports>
<forkMode>always</forkMode>
<skipTests>${skipTests}</skipTests>