You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/03/28 23:19:23 UTC
mahout git commit: MAHOUT-1819:Set the default Parallelism for Flink
execution in FlinkDistributedContext, this closes apache/mahout#206
Repository: mahout
Updated Branches:
refs/heads/flink-binding 99c32f2d9 -> f0e22e28c
MAHOUT-1819:Set the default Parallelism for Flink execution in FlinkDistributedContext, this closes apache/mahout#206
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/f0e22e28
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/f0e22e28
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/f0e22e28
Branch: refs/heads/flink-binding
Commit: f0e22e28c8da53a0c26233afd4d363dcc035ee7d
Parents: 99c32f2
Author: smarthi <sm...@apache.org>
Authored: Mon Mar 28 17:17:48 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Mon Mar 28 17:17:48 2016 -0400
----------------------------------------------------------------------
.../flinkbindings/FlinkDistributedContext.scala | 17 +++++++++++++++++
.../apache/mahout/flinkbindings/FlinkEngine.scala | 17 ++++++++---------
.../mahout/flinkbindings/blas/FlinkOpAx.scala | 3 +--
.../flinkbindings/drm/CheckpointedFlinkDrm.scala | 4 ++--
4 files changed, 28 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/f0e22e28/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
index cfc9209..49dc593 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
@@ -19,11 +19,28 @@
package org.apache.mahout.flinkbindings
import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.configuration.GlobalConfiguration
import org.apache.mahout.math.drm.DistributedContext
import org.apache.mahout.math.drm.DistributedEngine
class FlinkDistributedContext(val env: ExecutionEnvironment) extends DistributedContext {
+ val mahoutHome = getMahoutHome()
+
+ GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml")
+
+ val conf = GlobalConfiguration.getConfiguration
+
+ var degreeOfParallelism: Int = 0
+
+ if (conf != null) {
+ degreeOfParallelism = conf.getInteger("parallelism.default", Runtime.getRuntime.availableProcessors)
+ } else {
+ degreeOfParallelism = Runtime.getRuntime.availableProcessors
+ }
+
+ env.setParallelism(degreeOfParallelism)
+
val engine: DistributedEngine = FlinkEngine
http://git-wip-us.apache.org/repos/asf/mahout/blob/f0e22e28/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index dd28e9d..adff30b 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -249,13 +249,13 @@ object FlinkEngine extends DistributedEngine {
override def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] =
FlinkByteBCast.wrap(v)
-
/** Broadcast support */
override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] =
FlinkByteBCast.wrap(m)
-
/** Parallelize in-core matrix as flink distributed matrix, using row ordinal indices as data set keys. */
+ // The 'numPartitions' parameter is not honored in this call,
+ // as Flink sets a global parallelism in ExecutionEnvironment
override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
(implicit dc: DistributedContext): CheckpointedDrm[Int] = {
@@ -264,26 +264,25 @@ object FlinkEngine extends DistributedEngine {
new CheckpointedFlinkDrm(ds = parallelDrm, _nrow = m.numRows(), _ncol = m.numCols())
}
-
+ // The 'parallelismDegree' parameter is not honored in this call,
+ // as Flink sets a global parallelism in ExecutionEnvironment
private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
(implicit dc: DistributedContext): DrmDataSet[Int] = {
val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
val dataSetType = TypeExtractor.getForObject(rows.head)
- //TODO: Make Sure that this is the correct partitioning scheme
- dc.env.fromCollection(rows)
- .partitionByRange(0)
- .setParallelism(parallelismDegree)
- .rebalance()
+ dc.env.fromCollection(rows).partitionByRange(0)
}
/** Parallelize in-core matrix as flink distributed matrix, using row labels as a data set keys. */
+ // The 'numPartitions' parameter is not honored in this call,
+ // as Flink sets a global parallelism in ExecutionEnvironment
override def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
(implicit dc: DistributedContext): CheckpointedDrm[String] = {
val rb = m.getRowLabelBindings
val p = for (i: String ← rb.keySet().toIndexedSeq) yield i → m(rb(i), ::)
- new CheckpointedFlinkDrm[String](dc.env.fromCollection(p).setParallelism(numPartitions),
+ new CheckpointedFlinkDrm[String](dc.env.fromCollection(p),
_nrow = m.nrow, _ncol = m.ncol, cacheHint = CacheHint.NONE)
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/f0e22e28/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
index fa649fb..ca43b31 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
@@ -87,8 +87,7 @@ object FlinkOpAx {
// Convert back to mtx
.toColMatrix
- // It is ridiculous, but in this scheme we will have to re-parallelize it again in order to plug
- // it back as a Flink drm
+ // This doesn't do anything now
val res = FlinkEngine.parallelize(inCoreM, parallelismDegree = 1)
new RowsFlinkDrm[Int](res, 1)
http://git-wip-us.apache.org/repos/asf/mahout/blob/f0e22e28/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index e59e5a5..794c721 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -57,14 +57,14 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
var parallelismDeg: Int = -1
var persistanceRootDir: String = _
- // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}}
+ // need to make sure that this is actually getting the correct properties for {{taskmanager.tmp.dirs}}
val mahoutHome = getMahoutHome()
// this is extra I/O for each cache call. this needs to be moved somewhere where it is called
// only once. Possibly FlinkDistributedEngine.
GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml")
- val conf = GlobalConfiguration.getConfiguration()
+ val conf = GlobalConfiguration.getConfiguration
if (!(conf == null )) {
persistanceRootDir = conf.getString("taskmanager.tmp.dirs", "/tmp/")