You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2014/07/07 21:41:58 UTC

git commit: MAHOUT-1583: cbind() operator for Scala DRMs This closes apache/mahout#20

Repository: mahout
Updated Branches:
  refs/heads/master 9bfb76732 -> 63cebf76e


MAHOUT-1583: cbind() operator for Scala DRMs
This closes apache/mahout#20

Squashed commit of the following:

commit 1baf3fcebf2cd49eb6487afe4eaed689653a3562
Merge: 41f0671 9bfb767
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Mon Jul 7 12:32:34 2014 -0700

    Merge branch 'master' into MAHOUT-1583

commit 41f0671c9ccca81883475437fe18056563c1f8ac
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Mon Jul 7 12:27:26 2014 -0700

    Adding assignment time comparison test

commit 6e115c87639ffb111b2e089d7ecdf0eaeecbb02b
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Mon Jun 16 18:42:42 2014 -0700

    + Licenses.

commit d3e905b5a92735822efe717f8b7a57c80bc56478
Author: Dmitriy Lyubimov <dl...@apache.org>
Date:   Mon Jun 16 18:09:59 2014 -0700

    initial writeup


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/63cebf76
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/63cebf76
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/63cebf76

Branch: refs/heads/master
Commit: 63cebf76e9c7746a1851841e37f5873704e086e8
Parents: 9bfb767
Author: Dmitriy Lyubimov <dl...@apache.org>
Authored: Mon Jul 7 12:34:12 2014 -0700
Committer: Dmitriy Lyubimov <dl...@apache.org>
Committed: Mon Jul 7 12:34:12 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  2 +
 .../apache/mahout/math/drm/RLikeDrmOps.scala    |  2 +
 .../mahout/math/drm/logical/OpCbind.scala       | 42 +++++++++
 .../math/scalabindings/MatrixOpsSuite.scala     | 51 ++++++++++-
 .../mahout/sparkbindings/SparkEngine.scala      |  1 +
 .../mahout/sparkbindings/blas/CbindAB.scala     | 95 ++++++++++++++++++++
 .../sparkbindings/drm/RLikeDrmOpsSuite.scala    | 70 ++++++++++-----
 7 files changed, 237 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 23ad49e..7ea84c5 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 1.0 - unreleased
 
+  MAHOUT-1583: cbind() operator for Scala DRMs
+
   MAHOUT-1541, MAHOUT-1568, MAHOUT-1569: Created text-delimited file I/O traits and classes on spark, a MahoutDriver for a CLI and a ItemSimilairtyDriver using the CLI
 
   MAHOUT-1573: More explicit parallelism adjustments in math-scala DRM apis; elements of automatic parallelism management (dlyubimov)

http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
index 7ac5577..d7027f2 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -60,6 +60,8 @@ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
   def %*%(that: Vector): DrmLike[K] = :%*%(that)
 
   def t: DrmLike[Int] = OpAtAnyKey(A = drm)
