You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/04/23 16:18:38 UTC
mahout git commit: MAHOUT-1835: Remove countsPerPartition in
Flink/blas/package.scala
Repository: mahout
Updated Branches:
refs/heads/master add081015 -> 0c39c2999
MAHOUT-1835: Remove countsPerPartition in Flink/blas/package.scala
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/0c39c299
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/0c39c299
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/0c39c299
Branch: refs/heads/master
Commit: 0c39c29998c8b6f93174bf7458f1b4732405c3e4
Parents: add0810
Author: smarthi <sm...@apache.org>
Authored: Sat Apr 23 10:17:40 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Sat Apr 23 10:17:40 2016 -0400
----------------------------------------------------------------------
.../mahout/flinkbindings/blas/package.scala | 31 +++-----------------
pom.xml | 2 +-
2 files changed, 5 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/0c39c299/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
index 553a422..32a8cac 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
@@ -23,42 +23,19 @@ import java.lang.Iterable
import org.apache.flink.api.common.functions.RichMapPartitionFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.utils._
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
import org.apache.mahout.flinkbindings.drm.FlinkDrm
import org.apache.mahout.math.drm.DrmLike
+import org.apache.mahout.math.scalabindings.RLikeOps._
import org.apache.mahout.math.{RandomAccessSparseVector, Vector}
-import org.apache.mahout.math._
-import scalabindings._
-import RLikeOps._
-import scala.collection._
import scala.reflect.ClassTag
package object blas {
/**
- * To compute tuples (PartitionIndex, PartitionElementCount)
- *
- * @param drmDataSet - DRM Dataset
- * @tparam K - Key type
- * @return (PartitionIndex, PartitionElementCount)
- */
- //TODO: Remove this when FLINK-3657 is merged into Flink codebase and
- // replace by call to DataSetUtils.countElementsPerPartition(DataSet[K])
- private[flinkbindings] def countsPerPartition[K](drmDataSet: DataSet[K]): DataSet[(Int, Int)] = {
- drmDataSet.mapPartition {
- new RichMapPartitionFunction[K, (Int, Int)] {
- override def mapPartition(iterable: Iterable[K], collector: Collector[(Int, Int)]) = {
- val count: Int = Iterator(iterable).size
- val index: Int = getRuntimeContext.getIndexOfThisSubtask
- collector.collect((index, count))
- }
- }
- }
- }
-
- /**
* Rekey matrix dataset keys to consecutive int keys.
* @param drmDataSet incoming matrix row-wise dataset
* @param computeMap if true, also compute mapping between old and new keys
@@ -79,13 +56,13 @@ package object blas {
val env = datasetA.getExecutionEnvironment
// First, compute partition sizes.
- val partSizes = countsPerPartition(datasetA).collect().toList
+ val partSizes = DataSetUtils(datasetA).countElementsPerPartition.collect().toList
// Starting indices
var startInd = new Array[Int](datasetA.getParallelism)
// Save counts
- for (pc <- partSizes) startInd(pc._1) = pc._2
+ for (pc <- partSizes) startInd(pc._1) = pc._2.toInt
// compute cumulative sum
val cumulativeSum = startInd.scanLeft(0)(_ + _).init
http://git-wip-us.apache.org/repos/asf/mahout/blob/0c39c299/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5136320..c669467 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,7 +122,7 @@
<scala.compat.version>2.10</scala.compat.version>
<scala.version>2.10.4</scala.version>
<spark.version>1.5.2</spark.version>
- <flink.version>1.0.1</flink.version>
+ <flink.version>1.0.2</flink.version>
<h2o.version>0.1.25</h2o.version>
<jackson.version>2.7.2</jackson.version>
</properties>