You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by av...@apache.org on 2015/06/24 20:14:59 UTC
[2/2] mahout git commit: MAHOUT-1736: Implement allreduceBlock() on
H2O
MAHOUT-1736: Implement allreduceBlock() on H2O
This closes apache/mahout#143
Signed-off-by: Anand Avati <av...@redhat.com>
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/5924e160
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/5924e160
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/5924e160
Branch: refs/heads/master
Commit: 5924e160823905f2e6c711cdd367c78109bf3d0e
Parents: 5d02c70
Author: Anand Avati <av...@redhat.com>
Authored: Tue Jun 23 20:23:53 2015 -0700
Committer: Anand Avati <av...@redhat.com>
Committed: Wed Jun 24 11:06:35 2015 -0700
----------------------------------------------------------------------
.../apache/mahout/h2obindings/H2OHelper.java | 32 ++++++++++++++++++++
.../apache/mahout/h2obindings/H2OEngine.scala | 2 +-
2 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/5924e160/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHelper.java
----------------------------------------------------------------------
diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHelper.java b/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHelper.java
index 0fae5a8..c9d91f9 100644
--- a/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHelper.java
+++ b/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHelper.java
@@ -33,14 +33,19 @@ import water.util.ArrayUtils;
import java.util.Map;
import java.util.HashMap;
+import java.io.Serializable;
import org.apache.mahout.h2obindings.drm.H2ODrm;
+import org.apache.mahout.h2obindings.drm.H2OBCast;
// for makeEmptyStrVec
import water.Key;
import water.DKV;
import water.fvec.CStrChunk;
+import scala.Function1;
+import scala.Function2;
+
/**
* Collection of helper methods for H2O backend.
*/
@@ -437,4 +442,31 @@ public class H2OHelper {
public static H2ODrm emptyDrm(long nrow, int ncol, int minHint, int exactHint) {
return new H2ODrm(emptyFrame(nrow, ncol, minHint, exactHint));
}
+
+ public static Matrix allreduceBlock(H2ODrm drmA, Object bmfn, Object rfn) {
+ class MRTaskMR extends MRTask<MRTaskMR> {
+ H2OBCast<Matrix> bmf_out;
+ Serializable bmf;
+ Serializable rf;
+
+ public MRTaskMR(Object _bmf, Object _rf) {
+ bmf = (Serializable) _bmf;
+ rf = (Serializable) _rf;
+ }
+
+ @Override
+ public void map(Chunk chks[]) {
+ Function1 f = (Function1) bmf;
+ bmf_out = new H2OBCast((Matrix)f.apply(new scala.Tuple2(null, new H2OBlockMatrix(chks))));
+ }
+
+ @Override
+ public void reduce(MRTaskMR that) {
+ Function2 f = (Function2) rf;
+ bmf_out = new H2OBCast((Matrix)f.apply(this.bmf_out.value(), that.bmf_out.value()));
+ }
+ }
+
+ return new MRTaskMR(bmfn, rfn).doAll(drmA.frame).bmf_out.value();
+ }
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5924e160/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
index 4236b95..bcf3507 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
@@ -168,7 +168,7 @@ object H2OEngine extends DistributedEngine {
*
*/
override def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc)
- : Matrix = ???
+ : Matrix = H2OHelper.allreduceBlock(drm.h2odrm, bmf, rf)
/**
* TODO: implement this please.