+
+  def cbind(that: DrmLike[K]) = OpCbind(A = this.drm, B = that)
 }
 
 class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {

http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala
new file mode 100644
index 0000000..1425264
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+import scala.util.Random
+
+/** cbind() logical operator */
+case class OpCbind[K: ClassTag](
+    override var A: DrmLike[K],
+    override var B: DrmLike[K]
+    ) extends AbstractBinaryOp[K, K, K] {
+
+  assert(A.nrow == B.nrow, "arguments must have same number of rows")
+
+  override protected[mahout] lazy val partitioningTag: Long =
+    if (A.partitioningTag == B.partitioningTag) A.partitioningTag
+    else Random.nextLong()
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = A.ncol + B.ncol
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
index 8374a9b..c6fccbc 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
@@ -21,19 +21,20 @@ import org.scalatest.{Matchers, FunSuite}
 import MatrixOps._
 import scala._
 import org.apache.mahout.test.MahoutSuite
+import org.apache.mahout.math.{RandomAccessSparseVector, SequentialAccessSparseVector, Matrices}
+import org.apache.mahout.common.RandomUtils
 
 
 class MatrixOpsSuite extends FunSuite with MahoutSuite {
 
-
   test("equivalence") {
     val a = dense((1, 2, 3), (3, 4, 5))
     val b = dense((1, 2, 3), (3, 4, 5))
     val c = dense((1, 4, 3), (3, 4, 5))
     assert(a === b)
     assert(a !== c)
-
   }
+
   test("elementwise plus, minus") {
     val a = dense((1, 2, 3), (3, 4, 5))
     val b = dense((1, 1, 2), (2, 1, 1))
@@ -42,7 +43,6 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite {
     assert(c(0, 0) == 2)
     assert(c(1, 2) == 6)
     println(c.toString)
-
   }
 
   test("matrix, vector slicing") {
@@ -132,7 +132,52 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite {
     )
 
     a.numNonZeroElementsPerColumn() should equal(dvec(3,2,4))
+  }
+
+  test("Vector Assignment performance") {
+
+    val n = 1000
+    val k = (n * 0.1).toInt
+    val nIters = 10000
+
+    val rnd = RandomUtils.getRandom
+
+    val src = new SequentialAccessSparseVector(n)
+    for (i <- 0 until k) src(rnd.nextInt(n)) = rnd.nextDouble()
+
+    val times = (0 until 50).map { i =>
+      val ms = System.currentTimeMillis()
+      var j = 0
+      while (j < nIters) {
+        new SequentialAccessSparseVector(n) := src
+        j += 1
+      }
+      System.currentTimeMillis() - ms
+    }
+
+        .tail
+
+    val avgTime = times.sum.toDouble / times.size
+
+    printf("Average assignment seqSparse2seqSparse time: %.3f ms\n", avgTime)
+
+    val times2 = (0 until 50).map { i =>
+      val ms = System.currentTimeMillis()
+      var j = 0
+      while (j < nIters) {
+        new SequentialAccessSparseVector(n) := (new RandomAccessSparseVector(n) := src)
+        j += 1
+      }
+      System.currentTimeMillis() - ms
+    }
+
+        .tail
+
+    val avgTime2 = times2.sum.toDouble / times2.size
+
+    printf("Average assignment seqSparse2seqSparse via Random Access Sparse time: %.3f ms\n", avgTime2)
 
   }
 
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 996eb1b..dbdc934 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -252,6 +252,7 @@ object SparkEngine extends DistributedEngine {
       case op@OpAx(a, x) => Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA))
       case op@OpAtx(a, x) => Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA))
       case op@OpAewB(a, b, opId) => AewB.a_ew_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpCbind(a, b) => CbindAB.cbindAB_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
       case op@OpAewScalar(a, s, _) => AewB.a_ew_scalar(op, tr2phys(a)(op.classTagA), s)
       case op@OpRowRange(a, _) => Slicing.rowRange(op, tr2phys(a)(op.classTagA))
       case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA))

