You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/05/03 00:51:00 UTC

mahout git commit: MAHOUT-1847: drmSampleRows in FlinkEngine doesn't wrap Int Keys when ClassTag is of type Int, this closes apache/mahout#232

Repository: mahout
Updated Branches:
  refs/heads/master 616b87c07 -> 4c85d6a48


MAHOUT-1847: drmSampleRows in FlinkEngine doesn't wrap Int Keys when ClassTag is of type Int, this closes apache/mahout#232


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/4c85d6a4
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/4c85d6a4
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/4c85d6a4

Branch: refs/heads/master
Commit: 4c85d6a48bcdd5c161f50f85ec1e7d278d1dbae0
Parents: 616b87c
Author: smarthi <sm...@apache.org>
Authored: Mon May 2 18:50:48 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Mon May 2 18:50:48 2016 -0400

----------------------------------------------------------------------
 .../org/apache/mahout/flinkbindings/FlinkEngine.scala     | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/4c85d6a4/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index fddb432..b3b72b0 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -365,7 +365,15 @@ object FlinkEngine extends DistributedEngine {
     implicit val typeInformation = generateTypeInformation[K]
 
     val sample = DataSetUtils(drmX.dataset).sampleWithSize(replacement, numSamples)
-    new CheckpointedFlinkDrm[K](sample)
+
+    val res = if (kTag != ClassTag.Int) {
+      new CheckpointedFlinkDrm[K](sample)
+    }
+    else {
+      blas.rekeySeqInts(new RowsFlinkDrm[K](sample, ncol = drmX.ncol), computeMap = false)._1
+    }
+
+    res.collect
   }
 
   /** Engine-specific all reduce tensor operation. */