You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ap...@apache.org on 2016/03/27 21:36:40 UTC

[01/10] mahout git commit: Small change addressing DL's comment on apache/mahout#200, also a small fix

Repository: mahout
Updated Branches:
  refs/heads/flink-binding a77f1c13d -> 48a8a8208


Small change addressing DL's comment on apache/mahout#200, also a small fix


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/a1cf7cf5
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/a1cf7cf5
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/a1cf7cf5

Branch: refs/heads/flink-binding
Commit: a1cf7cf56e036ce12d616f8aea0af1e9dcdf2cb6
Parents: e3c8db5
Author: Andrew Palumbo <ap...@apache.org>
Authored: Fri Mar 25 16:56:20 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Fri Mar 25 16:56:20 2016 -0400

----------------------------------------------------------------------
 .../scala/org/apache/mahout/flinkbindings/FlinkEngine.scala    | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/a1cf7cf5/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index c355cae..af508b3 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -127,6 +127,7 @@ object FlinkEngine extends DistributedEngine {
     newcp.cache()
   }
 
+
   private def flinkTranslate[K](oper: DrmLike[K]): FlinkDrm[K] = {
     implicit val kTag = oper.keyClassTag
     implicit val typeInformation = generateTypeInformation[K]
@@ -358,7 +359,7 @@ object FlinkEngine extends DistributedEngine {
   }
 
   def generateTypeInformation[K: ClassTag]: TypeInformation[K] = {
-    val tag = implicitly[ClassTag[K]]
+    implicit val tag = ClassTag[K]
 
     generateTypeInformationFromTag(tag)
   }
@@ -374,7 +375,4 @@ object FlinkEngine extends DistributedEngine {
       throw new IllegalArgumentException(s"index type $tag is not supported")
     }
   }
-  object FlinkEngine {
-
-  }
 }
\ No newline at end of file


[09/10] mahout git commit: Comments, cleanup

Posted by ap...@apache.org.
Comments, cleanup


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/e420485e
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/e420485e
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/e420485e

Branch: refs/heads/flink-binding
Commit: e420485e0b82908919985a892f85c7d6dff9d24b
Parents: f0ee522
Author: Andrew Palumbo <ap...@apache.org>
Authored: Sat Mar 26 23:49:14 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Sat Mar 26 23:49:14 2016 -0400