http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
new file mode 100644
index 0000000..ea10ccb
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.blas
+
+import org.apache.log4j.Logger
+import scala.reflect.ClassTag
+import org.apache.mahout.sparkbindings.drm.DrmRddInput
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm.logical.OpCbind
+import org.apache.spark.SparkContext._
+
+/** Physical cbind */
+object CbindAB {
+
+  private val log = Logger.getLogger(CbindAB.getClass)
+
+  def cbindAB_nograph[K: ClassTag](op: OpCbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
+
+    val a = srcA.toDrmRdd()
+    val b = srcB.toDrmRdd()
+    val n = op.ncol
+    val n1 = op.A.ncol
+    val n2 = n - n1
+
+    // Check if A and B are identically partitioned AND keyed. if they are, then just perform zip
+    // instead of join, and apply the op map-side. Otherwise, perform join and apply the op
+    // reduce-side.
+    val rdd = if (op.isIdenticallyPartitioned(op.A)) {
+
+      log.debug("applying zipped cbind()")
+
+      a
+          .zip(b)
+          .map {
+        case ((keyA, vectorA), (keyB, vectorB)) =>
+          assert(keyA == keyB, "inputs are claimed identically partitioned, but they are not identically keyed")
+
+          val dense = vectorA.isDense && vectorB.isDense
+          val vec: Vector = if (dense) new DenseVector(n) else new SequentialAccessSparseVector(n)
+          vec(0 until n1) := vectorA
+          vec(n1 until n) := vectorB
+          keyA -> vec
+      }
+    } else {
+
+      log.debug("applying cbind as join")
+
+      a
+          .cogroup(b, numPartitions = a.partitions.size max b.partitions.size)
+          .map {
+        case (key, (vectorSeqA, vectorSeqB)) =>
+
+          // Generally, after co-grouping, we should not accept anything but 1 to 1 in the left and
+          // the right groups. However let's be flexible here, if it does happen, recombine them into 1.
+
+          val vectorA = if (vectorSeqA.size <= 1)
+            vectorSeqA.headOption.getOrElse(new RandomAccessSparseVector(n1))
+          else
+            (vectorSeqA.head.like() /: vectorSeqA)(_ += _)
+
+          val vectorB = if ( vectorSeqB.size <= 1)
+            vectorSeqB.headOption.getOrElse(new RandomAccessSparseVector(n2))
+          else
+            (vectorSeqB.head.like() /: vectorSeqB)(_ += _)
+
+          val dense = vectorA.isDense && vectorB.isDense
+          val vec:Vector = if (dense) new DenseVector(n) else new SequentialAccessSparseVector(n)
+          vec(0 until n1) := vectorA
+          vec(n1 until n) := vectorB
+          key -> vec
+      }
+    }
+
+    new DrmRddInput(rowWiseSrc = Some(op.ncol -> rdd))
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
index 3cd49cd..50f8978 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
@@ -94,12 +94,12 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
     B.colSums()
 
 
-    val x = drmBroadcast(dvec(0,0))
-    val x2 = drmBroadcast(dvec(0,0))
+    val x = drmBroadcast(dvec(0, 0))
+    val x2 = drmBroadcast(dvec(0, 0))
     // Distributed operation
     val C = (B.t %*% A.t).t.mapBlock() {
       case (keys, block) =>
-        for (row <- 0 until block.nrow) block(row,::) += x.value + x2
+        for (row <- 0 until block.nrow) block(row, ::) += x.value + x2
         keys -> block
     }
 
@@ -133,7 +133,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
     val B = drmParallelize(inCoreB, numPartitions = 2)
         // Re-key B into DrmLike[String] instead of [Int]
         .mapBlock()({
-      case (keys,block) => keys.map(_.toString) -> block
+      case (keys, block) => keys.map(_.toString) -> block
     })
 
     val C = A %*% B
@@ -146,7 +146,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
 
   test("C = At %*% B , join") {
 
-    val inCoreA = dense((1, 2), (3, 4),(-3, -5))
+    val inCoreA = dense((1, 2), (3, 4), (-3, -5))
     val inCoreB = dense((3, 5), (4, 6), (0, 1))
 
     val A = drmParallelize(inCoreA, numPartitions = 2)
@@ -154,7 +154,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
 
     val C = A.t %*% B
 
-    SparkEngine.optimizerRewrite(C) should equal (OpAtB[Int](A,B))
+    SparkEngine.optimizerRewrite(C) should equal(OpAtB[Int](A, B))
 
     val inCoreC = C.collect
     val inCoreControlC = inCoreA.t %*% inCoreB
@@ -165,7 +165,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
 
   test("C = At %*% B , join, String-keyed") {
 
-    val inCoreA = dense((1, 2), (3, 4),(-3, -5))
+    val inCoreA = dense((1, 2), (3, 4), (-3, -5))
     val inCoreB = dense((3, 5), (4, 6), (0, 1))
 
     val A = drmParallelize(inCoreA, numPartitions = 2)
@@ -180,7 +180,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
 
     val C = A.t %*% B
 
-    SparkEngine.optimizerRewrite(C) should equal (OpAtB[String](A,B))
+    SparkEngine.optimizerRewrite(C) should equal(OpAtB[String](A, B))
 
     val inCoreC = C.collect
     val inCoreControlC = inCoreA.t %*% inCoreB
@@ -191,18 +191,18 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
 
   test("C = At %*% B , zippable, String-keyed") {
 
-    val inCoreA = dense((1, 2), (3, 4),(-3, -5))
+    val inCoreA = dense((1, 2), (3, 4), (-3, -5))
 
     val A = drmParallelize(inCoreA, numPartitions = 2)
         .mapBlock()({
       case (keys, block) => keys.map(_.toString) -> block
     })
 
-    val B =  A + 1.0
+    val B = A + 1.0
 
     val C = A.t %*% B
 
-    SparkEngine.optimizerRewrite(C) should equal (OpAtB[String](A,B))
+    SparkEngine.optimizerRewrite(C) should equal(OpAtB[String](A, B))
 
     val inCoreC = C.collect
     val inCoreControlC = inCoreA.t %*% (inCoreA + 1.0)
@@ -319,12 +319,12 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
 
     val A = drmParallelize(inCoreA, numPartitions = 2)
 
-    printf("A.nrow=%d.\n",A.rdd.count())
+    printf("A.nrow=%d.\n", A.rdd.count())
 
     // Create B which would be identically partitioned to A. mapBlock() by default will do the trick.
     val B = A.mapBlock() {
       case (keys, block) =>
-        val bBlock = block.like() := ((r,c,v) => util.Random.nextDouble())
+        val bBlock = block.like() := ((r, c, v) => util.Random.nextDouble())
         keys -> bBlock
     }
         // Prevent repeated computation non-determinism
@@ -394,7 +394,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
     val inCoreB = dense((3, 5), (4, 6))
 
     val B = drmParallelize(inCoreB, numPartitions = 2)
-//    val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY_SER)
+    //    val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY_SER)
     val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY)
 
     val C = A + B
@@ -403,17 +403,17 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
     val inCoreD = (A + B).collect
 
     // Actual
-    val inCoreCControl = inCoreA +  inCoreB * 2.0
+    val inCoreCControl = inCoreA + inCoreB * 2.0
 
     (inCoreC - inCoreCControl).norm should be < 1E-10
     (inCoreD - inCoreCControl).norm should be < 1E-10
   }
 
-  test ("general side")  {
+  test("general side") {
     val sc = implicitly[DistributedContext]
-    val k1 = sc.parallelize(Seq(ArrayBuffer(0,1,2,3)))
-//      .persist(StorageLevel.MEMORY_ONLY)   // -- this will demonstrate immutability side effect!
-      .persist(StorageLevel.MEMORY_ONLY_SER)
+    val k1 = sc.parallelize(Seq(ArrayBuffer(0, 1, 2, 3)))
+        //      .persist(StorageLevel.MEMORY_ONLY)   // -- this will demonstrate immutability side effect!
+        .persist(StorageLevel.MEMORY_ONLY_SER)
 
     println(k1.map(_ += 4).collect.head)
     println(k1.map(_ += 4).collect.head)
@@ -444,7 +444,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
 
     val drmA = drmParallelize(inCoreA, numPartitions = 2)
 
-    SparkEngine.optimizerRewrite(drmA.t %*% x) should equal (OpAtx(drmA, x))
+    SparkEngine.optimizerRewrite(drmA.t %*% x) should equal(OpAtx(drmA, x))
 
     val atx = (drmA.t %*% x).collect(::, 0)
 
@@ -459,8 +459,8 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
     )
     val drmA = drmParallelize(inCoreA, numPartitions = 2)
 
-    drmA.colSums() should equal (inCoreA.colSums())
-    drmA.colMeans() should equal (inCoreA.colMeans())
+    drmA.colSums() should equal(inCoreA.colSums())
+    drmA.colMeans() should equal(inCoreA.colMeans())
   }
 
   test("numNonZeroElementsPerColumn") {
@@ -472,7 +472,31 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext {
     )
     val drmA = drmParallelize(inCoreA, numPartitions = 2)
 
-    drmA.numNonZeroElementsPerColumn() should equal (inCoreA.numNonZeroElementsPerColumn())
+    drmA.numNonZeroElementsPerColumn() should equal(inCoreA.numNonZeroElementsPerColumn())
+  }
+
+  test("C = A cbind B, cogroup") {
+
+    val inCoreA = dense((1, 2), (3, 4))
+    val inCoreB = dense((3, 5), (4, 6))
+    val controlC = dense((1, 2, 3, 5), (3, 4, 4, 6))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
+    val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint()
+
+    (A.cbind(B) -: controlC).norm should be < 1e-10
+
+  }
+
+  test("C = A cbind B, zip") {
+
+    val inCoreA = dense((1, 2), (3, 4))
+    val controlC = dense((1, 2, 2, 3), (3, 4, 4, 5))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
+
+    (A.cbind(A + 1.0) -: controlC).norm should be < 1e-10
+
   }
 
 }