You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/03/15 20:04:52 UTC
mahout git commit: MAHOUT-1812: Implement
drmParallelizeEmptyLong(...) in flink Bindings
Repository: mahout
Updated Branches:
refs/heads/flink-binding e75c5f606 -> 93ebed620
MAHOUT-1812: Implement drmParallelizeEmptyLong(...) in flink Bindings
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/93ebed62
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/93ebed62
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/93ebed62
Branch: refs/heads/flink-binding
Commit: 93ebed620ae7d7da1a6ac2dbe944a8fd517de1c0
Parents: e75c5f6
Author: smarthi <sm...@apache.org>
Authored: Tue Mar 15 15:03:35 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Tue Mar 15 15:04:21 2016 -0400
----------------------------------------------------------------------
.../mahout/flinkbindings/FlinkEngine.scala | 95 +++++++++++---------
.../standard/NaiveBayesTestSuite.scala | 18 ++++
2 files changed, 71 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/93ebed62/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 3988d51..8db063b 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -43,11 +43,11 @@ object FlinkEngine extends DistributedEngine {
var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil
/**
- * Load DRM from hdfs (as in Mahout DRM format).
- *
- * @param path The DFS path to load from
- * @param parMin Minimum parallelism after load (equivalent to #par(min=...)).
- */
+ * Load DRM from hdfs (as in Mahout DRM format).
+ *
+ * @param path The DFS path to load from
+ * @param parMin Minimum parallelism after load (equivalent to #par(min=...)).
+ */
override def drmDfsRead(path: String, parMin: Int = 1)
(implicit dc: DistributedContext): CheckpointedDrm[_] = {
@@ -103,7 +103,7 @@ object FlinkEngine extends DistributedEngine {
override def indexedDatasetDFSRead(src: String, schema: Schema, existingRowIDs: Option[BiDictionary])
(implicit sc: DistributedContext): IndexedDataset = ???
- override def indexedDatasetDFSReadElements(src: String,schema: Schema, existingRowIDs: Option[BiDictionary])
+ override def indexedDatasetDFSReadElements(src: String, schema: Schema, existingRowIDs: Option[BiDictionary])
(implicit sc: DistributedContext): IndexedDataset = ???
@@ -115,16 +115,16 @@ object FlinkEngine extends DistributedEngine {
*/
override def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = super.optimizerRewrite(action)
- /**
- * Translates logical plan into Flink execution plan.
- **/
+ /**
+ * Translates logical plan into Flink execution plan.
+ **/
override def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = {
// Flink-specific Physical Plan translation.
implicit val typeInformation = generateTypeInformation[K]
val drm = flinkTranslate(plan)
val newcp = new CheckpointedFlinkDrm(ds = drm.asRowWise.ds, _nrow = plan.nrow, _ncol = plan.ncol)
- // newcp.ds.getExecutionEnvironment.createProgramPlan("plan")
+ // newcp.ds.getExecutionEnvironment.createProgramPlan("plan")
newcp.cache()
}
@@ -152,10 +152,10 @@ object FlinkEngine extends DistributedEngine {
// express ABt via AtB: let C=At and D=Bt, and calculate CtD
// TODO: create specific implementation of ABt, see MAHOUT-1750
val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts!
- val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]]))
+ val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]]))
val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, _ncol = opAt.ncol)
val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts!
- val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]]))
+ val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]]))
val d = new CheckpointedFlinkDrm(bt.asRowWise.ds, _nrow = opBt.nrow, _ncol = opBt.ncol)
FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d)).asInstanceOf[FlinkDrm[K]]
case op@OpAtA(a) if op.keyClassTag == ClassTag.Int ⇒ FlinkOpAtA.at_a(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
@@ -190,11 +190,11 @@ object FlinkEngine extends DistributedEngine {
}
}
- /**
- * returns a vector that contains a column-wise sum from DRM
- */
+ /**
+ * returns a vector that contains a column-wise sum from DRM
+ */
override def colSums[K](drm: CheckpointedDrm[K]): Vector = {
- implicit val kTag: ClassTag[K] = drm.keyClassTag
+ implicit val kTag: ClassTag[K] = drm.keyClassTag
implicit val typeInformation = generateTypeInformation[K]
@@ -208,7 +208,7 @@ object FlinkEngine extends DistributedEngine {
/** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */
override def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector = {
- implicit val kTag: ClassTag[K] = drm.keyClassTag
+ implicit val kTag: ClassTag[K] = drm.keyClassTag
implicit val typeInformation = generateTypeInformation[K]
@@ -228,18 +228,18 @@ object FlinkEngine extends DistributedEngine {
list.head
}
- /**
- * returns a vector that contains a column-wise mean from DRM
- */
+ /**
+ * returns a vector that contains a column-wise mean from DRM
+ */
override def colMeans[K](drm: CheckpointedDrm[K]): Vector = {
drm.colSums() / drm.nrow
}
/**
- * Calculates the element-wise squared norm of a matrix
- */
+ * Calculates the element-wise squared norm of a matrix
+ */
override def norm[K](drm: CheckpointedDrm[K]): Double = {
- implicit val kTag: ClassTag[K] = drm.keyClassTag
+ implicit val kTag: ClassTag[K] = drm.keyClassTag
implicit val typeInformation = generateTypeInformation[K]
val sumOfSquares = drm.ds.map {
@@ -248,13 +248,13 @@ object FlinkEngine extends DistributedEngine {
}
}.reduce(_ + _)
-// val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] {
-// def map(tuple: (K, Vector)): Double = tuple match {
-// case (idx, vec) => vec dot vec
-// }
-// }).reduce(new ReduceFunction[Double] {
-// def reduce(v1: Double, v2: Double) = v1 + v2
-// })
+ // val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] {
+ // def map(tuple: (K, Vector)): Double = tuple match {
+ // case (idx, vec) => vec dot vec
+ // }
+ // }).reduce(new ReduceFunction[Double] {
+ // def reduce(v1: Double, v2: Double) = v1 + v2
+ // })
val list = sumOfSquares.collect
@@ -263,12 +263,12 @@ object FlinkEngine extends DistributedEngine {
}
/** Broadcast support */
- override def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] =
+ override def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] =
FlinkByteBCast.wrap(v)
/** Broadcast support */
- override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] =
+ override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] =
FlinkByteBCast.wrap(m)
@@ -278,19 +278,19 @@ object FlinkEngine extends DistributedEngine {
val parallelDrm = parallelize(m, numPartitions)
- new CheckpointedFlinkDrm(ds=parallelDrm, _nrow=m.numRows(), _ncol=m.numCols())
+ new CheckpointedFlinkDrm(ds = parallelDrm, _nrow = m.numRows(), _ncol = m.numCols())
}
private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
- (implicit dc: DistributedContext): DrmDataSet[Int] = {
- val rows = (0 until m.nrow).map(i => (i, m(i, ::)))//.toSeq.sortWith((ii, jj) => ii._1 < jj._1)
+ (implicit dc: DistributedContext): DrmDataSet[Int] = {
+ val rows = (0 until m.nrow).map(i => (i, m(i, ::))) //.toSeq.sortWith((ii, jj) => ii._1 < jj._1)
val dataSetType = TypeExtractor.getForObject(rows.head)
//TODO: Make Sure that this is the correct partitioning scheme
dc.env.fromCollection(rows)
- .partitionByRange(0)
- .setParallelism(parallelismDegree)
- .rebalance()
+ .partitionByRange(0)
+ .setParallelism(parallelismDegree)
+ .rebalance()
}
/** Parallelize in-core matrix as flink distributed matrix, using row labels as a data set keys. */
@@ -307,7 +307,7 @@ object FlinkEngine extends DistributedEngine {
/** This creates an empty DRM with specified number of partitions and cardinality. */
override def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
(implicit dc: DistributedContext): CheckpointedDrm[Int] = {
- val nonParallelResult = (0 to numPartitions).flatMap { part =>
+ val nonParallelResult = (0 to numPartitions).flatMap { part ⇒
val partNRow = (nrow - 1) / numPartitions + 1
val partStart = partNRow * part
val partEnd = Math.min(partStart + partNRow, nrow)
@@ -315,13 +315,24 @@ object FlinkEngine extends DistributedEngine {
for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
}
val result = dc.env.fromCollection(nonParallelResult)
- new CheckpointedFlinkDrm[Int](ds=result, _nrow=nrow, _ncol=ncol)
+ new CheckpointedFlinkDrm[Int](ds = result, _nrow = nrow, _ncol = ncol)
}
/** Creates empty DRM with non-trivial height */
override def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
- (implicit sc: DistributedContext): CheckpointedDrm[Long] = ???
-
+ (implicit dc: DistributedContext): CheckpointedDrm[Long] = {
+
+ val nonParallelResult = (0 to numPartitions).flatMap { part ⇒
+ val partNRow = (nrow - 1) / numPartitions + 1
+ val partStart = partNRow * part
+ val partEnd = Math.min(partStart + partNRow, nrow)
+
+ for (i ← partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
+ }
+
+ val result = dc.env.fromCollection(nonParallelResult)
+ new CheckpointedFlinkDrm[Long](ds = result, nrow, ncol, cacheHint = CacheHint.NONE)
+ }
/**
* Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys
http://git-wip-us.apache.org/repos/asf/mahout/blob/93ebed62/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
index d6feed9..0f1d6bc 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.mahout.flinkbindings.standard
import org.apache.mahout.classifier.naivebayes.NBTestBase