You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/04/11 10:09:43 UTC

[16/50] [abbrv] mahout git commit: Comments, cleanup

Comments, cleanup


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/e420485e
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/e420485e
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/e420485e

Branch: refs/heads/master
Commit: e420485e0b82908919985a892f85c7d6dff9d24b
Parents: f0ee522
Author: Andrew Palumbo <ap...@apache.org>
Authored: Sat Mar 26 23:49:14 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Sat Mar 26 23:49:14 2016 -0400

----------------------------------------------------------------------
 .../org/apache/mahout/flinkbindings/FlinkEngine.scala |  2 +-
 .../flinkbindings/drm/CheckpointedFlinkDrm.scala      | 14 ++++++++++----
 2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/e420485e/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 9adec7e..dd28e9d 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -267,7 +267,7 @@ object FlinkEngine extends DistributedEngine {
 
   private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
                                         (implicit dc: DistributedContext): DrmDataSet[Int] = {
-    val rows = (0 until m.nrow).map(i => (i, m(i, ::))) //.toSeq.sortWith((ii, jj) => ii._1 < jj._1)
+    val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
     val dataSetType = TypeExtractor.getForObject(rows.head)
     //TODO: Make Sure that this is the correct partitioning scheme
     dc.env.fromCollection(rows)

http://git-wip-us.apache.org/repos/asf/mahout/blob/e420485e/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 9b3a9f5..8424856 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -18,7 +18,7 @@
  */
 package org.apache.mahout.flinkbindings.drm
 
-import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction, RichFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.{TypeSerializerInputFormat, TypeSerializerOutputFormat}
 import org.apache.flink.api.scala._
@@ -60,6 +60,8 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
   // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}}
   val mahoutHome = getMahoutHome()
 
+  // this is extra I/O for each cache call.  this needs to be moved somewhere where it is called
+  // only once.  Possibly FlinkDistributedEngine.
   GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml")
 
   val conf = GlobalConfiguration.getConfiguration()
@@ -92,16 +94,20 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
 
   override val keyClassTag: ClassTag[K] = classTag[K]
 
+  /** Note as of Flink 1.0.0, no direct flink caching exists so we save
+    * the dataset to the filesystem and read it back when cache is called */
   def cache() = {
-//    implicit val typeInformation = createTypeInformation[(K,Vector)]
     if (!isCached) {
       cacheFileName = persistanceRootDir + System.nanoTime().toString
       parallelismDeg = ds.getParallelism
       isCached = true
       persist(ds, cacheFileName)
     }
-
     val _ds = readPersistedDataSet(cacheFileName, ds)
+
+    // We may want to look more closely at this:
+    // since we've cached a drm, triggering a computation
+    // it may not make sense to keep the same parallelism degree
     if (!(parallelismDeg == _ds.getParallelism)) {
       _ds.setParallelism(parallelismDeg).rebalance()
     }
@@ -141,7 +147,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
     * @param path File path to read dataset from
     * @param ds persisted ds to retrieve type information and environment forom
     * @tparam T key Type of the [[DataSet]] elements
-    * @return [[DataSet]] reading the just written file
+    * @return [[DataSet]] the persisted dataset
     */
   def readPersistedDataSet[T: ClassTag : TypeInformation]
        (path: String, ds: DataSet[T]): DataSet[T] = {