You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2016/03/22 03:59:31 UTC
mahout git commit: MAHOUT-1816:Implement newRowCardinality in
CheckpointedFlinkDrm, this closes apache/mahout#199
Repository: mahout
Updated Branches:
refs/heads/flink-binding f4f42ae4c -> 9bf5292fd
MAHOUT-1816:Implement newRowCardinality in CheckpointedFlinkDrm, this closes apache/mahout#199
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/9bf5292f
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/9bf5292f
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/9bf5292f
Branch: refs/heads/flink-binding
Commit: 9bf5292fd7ae8bacf027378d4b692f4e70aa46e2
Parents: f4f42ae
Author: smarthi <sm...@apache.org>
Authored: Mon Mar 21 22:59:11 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Mon Mar 21 22:59:11 2016 -0400
----------------------------------------------------------------------
.../mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala | 10 ++++++----
.../mahout/sparkbindings/drm/CheckpointedDrmSpark.scala | 4 ++--
2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/9bf5292f/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 6f1ba9f..a6b267b 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -192,15 +192,17 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
(x: K) => new Text(x.asInstanceOf[String])
} else if (keyTag.runtimeClass == classOf[Long]) {
(x: K) => new LongWritable(x.asInstanceOf[Long])
- // WritableTypeInfo will reject the base Writable class
-// } else if (classOf[Writable].isAssignableFrom(keyTag.runtimeClass)) {
-// (x: K) => x.asInstanceOf[Writable]
} else {
throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(keyTag))
}
}
- def newRowCardinality(n: Int): CheckpointedDrm[K] = ???
+ def newRowCardinality(n: Int): CheckpointedDrm[K] = {
+ assert(n > -1)
+ assert(n >= nrow)
+ new CheckpointedFlinkDrm(ds = ds, _nrow = n, _ncol = _ncol, cacheHint = cacheHint,
+ partitioningTag = partitioningTag, _canHaveMissingRows = _canHaveMissingRows)
+ }
override val context: DistributedContext = ds.getExecutionEnvironment
http://git-wip-us.apache.org/repos/asf/mahout/blob/9bf5292f/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index 71755c5..ff150a1 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -155,7 +155,7 @@ class CheckpointedDrmSpark[K: ClassTag](
/**
* Dump matrix as computed Mahout's DRM into specified (HD)FS path
*
- * @param path
+ * @param path output path to dump Matrix to
*/
def dfsWrite(path: String) = {
val ktag = implicitly[ClassTag[K]]
@@ -201,7 +201,7 @@ class CheckpointedDrmSpark[K: ClassTag](
rddInput.isBlockified match {
case true ⇒ rddInput.asBlockified(throw new AssertionError("not reached"))
.map(_._2.ncol).reduce(max)
- case false ⇒ cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max(_, _))
+ case false ⇒ cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max)
}
}