You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/03/28 23:19:23 UTC

mahout git commit: MAHOUT-1819:Set the default Parallelism for Flink execution in FlinkDistributedContext, this closes apache/mahout#206

Repository: mahout
Updated Branches:
  refs/heads/flink-binding 99c32f2d9 -> f0e22e28c


MAHOUT-1819:Set the default Parallelism for Flink execution in FlinkDistributedContext, this closes apache/mahout#206


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/f0e22e28
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/f0e22e28
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/f0e22e28

Branch: refs/heads/flink-binding
Commit: f0e22e28c8da53a0c26233afd4d363dcc035ee7d
Parents: 99c32f2
Author: smarthi <sm...@apache.org>
Authored: Mon Mar 28 17:17:48 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Mon Mar 28 17:17:48 2016 -0400

----------------------------------------------------------------------
 .../flinkbindings/FlinkDistributedContext.scala    | 17 +++++++++++++++++
 .../apache/mahout/flinkbindings/FlinkEngine.scala  | 17 ++++++++---------
 .../mahout/flinkbindings/blas/FlinkOpAx.scala      |  3 +--
 .../flinkbindings/drm/CheckpointedFlinkDrm.scala   |  4 ++--
 4 files changed, 28 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/f0e22e28/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
index cfc9209..49dc593 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
@@ -19,11 +19,28 @@
 package org.apache.mahout.flinkbindings
 
 import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.configuration.GlobalConfiguration
 import org.apache.mahout.math.drm.DistributedContext
 import org.apache.mahout.math.drm.DistributedEngine
 
 class FlinkDistributedContext(val env: ExecutionEnvironment) extends DistributedContext {
 
+  val mahoutHome = getMahoutHome()
+
+  GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml")
+
+  val conf = GlobalConfiguration.getConfiguration
+
+  var degreeOfParallelism: Int = 0
+
+  if (conf != null) {
+    degreeOfParallelism = conf.getInteger("parallelism.default", Runtime.getRuntime.availableProcessors)
+  } else {
+    degreeOfParallelism = Runtime.getRuntime.availableProcessors
+  }
+
+  env.setParallelism(degreeOfParallelism)
+  
   val engine: DistributedEngine = FlinkEngine
 
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/f0e22e28/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index dd28e9d..adff30b 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -249,13 +249,13 @@ object FlinkEngine extends DistributedEngine {
   override def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] =
     FlinkByteBCast.wrap(v)
 
-
   /** Broadcast support */
   override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] =
     FlinkByteBCast.wrap(m)
 
-
   /** Parallelize in-core matrix as flink distributed matrix, using row ordinal indices as data set keys. */
+  // The 'numPartitions' parameter is not honored in this call,
+  // as Flink sets a global parallelism in ExecutionEnvironment
   override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
                                            (implicit dc: DistributedContext): CheckpointedDrm[Int] = {
 
@@ -264,26 +264,25 @@ object FlinkEngine extends DistributedEngine {
     new CheckpointedFlinkDrm(ds = parallelDrm, _nrow = m.numRows(), _ncol = m.numCols())
   }
 
-
+  // The 'parallelismDegree' parameter is not honored in this call,
+  // as Flink sets a global parallelism in ExecutionEnvironment
   private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
                                         (implicit dc: DistributedContext): DrmDataSet[Int] = {
     val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
     val dataSetType = TypeExtractor.getForObject(rows.head)
-    //TODO: Make Sure that this is the correct partitioning scheme
-    dc.env.fromCollection(rows)
-      .partitionByRange(0)
-      .setParallelism(parallelismDegree)
-      .rebalance()
+    dc.env.fromCollection(rows).partitionByRange(0)
   }
 
   /** Parallelize in-core matrix as flink distributed matrix, using row labels as a data set keys. */
+  // The 'numPartitions' parameter is not honored in this call,
+  // as Flink sets a global parallelism in ExecutionEnvironment
   override def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
                                           (implicit dc: DistributedContext): CheckpointedDrm[String] = {
 
     val rb = m.getRowLabelBindings
     val p = for (i: String ← rb.keySet().toIndexedSeq) yield i → m(rb(i), ::)
 
-    new CheckpointedFlinkDrm[String](dc.env.fromCollection(p).setParallelism(numPartitions),
+    new CheckpointedFlinkDrm[String](dc.env.fromCollection(p),
       _nrow = m.nrow, _ncol = m.ncol, cacheHint = CacheHint.NONE)
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/f0e22e28/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
index fa649fb..ca43b31 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
@@ -87,8 +87,7 @@ object FlinkOpAx {
       // Convert back to mtx
       .toColMatrix
 
-    // It is ridiculous, but in this scheme we will have to re-parallelize it again in order to plug
-    // it back as a Flink drm
+    // This doesn't do anything now
     val res = FlinkEngine.parallelize(inCoreM, parallelismDegree = 1)
 
     new RowsFlinkDrm[Int](res, 1)

http://git-wip-us.apache.org/repos/asf/mahout/blob/f0e22e28/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index e59e5a5..794c721 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -57,14 +57,14 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
   var parallelismDeg: Int = -1
   var persistanceRootDir: String = _
 
-  // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}}
+  // need to make sure that this is actually getting the correct properties for {{taskmanager.tmp.dirs}}
   val mahoutHome = getMahoutHome()
 
   // this is extra I/O for each cache call.  this needs to be moved somewhere where it is called
   // only once.  Possibly FlinkDistributedEngine.
   GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml")
 
-  val conf = GlobalConfiguration.getConfiguration()
+  val conf = GlobalConfiguration.getConfiguration
 
   if (!(conf == null )) {
      persistanceRootDir = conf.getString("taskmanager.tmp.dirs", "/tmp/")