----------------------------------------------------------------------
 .../org/apache/mahout/flinkbindings/FlinkEngine.scala |  2 +-
 .../flinkbindings/drm/CheckpointedFlinkDrm.scala      | 14 ++++++++++----
 2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/e420485e/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 9adec7e..dd28e9d 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -267,7 +267,7 @@ object FlinkEngine extends DistributedEngine {
 
   private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
                                         (implicit dc: DistributedContext): DrmDataSet[Int] = {
-    val rows = (0 until m.nrow).map(i => (i, m(i, ::))) //.toSeq.sortWith((ii, jj) => ii._1 < jj._1)
+    val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
     val dataSetType = TypeExtractor.getForObject(rows.head)
     //TODO: Make Sure that this is the correct partitioning scheme
     dc.env.fromCollection(rows)

http://git-wip-us.apache.org/repos/asf/mahout/blob/e420485e/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 9b3a9f5..8424856 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -18,7 +18,7 @@
  */
 package org.apache.mahout.flinkbindings.drm
 
-import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction, RichFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.{TypeSerializerInputFormat, TypeSerializerOutputFormat}
 import org.apache.flink.api.scala._
@@ -60,6 +60,8 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
   // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}}
   val mahoutHome = getMahoutHome()
 
+  // this is extra I/O for each cache call.  this needs to be moved somewhere where it is called
+  // only once.  Possibly FlinkDistributedEngine.
   GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml")
 
   val conf = GlobalConfiguration.getConfiguration()
@@ -92,16 +94,20 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
 
   override val keyClassTag: ClassTag[K] = classTag[K]
 
+  /** Note as of Flink 1.0.0, no direct flink caching exists so we save
+    * the dataset to the filesystem and read it back when cache is called */
   def cache() = {
-//    implicit val typeInformation = createTypeInformation[(K,Vector)]
     if (!isCached) {
       cacheFileName = persistanceRootDir + System.nanoTime().toString
       parallelismDeg = ds.getParallelism
       isCached = true
       persist(ds, cacheFileName)
     }
-
     val _ds = readPersistedDataSet(cacheFileName, ds)
+
+    // We may want to look more closely at this:
+    // since we've cached a drm, triggering a computation
+    // it may not make sense to keep the same parallelism degree
     if (!(parallelismDeg == _ds.getParallelism)) {
       _ds.setParallelism(parallelismDeg).rebalance()
     }
@@ -141,7 +147,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
     * @param path File path to read dataset from
     * @param ds persisted ds to retrieve type information and environment forom
     * @tparam T key Type of the [[DataSet]] elements
-    * @return [[DataSet]] reading the just written file
+    * @return [[DataSet]] the persisted dataset
     */
   def readPersistedDataSet[T: ClassTag : TypeInformation]
        (path: String, ds: DataSet[T]): DataSet[T] = {


[05/10] mahout git commit: wip: use properties from /home/andy/sandbox/mahout/conf/flink-config.yaml

Posted by ap...@apache.org.
wip: use properties from /home/andy/sandbox/mahout/conf/flink-config.yaml


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/100d343e
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/100d343e
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/100d343e

Branch: refs/heads/flink-binding
Commit: 100d343e4b6e66b1a7c581455cd1faab7bbdb538
Parents: ad22252
Author: Andrew Palumbo <ap...@apache.org>
Authored: Fri Mar 25 20:26:41 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Fri Mar 25 20:29:47 2016 -0400

----------------------------------------------------------------------
 .../drm/CheckpointedFlinkDrm.scala              | 22 +++++++++++++++-----
 1 file changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/100d343e/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 0b3d13e..a5bbbb5 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -25,6 +25,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.core.fs.Path
 import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
+import org.apache.flink.configuration.GlobalConfiguration
 import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable}
 import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat}
 import org.apache.mahout.flinkbindings.io.Hadoop2HDFSUtil
@@ -56,8 +57,18 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
   var parallelismDeg: Int = -1
 
   // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}}
-  val persistanceRootDir = ds.getExecutionEnvironment.
-    getConfig.getGlobalJobParameters.toMap.getOrDefault("taskmanager.tmp.dirs", "/tmp/")
+  val mahoutHome = System.getProperty("MAHOUT_HOME")
+
+  GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml")
+
+  val conf = GlobalConfiguration.getConfiguration()
+
+  if (!(conf == null )) {
+    val persistanceRootDir = conf.getString("taskmanager.tmp.dirs", "/tmp/")
+  } else {
+    val persistanceRootDir = "/tmp/"
+  }
+
 
   private lazy val dim: (Long, Int) = {
     // combine computation of ncol and nrow in one pass
@@ -81,8 +92,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
   override val keyClassTag: ClassTag[K] = classTag[K]
 
   def cache() = {
-    implicit val typeInformation = createTypeInformation[(K,Vector)]
-    implicit val inputFormat = (ds.getType)
+//    implicit val typeInformation = createTypeInformation[(K,Vector)]
     if (!isCached) {
       cacheFileName = persistanceRootDir + System.nanoTime().toString
       parallelismDeg = ds.getParallelism
@@ -91,7 +101,9 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
     }
 
     val _ds = readPersistedDataSet(cacheFileName, ds)
-
+    if (!(parallelismDeg == _ds.getParallelism)) {
+      _ds.setParallelism(parallelismDeg).rebalance()
+    }
     datasetWrap(_ds)
   }
 


[04/10] mahout git commit: use as the base directory for cached files

Posted by ap...@apache.org.
use  as the base directory for cached files


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/ad22252c
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/ad22252c
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/ad22252c

Branch: refs/heads/flink-binding
Commit: ad22252ca2ab39e22d4df7cc22464af2c6179830
Parents: ad4c32c
Author: Andrew Palumbo <ap...@apache.org>
Authored: Fri Mar 25 19:05:39 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Fri Mar 25 19:05:39 2016 -0400

----------------------------------------------------------------------
 .../mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala     | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/ad22252c/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 5246938..0b3d13e 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -51,10 +51,13 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
   lazy val ncol: Int = if (_ncol >= 0) _ncol else dim._2
 
   // persistance values
-  var cacheFileName: String = "/a"
+  var cacheFileName: String = "undefinedCacheName"
   var isCached: Boolean = false
   var parallelismDeg: Int = -1
-  val persistanceRootDir = "/tmp/"
+
+  // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}}
+  val persistanceRootDir = ds.getExecutionEnvironment.
+    getConfig.getGlobalJobParameters.toMap.getOrDefault("taskmanager.tmp.dirs", "/tmp/")
 
   private lazy val dim: (Long, Int) = {
     // combine computation of ncol and nrow in one pass
@@ -92,7 +95,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
     datasetWrap(_ds)
   }
 
-  def uncache() = {
+  def uncache():this.type = {
     if (isCached) {
       Hadoop2HDFSUtil.delete(cacheFileName)
       isCached = false


[03/10] mahout git commit: add unchace

Posted by ap...@apache.org.
add unchace


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/ad4c32ce
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/ad4c32ce
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/ad4c32ce

Branch: refs/heads/flink-binding
Commit: ad4c32ce871df686267df1f1dbff76a883b8d3fc
Parents: 9c5ee59
Author: Andrew Palumbo <ap...@apache.org>
Authored: Fri Mar 25 18:31:51 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Fri Mar 25 18:31:51 2016 -0400

----------------------------------------------------------------------
 .../apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/ad4c32ce/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 65acbd6..5246938 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.Path
 import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
 import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable}
 import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat}
+import org.apache.mahout.flinkbindings.io.Hadoop2HDFSUtil
 import org.apache.mahout.flinkbindings.{DrmDataSet, _}
 import org.apache.mahout.math._
 import org.apache.mahout.math.drm.CacheHint._
@@ -92,7 +93,10 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
   }
 
   def uncache() = {
-    // TODO
+    if (isCached) {
+      Hadoop2HDFSUtil.delete(cacheFileName)
+      isCached = false
+    }
     this
   }
 


[07/10] mahout git commit: move getMahoutHome()

Posted by ap...@apache.org.
move getMahoutHome()


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/b45f9825
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/b45f9825
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/b45f9825

Branch: refs/heads/flink-binding
Commit: b45f982581b2a2bed75b771672b4f2d66ef32840
Parents: b96918b
Author: Andrew Palumbo <ap...@apache.org>
Authored: Sat Mar 26 17:35:37 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Sat Mar 26 17:38:50 2016 -0400

----------------------------------------------------------------------
 .../apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala | 4 ++--
 .../main/scala/org/apache/mahout/flinkbindings/package.scala   | 6 ++++++
 2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/b45f9825/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index b0ca4c4..9b3a9f5 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -18,7 +18,7 @@
  */
 package org.apache.mahout.flinkbindings.drm
 
-import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction}
+import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction, RichFunction, RichMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.{TypeSerializerInputFormat, TypeSerializerOutputFormat}
 import org.apache.flink.api.scala._
@@ -58,7 +58,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
   var persistanceRootDir: String = _
 
   // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}}
-  val mahoutHome = System.getProperty("MAHOUT_HOME")
+  val mahoutHome = getMahoutHome()
 
   GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml")
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/b45f9825/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index 10ce545..e769952 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -105,4 +105,10 @@ package object flinkbindings {
 
   private[flinkbindings] def extractRealClassTag[K: ClassTag](drm: DrmLike[K]): ClassTag[_] = drm.keyClassTag
 
+  private[flinkbindings] def getMahoutHome() = {
+    var mhome = System.getenv("MAHOUT_HOME")
+    if (mhome == null) mhome = System.getProperty("mahout.home")
+    require(mhome != null, "MAHOUT_HOME is required to spawn mahout-based flink jobs")
+    mhome
+  }
 }
\ No newline at end of file


[10/10] mahout git commit: comment out parallization settting in cache()

Posted by ap...@apache.org.
comment out parallization settting in cache()


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/48a8a820
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/48a8a820
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/48a8a820

Branch: refs/heads/flink-binding
Commit: 48a8a8208322ed690d5356a4e0cac7667b080bab
Parents: e420485
Author: Andrew Palumbo <ap...@apache.org>
Authored: Sun Mar 27 15:16:36 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Sun Mar 27 15:16:36 2016 -0400

----------------------------------------------------------------------
 .../drm/CheckpointedFlinkDrm.scala              | 21 +++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/48a8a820/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 8424856..e59e5a5 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -105,16 +105,23 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
     }
     val _ds = readPersistedDataSet(cacheFileName, ds)
 
-    // We may want to look more closely at this:
-    // since we've cached a drm, triggering a computation
-    // it may not make sense to keep the same parallelism degree
-    if (!(parallelismDeg == _ds.getParallelism)) {
-      _ds.setParallelism(parallelismDeg).rebalance()
-    }
+    /** Leave the parallelism degree to be set the operators
+      * TODO: find out a way to set the parallelism degree based on the
+      * final drm after computation is actually triggered
+      *
+      *  // We may want to look more closely at this:
+      *  // since we've cached a drm, triggering a computation
+      *  // it may not make sense to keep the same parallelism degree
+      *  if (!(parallelismDeg == _ds.getParallelism)) {
+      *    _ds.setParallelism(parallelismDeg).rebalance()
+      *  }
+      *
+      */
+
     datasetWrap(_ds)
   }
 
