You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2015/10/20 07:37:09 UTC
[26/32] mahout git commit: MAHOUT-1570: Flink: drmParallelizeEmpty
and extra tests
MAHOUT-1570: Flink: drmParallelizeEmpty and extra tests
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/d13f4884
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/d13f4884
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/d13f4884
Branch: refs/heads/flink-binding
Commit: d13f48849ad5658cc03acea57f7c5d9d14bad137
Parents: fcc6cf1
Author: Alexey Grigorev <al...@gmail.com>
Authored: Tue Aug 25 15:38:50 2015 +0200
Committer: Alexey Grigorev <al...@gmail.com>
Committed: Fri Sep 25 17:45:53 2015 +0200
----------------------------------------------------------------------
.../mahout/flinkbindings/FlinkEngine.scala | 68 +++++++-----------
.../mahout/flinkbindings/DrmLikeOpsSuite.scala | 72 ++++++++++++++++++++
2 files changed, 95 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/d13f4884/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index a21591d..35c6b76 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -19,8 +19,10 @@
package org.apache.mahout.flinkbindings
import java.util.Collection
+
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
+
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.java.tuple.Tuple2
@@ -29,57 +31,20 @@ import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.FileInputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.SequenceFileInputFormat
-import org.apache.mahout.flinkbindings.blas.FlinkOpAewB
-import org.apache.mahout.flinkbindings.blas.FlinkOpAewScalar
-import org.apache.mahout.flinkbindings.blas.FlinkOpAt
-import org.apache.mahout.flinkbindings.blas.FlinkOpAtB
-import org.apache.mahout.flinkbindings.blas.FlinkOpAx
-import org.apache.mahout.flinkbindings.blas.FlinkOpCBind
-import org.apache.mahout.flinkbindings.blas.FlinkOpMapBlock
-import org.apache.mahout.flinkbindings.blas.FlinkOpRBind
-import org.apache.mahout.flinkbindings.blas.FlinkOpRowRange
-import org.apache.mahout.flinkbindings.blas.FlinkOpTimesRightMatrix
import org.apache.mahout.flinkbindings._
-import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
+import org.apache.mahout.flinkbindings.blas._
+import org.apache.mahout.flinkbindings.drm._
import org.apache.mahout.flinkbindings.io.HDFSUtil
import org.apache.mahout.flinkbindings.io.Hadoop1HDFSUtil
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.VectorWritable
-import org.apache.mahout.math.drm.BCast
-import org.apache.mahout.math.drm.BlockMapFunc2
-import org.apache.mahout.math.drm.BlockReduceFunc
-import org.apache.mahout.math.drm.CacheHint
-import org.apache.mahout.math.drm.CheckpointedDrm
-import org.apache.mahout.math.drm.DistributedContext
-import org.apache.mahout.math.drm.DistributedEngine
-import org.apache.mahout.math.drm.DrmLike
-import org.apache.mahout.math.drm.DrmTuple
-import org.apache.mahout.math.drm.drm2drmCpOps
-import org.apache.mahout.math.drm.logical.OpABt
-import org.apache.mahout.math.drm.logical.OpAewB
-import org.apache.mahout.math.drm.logical.OpAewScalar
-import org.apache.mahout.math.drm.logical.OpAewUnaryFunc
-import org.apache.mahout.math.drm.logical.OpAt
-import org.apache.mahout.math.drm.logical.OpAtA
-import org.apache.mahout.math.drm.logical.OpAtB
-import org.apache.mahout.math.drm.logical.OpAtx
-import org.apache.mahout.math.drm.logical.OpAx
-import org.apache.mahout.math.drm.logical.OpCbind
-import org.apache.mahout.math.drm.logical.OpMapBlock
-import org.apache.mahout.math.drm.logical.OpRbind
-import org.apache.mahout.math.drm.logical.OpRowRange
-import org.apache.mahout.math.drm.logical.OpTimesRightMatrix
+import org.apache.mahout.math._
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.logical._
import org.apache.mahout.math.indexeddataset.BiDictionary
import org.apache.mahout.math.indexeddataset.IndexedDataset
import org.apache.mahout.math.indexeddataset.Schema
import org.apache.mahout.math.scalabindings._
import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.flinkbindings.blas.FlinkOpAtA
-import org.apache.mahout.math.drm.logical.OpCbindScalar
-import org.apache.mahout.math.drm.logical.OpAewUnaryFuncFusion
+
object FlinkEngine extends DistributedEngine {
@@ -259,11 +224,24 @@ object FlinkEngine extends DistributedEngine {
/** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
override def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
- (implicit sc: DistributedContext): CheckpointedDrm[String] = ???
+ (implicit dc: DistributedContext): CheckpointedDrm[String] = ???
/** This creates an empty DRM with specified number of partitions and cardinality. */
override def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
- (implicit sc: DistributedContext): CheckpointedDrm[Int] = ???
+ (implicit dc: DistributedContext): CheckpointedDrm[Int] = {
+ val nonParallelResult = (0 to numPartitions).flatMap { part =>
+ val partNRow = (nrow - 1) / numPartitions + 1
+ val partStart = partNRow * part
+ val partEnd = Math.min(partStart + partNRow, nrow)
+
+ for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
+ }
+
+ val dataSetType = TypeExtractor.getForObject(nonParallelResult.head)
+ val result = dc.env.fromCollection(nonParallelResult.asJava, dataSetType)
+
+ new CheckpointedFlinkDrm(ds=result, _nrow=nrow, _ncol=ncol)
+ }
/** Creates empty DRM with non-trivial height */
override def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
http://git-wip-us.apache.org/repos/asf/mahout/blob/d13f4884/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
new file mode 100644
index 0000000..4c75afa
--- /dev/null
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mahout.flinkbindings
+
+import org.apache.mahout.flinkbindings._
+import org.apache.mahout.math._
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.RLikeDrmOps._
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.junit.runner.RunWith
+import org.scalatest.FunSuite
+import org.scalatest.junit.JUnitRunner
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+
+@RunWith(classOf[JUnitRunner])
+class DrmLikeOpsSuite extends FunSuite with DistributedFlinkSuite {
+
+ test("norm") {
+ val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
+ val A = drmParallelize(m = inCoreA, numPartitions = 2)
+
+ (inCoreA.norm - A.norm) should be < 1e-6
+ }
+
+ test("colSums") {
+ val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
+ val A = drmParallelize(m = inCoreA, numPartitions = 2)
+
+ (inCoreA.colSums - A.colSums).norm(2) should be < 1e-6
+ }
+
+ test("rowSums") {
+ val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
+ val A = drmParallelize(m = inCoreA, numPartitions = 2)
+
+ (inCoreA.rowSums - A.rowSums).norm(2) should be < 1e-6
+ }
+
+ test("rowMeans") {
+ val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
+ val A = drmParallelize(m = inCoreA, numPartitions = 2)
+
+ (inCoreA.rowMeans - A.rowMeans).norm(2) should be < 1e-6
+ }
+
+ test("drmParallelizeEmpty") {
+ val emptyDrm = drmParallelizeEmpty(nrow = 2, ncol = 2, numPartitions = 2)
+ val expected = dense((0, 0), (0, 0))
+
+ (emptyDrm.collect - expected).norm should be < 1e-6
+ }
+
+}
\ No newline at end of file