You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/04/11 10:09:45 UTC
[18/50] [abbrv] mahout git commit: revert commits:
48a8a8208322ed690d5356a4e0cac7667b080bab
e420485e0b82908919985a892f85c7d6dff9d24b
f0ee522c69a1f47f0d4da263e9f1669404155b5e
b45f982581b2a2bed75b771672b4f2d66ef32840 b96918bba9855fba5cefc11e1c4153b941
revert commits:
48a8a8208322ed690d5356a4e0cac7667b080bab
e420485e0b82908919985a892f85c7d6dff9d24b
f0ee522c69a1f47f0d4da263e9f1669404155b5e
b45f982581b2a2bed75b771672b4f2d66ef32840
b96918bba9855fba5cefc11e1c4153b9419509cb
100d343e4b6e66b1a7c581455cd1faab7bbdb538
ad22252ca2ab39e22d4df7cc22464af2c6179830
ad4c32ce871df686267df1f1dbff76a883b8d3fc
9c5ee59214a454f7ae25c762bf04bb30bd7982c8
a1cf7cf56e036ce12d616f8aea0af1e9dcdf2cb6
commit 48a8a8208322ed690d5356a4e0cac7667b080bab
Author: Andrew Palumbo <ap...@apache.org>
Date: Sun Mar 27 15:16:36 2016 -0400
comment out parallization settting in cache()
commit e420485e0b82908919985a892f85c7d6dff9d24b
Author: Andrew Palumbo <ap...@apache.org>
Date: Sat Mar 26 23:49:14 2016 -0400
Comments, cleanup
commit f0ee522c69a1f47f0d4da263e9f1669404155b5e
Merge: b45f982 a77f1c1
Author: Andrew Palumbo <ap...@apache.org>
Date: Sat Mar 26 23:28:52 2016 -0400
Merge branch 'flink-binding' of https://git-wip-us.apache.org/repos/asf/mahout into MAHOUT-1817
commit a77f1c13de58d462eed7ff224ec333b22ac22bf3
Author: Andrew Palumbo <ap...@apache.org>
Date: Sat Mar 26 23:22:44 2016 -0400
MAHOUT-1749 Mahout DSL for Flink: Implement Atx closes apache/mahout#204
commit b45f982581b2a2bed75b771672b4f2d66ef32840
Author: Andrew Palumbo <ap...@apache.org>
Date: Sat Mar 26 17:35:37 2016 -0400
move getMahoutHome()
commit b96918bba9855fba5cefc11e1c4153b9419509cb
Author: Andrew Palumbo <ap...@apache.org>
Date: Fri Mar 25 20:33:22 2016 -0400
wip: use properties from /home/andy/sandbox/mahout/conf/flink-config.yaml
commit 100d343e4b6e66b1a7c581455cd1faab7bbdb538
Author: Andrew Palumbo <ap...@apache.org>
Date: Fri Mar 25 20:26:41 2016 -0400
wip: use properties from /home/andy/sandbox/mahout/conf/flink-config.yaml
commit ad22252ca2ab39e22d4df7cc22464af2c6179830
Author: Andrew Palumbo <ap...@apache.org>
Date: Fri Mar 25 19:05:39 2016 -0400
use as the base directory for cached files
commit ad4c32ce871df686267df1f1dbff76a883b8d3fc
Author: Andrew Palumbo <ap...@apache.org>
Date: Fri Mar 25 18:31:51 2016 -0400
add unchace
commit 9c5ee59214a454f7ae25c762bf04bb30bd7982c8
Author: Andrew Palumbo <ap...@apache.org>
Date: Fri Mar 25 18:10:18 2016 -0400
Persist only if the dataset has not been cached. Otherwise read back in already cached dataset
commit a1cf7cf56e036ce12d616f8aea0af1e9dcdf2cb6
Author: Andrew Palumbo <ap...@apache.org>
Date: Fri Mar 25 16:56:20 2016 -0400
Small change addressing DL's comment on apache/mahout#200, also a small fix
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/f9111ac2
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/f9111ac2
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/f9111ac2
Branch: refs/heads/master
Commit: f9111ac27334659b2fb55849319dcd8e113b3c25
Parents: 48a8a82
Author: Andrew Palumbo <ap...@apache.org>
Authored: Sun Mar 27 16:06:11 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Sun Mar 27 16:06:11 2016 -0400
----------------------------------------------------------------------
.../mahout/flinkbindings/FlinkEngine.scala | 18 +++--
.../mahout/flinkbindings/blas/FlinkOpAx.scala | 42 +----------
.../drm/CheckpointedFlinkDrm.scala | 73 ++++----------------
.../apache/mahout/flinkbindings/package.scala | 6 --
4 files changed, 27 insertions(+), 112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/f9111ac2/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index dd28e9d..c355cae 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -127,7 +127,6 @@ object FlinkEngine extends DistributedEngine {
newcp.cache()
}
-
private def flinkTranslate[K](oper: DrmLike[K]): FlinkDrm[K] = {
implicit val kTag = oper.keyClassTag
implicit val typeInformation = generateTypeInformation[K]
@@ -138,7 +137,13 @@ object FlinkEngine extends DistributedEngine {
FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a))
case op@OpAt(a) if op.keyClassTag == ClassTag.Int ⇒ FlinkOpAt.sparseTrick(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
case op@OpAtx(a, x) if op.keyClassTag == ClassTag.Int ⇒
- FlinkOpAx.atx_with_broadcast(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
+ // express Atx as (A.t) %*% x
+ // TODO: create specific implementation of Atx, see MAHOUT-1749
+ val opAt = OpAt(a)
+ val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a))
+ val atCast = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, _ncol = opAt.ncol)
+ val opAx = OpAx(atCast, x)
+ FlinkOpAx.blockifiedBroadcastAx(opAx, flinkTranslate(atCast)).asInstanceOf[FlinkDrm[K]]
case op@OpAtB(a, b) ⇒ FlinkOpAtB.notZippable(op, flinkTranslate(a),
flinkTranslate(b)).asInstanceOf[FlinkDrm[K]]
case op@OpABt(a, b) ⇒
@@ -267,7 +272,7 @@ object FlinkEngine extends DistributedEngine {
private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
(implicit dc: DistributedContext): DrmDataSet[Int] = {
- val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
+ val rows = (0 until m.nrow).map(i => (i, m(i, ::))) //.toSeq.sortWith((ii, jj) => ii._1 < jj._1)
val dataSetType = TypeExtractor.getForObject(rows.head)
//TODO: Make Sure that this is the correct partitioning scheme
dc.env.fromCollection(rows)
@@ -353,9 +358,9 @@ object FlinkEngine extends DistributedEngine {
}
def generateTypeInformation[K: ClassTag]: TypeInformation[K] = {
- implicit val ktag = classTag[K]
+ val tag = implicitly[ClassTag[K]]
- generateTypeInformationFromTag(ktag)
+ generateTypeInformationFromTag(tag)
}
private def generateTypeInformationFromTag[K](tag: ClassTag[K]): TypeInformation[K] = {
@@ -369,4 +374,7 @@ object FlinkEngine extends DistributedEngine {
throw new IllegalArgumentException(s"index type $tag is not supported")
}
}
+ object FlinkEngine {
+
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/f9111ac2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
index 8a333c4..ec20b6d 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
@@ -24,12 +24,9 @@ import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
-import org.apache.mahout.flinkbindings.FlinkEngine
-import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm, RowsFlinkDrm}
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.logical.{OpAtx, OpAx}
+import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
+import org.apache.mahout.math.drm.logical.OpAx
import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.math.scalabindings._
import org.apache.mahout.math.{Matrix, Vector}
@@ -61,39 +58,4 @@ object FlinkOpAx {
new BlockifiedFlinkDrm(out, op.nrow.toInt)
}
-
-
- def atx_with_broadcast(op: OpAtx, srcA: FlinkDrm[Int]): FlinkDrm[Int] = {
- implicit val ctx = srcA.context
-
- val dataSetA = srcA.asBlockified.ds
-
- // broadcast the vector x to the back end
- val bcastX = drmBroadcast(op.x)
-
- implicit val typeInformation = createTypeInformation[(Array[Int],Matrix)]
- val inCoreM = dataSetA.map {
- tuple =>
- tuple._1.zipWithIndex.map {
- case (key, idx) => tuple._2(idx, ::) * bcastX.value(key)
- }
- .reduce(_ += _)
- }
- // All-reduce
- .reduce(_ += _)
-
- // collect result
- .collect()(0)
-
- // Convert back to mtx
- .toColMatrix
-
- // It is ridiculous, but in this scheme we will have to re-parallelize it again in order to plug
- // it back as a Flink drm
- val res = FlinkEngine.parallelize(inCoreM, parallelismDegree = 1)
-
- new RowsFlinkDrm[Int](res, 1)
-
- }
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/f9111ac2/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index e59e5a5..ea96e88 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -25,10 +25,8 @@ import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.core.fs.Path
import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
-import org.apache.flink.configuration.GlobalConfiguration
import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat}
-import org.apache.mahout.flinkbindings.io.Hadoop2HDFSUtil
import org.apache.mahout.flinkbindings.{DrmDataSet, _}
import org.apache.mahout.math._
import org.apache.mahout.math.drm.CacheHint._
@@ -52,26 +50,10 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
lazy val ncol: Int = if (_ncol >= 0) _ncol else dim._2
// persistance values
- var cacheFileName: String = "undefinedCacheName"
+ var cacheFileName: String = "/a"
var isCached: Boolean = false
var parallelismDeg: Int = -1
- var persistanceRootDir: String = _
-
- // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}}
- val mahoutHome = getMahoutHome()
-
- // this is extra I/O for each cache call. this needs to be moved somewhere where it is called
- // only once. Possibly FlinkDistributedEngine.
- GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml")
-
- val conf = GlobalConfiguration.getConfiguration()
-
- if (!(conf == null )) {
- persistanceRootDir = conf.getString("taskmanager.tmp.dirs", "/tmp/")
- } else {
- persistanceRootDir = "/tmp/"
- }
-
+ val persistanceRootDir = "/tmp/"
private lazy val dim: (Long, Int) = {
// combine computation of ncol and nrow in one pass
@@ -94,38 +76,20 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
override val keyClassTag: ClassTag[K] = classTag[K]
- /** Note as of Flink 1.0.0, no direct flink caching exists so we save
- * the dataset to the filesystem and read it back when cache is called */
def cache() = {
if (!isCached) {
- cacheFileName = persistanceRootDir + System.nanoTime().toString
+ cacheFileName = System.nanoTime().toString
parallelismDeg = ds.getParallelism
isCached = true
- persist(ds, cacheFileName)
}
- val _ds = readPersistedDataSet(cacheFileName, ds)
-
- /** Leave the parallelism degree to be set the operators
- * TODO: find out a way to set the parallelism degree based on the
- * final drm after computation is actually triggered
- *
- * // We may want to look more closely at this:
- * // since we've cached a drm, triggering a computation
- * // it may not make sense to keep the same parallelism degree
- * if (!(parallelismDeg == _ds.getParallelism)) {
- * _ds.setParallelism(parallelismDeg).rebalance()
- * }
- *
- */
+ implicit val typeInformation = createTypeInformation[(K,Vector)]
+ val _ds = persist(ds, persistanceRootDir + cacheFileName)
datasetWrap(_ds)
}
- def uncache(): this.type = {
- if (isCached) {
- Hadoop2HDFSUtil.delete(cacheFileName)
- isCached = false
- }
+ def uncache() = {
+ // TODO
this
}
@@ -135,10 +99,12 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
* @param dataset [[DataSet]] to write to disk
* @param path File path to write dataset to
* @tparam T Type of the [[DataSet]] elements
+ * @return [[DataSet]] reading the just written file
*/
- def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): Unit = {
+ def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): DataSet[T] = {
val env = dataset.getExecutionEnvironment
val outputFormat = new TypeSerializerOutputFormat[T]
+
val filePath = new Path(path)
outputFormat.setOutputFilePath(filePath)
@@ -146,29 +112,14 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
dataset.output(outputFormat)
env.execute("FlinkTools persist")
- }
-
- /** Read a [[DataSet]] from specified path and returns it as a DataSource for subsequent
- * operations.
- *
- * @param path File path to read dataset from
- * @param ds persisted ds to retrieve type information and environment forom
- * @tparam T key Type of the [[DataSet]] elements
- * @return [[DataSet]] the persisted dataset
- */
- def readPersistedDataSet[T: ClassTag : TypeInformation]
- (path: String, ds: DataSet[T]): DataSet[T] = {
- val env = ds.getExecutionEnvironment
- val inputFormat = new TypeSerializerInputFormat[T](ds.getType())
- val filePath = new Path(path)
+ val inputFormat = new TypeSerializerInputFormat[T](dataset.getType)
inputFormat.setFilePath(filePath)
env.createInput(inputFormat)
}
-
- // Members declared in org.apache.mahout.math.drm.DrmLike
+ // Members declared in org.apache.mahout.math.drm.DrmLike
protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows
http://git-wip-us.apache.org/repos/asf/mahout/blob/f9111ac2/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index e769952..10ce545 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -105,10 +105,4 @@ package object flinkbindings {
private[flinkbindings] def extractRealClassTag[K: ClassTag](drm: DrmLike[K]): ClassTag[_] = drm.keyClassTag
- private[flinkbindings] def getMahoutHome() = {
- var mhome = System.getenv("MAHOUT_HOME")
- if (mhome == null) mhome = System.getProperty("mahout.home")
- require(mhome != null, "MAHOUT_HOME is required to spawn mahout-based flink jobs")
- mhome
- }
}
\ No newline at end of file