-  def uncache():this.type = {
+  def uncache(): this.type = {
     if (isCached) {
       Hadoop2HDFSUtil.delete(cacheFileName)
       isCached = false


[08/10] mahout git commit: Merge branch 'flink-binding' of https://git-wip-us.apache.org/repos/asf/mahout into MAHOUT-1817

Posted by ap...@apache.org.
Merge branch 'flink-binding' of https://git-wip-us.apache.org/repos/asf/mahout into MAHOUT-1817


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/f0ee522c
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/f0ee522c
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/f0ee522c

Branch: refs/heads/flink-binding
Commit: f0ee522c69a1f47f0d4da263e9f1669404155b5e
Parents: b45f982 a77f1c1
Author: Andrew Palumbo <ap...@apache.org>
Authored: Sat Mar 26 23:28:52 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Sat Mar 26 23:28:52 2016 -0400

----------------------------------------------------------------------
 .../mahout/flinkbindings/FlinkEngine.scala      |  8 +---
 .../mahout/flinkbindings/blas/FlinkOpAx.scala   | 42 +++++++++++++++++++-
 2 files changed, 41 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/f0ee522c/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------


[06/10] mahout git commit: wip: use properties from /home/andy/sandbox/mahout/conf/flink-config.yaml

Posted by ap...@apache.org.
wip: use properties from /home/andy/sandbox/mahout/conf/flink-config.yaml


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/b96918bb
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/b96918bb
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/b96918bb

Branch: refs/heads/flink-binding
Commit: b96918bba9855fba5cefc11e1c4153b9419509cb
Parents: 100d343
Author: Andrew Palumbo <ap...@apache.org>
Authored: Fri Mar 25 20:33:22 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Fri Mar 25 20:33:22 2016 -0400

----------------------------------------------------------------------
 .../apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala  | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/b96918bb/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index a5bbbb5..b0ca4c4 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -55,6 +55,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
   var cacheFileName: String = "undefinedCacheName"
   var isCached: Boolean = false
   var parallelismDeg: Int = -1
+  var persistanceRootDir: String = _
 
   // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}}
   val mahoutHome = System.getProperty("MAHOUT_HOME")
@@ -64,9 +65,9 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
   val conf = GlobalConfiguration.getConfiguration()
 
   if (!(conf == null )) {
-    val persistanceRootDir = conf.getString("taskmanager.tmp.dirs", "/tmp/")
+     persistanceRootDir = conf.getString("taskmanager.tmp.dirs", "/tmp/")
   } else {
-    val persistanceRootDir = "/tmp/"
+     persistanceRootDir = "/tmp/"
   }
 
 


[02/10] mahout git commit: Persist only if the dataset has not been cached. Otherwise read back in already cached dataset

Posted by ap...@apache.org.
Persist only if the dataset has not been cached.  Otherwise read back in already cached dataset


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/9c5ee592
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/9c5ee592
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/9c5ee592

Branch: refs/heads/flink-binding
Commit: 9c5ee59214a454f7ae25c762bf04bb30bd7982c8
Parents: a1cf7cf
Author: Andrew Palumbo <ap...@apache.org>
Authored: Fri Mar 25 18:10:18 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Fri Mar 25 18:10:18 2016 -0400

----------------------------------------------------------------------
 .../mahout/flinkbindings/FlinkEngine.scala      |  4 +--
 .../drm/CheckpointedFlinkDrm.scala              | 32 +++++++++++++++-----
 2 files changed, 26 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/9c5ee592/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index af508b3..0640ebe 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -359,9 +359,9 @@ object FlinkEngine extends DistributedEngine {
   }
 
   def generateTypeInformation[K: ClassTag]: TypeInformation[K] = {
-    implicit val tag = ClassTag[K]
+    implicit val ktag = classTag[K]
 
-    generateTypeInformationFromTag(tag)
+    generateTypeInformationFromTag(ktag)
   }
 
   private def generateTypeInformationFromTag[K](tag: ClassTag[K]): TypeInformation[K] = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/9c5ee592/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index ea96e88..65acbd6 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -77,14 +77,17 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
   override val keyClassTag: ClassTag[K] = classTag[K]
 
   def cache() = {
+    implicit val typeInformation = createTypeInformation[(K,Vector)]
+    implicit val inputFormat = (ds.getType)
     if (!isCached) {
-      cacheFileName = System.nanoTime().toString
+      cacheFileName = persistanceRootDir + System.nanoTime().toString
       parallelismDeg = ds.getParallelism
       isCached = true
+      persist(ds, cacheFileName)
     }
-    implicit val typeInformation = createTypeInformation[(K,Vector)]
 
-    val _ds = persist(ds, persistanceRootDir + cacheFileName)
+    val _ds = readPersistedDataSet(cacheFileName, ds)
+
     datasetWrap(_ds)
   }
 
@@ -99,12 +102,10 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
     * @param dataset [[DataSet]] to write to disk
     * @param path File path to write dataset to
     * @tparam T Type of the [[DataSet]] elements
-    * @return [[DataSet]] reading the just written file
     */
-  def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): DataSet[T] = {
+  def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): Unit = {
     val env = dataset.getExecutionEnvironment
     val outputFormat = new TypeSerializerOutputFormat[T]
-
     val filePath = new Path(path)
 
     outputFormat.setOutputFilePath(filePath)
@@ -112,14 +113,29 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
 
     dataset.output(outputFormat)
     env.execute("FlinkTools persist")
+  }
+
+  /** Read a [[DataSet]] from specified path and returns it as a DataSource for subsequent
+    * operations.
+    *
+    * @param path File path to read dataset from
+    * @param ds persisted ds to retrieve type information and environment forom
+    * @tparam T key Type of the [[DataSet]] elements
+    * @return [[DataSet]] reading the just written file
+    */
+  def readPersistedDataSet[T: ClassTag : TypeInformation]
+       (path: String, ds: DataSet[T]): DataSet[T] = {
 
-    val inputFormat = new TypeSerializerInputFormat[T](dataset.getType)
+    val env = ds.getExecutionEnvironment
+    val inputFormat = new TypeSerializerInputFormat[T](ds.getType())
+    val filePath = new Path(path)
     inputFormat.setFilePath(filePath)
 
     env.createInput(inputFormat)
   }
 
-  // Members declared in org.apache.mahout.math.drm.DrmLike   
+
+  // Members declared in org.apache.mahout.math.drm.DrmLike
 
   protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows