You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/05/03 00:51:00 UTC
mahout git commit: MAHOUT-1847: drmSampleRows in FlinkEngine doesn't
wrap Int Keys when ClassTag is of type Int, this closes apache/mahout#232
Repository: mahout
Updated Branches:
refs/heads/master 616b87c07 -> 4c85d6a48
MAHOUT-1847: drmSampleRows in FlinkEngine doesn't wrap Int Keys when ClassTag is of type Int, this closes apache/mahout#232
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/4c85d6a4
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/4c85d6a4
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/4c85d6a4
Branch: refs/heads/master
Commit: 4c85d6a48bcdd5c161f50f85ec1e7d278d1dbae0
Parents: 616b87c
Author: smarthi <sm...@apache.org>
Authored: Mon May 2 18:50:48 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Mon May 2 18:50:48 2016 -0400
----------------------------------------------------------------------
.../org/apache/mahout/flinkbindings/FlinkEngine.scala | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/4c85d6a4/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index fddb432..b3b72b0 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -365,7 +365,15 @@ object FlinkEngine extends DistributedEngine {
implicit val typeInformation = generateTypeInformation[K]
val sample = DataSetUtils(drmX.dataset).sampleWithSize(replacement, numSamples)
- new CheckpointedFlinkDrm[K](sample)
+
+ val res = if (kTag != ClassTag.Int) {
+ new CheckpointedFlinkDrm[K](sample)
+ }
+ else {
+ blas.rekeySeqInts(new RowsFlinkDrm[K](sample, ncol = drmX.ncol), computeMap = false)._1
+ }
+
+ res.collect
}
/** Engine-specific all reduce tensor operation. */