You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ap...@apache.org on 2016/03/27 21:36:40 UTC
[01/10] mahout git commit: Small change addressing DL's comment on
apache/mahout#200, also a small fix
Repository: mahout
Updated Branches:
refs/heads/flink-binding a77f1c13d -> 48a8a8208
Small change addressing DL's comment on apache/mahout#200, also a small fix
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/a1cf7cf5
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/a1cf7cf5
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/a1cf7cf5
Branch: refs/heads/flink-binding
Commit: a1cf7cf56e036ce12d616f8aea0af1e9dcdf2cb6
Parents: e3c8db5
Author: Andrew Palumbo <ap...@apache.org>
Authored: Fri Mar 25 16:56:20 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Fri Mar 25 16:56:20 2016 -0400
----------------------------------------------------------------------
.../scala/org/apache/mahout/flinkbindings/FlinkEngine.scala | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/a1cf7cf5/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index c355cae..af508b3 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -127,6 +127,7 @@ object FlinkEngine extends DistributedEngine {
newcp.cache()
}
+
private def flinkTranslate[K](oper: DrmLike[K]): FlinkDrm[K] = {
implicit val kTag = oper.keyClassTag
implicit val typeInformation = generateTypeInformation[K]
@@ -358,7 +359,7 @@ object FlinkEngine extends DistributedEngine {
}
def generateTypeInformation[K: ClassTag]: TypeInformation[K] = {
- val tag = implicitly[ClassTag[K]]
+ implicit val tag = ClassTag[K]
generateTypeInformationFromTag(tag)
}
@@ -374,7 +375,4 @@ object FlinkEngine extends DistributedEngine {
throw new IllegalArgumentException(s"index type $tag is not supported")
}
}
- object FlinkEngine {
-
- }
}
\ No newline at end of file
[09/10] mahout git commit: Comments, cleanup
Posted by ap...@apache.org.
Comments, cleanup
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/e420485e
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/e420485e
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/e420485e
Branch: refs/heads/flink-binding
Commit: e420485e0b82908919985a892f85c7d6dff9d24b
Parents: f0ee522
Author: Andrew Palumbo <ap...@apache.org>
Authored: Sat Mar 26 23:49:14 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Sat Mar 26 23:49:14 2016 -0400
----------------------------------------------------------------------
.../org/apache/mahout/flinkbindings/FlinkEngine.scala | 2 +-
.../flinkbindings/drm/CheckpointedFlinkDrm.scala | 14 ++++++++++----
2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/e420485e/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 9adec7e..dd28e9d 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -267,7 +267,7 @@ object FlinkEngine extends DistributedEngine {
private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
(implicit dc: DistributedContext): DrmDataSet[Int] = {
- val rows = (0 until m.nrow).map(i => (i, m(i, ::))) //.toSeq.sortWith((ii, jj) => ii._1 < jj._1)
+ val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
val dataSetType = TypeExtractor.getForObject(rows.head)
//TODO: Make Sure that this is the correct partitioning scheme
dc.env.fromCollection(rows)
http://git-wip-us.apache.org/repos/asf/mahout/blob/e420485e/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 9b3a9f5..8424856 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -18,7 +18,7 @@
*/
package org.apache.mahout.flinkbindings.drm
-import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction, RichFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.io.{TypeSerializerInputFormat, TypeSerializerOutputFormat}
import org.apache.flink.api.scala._
@@ -60,6 +60,8 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
// need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}}
val mahoutHome = getMahoutHome()
+ // this is extra I/O for each cache call. this needs to be moved somewhere where it is called
+ // only once. Possibly FlinkDistributedEngine.
GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml")
val conf = GlobalConfiguration.getConfiguration()
@@ -92,16 +94,20 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
override val keyClassTag: ClassTag[K] = classTag[K]
+ /** Note as of Flink 1.0.0, no direct flink caching exists so we save
+ * the dataset to the filesystem and read it back when cache is called */
def cache() = {
-// implicit val typeInformation = createTypeInformation[(K,Vector)]
if (!isCached) {
cacheFileName = persistanceRootDir + System.nanoTime().toString
parallelismDeg = ds.getParallelism
isCached = true
persist(ds, cacheFileName)
}
-
val _ds = readPersistedDataSet(cacheFileName, ds)
+
+ // We may want to look more closely at this:
+ // since we've cached a drm, triggering a computation
+ // it may not make sense to keep the same parallelism degree
if (!(parallelismDeg == _ds.getParallelism)) {
_ds.setParallelism(parallelismDeg).rebalance()
}
@@ -141,7 +147,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
* @param path File path to read dataset from
* @param ds persisted ds to retrieve type information and environment forom
* @tparam T key Type of the [[DataSet]] elements
- * @return [[DataSet]] reading the just written file
+ * @return [[DataSet]] the persisted dataset
*/
def readPersistedDataSet[T: ClassTag : TypeInformation]
(path: String, ds: DataSet[T]): DataSet[T] = {
[05/10] mahout git commit: wip: use properties from
/home/andy/sandbox/mahout/conf/flink-config.yaml
Posted by ap...@apache.org.
wip: use properties from /home/andy/sandbox/mahout/conf/flink-config.yaml
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/100d343e
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/100d343e
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/100d343e
Branch: refs/heads/flink-binding
Commit: 100d343e4b6e66b1a7c581455cd1faab7bbdb538
Parents: ad22252
Author: Andrew Palumbo <ap...@apache.org>
Authored: Fri Mar 25 20:26:41 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Fri Mar 25 20:29:47 2016 -0400
----------------------------------------------------------------------
.../drm/CheckpointedFlinkDrm.scala | 22 +++++++++++++++-----
1 file changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/100d343e/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 0b3d13e..a5bbbb5 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -25,6 +25,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.core.fs.Path
import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
+import org.apache.flink.configuration.GlobalConfiguration
import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat}
import org.apache.mahout.flinkbindings.io.Hadoop2HDFSUtil
@@ -56,8 +57,18 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
var parallelismDeg: Int = -1
// need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}}
- val persistanceRootDir = ds.getExecutionEnvironment.
- getConfig.getGlobalJobParameters.toMap.getOrDefault("taskmanager.tmp.dirs", "/tmp/")
+ val mahoutHome = System.getProperty("MAHOUT_HOME")
+
+ GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml")
+
+ val conf = GlobalConfiguration.getConfiguration()
+
+ if (!(conf == null )) {
+ val persistanceRootDir = conf.getString("taskmanager.tmp.dirs", "/tmp/")
+ } else {
+ val persistanceRootDir = "/tmp/"
+ }
+
private lazy val dim: (Long, Int) = {
// combine computation of ncol and nrow in one pass
@@ -81,8 +92,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
override val keyClassTag: ClassTag[K] = classTag[K]
def cache() = {
- implicit val typeInformation = createTypeInformation[(K,Vector)]
- implicit val inputFormat = (ds.getType)
+// implicit val typeInformation = createTypeInformation[(K,Vector)]
if (!isCached) {
cacheFileName = persistanceRootDir + System.nanoTime().toString
parallelismDeg = ds.getParallelism
@@ -91,7 +101,9 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
}
val _ds = readPersistedDataSet(cacheFileName, ds)
-
+ if (!(parallelismDeg == _ds.getParallelism)) {
+ _ds.setParallelism(parallelismDeg).rebalance()
+ }
datasetWrap(_ds)
}
[04/10] mahout git commit: use as the base directory for cached files
Posted by ap...@apache.org.
use as the base directory for cached files
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/ad22252c
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/ad22252c
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/ad22252c
Branch: refs/heads/flink-binding
Commit: ad22252ca2ab39e22d4df7cc22464af2c6179830
Parents: ad4c32c
Author: Andrew Palumbo <ap...@apache.org>
Authored: Fri Mar 25 19:05:39 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Fri Mar 25 19:05:39 2016 -0400
----------------------------------------------------------------------
.../mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/ad22252c/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 5246938..0b3d13e 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -51,10 +51,13 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
lazy val ncol: Int = if (_ncol >= 0) _ncol else dim._2
// persistance values
- var cacheFileName: String = "/a"
+ var cacheFileName: String = "undefinedCacheName"
var isCached: Boolean = false
var parallelismDeg: Int = -1
- val persistanceRootDir = "/tmp/"
+
+ // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}}
+ val persistanceRootDir = ds.getExecutionEnvironment.
+ getConfig.getGlobalJobParameters.toMap.getOrDefault("taskmanager.tmp.dirs", "/tmp/")
private lazy val dim: (Long, Int) = {
// combine computation of ncol and nrow in one pass
@@ -92,7 +95,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
datasetWrap(_ds)
}
- def uncache() = {
+ def uncache():this.type = {
if (isCached) {
Hadoop2HDFSUtil.delete(cacheFileName)
isCached = false
[03/10] mahout git commit: add unchace
Posted by ap...@apache.org.
add unchace
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/ad4c32ce
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/ad4c32ce
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/ad4c32ce
Branch: refs/heads/flink-binding
Commit: ad4c32ce871df686267df1f1dbff76a883b8d3fc
Parents: 9c5ee59
Author: Andrew Palumbo <ap...@apache.org>
Authored: Fri Mar 25 18:31:51 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Fri Mar 25 18:31:51 2016 -0400
----------------------------------------------------------------------
.../apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/ad4c32ce/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 65acbd6..5246938 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.Path
import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat}
+import org.apache.mahout.flinkbindings.io.Hadoop2HDFSUtil
import org.apache.mahout.flinkbindings.{DrmDataSet, _}
import org.apache.mahout.math._
import org.apache.mahout.math.drm.CacheHint._
@@ -92,7 +93,10 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
}
def uncache() = {
- // TODO
+ if (isCached) {
+ Hadoop2HDFSUtil.delete(cacheFileName)
+ isCached = false
+ }
this
}
[07/10] mahout git commit: move getMahoutHome()
Posted by ap...@apache.org.
move getMahoutHome()
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/b45f9825
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/b45f9825
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/b45f9825
Branch: refs/heads/flink-binding
Commit: b45f982581b2a2bed75b771672b4f2d66ef32840
Parents: b96918b
Author: Andrew Palumbo <ap...@apache.org>
Authored: Sat Mar 26 17:35:37 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Sat Mar 26 17:38:50 2016 -0400
----------------------------------------------------------------------
.../apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala | 4 ++--
.../main/scala/org/apache/mahout/flinkbindings/package.scala | 6 ++++++
2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/b45f9825/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index b0ca4c4..9b3a9f5 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -18,7 +18,7 @@
*/
package org.apache.mahout.flinkbindings.drm
-import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction}
+import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction, RichFunction, RichMapFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.io.{TypeSerializerInputFormat, TypeSerializerOutputFormat}
import org.apache.flink.api.scala._
@@ -58,7 +58,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
var persistanceRootDir: String = _
// need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}}
- val mahoutHome = System.getProperty("MAHOUT_HOME")
+ val mahoutHome = getMahoutHome()
GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml")
http://git-wip-us.apache.org/repos/asf/mahout/blob/b45f9825/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index 10ce545..e769952 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -105,4 +105,10 @@ package object flinkbindings {
private[flinkbindings] def extractRealClassTag[K: ClassTag](drm: DrmLike[K]): ClassTag[_] = drm.keyClassTag
+ private[flinkbindings] def getMahoutHome() = {
+ var mhome = System.getenv("MAHOUT_HOME")
+ if (mhome == null) mhome = System.getProperty("mahout.home")
+ require(mhome != null, "MAHOUT_HOME is required to spawn mahout-based flink jobs")
+ mhome
+ }
}
\ No newline at end of file
[10/10] mahout git commit: comment out parallization settting in
cache()
Posted by ap...@apache.org.
comment out parallization settting in cache()
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/48a8a820
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/48a8a820
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/48a8a820
Branch: refs/heads/flink-binding
Commit: 48a8a8208322ed690d5356a4e0cac7667b080bab
Parents: e420485
Author: Andrew Palumbo <ap...@apache.org>
Authored: Sun Mar 27 15:16:36 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Sun Mar 27 15:16:36 2016 -0400
----------------------------------------------------------------------
.../drm/CheckpointedFlinkDrm.scala | 21 +++++++++++++-------
1 file changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/48a8a820/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 8424856..e59e5a5 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -105,16 +105,23 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
}
val _ds = readPersistedDataSet(cacheFileName, ds)
- // We may want to look more closely at this:
- // since we've cached a drm, triggering a computation
- // it may not make sense to keep the same parallelism degree
- if (!(parallelismDeg == _ds.getParallelism)) {
- _ds.setParallelism(parallelismDeg).rebalance()
- }
+ /** Leave the parallelism degree to be set the operators
+ * TODO: find out a way to set the parallelism degree based on the
+ * final drm after computation is actually triggered
+ *
+ * // We may want to look more closely at this:
+ * // since we've cached a drm, triggering a computation
+ * // it may not make sense to keep the same parallelism degree
+ * if (!(parallelismDeg == _ds.getParallelism)) {
+ * _ds.setParallelism(parallelismDeg).rebalance()
+ * }
+ *
+ */
+
datasetWrap(_ds)
}
- def uncache():this.type = {
+ def uncache(): this.type = {
if (isCached) {
Hadoop2HDFSUtil.delete(cacheFileName)
isCached = false
[08/10] mahout git commit: Merge branch 'flink-binding' of
https://git-wip-us.apache.org/repos/asf/mahout into MAHOUT-1817
Posted by ap...@apache.org.
Merge branch 'flink-binding' of https://git-wip-us.apache.org/repos/asf/mahout into MAHOUT-1817
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/f0ee522c
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/f0ee522c
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/f0ee522c
Branch: refs/heads/flink-binding
Commit: f0ee522c69a1f47f0d4da263e9f1669404155b5e
Parents: b45f982 a77f1c1
Author: Andrew Palumbo <ap...@apache.org>
Authored: Sat Mar 26 23:28:52 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Sat Mar 26 23:28:52 2016 -0400
----------------------------------------------------------------------
.../mahout/flinkbindings/FlinkEngine.scala | 8 +---
.../mahout/flinkbindings/blas/FlinkOpAx.scala | 42 +++++++++++++++++++-
2 files changed, 41 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/f0ee522c/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
[06/10] mahout git commit: wip: use properties from
/home/andy/sandbox/mahout/conf/flink-config.yaml
Posted by ap...@apache.org.
wip: use properties from /home/andy/sandbox/mahout/conf/flink-config.yaml
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/b96918bb
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/b96918bb
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/b96918bb
Branch: refs/heads/flink-binding
Commit: b96918bba9855fba5cefc11e1c4153b9419509cb
Parents: 100d343
Author: Andrew Palumbo <ap...@apache.org>
Authored: Fri Mar 25 20:33:22 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Fri Mar 25 20:33:22 2016 -0400
----------------------------------------------------------------------
.../apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/b96918bb/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index a5bbbb5..b0ca4c4 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -55,6 +55,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
var cacheFileName: String = "undefinedCacheName"
var isCached: Boolean = false
var parallelismDeg: Int = -1
+ var persistanceRootDir: String = _
// need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}}
val mahoutHome = System.getProperty("MAHOUT_HOME")
@@ -64,9 +65,9 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
val conf = GlobalConfiguration.getConfiguration()
if (!(conf == null )) {
- val persistanceRootDir = conf.getString("taskmanager.tmp.dirs", "/tmp/")
+ persistanceRootDir = conf.getString("taskmanager.tmp.dirs", "/tmp/")
} else {
- val persistanceRootDir = "/tmp/"
+ persistanceRootDir = "/tmp/"
}
[02/10] mahout git commit: Persist only if the dataset has not been
cached. Otherwise read back in already cached dataset
Posted by ap...@apache.org.
Persist only if the dataset has not been cached. Otherwise read back in already cached dataset
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/9c5ee592
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/9c5ee592
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/9c5ee592
Branch: refs/heads/flink-binding
Commit: 9c5ee59214a454f7ae25c762bf04bb30bd7982c8
Parents: a1cf7cf
Author: Andrew Palumbo <ap...@apache.org>
Authored: Fri Mar 25 18:10:18 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Fri Mar 25 18:10:18 2016 -0400
----------------------------------------------------------------------
.../mahout/flinkbindings/FlinkEngine.scala | 4 +--
.../drm/CheckpointedFlinkDrm.scala | 32 +++++++++++++++-----
2 files changed, 26 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/9c5ee592/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index af508b3..0640ebe 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -359,9 +359,9 @@ object FlinkEngine extends DistributedEngine {
}
def generateTypeInformation[K: ClassTag]: TypeInformation[K] = {
- implicit val tag = ClassTag[K]
+ implicit val ktag = classTag[K]
- generateTypeInformationFromTag(tag)
+ generateTypeInformationFromTag(ktag)
}
private def generateTypeInformationFromTag[K](tag: ClassTag[K]): TypeInformation[K] = {
http://git-wip-us.apache.org/repos/asf/mahout/blob/9c5ee592/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index ea96e88..65acbd6 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -77,14 +77,17 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
override val keyClassTag: ClassTag[K] = classTag[K]
def cache() = {
+ implicit val typeInformation = createTypeInformation[(K,Vector)]
+ implicit val inputFormat = (ds.getType)
if (!isCached) {
- cacheFileName = System.nanoTime().toString
+ cacheFileName = persistanceRootDir + System.nanoTime().toString
parallelismDeg = ds.getParallelism
isCached = true
+ persist(ds, cacheFileName)
}
- implicit val typeInformation = createTypeInformation[(K,Vector)]
- val _ds = persist(ds, persistanceRootDir + cacheFileName)
+ val _ds = readPersistedDataSet(cacheFileName, ds)
+
datasetWrap(_ds)
}
@@ -99,12 +102,10 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
* @param dataset [[DataSet]] to write to disk
* @param path File path to write dataset to
* @tparam T Type of the [[DataSet]] elements
- * @return [[DataSet]] reading the just written file
*/
- def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): DataSet[T] = {
+ def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): Unit = {
val env = dataset.getExecutionEnvironment
val outputFormat = new TypeSerializerOutputFormat[T]
-
val filePath = new Path(path)
outputFormat.setOutputFilePath(filePath)
@@ -112,14 +113,29 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
dataset.output(outputFormat)
env.execute("FlinkTools persist")
+ }
+
+ /** Read a [[DataSet]] from specified path and returns it as a DataSource for subsequent
+ * operations.
+ *
+ * @param path File path to read dataset from
+ * @param ds persisted ds to retrieve type information and environment forom
+ * @tparam T key Type of the [[DataSet]] elements
+ * @return [[DataSet]] reading the just written file
+ */
+ def readPersistedDataSet[T: ClassTag : TypeInformation]
+ (path: String, ds: DataSet[T]): DataSet[T] = {
- val inputFormat = new TypeSerializerInputFormat[T](dataset.getType)
+ val env = ds.getExecutionEnvironment
+ val inputFormat = new TypeSerializerInputFormat[T](ds.getType())
+ val filePath = new Path(path)
inputFormat.setFilePath(filePath)
env.createInput(inputFormat)
}
- // Members declared in org.apache.mahout.math.drm.DrmLike
+
+ // Members declared in org.apache.mahout.math.drm.DrmLike
protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows