You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ap...@apache.org on 2016/03/27 21:36:48 UTC
[09/10] mahout git commit: Comments, cleanup
Comments, cleanup
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/e420485e
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/e420485e
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/e420485e
Branch: refs/heads/flink-binding
Commit: e420485e0b82908919985a892f85c7d6dff9d24b
Parents: f0ee522
Author: Andrew Palumbo <ap...@apache.org>
Authored: Sat Mar 26 23:49:14 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Sat Mar 26 23:49:14 2016 -0400
----------------------------------------------------------------------
.../org/apache/mahout/flinkbindings/FlinkEngine.scala | 2 +-
.../flinkbindings/drm/CheckpointedFlinkDrm.scala | 14 ++++++++++----
2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/e420485e/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 9adec7e..dd28e9d 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -267,7 +267,7 @@ object FlinkEngine extends DistributedEngine {
private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
(implicit dc: DistributedContext): DrmDataSet[Int] = {
- val rows = (0 until m.nrow).map(i => (i, m(i, ::))) //.toSeq.sortWith((ii, jj) => ii._1 < jj._1)
+ val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
val dataSetType = TypeExtractor.getForObject(rows.head)
//TODO: Make Sure that this is the correct partitioning scheme
dc.env.fromCollection(rows)
http://git-wip-us.apache.org/repos/asf/mahout/blob/e420485e/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 9b3a9f5..8424856 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -18,7 +18,7 @@
*/
package org.apache.mahout.flinkbindings.drm
-import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction, RichFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.io.{TypeSerializerInputFormat, TypeSerializerOutputFormat}
import org.apache.flink.api.scala._
@@ -60,6 +60,8 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
// need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}}
val mahoutHome = getMahoutHome()
+ // this is extra I/O for each cache call. this needs to be moved somewhere where it is called
+ // only once. Possibly FlinkDistributedEngine.
GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml")
val conf = GlobalConfiguration.getConfiguration()
@@ -92,16 +94,20 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
override val keyClassTag: ClassTag[K] = classTag[K]
+ /** Note as of Flink 1.0.0, no direct flink caching exists so we save
+ * the dataset to the filesystem and read it back when cache is called */
def cache() = {
-// implicit val typeInformation = createTypeInformation[(K,Vector)]
if (!isCached) {
cacheFileName = persistanceRootDir + System.nanoTime().toString
parallelismDeg = ds.getParallelism
isCached = true
persist(ds, cacheFileName)
}
-
val _ds = readPersistedDataSet(cacheFileName, ds)
+
+ // We may want to look more closely at this:
+ // since we've cached a drm, triggering a computation
+ // it may not make sense to keep the same parallelism degree
if (!(parallelismDeg == _ds.getParallelism)) {
_ds.setParallelism(parallelismDeg).rebalance()
}
@@ -141,7 +147,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
* @param path File path to read dataset from
* @param ds persisted ds to retrieve type information and environment forom
* @tparam T key Type of the [[DataSet]] elements
- * @return [[DataSet]] reading the just written file
+ * @return [[DataSet]] the persisted dataset
*/
def readPersistedDataSet[T: ClassTag : TypeInformation]
(path: String, ds: DataSet[T]): DataSet[T] = {