You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ap...@apache.org on 2016/03/17 23:33:58 UTC

mahout git commit: MAHOUT-1815: dsqDist(X, Y) and dsqDist(X) failing in flink tests. closes apache/mahout#197

Repository: mahout
Updated Branches:
  refs/heads/flink-binding 7b862781c -> 2e8790d5c


MAHOUT-1815: dsqDist(X,Y) and dsqDist(X) failing in flink tests. closes apache/mahout#197


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/2e8790d5
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/2e8790d5
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/2e8790d5

Branch: refs/heads/flink-binding
Commit: 2e8790d5c6e0f337abe55906e052d7236f046207
Parents: 7b86278
Author: Andrew Palumbo <ap...@apache.org>
Authored: Thu Mar 17 18:33:10 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Thu Mar 17 18:33:10 2016 -0400

----------------------------------------------------------------------
 .../scala/org/apache/mahout/flinkbindings/FlinkEngine.scala     | 4 ++--
 .../scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala | 5 ++++-
 2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/2e8790d5/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 958b6cf..f1e06d0 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -152,10 +152,10 @@ object FlinkEngine extends DistributedEngine {
         // express ABt via AtB: let C=At and D=Bt, and calculate CtD
         // TODO: create specific implementation of ABt, see MAHOUT-1750
         val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts!
-      val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]]))
+        val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]]))
         val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, _ncol = opAt.ncol)
         val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts!
-      val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]]))
+        val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]]))
         val d = new CheckpointedFlinkDrm(bt.asRowWise.ds, _nrow = opBt.nrow, _ncol = opBt.ncol)
         FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d)).asInstanceOf[FlinkDrm[K]]
       case op@OpAtA(a) if op.keyClassTag == ClassTag.Int ⇒ FlinkOpAtA.at_a(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]

http://git-wip-us.apache.org/repos/asf/mahout/blob/2e8790d5/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
index 6a081ba..ac1e73a 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
@@ -81,12 +81,15 @@ object FlinkOpAtB {
         val (idx, _) = it.head
 
         val block = it.map { t => t._2 }.reduce { (m1, m2) => m1 + m2 }
+        
+        val blockStart = idx * blockHeight
+        val keys = Array.tabulate(block.nrow)(blockStart + _)
 
-        val keys = idx.until(block.nrow).toArray[Int]
         out.collect(keys -> block)
       }
     })
 
+
     new BlockifiedFlinkDrm[Int](res, ncol)
   }