You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/04/23 16:18:38 UTC

mahout git commit: MAHOUT-1835: Remove countsPerPartition in Flink/blas/package.scala

Repository: mahout
Updated Branches:
  refs/heads/master add081015 -> 0c39c2999


MAHOUT-1835: Remove countsPerPartition in Flink/blas/package.scala


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/0c39c299
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/0c39c299
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/0c39c299

Branch: refs/heads/master
Commit: 0c39c29998c8b6f93174bf7458f1b4732405c3e4
Parents: add0810
Author: smarthi <sm...@apache.org>
Authored: Sat Apr 23 10:17:40 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Sat Apr 23 10:17:40 2016 -0400

----------------------------------------------------------------------
 .../mahout/flinkbindings/blas/package.scala     | 31 +++-----------------
 pom.xml                                         |  2 +-
 2 files changed, 5 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/0c39c299/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
index 553a422..32a8cac 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
@@ -23,42 +23,19 @@ import java.lang.Iterable
 import org.apache.flink.api.common.functions.RichMapPartitionFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.utils._
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.util.Collector
 import org.apache.mahout.flinkbindings.drm.FlinkDrm
 import org.apache.mahout.math.drm.DrmLike
+import org.apache.mahout.math.scalabindings.RLikeOps._
 import org.apache.mahout.math.{RandomAccessSparseVector, Vector}
-import org.apache.mahout.math._
-import scalabindings._
-import RLikeOps._
 
-import scala.collection._
 import scala.reflect.ClassTag
 
 package object blas {
 
   /**
-    * To compute tuples (PartitionIndex, PartitionElementCount)
-    *
-    * @param drmDataSet - DRM Dataset
-    * @tparam K - Key type
-    * @return (PartitionIndex, PartitionElementCount)
-    */
-  //TODO: Remove this when FLINK-3657 is merged into Flink codebase and
-  // replace by call to DataSetUtils.countElementsPerPartition(DataSet[K])
-  private[flinkbindings] def countsPerPartition[K](drmDataSet: DataSet[K]): DataSet[(Int, Int)] = {
-    drmDataSet.mapPartition {
-      new RichMapPartitionFunction[K, (Int, Int)] {
-        override def mapPartition(iterable: Iterable[K], collector: Collector[(Int, Int)]) = {
-          val count: Int = Iterator(iterable).size
-          val index: Int = getRuntimeContext.getIndexOfThisSubtask
-          collector.collect((index, count))
-        }
-      }
-    }
-  }
-
-  /**
     * Rekey matrix dataset keys to consecutive int keys.
     * @param drmDataSet incoming matrix row-wise dataset
     * @param computeMap if true, also compute mapping between old and new keys
@@ -79,13 +56,13 @@ package object blas {
     val env = datasetA.getExecutionEnvironment
 
     // First, compute partition sizes.
-    val partSizes = countsPerPartition(datasetA).collect().toList
+    val partSizes = DataSetUtils(datasetA).countElementsPerPartition.collect().toList
 
     // Starting indices
     var startInd = new Array[Int](datasetA.getParallelism)
 
     // Save counts
-    for (pc <- partSizes) startInd(pc._1) = pc._2
+    for (pc <- partSizes) startInd(pc._1) = pc._2.toInt
 
     // compute cumulative sum
     val cumulativeSum = startInd.scanLeft(0)(_ + _).init

http://git-wip-us.apache.org/repos/asf/mahout/blob/0c39c299/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5136320..c669467 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,7 +122,7 @@
     <scala.compat.version>2.10</scala.compat.version>
     <scala.version>2.10.4</scala.version>
     <spark.version>1.5.2</spark.version>
-    <flink.version>1.0.1</flink.version>
+    <flink.version>1.0.2</flink.version>
     <h2o.version>0.1.25</h2o.version>
     <jackson.version>2.7.2</jackson.version>
   </properties>