You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/03/15 20:04:52 UTC

mahout git commit: MAHOUT-1812: Implement drmParallelizeEmptyLong(...) in flink Bindings

Repository: mahout
Updated Branches:
  refs/heads/flink-binding e75c5f606 -> 93ebed620


MAHOUT-1812: Implement drmParallelizeEmptyLong(...) in flink Bindings


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/93ebed62
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/93ebed62
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/93ebed62

Branch: refs/heads/flink-binding
Commit: 93ebed620ae7d7da1a6ac2dbe944a8fd517de1c0
Parents: e75c5f6
Author: smarthi <sm...@apache.org>
Authored: Tue Mar 15 15:03:35 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Tue Mar 15 15:04:21 2016 -0400

----------------------------------------------------------------------
 .../mahout/flinkbindings/FlinkEngine.scala      | 95 +++++++++++---------
 .../standard/NaiveBayesTestSuite.scala          | 18 ++++
 2 files changed, 71 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/93ebed62/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 3988d51..8db063b 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -43,11 +43,11 @@ object FlinkEngine extends DistributedEngine {
   var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil
 
   /**
-   * Load DRM from hdfs (as in Mahout DRM format).
-   * 
-   * @param path The DFS path to load from
-   * @param parMin Minimum parallelism after load (equivalent to #par(min=...)).
-   */
+    * Load DRM from hdfs (as in Mahout DRM format).
+    *
+    * @param path   The DFS path to load from
+    * @param parMin Minimum parallelism after load (equivalent to #par(min=...)).
+    */
   override def drmDfsRead(path: String, parMin: Int = 1)
                          (implicit dc: DistributedContext): CheckpointedDrm[_] = {
 
@@ -103,7 +103,7 @@ object FlinkEngine extends DistributedEngine {
   override def indexedDatasetDFSRead(src: String, schema: Schema, existingRowIDs: Option[BiDictionary])
                                     (implicit sc: DistributedContext): IndexedDataset = ???
 
-  override def indexedDatasetDFSReadElements(src: String,schema: Schema, existingRowIDs: Option[BiDictionary])
+  override def indexedDatasetDFSReadElements(src: String, schema: Schema, existingRowIDs: Option[BiDictionary])
                                             (implicit sc: DistributedContext): IndexedDataset = ???
 
 
@@ -115,16 +115,16 @@ object FlinkEngine extends DistributedEngine {
     */
   override def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = super.optimizerRewrite(action)
 
-  /** 
-   * Translates logical plan into Flink execution plan. 
-   **/
+  /**
+    * Translates logical plan into Flink execution plan.
+    **/
   override def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = {
     // Flink-specific Physical Plan translation.
 
     implicit val typeInformation = generateTypeInformation[K]
     val drm = flinkTranslate(plan)
     val newcp = new CheckpointedFlinkDrm(ds = drm.asRowWise.ds, _nrow = plan.nrow, _ncol = plan.ncol)
-   // newcp.ds.getExecutionEnvironment.createProgramPlan("plan")
+    // newcp.ds.getExecutionEnvironment.createProgramPlan("plan")
     newcp.cache()
   }
 
@@ -152,10 +152,10 @@ object FlinkEngine extends DistributedEngine {
         // express ABt via AtB: let C=At and D=Bt, and calculate CtD
         // TODO: create specific implementation of ABt, see MAHOUT-1750
         val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts!
-        val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]]))
+      val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]]))
         val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, _ncol = opAt.ncol)
         val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts!
-        val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]]))
+      val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]]))
         val d = new CheckpointedFlinkDrm(bt.asRowWise.ds, _nrow = opBt.nrow, _ncol = opBt.ncol)
         FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d)).asInstanceOf[FlinkDrm[K]]
       case op@OpAtA(a) if op.keyClassTag == ClassTag.Int ⇒ FlinkOpAtA.at_a(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
@@ -190,11 +190,11 @@ object FlinkEngine extends DistributedEngine {
     }
   }
 
