You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2015/10/20 07:37:09 UTC

[26/32] mahout git commit: MAHOUT-1570: Flink: drmParallelizeEmpty and extra tests

MAHOUT-1570: Flink: drmParallelizeEmpty and extra tests


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/d13f4884
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/d13f4884
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/d13f4884

Branch: refs/heads/flink-binding
Commit: d13f48849ad5658cc03acea57f7c5d9d14bad137
Parents: fcc6cf1
Author: Alexey Grigorev <al...@gmail.com>
Authored: Tue Aug 25 15:38:50 2015 +0200
Committer: Alexey Grigorev <al...@gmail.com>
Committed: Fri Sep 25 17:45:53 2015 +0200

----------------------------------------------------------------------
 .../mahout/flinkbindings/FlinkEngine.scala      | 68 +++++++-----------
 .../mahout/flinkbindings/DrmLikeOpsSuite.scala  | 72 ++++++++++++++++++++
 2 files changed, 95 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/d13f4884/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index a21591d..35c6b76 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -19,8 +19,10 @@
 package org.apache.mahout.flinkbindings
 
 import java.util.Collection
+
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
+
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.java.tuple.Tuple2
@@ -29,57 +31,20 @@ import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.FileInputFormat
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapred.SequenceFileInputFormat
-import org.apache.mahout.flinkbindings.blas.FlinkOpAewB
-import org.apache.mahout.flinkbindings.blas.FlinkOpAewScalar
-import org.apache.mahout.flinkbindings.blas.FlinkOpAt
-import org.apache.mahout.flinkbindings.blas.FlinkOpAtB
-import org.apache.mahout.flinkbindings.blas.FlinkOpAx
-import org.apache.mahout.flinkbindings.blas.FlinkOpCBind
-import org.apache.mahout.flinkbindings.blas.FlinkOpMapBlock
-import org.apache.mahout.flinkbindings.blas.FlinkOpRBind
-import org.apache.mahout.flinkbindings.blas.FlinkOpRowRange
-import org.apache.mahout.flinkbindings.blas.FlinkOpTimesRightMatrix
 import org.apache.mahout.flinkbindings._
-import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
+import org.apache.mahout.flinkbindings.blas._
+import org.apache.mahout.flinkbindings.drm._
 import org.apache.mahout.flinkbindings.io.HDFSUtil
 import org.apache.mahout.flinkbindings.io.Hadoop1HDFSUtil
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.VectorWritable
-import org.apache.mahout.math.drm.BCast
-import org.apache.mahout.math.drm.BlockMapFunc2
-import org.apache.mahout.math.drm.BlockReduceFunc
-import org.apache.mahout.math.drm.CacheHint
-import org.apache.mahout.math.drm.CheckpointedDrm
-import org.apache.mahout.math.drm.DistributedContext
-import org.apache.mahout.math.drm.DistributedEngine
-import org.apache.mahout.math.drm.DrmLike
-import org.apache.mahout.math.drm.DrmTuple
-import org.apache.mahout.math.drm.drm2drmCpOps
-import org.apache.mahout.math.drm.logical.OpABt
-import org.apache.mahout.math.drm.logical.OpAewB
-import org.apache.mahout.math.drm.logical.OpAewScalar
-import org.apache.mahout.math.drm.logical.OpAewUnaryFunc
-import org.apache.mahout.math.drm.logical.OpAt
-import org.apache.mahout.math.drm.logical.OpAtA
-import org.apache.mahout.math.drm.logical.OpAtB
-import org.apache.mahout.math.drm.logical.OpAtx
-import org.apache.mahout.math.drm.logical.OpAx
-import org.apache.mahout.math.drm.logical.OpCbind
-import org.apache.mahout.math.drm.logical.OpMapBlock
-import org.apache.mahout.math.drm.logical.OpRbind
-import org.apache.mahout.math.drm.logical.OpRowRange
-import org.apache.mahout.math.drm.logical.OpTimesRightMatrix
+import org.apache.mahout.math._
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.logical._
 import org.apache.mahout.math.indexeddataset.BiDictionary
 import org.apache.mahout.math.indexeddataset.IndexedDataset
 import org.apache.mahout.math.indexeddataset.Schema
 import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.flinkbindings.blas.FlinkOpAtA
-import org.apache.mahout.math.drm.logical.OpCbindScalar
-import org.apache.mahout.math.drm.logical.OpAewUnaryFuncFusion
+
 
 object FlinkEngine extends DistributedEngine {
 
@@ -259,11 +224,24 @@ object FlinkEngine extends DistributedEngine {
 
   /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
   override def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
-                                          (implicit sc: DistributedContext): CheckpointedDrm[String] = ???
+                                          (implicit dc: DistributedContext): CheckpointedDrm[String] = ???
 
   /** This creates an empty DRM with specified number of partitions and cardinality. */
   override def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
-                                  (implicit sc: DistributedContext): CheckpointedDrm[Int] = ???
+                                  (implicit dc: DistributedContext): CheckpointedDrm[Int] = {
+    val nonParallelResult = (0 to numPartitions).flatMap { part => 
+      val partNRow = (nrow - 1) / numPartitions + 1
+      val partStart = partNRow * part
+      val partEnd = Math.min(partStart + partNRow, nrow)
+
+      for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
+    }
+
+    val dataSetType = TypeExtractor.getForObject(nonParallelResult.head)
+    val result = dc.env.fromCollection(nonParallelResult.asJava, dataSetType)
+
+    new CheckpointedFlinkDrm(ds=result, _nrow=nrow, _ncol=ncol)
+  }
 
   /** Creates empty DRM with non-trivial height */
   override def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)

http://git-wip-us.apache.org/repos/asf/mahout/blob/d13f4884/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
new file mode 100644
index 0000000..4c75afa
--- /dev/null
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mahout.flinkbindings
+
+import org.apache.mahout.flinkbindings._
+import org.apache.mahout.math._
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.RLikeDrmOps._
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.junit.runner.RunWith
+import org.scalatest.FunSuite
+import org.scalatest.junit.JUnitRunner
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+
+@RunWith(classOf[JUnitRunner])
+class DrmLikeOpsSuite extends FunSuite with DistributedFlinkSuite {
+
+  test("norm") {
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
+    val A = drmParallelize(m = inCoreA, numPartitions = 2)
+
+    (inCoreA.norm - A.norm) should be < 1e-6
+  }
+
+  test("colSums") {
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
+    val A = drmParallelize(m = inCoreA, numPartitions = 2)
+    
+    (inCoreA.colSums - A.colSums).norm(2) should be < 1e-6
+  }
+
+  test("rowSums") {
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
+    val A = drmParallelize(m = inCoreA, numPartitions = 2)
+    
+    (inCoreA.rowSums - A.rowSums).norm(2) should be < 1e-6
+  }
+
+  test("rowMeans") {
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
+    val A = drmParallelize(m = inCoreA, numPartitions = 2)
+    
+    (inCoreA.rowMeans - A.rowMeans).norm(2) should be < 1e-6
+  }
+
+  test("drmParallelizeEmpty") {
+    val emptyDrm = drmParallelizeEmpty(nrow = 2, ncol = 2, numPartitions = 2)
+    val expected = dense((0, 0), (0, 0))
+
+    (emptyDrm.collect - expected).norm should be < 1e-6
+  }
+
+}
\ No newline at end of file