You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/04/11 10:10:01 UTC
[34/50] [abbrv] mahout git commit: MAHOUT-1814:Implement drm2intKeyed
in flink bindings, this closes apache/mahout#214
MAHOUT-1814:Implement drm2intKeyed in flink bindings, this closes apache/mahout#214
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/430310db
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/430310db
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/430310db
Branch: refs/heads/master
Commit: 430310dbf7870e263ea0340df9b226655ce82a72
Parents: 7c275f0
Author: smarthi <sm...@apache.org>
Authored: Sat Apr 9 23:04:53 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Sat Apr 9 23:04:53 2016 -0400
----------------------------------------------------------------------
.../mahout/flinkbindings/FlinkEngine.scala | 30 +++++-
.../mahout/flinkbindings/blas/FlinkOpAtA.scala | 18 ++--
.../mahout/flinkbindings/blas/package.scala | 101 ++++++++++++++++++-
3 files changed, 133 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/430310db/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index adff30b..fddb432 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -320,8 +320,34 @@ object FlinkEngine extends DistributedEngine {
* Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys
* to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix.
*/
- def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false):
- (DrmLike[Int], Option[DrmLike[K]]) = ???
+ def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) = {
+ implicit val ktag = drmX.keyClassTag
+ implicit val kTypeInformation = generateTypeInformation[K]
+
+ if (ktag == ClassTag.Int) {
+ drmX.asInstanceOf[DrmLike[Int]] → None
+ } else {
+ val drmXcp = drmX.checkpoint(CacheHint.MEMORY_ONLY)
+ val ncol = drmXcp.asInstanceOf[CheckpointedFlinkDrm[K]].ncol
+ val nrow = drmXcp.asInstanceOf[CheckpointedFlinkDrm[K]].nrow
+
+ // Compute sequential int key numbering.
+ val (intDataset, keyMap) = blas.rekeySeqInts(drmDataSet = drmXcp, computeMap = computeMap)
+
+ // Convert computed key mapping to a matrix.
+ val mxKeyMap = keyMap.map { dataSet ⇒
+ datasetWrap(dataSet.map {
+ tuple: (K, Int) => {
+ val ordinal = tuple._2
+ val key = tuple._1
+ key -> (dvec(ordinal): Vector)
+ }
+ })
+ }
+
+ intDataset -> mxKeyMap
+ }
+ }
/**
* (Optional) Sampling operation.
http://git-wip-us.apache.org/repos/asf/mahout/blob/430310db/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
index ab99e4d..6d0221a 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
@@ -21,27 +21,21 @@ package org.apache.mahout.flinkbindings.blas
import java.lang.Iterable
import org.apache.flink.api.common.functions._
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.shaded.com.google.common.collect.Lists
import org.apache.flink.util.Collector
-
-import org.apache.mahout.math.{Matrix, UpperTriangular}
-import org.apache.mahout.math.drm.{BlockifiedDrmTuple, _}
-
-import org.apache.mahout.math._
import org.apache.mahout.flinkbindings._
import org.apache.mahout.flinkbindings.drm._
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import collection._
-import JavaConversions._
import org.apache.mahout.math.drm.logical.OpAtA
+import org.apache.mahout.math.drm.{BlockifiedDrmTuple, _}
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.{Matrix, UpperTriangular, _}
-
+import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
+import scala.collection._
/**
* Inspired by Spark's implementation from
http://git-wip-us.apache.org/repos/asf/mahout/blob/430310db/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
index 6a3ac0e..265951c 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
@@ -21,18 +21,27 @@ package org.apache.mahout.flinkbindings
import java.lang.Iterable
import org.apache.flink.api.common.functions.RichMapPartitionFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
+import org.apache.mahout.flinkbindings.drm.FlinkDrm
+import org.apache.mahout.math.drm.DrmLike
+import org.apache.mahout.math.{RandomAccessSparseVector, Vector}
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
import scala.collection._
+import scala.reflect.ClassTag
package object blas {
/**
* To compute tuples (PartitionIndex, PartitionElementCount)
*
- * @param drmDataSet
- * @tparam K
+ * @param drmDataSet - DRM Dataset
+ * @tparam K - Key type
* @return (PartitionIndex, PartitionElementCount)
*/
//TODO: Remove this when FLINK-3657 is merged into Flink codebase and
@@ -48,4 +57,92 @@ package object blas {
}
}
}
+
+ /**
+ * Rekey matrix dataset keys to consecutive int keys.
+ * @param drmDataSet incoming matrix row-wise dataset
+ * @param computeMap if true, also compute mapping between old and new keys
+ * @tparam K existing key parameter
+ * @return
+ */
+ private[mahout] def rekeySeqInts[K: ClassTag: TypeInformation](drmDataSet: FlinkDrm[K],
+ computeMap: Boolean = true): (DrmLike[Int],
+ Option[DataSet[(K, Int)]]) = {
+
+ implicit val dc = drmDataSet.context
+
+ val datasetA = drmDataSet.asRowWise.ds
+
+ val ncols = drmDataSet.asRowWise.ncol
+
+ // Flink environment
+ val env = datasetA.getExecutionEnvironment
+
+ // First, compute partition sizes.
+ val partSizes = countsPerPartition(datasetA).collect().toList
+
+ // Starting indices
+ var startInd = new Array[Int](datasetA.getParallelism)
+
+ // Save counts
+ for (pc <- partSizes) startInd(pc._1) = pc._2
+
+ // compute cumulative sum
+ val cumulativeSum = startInd.scanLeft(0)(_ + _).init
+
+ val vector: Vector = new RandomAccessSparseVector(cumulativeSum.length)
+
+ cumulativeSum.indices.foreach { i => vector(i) = cumulativeSum(i).toDouble }
+
+ val bCast = FlinkEngine.drmBroadcast(vector)
+
+ implicit val typeInformation = createTypeInformation[(K, Int)]
+
+ // Compute key -> int index map:
+ val keyMap = if (computeMap) {
+ Some(
+ datasetA.mapPartition(new RichMapPartitionFunction[(K, Vector), (K, Int)] {
+
+ // partition number
+ var part: Int = 0
+
+ // get the index of the partition
+ override def open(params: Configuration): Unit = {
+ part = getRuntimeContext.getIndexOfThisSubtask
+ }
+
+ override def mapPartition(iterable: Iterable[(K, Vector)], collector: Collector[(K, Int)]): Unit = {
+ val k = iterable.iterator().next._1
+ val si = bCast.value.get(part)
+ collector.collect(k -> (part + si).toInt)
+ }
+ }))
+ } else {
+ None
+ }
+
+ // Finally, do the transform
+ val intDataSet = datasetA
+
+ // Re-number each partition
+ .mapPartition(new RichMapPartitionFunction[(K, Vector), (Int, Vector)] {
+
+ // partition number
+ var part: Int = 0
+
+ // get the index of the partition
+ override def open(params: Configuration): Unit = {
+ part = getRuntimeContext.getIndexOfThisSubtask
+ }
+
+ override def mapPartition(iterable: Iterable[(K, Vector)], collector: Collector[(Int, Vector)]): Unit = {
+ val k = iterable.iterator().next._2
+ val si = bCast.value.get(part)
+ collector.collect((part + si).toInt -> k)
+ }
+ })
+
+ // Finally, return drm -> keymap result
+ datasetWrap(intDataSet) -> keyMap
+ }
}
\ No newline at end of file