You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/04/11 10:09:45 UTC

[18/50] [abbrv] mahout git commit: revert commits: 48a8a8208322ed690d5356a4e0cac7667b080bab e420485e0b82908919985a892f85c7d6dff9d24b f0ee522c69a1f47f0d4da263e9f1669404155b5e b45f982581b2a2bed75b771672b4f2d66ef32840 b96918bba9855fba5cefc11e1c4153b941

revert commits:
 48a8a8208322ed690d5356a4e0cac7667b080bab
 e420485e0b82908919985a892f85c7d6dff9d24b
 f0ee522c69a1f47f0d4da263e9f1669404155b5e
 b45f982581b2a2bed75b771672b4f2d66ef32840
 b96918bba9855fba5cefc11e1c4153b9419509cb
 100d343e4b6e66b1a7c581455cd1faab7bbdb538
 ad22252ca2ab39e22d4df7cc22464af2c6179830
 ad4c32ce871df686267df1f1dbff76a883b8d3fc
 9c5ee59214a454f7ae25c762bf04bb30bd7982c8
 a1cf7cf56e036ce12d616f8aea0af1e9dcdf2cb6

commit 48a8a8208322ed690d5356a4e0cac7667b080bab
Author: Andrew Palumbo <ap...@apache.org>
Date:   Sun Mar 27 15:16:36 2016 -0400

    comment out parallization settting in cache()

commit e420485e0b82908919985a892f85c7d6dff9d24b
Author: Andrew Palumbo <ap...@apache.org>
Date:   Sat Mar 26 23:49:14 2016 -0400

    Comments, cleanup

commit f0ee522c69a1f47f0d4da263e9f1669404155b5e
Merge: b45f982 a77f1c1
Author: Andrew Palumbo <ap...@apache.org>
Date:   Sat Mar 26 23:28:52 2016 -0400

    Merge branch 'flink-binding' of https://git-wip-us.apache.org/repos/asf/mahout into MAHOUT-1817

commit a77f1c13de58d462eed7ff224ec333b22ac22bf3
Author: Andrew Palumbo <ap...@apache.org>
Date:   Sat Mar 26 23:22:44 2016 -0400

    MAHOUT-1749 Mahout DSL for Flink: Implement Atx closes apache/mahout#204

commit b45f982581b2a2bed75b771672b4f2d66ef32840
Author: Andrew Palumbo <ap...@apache.org>
Date:   Sat Mar 26 17:35:37 2016 -0400

    move getMahoutHome()

commit b96918bba9855fba5cefc11e1c4153b9419509cb
Author: Andrew Palumbo <ap...@apache.org>
Date:   Fri Mar 25 20:33:22 2016 -0400

    wip: use properties from /home/andy/sandbox/mahout/conf/flink-config.yaml

commit 100d343e4b6e66b1a7c581455cd1faab7bbdb538
Author: Andrew Palumbo <ap...@apache.org>
Date:   Fri Mar 25 20:26:41 2016 -0400

    wip: use properties from /home/andy/sandbox/mahout/conf/flink-config.yaml

commit ad22252ca2ab39e22d4df7cc22464af2c6179830
Author: Andrew Palumbo <ap...@apache.org>
Date:   Fri Mar 25 19:05:39 2016 -0400

    use  as the base directory for cached files

commit ad4c32ce871df686267df1f1dbff76a883b8d3fc
Author: Andrew Palumbo <ap...@apache.org>
Date:   Fri Mar 25 18:31:51 2016 -0400

    add unchace

commit 9c5ee59214a454f7ae25c762bf04bb30bd7982c8
Author: Andrew Palumbo <ap...@apache.org>
Date:   Fri Mar 25 18:10:18 2016 -0400

    Persist only if the dataset has not been cached.  Otherwise read back in already cached dataset

commit a1cf7cf56e036ce12d616f8aea0af1e9dcdf2cb6
Author: Andrew Palumbo <ap...@apache.org>
Date:   Fri Mar 25 16:56:20 2016 -0400

    Small change addressing DL's comment on apache/mahout#200, also a small fix


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/f9111ac2
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/f9111ac2
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/f9111ac2

Branch: refs/heads/master
Commit: f9111ac27334659b2fb55849319dcd8e113b3c25
Parents: 48a8a82
Author: Andrew Palumbo <ap...@apache.org>
Authored: Sun Mar 27 16:06:11 2016 -0400
Committer: Andrew Palumbo <ap...@apache.org>
Committed: Sun Mar 27 16:06:11 2016 -0400

----------------------------------------------------------------------
 .../mahout/flinkbindings/FlinkEngine.scala      | 18 +++--
 .../mahout/flinkbindings/blas/FlinkOpAx.scala   | 42 +----------
 .../drm/CheckpointedFlinkDrm.scala              | 73 ++++----------------
 .../apache/mahout/flinkbindings/package.scala   |  6 --
 4 files changed, 27 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/f9111ac2/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index dd28e9d..c355cae 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -127,7 +127,6 @@ object FlinkEngine extends DistributedEngine {
     newcp.cache()
   }
 