-  /** 
-   * returns a vector that contains a column-wise sum from DRM 
-   */
+  /**
+    * returns a vector that contains a column-wise sum from DRM
+    */
   override def colSums[K](drm: CheckpointedDrm[K]): Vector = {
-    implicit val kTag: ClassTag[K] =  drm.keyClassTag
+    implicit val kTag: ClassTag[K] = drm.keyClassTag
     implicit val typeInformation = generateTypeInformation[K]
 
 
@@ -208,7 +208,7 @@ object FlinkEngine extends DistributedEngine {
 
   /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */
   override def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector = {
-    implicit val kTag: ClassTag[K] =  drm.keyClassTag
+    implicit val kTag: ClassTag[K] = drm.keyClassTag
     implicit val typeInformation = generateTypeInformation[K]
 
 
@@ -228,18 +228,18 @@ object FlinkEngine extends DistributedEngine {
     list.head
   }
 
-  /** 
-   * returns a vector that contains a column-wise mean from DRM 
-   */
+  /**
+    * returns a vector that contains a column-wise mean from DRM
+    */
   override def colMeans[K](drm: CheckpointedDrm[K]): Vector = {
     drm.colSums() / drm.nrow
   }
 
   /**
-   * Calculates the element-wise squared norm of a matrix
-   */
+    * Calculates the element-wise squared norm of a matrix
+    */
   override def norm[K](drm: CheckpointedDrm[K]): Double = {
-    implicit val kTag: ClassTag[K] =  drm.keyClassTag
+    implicit val kTag: ClassTag[K] = drm.keyClassTag
     implicit val typeInformation = generateTypeInformation[K]
 
     val sumOfSquares = drm.ds.map {
@@ -248,13 +248,13 @@ object FlinkEngine extends DistributedEngine {
       }
     }.reduce(_ + _)
 
-//    val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] {
-//      def map(tuple: (K, Vector)): Double = tuple match {
-//        case (idx, vec) => vec dot vec
-//      }
-//    }).reduce(new ReduceFunction[Double] {
-//      def reduce(v1: Double, v2: Double) = v1 + v2
-//    })
+    //    val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] {
+    //      def map(tuple: (K, Vector)): Double = tuple match {
+    //        case (idx, vec) => vec dot vec
+    //      }
+    //    }).reduce(new ReduceFunction[Double] {
+    //      def reduce(v1: Double, v2: Double) = v1 + v2
+    //    })
 
     val list = sumOfSquares.collect
 
@@ -263,12 +263,12 @@ object FlinkEngine extends DistributedEngine {
   }
 
   /** Broadcast support */
-  override def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] = 
+  override def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] =
     FlinkByteBCast.wrap(v)
 
 
   /** Broadcast support */
-  override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = 
+  override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] =
     FlinkByteBCast.wrap(m)
 
 
@@ -278,19 +278,19 @@ object FlinkEngine extends DistributedEngine {
 
     val parallelDrm = parallelize(m, numPartitions)
 
-    new CheckpointedFlinkDrm(ds=parallelDrm, _nrow=m.numRows(), _ncol=m.numCols())
+    new CheckpointedFlinkDrm(ds = parallelDrm, _nrow = m.numRows(), _ncol = m.numCols())
   }
 
 
   private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
-      (implicit dc: DistributedContext): DrmDataSet[Int] = {
-    val rows = (0 until m.nrow).map(i => (i, m(i, ::)))//.toSeq.sortWith((ii, jj) => ii._1 < jj._1)
+                                        (implicit dc: DistributedContext): DrmDataSet[Int] = {
+    val rows = (0 until m.nrow).map(i => (i, m(i, ::))) //.toSeq.sortWith((ii, jj) => ii._1 < jj._1)
     val dataSetType = TypeExtractor.getForObject(rows.head)
     //TODO: Make Sure that this is the correct partitioning scheme
     dc.env.fromCollection(rows)
-            .partitionByRange(0)
-            .setParallelism(parallelismDegree)
-            .rebalance()
+      .partitionByRange(0)
+      .setParallelism(parallelismDegree)
+      .rebalance()
   }
 
   /** Parallelize in-core matrix as flink distributed matrix, using row labels as a data set keys. */
@@ -307,7 +307,7 @@ object FlinkEngine extends DistributedEngine {
   /** This creates an empty DRM with specified number of partitions and cardinality. */
   override def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
                                   (implicit dc: DistributedContext): CheckpointedDrm[Int] = {
-    val nonParallelResult = (0 to numPartitions).flatMap { part => 
+    val nonParallelResult = (0 to numPartitions).flatMap { part ⇒
       val partNRow = (nrow - 1) / numPartitions + 1
       val partStart = partNRow * part
       val partEnd = Math.min(partStart + partNRow, nrow)
@@ -315,13 +315,24 @@ object FlinkEngine extends DistributedEngine {
       for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
     }
     val result = dc.env.fromCollection(nonParallelResult)
-    new CheckpointedFlinkDrm[Int](ds=result, _nrow=nrow, _ncol=ncol)
+    new CheckpointedFlinkDrm[Int](ds = result, _nrow = nrow, _ncol = ncol)
   }
 
   /** Creates empty DRM with non-trivial height */
   override def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
-                                      (implicit sc: DistributedContext): CheckpointedDrm[Long] = ???
-  
+                                      (implicit dc: DistributedContext): CheckpointedDrm[Long] = {
+
+    val nonParallelResult = (0 to numPartitions).flatMap { part ⇒
+        val partNRow = (nrow - 1) / numPartitions + 1
+        val partStart = partNRow * part
+        val partEnd = Math.min(partStart + partNRow, nrow)
+
+      for (i ← partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
+    }
+
+    val result = dc.env.fromCollection(nonParallelResult)
+    new CheckpointedFlinkDrm[Long](ds = result, nrow, ncol, cacheHint = CacheHint.NONE)
+  }
 
   /**
    * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys

http://git-wip-us.apache.org/repos/asf/mahout/blob/93ebed62/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
index d6feed9..0f1d6bc 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
@@ -1,3 +1,21 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements. See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership. The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
 package org.apache.mahout.flinkbindings.standard
 
 import org.apache.mahout.classifier.naivebayes.NBTestBase