You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2014/07/07 21:41:58 UTC
git commit: MAHOUT-1583: cbind() operator for Scala DRMs This closes
apache/mahout#20
Repository: mahout
Updated Branches:
refs/heads/master 9bfb76732 -> 63cebf76e
MAHOUT-1583: cbind() operator for Scala DRMs
This closes apache/mahout#20
Squashed commit of the following:
commit 1baf3fcebf2cd49eb6487afe4eaed689653a3562
Merge: 41f0671 9bfb767
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Mon Jul 7 12:32:34 2014 -0700
Merge branch 'master' into MAHOUT-1583
commit 41f0671c9ccca81883475437fe18056563c1f8ac
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Mon Jul 7 12:27:26 2014 -0700
Adding assignment time comparison test
commit 6e115c87639ffb111b2e089d7ecdf0eaeecbb02b
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Mon Jun 16 18:42:42 2014 -0700
+ Licenses.
commit d3e905b5a92735822efe717f8b7a57c80bc56478
Author: Dmitriy Lyubimov <dl...@apache.org>
Date: Mon Jun 16 18:09:59 2014 -0700
initial writeup
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/63cebf76
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/63cebf76
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/63cebf76
Branch: refs/heads/master
Commit: 63cebf76e9c7746a1851841e37f5873704e086e8
Parents: 9bfb767
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Mon Jul 7 12:34:12 2014 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Mon Jul 7 12:34:12 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../apache/mahout/math/drm/RLikeDrmOps.scala | 2 +
.../mahout/math/drm/logical/OpCbind.scala | 42 +++++++++
.../math/scalabindings/MatrixOpsSuite.scala | 51 ++++++++++-
.../mahout/sparkbindings/SparkEngine.scala | 1 +
.../mahout/sparkbindings/blas/CbindAB.scala | 95 ++++++++++++++++++++
.../sparkbindings/drm/RLikeDrmOpsSuite.scala | 70 ++++++++++-----
7 files changed, 237 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 23ad49e..7ea84c5 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
Release 1.0 - unreleased
+ MAHOUT-1583: cbind() operator for Scala DRMs
+
MAHOUT-1541, MAHOUT-1568, MAHOUT-1569: Created text-delimited file I/O traits and classes on spark, a MahoutDriver for a CLI and a ItemSimilairtyDriver using the CLI
MAHOUT-1573: More explicit parallelism adjustments in math-scala DRM apis; elements of automatic parallelism management (dlyubimov)
http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
index 7ac5577..d7027f2 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -60,6 +60,8 @@ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
def %*%(that: Vector): DrmLike[K] = :%*%(that)
def t: DrmLike[Int] = OpAtAnyKey(A = drm)
+
+ def cbind(that: DrmLike[K]) = OpCbind(A = this.drm, B = that)
}
class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {
http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala
new file mode 100644
index 0000000..1425264
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+import scala.util.Random
+
+/** cbind() logical operator */
+case class OpCbind[K: ClassTag](
+ override var A: DrmLike[K],
+ override var B: DrmLike[K]
+ ) extends AbstractBinaryOp[K, K, K] {
+
+ assert(A.nrow == B.nrow, "arguments must have same number of rows")
+
+ override protected[mahout] lazy val partitioningTag: Long =
+ if (A.partitioningTag == B.partitioningTag) A.partitioningTag
+ else Random.nextLong()
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = A.ncol + B.ncol
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
index 8374a9b..c6fccbc 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
@@ -21,19 +21,20 @@ import org.scalatest.{Matchers, FunSuite}
import MatrixOps._
import scala._
import org.apache.mahout.test.MahoutSuite
+import org.apache.mahout.math.{RandomAccessSparseVector, SequentialAccessSparseVector, Matrices}
+import org.apache.mahout.common.RandomUtils
class MatrixOpsSuite extends FunSuite with MahoutSuite {
-
test("equivalence") {
val a = dense((1, 2, 3), (3, 4, 5))
val b = dense((1, 2, 3), (3, 4, 5))
val c = dense((1, 4, 3), (3, 4, 5))
assert(a === b)
assert(a !== c)
-
}
+
test("elementwise plus, minus") {
val a = dense((1, 2, 3), (3, 4, 5))
val b = dense((1, 1, 2), (2, 1, 1))
@@ -42,7 +43,6 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite {
assert(c(0, 0) == 2)
assert(c(1, 2) == 6)
println(c.toString)
-
}
test("matrix, vector slicing") {
@@ -132,7 +132,52 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite {
)
a.numNonZeroElementsPerColumn() should equal(dvec(3,2,4))
+ }
+
+ test("Vector Assignment performance") {
+
+ val n = 1000
+ val k = (n * 0.1).toInt
+ val nIters = 10000
+
+ val rnd = RandomUtils.getRandom
+
+ val src = new SequentialAccessSparseVector(n)
+ for (i <- 0 until k) src(rnd.nextInt(n)) = rnd.nextDouble()
+
+ val times = (0 until 50).map { i =>
+ val ms = System.currentTimeMillis()
+ var j = 0
+ while (j < nIters) {
+ new SequentialAccessSparseVector(n) := src
+ j += 1
+ }
+ System.currentTimeMillis() - ms
+ }
+
+ .tail
+
+ val avgTime = times.sum.toDouble / times.size
+
+ printf("Average assignment seqSparse2seqSparse time: %.3f ms\n", avgTime)
+
+ val times2 = (0 until 50).map { i =>
+ val ms = System.currentTimeMillis()
+ var j = 0
+ while (j < nIters) {
+ new SequentialAccessSparseVector(n) := (new RandomAccessSparseVector(n) := src)
+ j += 1
+ }
+ System.currentTimeMillis() - ms
+ }
+
+ .tail
+
+ val avgTime2 = times2.sum.toDouble / times2.size
+
+ printf("Average assignment seqSparse2seqSparse via Random Access Sparse time: %.3f ms\n", avgTime2)
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 996eb1b..dbdc934 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -252,6 +252,7 @@ object SparkEngine extends DistributedEngine {
case op@OpAx(a, x) => Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA))
case op@OpAtx(a, x) => Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA))
case op@OpAewB(a, b, opId) => AewB.a_ew_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpCbind(a, b) => CbindAB.cbindAB_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
case op@OpAewScalar(a, s, _) => AewB.a_ew_scalar(op, tr2phys(a)(op.classTagA), s)
case op@OpRowRange(a, _) => Slicing.rowRange(op, tr2phys(a)(op.classTagA))
case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA))
http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
new file mode 100644
index 0000000..ea10ccb
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.blas
+
+import org.apache.log4j.Logger
+import scala.reflect.ClassTag
+import org.apache.mahout.sparkbindings.drm.DrmRddInput
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm.logical.OpCbind
+import org.apache.spark.SparkContext._
+
+/** Physical cbind */
+object CbindAB {
+
+ private val log = Logger.getLogger(CbindAB.getClass)
+
+ def cbindAB_nograph[K: ClassTag](op: OpCbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
+
+ val a = srcA.toDrmRdd()
+ val b = srcB.toDrmRdd()
+ val n = op.ncol
+ val n1 = op.A.ncol
+ val n2 = n - n1
+
+ // Check if A and B are identically partitioned AND keyed. if they are, then just perform zip
+ // instead of join, and apply the op map-side. Otherwise, perform join and apply the op
+ // reduce-side.
+ val rdd = if (op.isIdenticallyPartitioned(op.A)) {
+
+ log.debug("applying zipped cbind()")
+
+ a
+ .zip(b)
+ .map {
+ case ((keyA, vectorA), (keyB, vectorB)) =>
+ assert(keyA == keyB, "inputs are claimed identically partitioned, but they are not identically keyed")
+
+ val dense = vectorA.isDense && vectorB.isDense
+ val vec: Vector = if (dense) new DenseVector(n) else new SequentialAccessSparseVector(n)
+ vec(0 until n1) := vectorA
+ vec(n1 until n) := vectorB
+ keyA -> vec
+ }
+ } else {
+
+ log.debug("applying cbind as join")
+
+ a
+ .cogroup(b, numPartitions = a.partitions.size max b.partitions.size)
+ .map {
+ case (key, (vectorSeqA, vectorSeqB)) =>
+
+ // Generally, after co-grouping, we should not accept anything but 1 to 1 in the left and
+ // the right groups. However let's be flexible here, if it does happen, recombine them into 1.
+
+ val vectorA = if (vectorSeqA.size <= 1)
+ vectorSeqA.headOption.getOrElse(new RandomAccessSparseVector(n1))
+ else
+ (vectorSeqA.head.like() /: vectorSeqA)(_ += _)
+
+ val vectorB = if ( vectorSeqB.size <= 1)
+ vectorSeqB.headOption.getOrElse(new RandomAccessSparseVector(n2))
+ else
+ (vectorSeqB.head.like() /: vectorSeqB)(_ += _)
+
+ val dense = vectorA.isDense && vectorB.isDense
+ val vec:Vector = if (dense) new DenseVector(n) else new SequentialAccessSparseVector(n)
+ vec(0 until n1) := vectorA
+ vec(n1 until n) := vectorB
+ key -> vec
+ }
+ }
+
+ new DrmRddInput(rowWiseSrc = Some(op.ncol -> rdd))
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
index 3cd49cd..50f8978 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
@@ -94,12 +94,12 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
B.colSums()
- val x = drmBroadcast(dvec(0,0))
- val x2 = drmBroadcast(dvec(0,0))
+ val x = drmBroadcast(dvec(0, 0))
+ val x2 = drmBroadcast(dvec(0, 0))
// Distributed operation
val C = (B.t %*% A.t).t.mapBlock() {
case (keys, block) =>
- for (row <- 0 until block.nrow) block(row,::) += x.value + x2
+ for (row <- 0 until block.nrow) block(row, ::) += x.value + x2
keys -> block
}
@@ -133,7 +133,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
val B = drmParallelize(inCoreB, numPartitions = 2)
// Re-key B into DrmLike[String] instead of [Int]
.mapBlock()({
- case (keys,block) => keys.map(_.toString) -> block
+ case (keys, block) => keys.map(_.toString) -> block
})
val C = A %*% B
@@ -146,7 +146,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
test("C = At %*% B , join") {
- val inCoreA = dense((1, 2), (3, 4),(-3, -5))
+ val inCoreA = dense((1, 2), (3, 4), (-3, -5))
val inCoreB = dense((3, 5), (4, 6), (0, 1))
val A = drmParallelize(inCoreA, numPartitions = 2)
@@ -154,7 +154,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
val C = A.t %*% B
- SparkEngine.optimizerRewrite(C) should equal (OpAtB[Int](A,B))
+ SparkEngine.optimizerRewrite(C) should equal(OpAtB[Int](A, B))
val inCoreC = C.collect
val inCoreControlC = inCoreA.t %*% inCoreB
@@ -165,7 +165,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
test("C = At %*% B , join, String-keyed") {
- val inCoreA = dense((1, 2), (3, 4),(-3, -5))
+ val inCoreA = dense((1, 2), (3, 4), (-3, -5))
val inCoreB = dense((3, 5), (4, 6), (0, 1))
val A = drmParallelize(inCoreA, numPartitions = 2)
@@ -180,7 +180,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
val C = A.t %*% B
- SparkEngine.optimizerRewrite(C) should equal (OpAtB[String](A,B))
+ SparkEngine.optimizerRewrite(C) should equal(OpAtB[String](A, B))
val inCoreC = C.collect
val inCoreControlC = inCoreA.t %*% inCoreB
@@ -191,18 +191,18 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
test("C = At %*% B , zippable, String-keyed") {
- val inCoreA = dense((1, 2), (3, 4),(-3, -5))
+ val inCoreA = dense((1, 2), (3, 4), (-3, -5))
val A = drmParallelize(inCoreA, numPartitions = 2)
.mapBlock()({
case (keys, block) => keys.map(_.toString) -> block
})
- val B = A + 1.0
+ val B = A + 1.0
val C = A.t %*% B
- SparkEngine.optimizerRewrite(C) should equal (OpAtB[String](A,B))
+ SparkEngine.optimizerRewrite(C) should equal(OpAtB[String](A, B))
val inCoreC = C.collect
val inCoreControlC = inCoreA.t %*% (inCoreA + 1.0)
@@ -319,12 +319,12 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
val A = drmParallelize(inCoreA, numPartitions = 2)
- printf("A.nrow=%d.\n",A.rdd.count())
+ printf("A.nrow=%d.\n", A.rdd.count())
// Create B which would be identically partitioned to A. mapBlock() by default will do the trick.
val B = A.mapBlock() {
case (keys, block) =>
- val bBlock = block.like() := ((r,c,v) => util.Random.nextDouble())
+ val bBlock = block.like() := ((r, c, v) => util.Random.nextDouble())
keys -> bBlock
}
// Prevent repeated computation non-determinism
@@ -394,7 +394,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
val inCoreB = dense((3, 5), (4, 6))
val B = drmParallelize(inCoreB, numPartitions = 2)
-// val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY_SER)
+ // val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY_SER)
val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY)
val C = A + B
@@ -403,17 +403,17 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
val inCoreD = (A + B).collect
// Actual
- val inCoreCControl = inCoreA + inCoreB * 2.0
+ val inCoreCControl = inCoreA + inCoreB * 2.0
(inCoreC - inCoreCControl).norm should be < 1E-10
(inCoreD - inCoreCControl).norm should be < 1E-10
}
- test ("general side") {
+ test("general side") {
val sc = implicitly[DistributedContext]
- val k1 = sc.parallelize(Seq(ArrayBuffer(0,1,2,3)))
-// .persist(StorageLevel.MEMORY_ONLY) // -- this will demonstrate immutability side effect!
- .persist(StorageLevel.MEMORY_ONLY_SER)
+ val k1 = sc.parallelize(Seq(ArrayBuffer(0, 1, 2, 3)))
+ // .persist(StorageLevel.MEMORY_ONLY) // -- this will demonstrate immutability side effect!
+ .persist(StorageLevel.MEMORY_ONLY_SER)
println(k1.map(_ += 4).collect.head)
println(k1.map(_ += 4).collect.head)
@@ -444,7 +444,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
val drmA = drmParallelize(inCoreA, numPartitions = 2)
- SparkEngine.optimizerRewrite(drmA.t %*% x) should equal (OpAtx(drmA, x))
+ SparkEngine.optimizerRewrite(drmA.t %*% x) should equal(OpAtx(drmA, x))
val atx = (drmA.t %*% x).collect(::, 0)
@@ -459,8 +459,8 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
)
val drmA = drmParallelize(inCoreA, numPartitions = 2)
- drmA.colSums() should equal (inCoreA.colSums())
- drmA.colMeans() should equal (inCoreA.colMeans())
+ drmA.colSums() should equal(inCoreA.colSums())
+ drmA.colMeans() should equal(inCoreA.colMeans())
}
test("numNonZeroElementsPerColumn") {
@@ -472,7 +472,31 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
)
val drmA = drmParallelize(inCoreA, numPartitions = 2)
- drmA.numNonZeroElementsPerColumn() should equal (inCoreA.numNonZeroElementsPerColumn())
+ drmA.numNonZeroElementsPerColumn() should equal(inCoreA.numNonZeroElementsPerColumn())
+ }
+
+ test("C = A cbind B, cogroup") {
+
+ val inCoreA = dense((1, 2), (3, 4))
+ val inCoreB = dense((3, 5), (4, 6))
+ val controlC = dense((1, 2, 3, 5), (3, 4, 4, 6))
+
+ val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
+ val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint()
+
+ (A.cbind(B) -: controlC).norm should be < 1e-10
+
+ }
+
+ test("C = A cbind B, zip") {
+
+ val inCoreA = dense((1, 2), (3, 4))
+ val controlC = dense((1, 2, 2, 3), (3, 4, 4, 5))
+
+ val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
+
+ (A.cbind(A + 1.0) -: controlC).norm should be < 1e-10
+
}
}