-
   private def flinkTranslate[K](oper: DrmLike[K]): FlinkDrm[K] = {
     implicit val kTag = oper.keyClassTag
     implicit val typeInformation = generateTypeInformation[K]
@@ -138,7 +137,13 @@ object FlinkEngine extends DistributedEngine {
         FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a))
       case op@OpAt(a) if op.keyClassTag == ClassTag.Int ⇒ FlinkOpAt.sparseTrick(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
       case op@OpAtx(a, x) if op.keyClassTag == ClassTag.Int ⇒
-        FlinkOpAx.atx_with_broadcast(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
+        // express Atx as (A.t) %*% x
+        // TODO: create specific implementation of Atx, see MAHOUT-1749
+        val opAt = OpAt(a)
+        val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a))
+        val atCast = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, _ncol = opAt.ncol)
+        val opAx = OpAx(atCast, x)
+        FlinkOpAx.blockifiedBroadcastAx(opAx, flinkTranslate(atCast)).asInstanceOf[FlinkDrm[K]]
       case op@OpAtB(a, b) ⇒ FlinkOpAtB.notZippable(op, flinkTranslate(a),
         flinkTranslate(b)).asInstanceOf[FlinkDrm[K]]
       case op@OpABt(a, b) ⇒
@@ -267,7 +272,7 @@ object FlinkEngine extends DistributedEngine {
 
   private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
                                         (implicit dc: DistributedContext): DrmDataSet[Int] = {
-    val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
+    val rows = (0 until m.nrow).map(i => (i, m(i, ::))) //.toSeq.sortWith((ii, jj) => ii._1 < jj._1)
     val dataSetType = TypeExtractor.getForObject(rows.head)
     //TODO: Make Sure that this is the correct partitioning scheme
     dc.env.fromCollection(rows)
@@ -353,9 +358,9 @@ object FlinkEngine extends DistributedEngine {
   }
 
   def generateTypeInformation[K: ClassTag]: TypeInformation[K] = {
-    implicit val ktag = classTag[K]
+    val tag = implicitly[ClassTag[K]]
 
-    generateTypeInformationFromTag(ktag)
+    generateTypeInformationFromTag(tag)
   }
 
   private def generateTypeInformationFromTag[K](tag: ClassTag[K]): TypeInformation[K] = {
@@ -369,4 +374,7 @@ object FlinkEngine extends DistributedEngine {
       throw new IllegalArgumentException(s"index type $tag is not supported")
     }
   }
+  object FlinkEngine {
+
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/f9111ac2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
index 8a333c4..ec20b6d 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
@@ -24,12 +24,9 @@ import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
-import org.apache.mahout.flinkbindings.FlinkEngine
-import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm, RowsFlinkDrm}
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.logical.{OpAtx, OpAx}
+import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
+import org.apache.mahout.math.drm.logical.OpAx
 import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.math.{Matrix, Vector}
 
 
@@ -61,39 +58,4 @@ object FlinkOpAx {
 
     new BlockifiedFlinkDrm(out, op.nrow.toInt)
   }
-
-
-  def atx_with_broadcast(op: OpAtx, srcA: FlinkDrm[Int]): FlinkDrm[Int] = {
-    implicit val ctx = srcA.context
-
-    val dataSetA = srcA.asBlockified.ds
-
-    // broadcast the vector x to the back end
-    val bcastX = drmBroadcast(op.x)
-
-    implicit val typeInformation = createTypeInformation[(Array[Int],Matrix)]
-    val inCoreM = dataSetA.map {
-      tuple =>
-        tuple._1.zipWithIndex.map {
-          case (key, idx) => tuple._2(idx, ::) * bcastX.value(key)
-        }
-          .reduce(_ += _)
-    }
-      // All-reduce
-      .reduce(_ += _)
-
-      // collect result
-      .collect()(0)
-
-      // Convert back to mtx
-      .toColMatrix
-
-    // It is ridiculous, but in this scheme we will have to re-parallelize it again in order to plug
-    // it back as a Flink drm
-    val res = FlinkEngine.parallelize(inCoreM, parallelismDegree = 1)
-
-    new RowsFlinkDrm[Int](res, 1)
-
-  }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/f9111ac2/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index e59e5a5..ea96e88 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -25,10 +25,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.core.fs.Path
 import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
-import org.apache.flink.configuration.GlobalConfiguration
 import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable}
 import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat}
-import org.apache.mahout.flinkbindings.io.Hadoop2HDFSUtil
 import org.apache.mahout.flinkbindings.{DrmDataSet, _}
 import org.apache.mahout.math._
 import org.apache.mahout.math.drm.CacheHint._
