You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/03/22 03:59:31 UTC

mahout git commit: MAHOUT-1816:Implement newRowCardinality in CheckpointedFlinkDrm, this closes apache/mahout#199

Repository: mahout
Updated Branches:
  refs/heads/flink-binding f4f42ae4c -> 9bf5292fd


MAHOUT-1816:Implement newRowCardinality in CheckpointedFlinkDrm, this closes apache/mahout#199


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/9bf5292f
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/9bf5292f
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/9bf5292f

Branch: refs/heads/flink-binding
Commit: 9bf5292fd7ae8bacf027378d4b692f4e70aa46e2
Parents: f4f42ae
Author: smarthi <sm...@apache.org>
Authored: Mon Mar 21 22:59:11 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Mon Mar 21 22:59:11 2016 -0400

----------------------------------------------------------------------
 .../mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala   | 10 ++++++----
 .../mahout/sparkbindings/drm/CheckpointedDrmSpark.scala   |  4 ++--
 2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/9bf5292f/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 6f1ba9f..a6b267b 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -192,15 +192,17 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
       (x: K) => new Text(x.asInstanceOf[String]) 
     } else if (keyTag.runtimeClass == classOf[Long]) {
       (x: K) => new LongWritable(x.asInstanceOf[Long])
-    // WritableTypeInfo will reject the base Writable class
-//          } else if (classOf[Writable].isAssignableFrom(keyTag.runtimeClass)) {
-//      (x: K) => x.asInstanceOf[Writable]
     } else {
       throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(keyTag))
     }
   }
 
-  def newRowCardinality(n: Int): CheckpointedDrm[K] = ???
+  def newRowCardinality(n: Int): CheckpointedDrm[K] = {
+    assert(n > -1)
+    assert(n >= nrow)
+    new CheckpointedFlinkDrm(ds = ds, _nrow = n, _ncol = _ncol, cacheHint = cacheHint,
+      partitioningTag = partitioningTag, _canHaveMissingRows = _canHaveMissingRows)
+  }
 
   override val context: DistributedContext = ds.getExecutionEnvironment
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/9bf5292f/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index 71755c5..ff150a1 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -155,7 +155,7 @@ class CheckpointedDrmSpark[K: ClassTag](
   /**
    * Dump matrix as computed Mahout's DRM into specified (HD)FS path
     *
-    * @param path
+    * @param path output path to dump Matrix to
    */
   def dfsWrite(path: String) = {
     val ktag = implicitly[ClassTag[K]]
@@ -201,7 +201,7 @@ class CheckpointedDrmSpark[K: ClassTag](
     rddInput.isBlockified match {
       case true ⇒ rddInput.asBlockified(throw new AssertionError("not reached"))
         .map(_._2.ncol).reduce(max)
-      case false ⇒ cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max(_, _))
+      case false ⇒ cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max)
     }
   }