@@ -52,26 +50,10 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
   lazy val ncol: Int = if (_ncol >= 0) _ncol else dim._2
 
   // persistance values
-  var cacheFileName: String = "undefinedCacheName"
+  var cacheFileName: String = "/a"
   var isCached: Boolean = false
   var parallelismDeg: Int = -1
-  var persistanceRootDir: String = _
-
-  // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}}
-  val mahoutHome = getMahoutHome()
-
-  // this is extra I/O for each cache call.  this needs to be moved somewhere where it is called
-  // only once.  Possibly FlinkDistributedEngine.
-  GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml")
-
-  val conf = GlobalConfiguration.getConfiguration()
-
-  if (!(conf == null )) {
-     persistanceRootDir = conf.getString("taskmanager.tmp.dirs", "/tmp/")
-  } else {
-     persistanceRootDir = "/tmp/"
-  }
-
+  val persistanceRootDir = "/tmp/"
 
   private lazy val dim: (Long, Int) = {
     // combine computation of ncol and nrow in one pass
@@ -94,38 +76,20 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
 
   override val keyClassTag: ClassTag[K] = classTag[K]
 
-  /** Note as of Flink 1.0.0, no direct flink caching exists so we save
-    * the dataset to the filesystem and read it back when cache is called */
   def cache() = {
     if (!isCached) {
-      cacheFileName = persistanceRootDir + System.nanoTime().toString
+      cacheFileName = System.nanoTime().toString
       parallelismDeg = ds.getParallelism
       isCached = true
-      persist(ds, cacheFileName)
     }
-    val _ds = readPersistedDataSet(cacheFileName, ds)
-
-    /** Leave the parallelism degree to be set the operators
-      * TODO: find out a way to set the parallelism degree based on the
-      * final drm after computation is actually triggered
-      *
-      *  // We may want to look more closely at this:
-      *  // since we've cached a drm, triggering a computation
-      *  // it may not make sense to keep the same parallelism degree
-      *  if (!(parallelismDeg == _ds.getParallelism)) {
-      *    _ds.setParallelism(parallelismDeg).rebalance()
-      *  }
-      *
-      */
+    implicit val typeInformation = createTypeInformation[(K,Vector)]
 
+    val _ds = persist(ds, persistanceRootDir + cacheFileName)
     datasetWrap(_ds)
   }
 
-  def uncache(): this.type = {
-    if (isCached) {
-      Hadoop2HDFSUtil.delete(cacheFileName)
-      isCached = false
-    }
+  def uncache() = {
+    // TODO
     this
   }
 
@@ -135,10 +99,12 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
     * @param dataset [[DataSet]] to write to disk
     * @param path File path to write dataset to
     * @tparam T Type of the [[DataSet]] elements
+    * @return [[DataSet]] reading the just written file
     */
-  def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): Unit = {
+  def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): DataSet[T] = {
     val env = dataset.getExecutionEnvironment
     val outputFormat = new TypeSerializerOutputFormat[T]
+
     val filePath = new Path(path)
 
     outputFormat.setOutputFilePath(filePath)
@@ -146,29 +112,14 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
 
     dataset.output(outputFormat)
     env.execute("FlinkTools persist")
-  }
-
-  /** Read a [[DataSet]] from specified path and returns it as a DataSource for subsequent
-    * operations.
-    *
-    * @param path File path to read dataset from
-    * @param ds persisted ds to retrieve type information and environment forom
-    * @tparam T key Type of the [[DataSet]] elements
-    * @return [[DataSet]] the persisted dataset
-    */
-  def readPersistedDataSet[T: ClassTag : TypeInformation]
-       (path: String, ds: DataSet[T]): DataSet[T] = {
 
-    val env = ds.getExecutionEnvironment
-    val inputFormat = new TypeSerializerInputFormat[T](ds.getType())
-    val filePath = new Path(path)
+    val inputFormat = new TypeSerializerInputFormat[T](dataset.getType)
     inputFormat.setFilePath(filePath)
 
     env.createInput(inputFormat)
   }
 
-
-  // Members declared in org.apache.mahout.math.drm.DrmLike
+  // Members declared in org.apache.mahout.math.drm.DrmLike   
 
   protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/f9111ac2/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index e769952..10ce545 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -105,10 +105,4 @@ package object flinkbindings {
 
   private[flinkbindings] def extractRealClassTag[K: ClassTag](drm: DrmLike[K]): ClassTag[_] = drm.keyClassTag
 
-  private[flinkbindings] def getMahoutHome() = {
-    var mhome = System.getenv("MAHOUT_HOME")
-    if (mhome == null) mhome = System.getProperty("mahout.home")
-    require(mhome != null, "MAHOUT_HOME is required to spawn mahout-based flink jobs")
-    mhome
-  }
 }
\ No newline at end of file